mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Handle cancellation of waiting executions correctly (#13051)
This commit is contained in:
parent
be39d0a0f1
commit
fc440eb68b
|
@ -1,28 +1,29 @@
|
||||||
import { mock } from 'jest-mock-extended';
|
import { captor, mock } from 'jest-mock-extended';
|
||||||
import type {
|
import type {
|
||||||
|
IDeferredPromise,
|
||||||
IExecuteResponsePromiseData,
|
IExecuteResponsePromiseData,
|
||||||
IRun,
|
IRun,
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import { createDeferredPromise } from 'n8n-workflow';
|
import { ExecutionCancelledError, randomInt, sleep } from 'n8n-workflow';
|
||||||
import PCancelable from 'p-cancelable';
|
import PCancelable from 'p-cancelable';
|
||||||
import { v4 as uuid } from 'uuid';
|
import { v4 as uuid } from 'uuid';
|
||||||
|
|
||||||
import { ActiveExecutions } from '@/active-executions';
|
import { ActiveExecutions } from '@/active-executions';
|
||||||
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
||||||
|
import config from '@/config';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
|
jest.mock('n8n-workflow', () => ({
|
||||||
|
...jest.requireActual('n8n-workflow'),
|
||||||
|
sleep: jest.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
const FAKE_EXECUTION_ID = '15';
|
const FAKE_EXECUTION_ID = '15';
|
||||||
const FAKE_SECOND_EXECUTION_ID = '20';
|
const FAKE_SECOND_EXECUTION_ID = '20';
|
||||||
|
|
||||||
const updateExistingExecution = jest.fn();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const createNewExecution = jest.fn(async () => FAKE_EXECUTION_ID);
|
|
||||||
|
|
||||||
const executionRepository = mock<ExecutionRepository>({
|
|
||||||
updateExistingExecution,
|
|
||||||
createNewExecution,
|
|
||||||
});
|
|
||||||
|
|
||||||
const concurrencyControl = mockInstance(ConcurrencyControlService, {
|
const concurrencyControl = mockInstance(ConcurrencyControlService, {
|
||||||
// @ts-expect-error Private property
|
// @ts-expect-error Private property
|
||||||
|
@ -31,155 +32,22 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, {
|
||||||
|
|
||||||
describe('ActiveExecutions', () => {
|
describe('ActiveExecutions', () => {
|
||||||
let activeExecutions: ActiveExecutions;
|
let activeExecutions: ActiveExecutions;
|
||||||
|
let responsePromise: IDeferredPromise<IExecuteResponsePromiseData>;
|
||||||
|
let workflowExecution: PCancelable<IRun>;
|
||||||
|
let postExecutePromise: Promise<IRun | undefined>;
|
||||||
|
|
||||||
beforeEach(() => {
|
const fullRunData: IRun = {
|
||||||
activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl);
|
data: {
|
||||||
});
|
resultData: {
|
||||||
|
runData: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
mode: 'manual',
|
||||||
|
startedAt: new Date(),
|
||||||
|
status: 'new',
|
||||||
|
};
|
||||||
|
|
||||||
afterEach(() => {
|
const executionData: IWorkflowExecutionDataProcess = {
|
||||||
jest.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should initialize activeExecutions with empty list', () => {
|
|
||||||
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should add execution to active execution list', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
|
|
||||||
expect(executionId).toBe(FAKE_EXECUTION_ID);
|
|
||||||
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
|
||||||
expect(createNewExecution).toHaveBeenCalledTimes(1);
|
|
||||||
expect(updateExistingExecution).toHaveBeenCalledTimes(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should update execution if add is called with execution ID', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID);
|
|
||||||
|
|
||||||
expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID);
|
|
||||||
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
|
||||||
expect(createNewExecution).toHaveBeenCalledTimes(0);
|
|
||||||
expect(updateExistingExecution).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should fail attaching execution to invalid executionId', async () => {
|
|
||||||
const deferredPromise = mockCancelablePromise();
|
|
||||||
|
|
||||||
expect(() => {
|
|
||||||
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise);
|
|
||||||
}).toThrow();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should successfully attach execution to valid executionId', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
|
|
||||||
const deferredPromise = mockCancelablePromise();
|
|
||||||
|
|
||||||
expect(() =>
|
|
||||||
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise),
|
|
||||||
).not.toThrow();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should attach and resolve response promise to existing execution', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
|
|
||||||
const deferredPromise = mockDeferredPromise();
|
|
||||||
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise);
|
|
||||||
const fakeResponse = { data: { resultData: { runData: {} } } };
|
|
||||||
activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse);
|
|
||||||
|
|
||||||
await expect(deferredPromise.promise).resolves.toEqual(fakeResponse);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
activeExecutions.setStatus(executionId, 'waiting');
|
|
||||||
activeExecutions.attachResponsePromise(executionId, mockDeferredPromise());
|
|
||||||
|
|
||||||
const waitingExecution = activeExecutions.getExecution(executionId);
|
|
||||||
expect(waitingExecution.responsePromise).toBeDefined();
|
|
||||||
|
|
||||||
// Resume the execution
|
|
||||||
await activeExecutions.add(newExecution, executionId);
|
|
||||||
|
|
||||||
const resumedExecution = activeExecutions.getExecution(executionId);
|
|
||||||
expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt);
|
|
||||||
expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should not remove a waiting execution', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
activeExecutions.setStatus(executionId, 'waiting');
|
|
||||||
activeExecutions.finalizeExecution(executionId);
|
|
||||||
|
|
||||||
// Wait until the next tick to ensure that the post-execution promise has settled
|
|
||||||
await new Promise(setImmediate);
|
|
||||||
|
|
||||||
// Execution should still be in activeExecutions
|
|
||||||
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
|
||||||
expect(activeExecutions.getStatus(executionId)).toBe('waiting');
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should remove an existing execution', async () => {
|
|
||||||
// ARRANGE
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
|
|
||||||
// ACT
|
|
||||||
activeExecutions.finalizeExecution(executionId);
|
|
||||||
|
|
||||||
// Wait until the next tick to ensure that the post-execution promise has settled
|
|
||||||
await new Promise(setImmediate);
|
|
||||||
|
|
||||||
// ASSERT
|
|
||||||
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should not try to resolve a post-execute promise for an inactive execution', async () => {
|
|
||||||
const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecution');
|
|
||||||
|
|
||||||
activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData());
|
|
||||||
|
|
||||||
expect(getExecutionSpy).not.toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should resolve post execute promise on removal', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
const postExecutePromise = activeExecutions.getPostExecutePromise(executionId);
|
|
||||||
// Force the above to be executed since we cannot await it
|
|
||||||
await new Promise((res) => {
|
|
||||||
setTimeout(res, 100);
|
|
||||||
});
|
|
||||||
const fakeOutput = mockFullRunData();
|
|
||||||
activeExecutions.finalizeExecution(executionId, fakeOutput);
|
|
||||||
|
|
||||||
await expect(postExecutePromise).resolves.toEqual(fakeOutput);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should throw error when trying to create a promise with invalid execution', async () => {
|
|
||||||
await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should call function to cancel execution when asked to stop', async () => {
|
|
||||||
const newExecution = mockExecutionData();
|
|
||||||
const executionId = await activeExecutions.add(newExecution);
|
|
||||||
const cancelExecution = jest.fn();
|
|
||||||
const cancellablePromise = mockCancelablePromise();
|
|
||||||
cancellablePromise.cancel = cancelExecution;
|
|
||||||
activeExecutions.attachWorkflowExecution(executionId, cancellablePromise);
|
|
||||||
activeExecutions.stopExecution(executionId);
|
|
||||||
|
|
||||||
expect(cancelExecution).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
function mockExecutionData(): IWorkflowExecutionDataProcess {
|
|
||||||
return {
|
|
||||||
executionMode: 'manual',
|
executionMode: 'manual',
|
||||||
workflowData: {
|
workflowData: {
|
||||||
id: '123',
|
id: '123',
|
||||||
|
@ -192,22 +60,235 @@ function mockExecutionData(): IWorkflowExecutionDataProcess {
|
||||||
},
|
},
|
||||||
userId: uuid(),
|
userId: uuid(),
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
function mockFullRunData(): IRun {
|
beforeEach(() => {
|
||||||
return {
|
activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl);
|
||||||
data: {
|
|
||||||
resultData: {
|
|
||||||
runData: {},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
mode: 'manual',
|
|
||||||
startedAt: new Date(),
|
|
||||||
status: 'new',
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/promise-function-async
|
executionRepository.createNewExecution.mockResolvedValue(FAKE_EXECUTION_ID);
|
||||||
const mockCancelablePromise = () => new PCancelable<IRun>((resolve) => resolve());
|
|
||||||
|
|
||||||
const mockDeferredPromise = () => createDeferredPromise<IExecuteResponsePromiseData>();
|
workflowExecution = new PCancelable<IRun>((resolve) => resolve());
|
||||||
|
workflowExecution.cancel = jest.fn();
|
||||||
|
responsePromise = mock<IDeferredPromise<IExecuteResponsePromiseData>>();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should initialize activeExecutions with empty list', () => {
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should add execution to active execution list', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData);
|
||||||
|
|
||||||
|
expect(executionId).toBe(FAKE_EXECUTION_ID);
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
||||||
|
expect(executionRepository.createNewExecution).toHaveBeenCalledTimes(1);
|
||||||
|
expect(executionRepository.updateExistingExecution).toHaveBeenCalledTimes(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should update execution if add is called with execution ID', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData, FAKE_SECOND_EXECUTION_ID);
|
||||||
|
|
||||||
|
expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID);
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
||||||
|
expect(executionRepository.createNewExecution).toHaveBeenCalledTimes(0);
|
||||||
|
expect(executionRepository.updateExistingExecution).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('attachWorkflowExecution', () => {
|
||||||
|
test('Should fail attaching execution to invalid executionId', async () => {
|
||||||
|
expect(() => {
|
||||||
|
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution);
|
||||||
|
}).toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should successfully attach execution to valid executionId', async () => {
|
||||||
|
await activeExecutions.add(executionData, FAKE_EXECUTION_ID);
|
||||||
|
|
||||||
|
expect(() =>
|
||||||
|
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution),
|
||||||
|
).not.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should attach and resolve response promise to existing execution', async () => {
|
||||||
|
await activeExecutions.add(executionData, FAKE_EXECUTION_ID);
|
||||||
|
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, responsePromise);
|
||||||
|
const fakeResponse = { data: { resultData: { runData: {} } } };
|
||||||
|
activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse);
|
||||||
|
|
||||||
|
expect(responsePromise.resolve).toHaveBeenCalledWith(fakeResponse);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(executionId, 'waiting');
|
||||||
|
activeExecutions.attachResponsePromise(executionId, responsePromise);
|
||||||
|
|
||||||
|
const waitingExecution = activeExecutions.getExecutionOrFail(executionId);
|
||||||
|
expect(waitingExecution.responsePromise).toBeDefined();
|
||||||
|
|
||||||
|
// Resume the execution
|
||||||
|
await activeExecutions.add(executionData, executionId);
|
||||||
|
|
||||||
|
const resumedExecution = activeExecutions.getExecutionOrFail(executionId);
|
||||||
|
expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt);
|
||||||
|
expect(resumedExecution.responsePromise).toBe(responsePromise);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('finalizeExecution', () => {
|
||||||
|
test('Should not remove a waiting execution', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(executionId, 'waiting');
|
||||||
|
activeExecutions.finalizeExecution(executionId);
|
||||||
|
|
||||||
|
// Wait until the next tick to ensure that the post-execution promise has settled
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
|
// Execution should still be in activeExecutions
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
|
||||||
|
expect(activeExecutions.getStatus(executionId)).toBe('waiting');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should remove an existing execution', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData);
|
||||||
|
|
||||||
|
activeExecutions.finalizeExecution(executionId);
|
||||||
|
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should not try to resolve a post-execute promise for an inactive execution', async () => {
|
||||||
|
const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail');
|
||||||
|
|
||||||
|
activeExecutions.finalizeExecution('inactive-execution-id', fullRunData);
|
||||||
|
|
||||||
|
expect(getExecutionSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should resolve post execute promise on removal', async () => {
|
||||||
|
const executionId = await activeExecutions.add(executionData);
|
||||||
|
const postExecutePromise = activeExecutions.getPostExecutePromise(executionId);
|
||||||
|
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
activeExecutions.finalizeExecution(executionId, fullRunData);
|
||||||
|
|
||||||
|
await expect(postExecutePromise).resolves.toEqual(fullRunData);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getPostExecutePromise', () => {
|
||||||
|
test('Should throw error when trying to create a promise with invalid execution', async () => {
|
||||||
|
await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('stopExecution', () => {
|
||||||
|
let executionId: string;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
executionId = await activeExecutions.add(executionData);
|
||||||
|
postExecutePromise = activeExecutions.getPostExecutePromise(executionId);
|
||||||
|
|
||||||
|
activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
|
||||||
|
activeExecutions.attachResponsePromise(executionId, responsePromise);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should cancel ongoing executions', async () => {
|
||||||
|
activeExecutions.stopExecution(executionId);
|
||||||
|
|
||||||
|
expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError));
|
||||||
|
expect(workflowExecution.cancel).toHaveBeenCalledTimes(1);
|
||||||
|
await expect(postExecutePromise).rejects.toThrow(ExecutionCancelledError);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should cancel waiting executions', async () => {
|
||||||
|
activeExecutions.setStatus(executionId, 'waiting');
|
||||||
|
activeExecutions.stopExecution(executionId);
|
||||||
|
|
||||||
|
expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError));
|
||||||
|
expect(workflowExecution.cancel).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('shutdown', () => {
|
||||||
|
let newExecutionId1: string, newExecutionId2: string;
|
||||||
|
let waitingExecutionId1: string, waitingExecutionId2: string;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
config.set('executions.mode', 'regular');
|
||||||
|
|
||||||
|
executionRepository.createNewExecution.mockImplementation(async () =>
|
||||||
|
randomInt(1000, 2000).toString(),
|
||||||
|
);
|
||||||
|
|
||||||
|
(sleep as jest.Mock).mockImplementation(() => {
|
||||||
|
// @ts-expect-error private property
|
||||||
|
activeExecutions.activeExecutions = {};
|
||||||
|
});
|
||||||
|
|
||||||
|
newExecutionId1 = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(newExecutionId1, 'new');
|
||||||
|
activeExecutions.attachResponsePromise(newExecutionId1, responsePromise);
|
||||||
|
|
||||||
|
newExecutionId2 = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(newExecutionId2, 'new');
|
||||||
|
|
||||||
|
waitingExecutionId1 = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(waitingExecutionId1, 'waiting');
|
||||||
|
activeExecutions.attachResponsePromise(waitingExecutionId1, responsePromise);
|
||||||
|
|
||||||
|
waitingExecutionId2 = await activeExecutions.add(executionData);
|
||||||
|
activeExecutions.setStatus(waitingExecutionId2, 'waiting');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should cancel only executions with response-promises by default', async () => {
|
||||||
|
const stopExecutionSpy = jest.spyOn(activeExecutions, 'stopExecution');
|
||||||
|
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(4);
|
||||||
|
|
||||||
|
await activeExecutions.shutdown();
|
||||||
|
|
||||||
|
expect(concurrencyControl.disable).toHaveBeenCalled();
|
||||||
|
|
||||||
|
const removeAllCaptor = captor<string[]>();
|
||||||
|
expect(concurrencyControl.removeAll).toHaveBeenCalledWith(removeAllCaptor);
|
||||||
|
expect(removeAllCaptor.value.sort()).toEqual([newExecutionId1, waitingExecutionId1].sort());
|
||||||
|
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledTimes(2);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId1);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId1);
|
||||||
|
expect(stopExecutionSpy).not.toHaveBeenCalledWith(newExecutionId2);
|
||||||
|
expect(stopExecutionSpy).not.toHaveBeenCalledWith(waitingExecutionId2);
|
||||||
|
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
// the other two executions aren't cancelled, but still removed from memory
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Should cancel all executions when cancelAll is true', async () => {
|
||||||
|
const stopExecutionSpy = jest.spyOn(activeExecutions, 'stopExecution');
|
||||||
|
|
||||||
|
expect(activeExecutions.getActiveExecutions()).toHaveLength(4);
|
||||||
|
|
||||||
|
await activeExecutions.shutdown(true);
|
||||||
|
|
||||||
|
expect(concurrencyControl.disable).toHaveBeenCalled();
|
||||||
|
|
||||||
|
const removeAllCaptor = captor<string[]>();
|
||||||
|
expect(concurrencyControl.removeAll).toHaveBeenCalledWith(removeAllCaptor);
|
||||||
|
expect(removeAllCaptor.value.sort()).toEqual(
|
||||||
|
[newExecutionId1, newExecutionId2, waitingExecutionId1, waitingExecutionId2].sort(),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledTimes(4);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId1);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId1);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId2);
|
||||||
|
expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
|
@ -95,13 +95,14 @@ export class ActiveExecutions {
|
||||||
const resumingExecution = this.activeExecutions[executionId];
|
const resumingExecution = this.activeExecutions[executionId];
|
||||||
const postExecutePromise = createDeferredPromise<IRun | undefined>();
|
const postExecutePromise = createDeferredPromise<IRun | undefined>();
|
||||||
|
|
||||||
this.activeExecutions[executionId] = {
|
const execution: IExecutingWorkflowData = {
|
||||||
executionData,
|
executionData,
|
||||||
startedAt: resumingExecution?.startedAt ?? new Date(),
|
startedAt: resumingExecution?.startedAt ?? new Date(),
|
||||||
postExecutePromise,
|
postExecutePromise,
|
||||||
status: executionStatus,
|
status: executionStatus,
|
||||||
responsePromise: resumingExecution?.responsePromise,
|
responsePromise: resumingExecution?.responsePromise,
|
||||||
};
|
};
|
||||||
|
this.activeExecutions[executionId] = execution;
|
||||||
|
|
||||||
// Automatically remove execution once the postExecutePromise settles
|
// Automatically remove execution once the postExecutePromise settles
|
||||||
void postExecutePromise.promise
|
void postExecutePromise.promise
|
||||||
|
@ -111,7 +112,10 @@ export class ActiveExecutions {
|
||||||
})
|
})
|
||||||
.finally(() => {
|
.finally(() => {
|
||||||
this.concurrencyControl.release({ mode: executionData.executionMode });
|
this.concurrencyControl.release({ mode: executionData.executionMode });
|
||||||
if (this.activeExecutions[executionId]?.status !== 'waiting') {
|
if (execution.status === 'waiting') {
|
||||||
|
// Do not hold on a reference to the previous WorkflowExecute instance, since a resuming execution will use a new instance
|
||||||
|
delete execution.workflowExecution;
|
||||||
|
} else {
|
||||||
delete this.activeExecutions[executionId];
|
delete this.activeExecutions[executionId];
|
||||||
this.logger.debug('Execution removed', { executionId });
|
this.logger.debug('Execution removed', { executionId });
|
||||||
}
|
}
|
||||||
|
@ -127,14 +131,14 @@ export class ActiveExecutions {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
|
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
|
||||||
this.getExecution(executionId).workflowExecution = workflowExecution;
|
this.getExecutionOrFail(executionId).workflowExecution = workflowExecution;
|
||||||
}
|
}
|
||||||
|
|
||||||
attachResponsePromise(
|
attachResponsePromise(
|
||||||
executionId: string,
|
executionId: string,
|
||||||
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
|
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
|
||||||
): void {
|
): void {
|
||||||
this.getExecution(executionId).responsePromise = responsePromise;
|
this.getExecutionOrFail(executionId).responsePromise = responsePromise;
|
||||||
}
|
}
|
||||||
|
|
||||||
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
|
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
|
||||||
|
@ -149,15 +153,23 @@ export class ActiveExecutions {
|
||||||
// There is no execution running with that id
|
// There is no execution running with that id
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
execution.workflowExecution?.cancel();
|
const error = new ExecutionCancelledError(executionId);
|
||||||
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
|
execution.responsePromise?.reject(error);
|
||||||
|
if (execution.status === 'waiting') {
|
||||||
|
// A waiting execution will not have a valid workflowExecution or postExecutePromise
|
||||||
|
// So we can't rely on the `.finally` on the postExecutePromise for the execution removal
|
||||||
|
delete this.activeExecutions[executionId];
|
||||||
|
} else {
|
||||||
|
execution.workflowExecution?.cancel();
|
||||||
|
execution.postExecutePromise.reject(error);
|
||||||
|
}
|
||||||
this.logger.debug('Execution cancelled', { executionId });
|
this.logger.debug('Execution cancelled', { executionId });
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Resolve the post-execution promise in an execution. */
|
/** Resolve the post-execution promise in an execution. */
|
||||||
finalizeExecution(executionId: string, fullRunData?: IRun) {
|
finalizeExecution(executionId: string, fullRunData?: IRun) {
|
||||||
if (!this.has(executionId)) return;
|
if (!this.has(executionId)) return;
|
||||||
const execution = this.getExecution(executionId);
|
const execution = this.getExecutionOrFail(executionId);
|
||||||
execution.postExecutePromise.resolve(fullRunData);
|
execution.postExecutePromise.resolve(fullRunData);
|
||||||
this.logger.debug('Execution finalized', { executionId });
|
this.logger.debug('Execution finalized', { executionId });
|
||||||
}
|
}
|
||||||
|
@ -166,7 +178,7 @@ export class ActiveExecutions {
|
||||||
* Returns a promise which will resolve with the data of the execution with the given id
|
* Returns a promise which will resolve with the data of the execution with the given id
|
||||||
*/
|
*/
|
||||||
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
|
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
|
||||||
return await this.getExecution(executionId).postExecutePromise.promise;
|
return await this.getExecutionOrFail(executionId).postExecutePromise.promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -193,32 +205,40 @@ export class ActiveExecutions {
|
||||||
}
|
}
|
||||||
|
|
||||||
setStatus(executionId: string, status: ExecutionStatus) {
|
setStatus(executionId: string, status: ExecutionStatus) {
|
||||||
this.getExecution(executionId).status = status;
|
this.getExecutionOrFail(executionId).status = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
getStatus(executionId: string): ExecutionStatus {
|
getStatus(executionId: string): ExecutionStatus {
|
||||||
return this.getExecution(executionId).status;
|
return this.getExecutionOrFail(executionId).status;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Wait for all active executions to finish */
|
/** Wait for all active executions to finish */
|
||||||
async shutdown(cancelAll = false) {
|
async shutdown(cancelAll = false) {
|
||||||
let executionIds = Object.keys(this.activeExecutions);
|
const isRegularMode = config.getEnv('executions.mode') === 'regular';
|
||||||
|
if (isRegularMode) {
|
||||||
if (config.getEnv('executions.mode') === 'regular') {
|
|
||||||
// removal of active executions will no longer release capacity back,
|
// removal of active executions will no longer release capacity back,
|
||||||
// so that throttled executions cannot resume during shutdown
|
// so that throttled executions cannot resume during shutdown
|
||||||
this.concurrencyControl.disable();
|
this.concurrencyControl.disable();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cancelAll) {
|
let executionIds = Object.keys(this.activeExecutions);
|
||||||
if (config.getEnv('executions.mode') === 'regular') {
|
const toCancel: string[] = [];
|
||||||
await this.concurrencyControl.removeAll(this.activeExecutions);
|
for (const executionId of executionIds) {
|
||||||
|
const { responsePromise, status } = this.activeExecutions[executionId];
|
||||||
|
if (!!responsePromise || (isRegularMode && cancelAll)) {
|
||||||
|
// Cancel all exectutions that have a response promise, because these promises can't be retained between restarts
|
||||||
|
this.stopExecution(executionId);
|
||||||
|
toCancel.push(executionId);
|
||||||
|
} else if (status === 'waiting' || status === 'new') {
|
||||||
|
// Remove waiting and new executions to not block shutdown
|
||||||
|
delete this.activeExecutions[executionId];
|
||||||
}
|
}
|
||||||
|
|
||||||
executionIds.forEach((executionId) => this.stopExecution(executionId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.concurrencyControl.removeAll(toCancel);
|
||||||
|
|
||||||
let count = 0;
|
let count = 0;
|
||||||
|
executionIds = Object.keys(this.activeExecutions);
|
||||||
while (executionIds.length !== 0) {
|
while (executionIds.length !== 0) {
|
||||||
if (count++ % 4 === 0) {
|
if (count++ % 4 === 0) {
|
||||||
this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`);
|
this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`);
|
||||||
|
@ -229,7 +249,7 @@ export class ActiveExecutions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getExecution(executionId: string): IExecutingWorkflowData {
|
getExecutionOrFail(executionId: string): IExecutingWorkflowData {
|
||||||
const execution = this.activeExecutions[executionId];
|
const execution = this.activeExecutions[executionId];
|
||||||
if (!execution) {
|
if (!execution) {
|
||||||
throw new ExecutionNotFoundError(executionId);
|
throw new ExecutionNotFoundError(executionId);
|
||||||
|
|
|
@ -11,7 +11,6 @@ import config from '@/config';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
||||||
import type { EventService } from '@/events/event.service';
|
import type { EventService } from '@/events/event.service';
|
||||||
import type { IExecutingWorkflowData } from '@/interfaces';
|
|
||||||
import type { Telemetry } from '@/telemetry';
|
import type { Telemetry } from '@/telemetry';
|
||||||
import { mockLogger } from '@test/mocking';
|
import { mockLogger } from '@test/mocking';
|
||||||
|
|
||||||
|
@ -432,11 +431,7 @@ describe('ConcurrencyControlService', () => {
|
||||||
/**
|
/**
|
||||||
* Act
|
* Act
|
||||||
*/
|
*/
|
||||||
await service.removeAll({
|
await service.removeAll(['1', '2', '3']);
|
||||||
'1': mock<IExecutingWorkflowData>(),
|
|
||||||
'2': mock<IExecutingWorkflowData>(),
|
|
||||||
'3': mock<IExecutingWorkflowData>(),
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert
|
* Assert
|
||||||
|
|
|
@ -8,7 +8,6 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
|
||||||
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
||||||
import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error';
|
import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error';
|
||||||
import { EventService } from '@/events/event.service';
|
import { EventService } from '@/events/event.service';
|
||||||
import type { IExecutingWorkflowData } from '@/interfaces';
|
|
||||||
import { Telemetry } from '@/telemetry';
|
import { Telemetry } from '@/telemetry';
|
||||||
|
|
||||||
import { ConcurrencyQueue } from './concurrency-queue';
|
import { ConcurrencyQueue } from './concurrency-queue';
|
||||||
|
@ -140,7 +139,7 @@ export class ConcurrencyControlService {
|
||||||
* enqueued executions that have response promises, as these cannot
|
* enqueued executions that have response promises, as these cannot
|
||||||
* be re-run via `Start.runEnqueuedExecutions` during startup.
|
* be re-run via `Start.runEnqueuedExecutions` during startup.
|
||||||
*/
|
*/
|
||||||
async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) {
|
async removeAll(executionIdsToCancel: string[]) {
|
||||||
if (!this.isEnabled) return;
|
if (!this.isEnabled) return;
|
||||||
|
|
||||||
this.queues.forEach((queue) => {
|
this.queues.forEach((queue) => {
|
||||||
|
@ -151,15 +150,13 @@ export class ConcurrencyControlService {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const executionIds = Object.entries(activeExecutions)
|
if (executionIdsToCancel.length === 0) return;
|
||||||
.filter(([_, execution]) => execution.status === 'new' && execution.responsePromise)
|
|
||||||
.map(([executionId, _]) => executionId);
|
|
||||||
|
|
||||||
if (executionIds.length === 0) return;
|
await this.executionRepository.cancelMany(executionIdsToCancel);
|
||||||
|
|
||||||
await this.executionRepository.cancelMany(executionIds);
|
this.logger.info('Canceled enqueued executions with response promises', {
|
||||||
|
executionIds: executionIdsToCancel,
|
||||||
this.logger.info('Canceled enqueued executions with response promises', { executionIds });
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
disable() {
|
disable() {
|
||||||
|
|
|
@ -530,6 +530,7 @@ export async function executeWebhook(
|
||||||
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
|
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
|
||||||
{ executionId, workflowId: workflow.id },
|
{ executionId, workflowId: workflow.id },
|
||||||
);
|
);
|
||||||
|
responseCallback(error, {});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue