fix(core): Prevent __default__ jobs in scaling mode (#12402)

This commit is contained in:
Iván Ovejero 2024-12-30 14:31:13 +01:00 committed by GitHub
parent 1e60bbcf16
commit 072664b40e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 29 additions and 8 deletions

View file

@ -2,7 +2,7 @@ import { GlobalConfig } from '@n8n/config';
import * as BullModule from 'bull'; import * as BullModule from 'bull';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings } from 'n8n-core';
import { ApplicationError } from 'n8n-workflow'; import { ApplicationError, ExecutionCancelledError } from 'n8n-workflow';
import Container from 'typedi'; import Container from 'typedi';
import { mockInstance, mockLogger } from '@test/mocking'; import { mockInstance, mockLogger } from '@test/mocking';
@ -287,6 +287,8 @@ describe('ScalingService', () => {
const result = await scalingService.stopJob(job); const result = await scalingService.stopJob(job);
expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' }); expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' });
expect(job.discard).toHaveBeenCalled();
expect(job.moveToFailed).toHaveBeenCalledWith(new ExecutionCancelledError('123'), true);
expect(result).toBe(true); expect(result).toBe(true);
}); });

View file

@ -1,8 +1,15 @@
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core'; import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core';
import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify, ensureError } from 'n8n-workflow'; import {
ApplicationError,
BINARY_ENCODING,
sleep,
jsonStringify,
ensureError,
ExecutionCancelledError,
} from 'n8n-workflow';
import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { strict } from 'node:assert'; import assert, { strict } from 'node:assert';
import Container, { Service } from 'typedi'; import Container, { Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions'; import { ActiveExecutions } from '@/active-executions';
@ -206,7 +213,8 @@ export class ScalingService {
try { try {
if (await job.isActive()) { if (await job.isActive()) {
await job.progress({ kind: 'abort-job' }); // being processed by worker await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('Stopped active job', props); await job.discard(); // prevent retries
await job.moveToFailed(new ExecutionCancelledError(job.data.executionId), true); // remove from queue
return true; return true;
} }
@ -214,8 +222,15 @@ export class ScalingService {
this.logger.debug('Stopped inactive job', props); this.logger.debug('Stopped inactive job', props);
return true; return true;
} catch (error: unknown) { } catch (error: unknown) {
await job.progress({ kind: 'abort-job' }); assert(error instanceof Error);
this.logger.error('Failed to stop job', { ...props, error }); this.logger.error('Failed to stop job', {
...props,
error: {
message: error.message,
name: error.name,
stack: error.stack,
},
});
return false; return false;
} }
} }

View file

@ -66,10 +66,15 @@ export class WorkflowRunner {
// //
// FIXME: This is a quick fix. The proper fix would be to not remove // FIXME: This is a quick fix. The proper fix would be to not remove
// the execution from the active executions while it's still running. // the execution from the active executions while it's still running.
if (error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError) { if (
error instanceof ExecutionNotFoundError ||
error instanceof ExecutionCancelledError ||
error.message.includes('cancelled')
) {
return; return;
} }
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
this.errorReporter.error(error, { executionId }); this.errorReporter.error(error, { executionId });
const isQueueMode = config.getEnv('executions.mode') === 'queue'; const isQueueMode = config.getEnv('executions.mode') === 'queue';
@ -413,7 +418,6 @@ export class WorkflowRunner {
data.workflowData, data.workflowData,
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined }, { retryOf: data.retryOf ? data.retryOf.toString() : undefined },
); );
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
await this.processError(error, new Date(), data.executionMode, executionId, hooks); await this.processError(error, new Date(), data.executionMode, executionId, hooks);
reject(error); reject(error);