mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 21:37:32 -08:00
fix(core): Prevent false stalled jobs in queue mode from displaying as errored (#7435)
This is related to an issue with how Bull handles stalled jobs, see https://github.com/OptimalBits/bull/issues/1415 for reference. CPU intensive workflows can in certain cases take a long while to finish up, thereby blocking the thread and causing Bull queue to think the job has stalled, even though it finished successfully. In these cases the error handling could then overwrite the successful execution data with the error message.
This commit is contained in:
parent
c599006b91
commit
e01b9e5ae1
|
@ -86,6 +86,24 @@ export class WorkflowRunner {
|
|||
) {
|
||||
ErrorReporter.error(error);
|
||||
|
||||
const isQueueMode = config.getEnv('executions.mode') === 'queue';
|
||||
|
||||
// in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled
|
||||
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415
|
||||
|
||||
if (isQueueMode && executionMode !== 'manual') {
|
||||
const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution(
|
||||
executionId,
|
||||
{
|
||||
includeData: false,
|
||||
},
|
||||
);
|
||||
if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') {
|
||||
// false positive, execution was successful
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const fullRunData: IRun = {
|
||||
data: {
|
||||
resultData: {
|
||||
|
|
83
packages/cli/test/unit/WorkflowRunner.test.ts
Normal file
83
packages/cli/test/unit/WorkflowRunner.test.ts
Normal file
|
@ -0,0 +1,83 @@
|
|||
import type { User } from '@db/entities/User';
|
||||
import * as testDb from '../integration/shared/testDb';
|
||||
import * as utils from '../integration/shared/utils/';
|
||||
import { createWorkflow, createExecution } from '../integration/shared/testDb';
|
||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
|
||||
import { Push } from '../../src/push';
|
||||
import { mockInstance } from '../integration/shared/utils';
|
||||
import Container from 'typedi';
|
||||
import config from '../../src/config';
|
||||
|
||||
let owner: User;
|
||||
let runner: WorkflowRunner;
|
||||
let hookFunctions: IWorkflowExecuteHooks;
|
||||
utils.setupTestServer({ endpointGroups: [] });
|
||||
|
||||
class Watchers {
|
||||
workflowExecuteAfter = jest.fn();
|
||||
}
|
||||
const watchers = new Watchers();
|
||||
const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter');
|
||||
|
||||
beforeAll(async () => {
|
||||
const globalOwnerRole = await testDb.getGlobalOwnerRole();
|
||||
owner = await testDb.createUser({ globalRole: globalOwnerRole });
|
||||
|
||||
mockInstance(Push);
|
||||
Container.set(Push, new Push());
|
||||
|
||||
runner = new WorkflowRunner();
|
||||
|
||||
hookFunctions = {
|
||||
workflowExecuteAfter: [watchers.workflowExecuteAfter],
|
||||
};
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await testDb.truncate(['Workflow', 'SharedWorkflow']);
|
||||
});
|
||||
|
||||
test('processError should return early in Bull stalled edge case', async () => {
|
||||
const workflow = await createWorkflow({}, owner);
|
||||
const execution = await createExecution(
|
||||
{
|
||||
status: 'success',
|
||||
finished: true,
|
||||
},
|
||||
workflow,
|
||||
);
|
||||
config.set('executions.mode', 'queue');
|
||||
await runner.processError(
|
||||
new Error('test') as ExecutionError,
|
||||
new Date(),
|
||||
'webhook',
|
||||
execution.id,
|
||||
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
|
||||
);
|
||||
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
test('processError should process error', async () => {
|
||||
const workflow = await createWorkflow({}, owner);
|
||||
const execution = await createExecution(
|
||||
{
|
||||
status: 'success',
|
||||
finished: true,
|
||||
},
|
||||
workflow,
|
||||
);
|
||||
config.set('executions.mode', 'regular');
|
||||
await runner.processError(
|
||||
new Error('test') as ExecutionError,
|
||||
new Date(),
|
||||
'webhook',
|
||||
execution.id,
|
||||
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
|
||||
);
|
||||
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
|
||||
});
|
Loading…
Reference in a new issue