diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index 8e4fd1a142..0ae648a098 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -1,28 +1,29 @@ -import { mock } from 'jest-mock-extended'; +import { captor, mock } from 'jest-mock-extended'; import type { + IDeferredPromise, IExecuteResponsePromiseData, IRun, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; -import { createDeferredPromise } from 'n8n-workflow'; +import { ExecutionCancelledError, randomInt, sleep } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import { v4 as uuid } from 'uuid'; import { ActiveExecutions } from '@/active-executions'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import config from '@/config'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { mockInstance } from '@test/mocking'; +jest.mock('n8n-workflow', () => ({ + ...jest.requireActual('n8n-workflow'), + sleep: jest.fn(), +})); + const FAKE_EXECUTION_ID = '15'; const FAKE_SECOND_EXECUTION_ID = '20'; -const updateExistingExecution = jest.fn(); -const createNewExecution = jest.fn(async () => FAKE_EXECUTION_ID); - -const executionRepository = mock({ - updateExistingExecution, - createNewExecution, -}); +const executionRepository = mock(); const concurrencyControl = mockInstance(ConcurrencyControlService, { // @ts-expect-error Private property @@ -31,155 +32,22 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, { describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; + let responsePromise: IDeferredPromise; + let workflowExecution: PCancelable; + let postExecutePromise: Promise; - beforeEach(() => { - activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl); - }); + const fullRunData: IRun = { + data: { + resultData: { + runData: {}, + }, + }, + mode: 'manual', + startedAt: new Date(), + status: 'new', + }; - afterEach(() => { - jest.clearAllMocks(); - }); - - test('Should initialize activeExecutions with empty list', () => { - expect(activeExecutions.getActiveExecutions()).toHaveLength(0); - }); - - test('Should add execution to active execution list', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - - expect(executionId).toBe(FAKE_EXECUTION_ID); - expect(activeExecutions.getActiveExecutions()).toHaveLength(1); - expect(createNewExecution).toHaveBeenCalledTimes(1); - expect(updateExistingExecution).toHaveBeenCalledTimes(0); - }); - - test('Should update execution if add is called with execution ID', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID); - - expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); - expect(activeExecutions.getActiveExecutions()).toHaveLength(1); - expect(createNewExecution).toHaveBeenCalledTimes(0); - expect(updateExistingExecution).toHaveBeenCalledTimes(1); - }); - - test('Should fail attaching execution to invalid executionId', async () => { - const deferredPromise = mockCancelablePromise(); - - expect(() => { - activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise); - }).toThrow(); - }); - - test('Should successfully attach execution to valid executionId', async () => { - const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); - const deferredPromise = mockCancelablePromise(); - - expect(() => - activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise), - ).not.toThrow(); - }); - - test('Should attach and resolve response promise to existing execution', async () => { - const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); - const deferredPromise = mockDeferredPromise(); - activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); - const fakeResponse = { data: { resultData: { runData: {} } } }; - activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse); - - await expect(deferredPromise.promise).resolves.toEqual(fakeResponse); - }); - - test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - activeExecutions.setStatus(executionId, 'waiting'); - activeExecutions.attachResponsePromise(executionId, mockDeferredPromise()); - - const waitingExecution = activeExecutions.getExecution(executionId); - expect(waitingExecution.responsePromise).toBeDefined(); - - // Resume the execution - await activeExecutions.add(newExecution, executionId); - - const resumedExecution = activeExecutions.getExecution(executionId); - expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt); - expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise); - }); - - test('Should not remove a waiting execution', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - activeExecutions.setStatus(executionId, 'waiting'); - activeExecutions.finalizeExecution(executionId); - - // Wait until the next tick to ensure that the post-execution promise has settled - await new Promise(setImmediate); - - // Execution should still be in activeExecutions - expect(activeExecutions.getActiveExecutions()).toHaveLength(1); - expect(activeExecutions.getStatus(executionId)).toBe('waiting'); - }); - - test('Should remove an existing execution', async () => { - // ARRANGE - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - - // ACT - activeExecutions.finalizeExecution(executionId); - - // Wait until the next tick to ensure that the post-execution promise has settled - await new Promise(setImmediate); - - // ASSERT - expect(activeExecutions.getActiveExecutions()).toHaveLength(0); - }); - - test('Should not try to resolve a post-execute promise for an inactive execution', async () => { - const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecution'); - - activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData()); - - expect(getExecutionSpy).not.toHaveBeenCalled(); - }); - - test('Should resolve post execute promise on removal', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - const postExecutePromise = activeExecutions.getPostExecutePromise(executionId); - // Force the above to be executed since we cannot await it - await new Promise((res) => { - setTimeout(res, 100); - }); - const fakeOutput = mockFullRunData(); - activeExecutions.finalizeExecution(executionId, fakeOutput); - - await expect(postExecutePromise).resolves.toEqual(fakeOutput); - }); - - test('Should throw error when trying to create a promise with invalid execution', async () => { - await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow(); - }); - - test('Should call function to cancel execution when asked to stop', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - const cancelExecution = jest.fn(); - const cancellablePromise = mockCancelablePromise(); - cancellablePromise.cancel = cancelExecution; - activeExecutions.attachWorkflowExecution(executionId, cancellablePromise); - activeExecutions.stopExecution(executionId); - - expect(cancelExecution).toHaveBeenCalledTimes(1); - }); -}); - -function mockExecutionData(): IWorkflowExecutionDataProcess { - return { + const executionData: IWorkflowExecutionDataProcess = { executionMode: 'manual', workflowData: { id: '123', @@ -192,22 +60,235 @@ function mockExecutionData(): IWorkflowExecutionDataProcess { }, userId: uuid(), }; -} -function mockFullRunData(): IRun { - return { - data: { - resultData: { - runData: {}, - }, - }, - mode: 'manual', - startedAt: new Date(), - status: 'new', - }; -} + beforeEach(() => { + activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl); -// eslint-disable-next-line @typescript-eslint/promise-function-async -const mockCancelablePromise = () => new PCancelable((resolve) => resolve()); + executionRepository.createNewExecution.mockResolvedValue(FAKE_EXECUTION_ID); -const mockDeferredPromise = () => createDeferredPromise(); + workflowExecution = new PCancelable((resolve) => resolve()); + workflowExecution.cancel = jest.fn(); + responsePromise = mock>(); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + test('Should initialize activeExecutions with empty list', () => { + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); + }); + + test('Should add execution to active execution list', async () => { + const executionId = await activeExecutions.add(executionData); + + expect(executionId).toBe(FAKE_EXECUTION_ID); + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); + expect(executionRepository.createNewExecution).toHaveBeenCalledTimes(1); + expect(executionRepository.updateExistingExecution).toHaveBeenCalledTimes(0); + }); + + test('Should update execution if add is called with execution ID', async () => { + const executionId = await activeExecutions.add(executionData, FAKE_SECOND_EXECUTION_ID); + + expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); + expect(executionRepository.createNewExecution).toHaveBeenCalledTimes(0); + expect(executionRepository.updateExistingExecution).toHaveBeenCalledTimes(1); + }); + + describe('attachWorkflowExecution', () => { + test('Should fail attaching execution to invalid executionId', async () => { + expect(() => { + activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution); + }).toThrow(); + }); + + test('Should successfully attach execution to valid executionId', async () => { + await activeExecutions.add(executionData, FAKE_EXECUTION_ID); + + expect(() => + activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution), + ).not.toThrow(); + }); + }); + + test('Should attach and resolve response promise to existing execution', async () => { + await activeExecutions.add(executionData, FAKE_EXECUTION_ID); + activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, responsePromise); + const fakeResponse = { data: { resultData: { runData: {} } } }; + activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse); + + expect(responsePromise.resolve).toHaveBeenCalledWith(fakeResponse); + }); + + test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => { + const executionId = await activeExecutions.add(executionData); + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.attachResponsePromise(executionId, responsePromise); + + const waitingExecution = activeExecutions.getExecutionOrFail(executionId); + expect(waitingExecution.responsePromise).toBeDefined(); + + // Resume the execution + await activeExecutions.add(executionData, executionId); + + const resumedExecution = activeExecutions.getExecutionOrFail(executionId); + expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt); + expect(resumedExecution.responsePromise).toBe(responsePromise); + }); + + describe('finalizeExecution', () => { + test('Should not remove a waiting execution', async () => { + const executionId = await activeExecutions.add(executionData); + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.finalizeExecution(executionId); + + // Wait until the next tick to ensure that the post-execution promise has settled + await new Promise(setImmediate); + + // Execution should still be in activeExecutions + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); + expect(activeExecutions.getStatus(executionId)).toBe('waiting'); + }); + + test('Should remove an existing execution', async () => { + const executionId = await activeExecutions.add(executionData); + + activeExecutions.finalizeExecution(executionId); + + await new Promise(setImmediate); + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); + }); + + test('Should not try to resolve a post-execute promise for an inactive execution', async () => { + const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail'); + + activeExecutions.finalizeExecution('inactive-execution-id', fullRunData); + + expect(getExecutionSpy).not.toHaveBeenCalled(); + }); + + test('Should resolve post execute promise on removal', async () => { + const executionId = await activeExecutions.add(executionData); + const postExecutePromise = activeExecutions.getPostExecutePromise(executionId); + + await new Promise(setImmediate); + activeExecutions.finalizeExecution(executionId, fullRunData); + + await expect(postExecutePromise).resolves.toEqual(fullRunData); + }); + }); + + describe('getPostExecutePromise', () => { + test('Should throw error when trying to create a promise with invalid execution', async () => { + await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow(); + }); + }); + + describe('stopExecution', () => { + let executionId: string; + + beforeEach(async () => { + executionId = await activeExecutions.add(executionData); + postExecutePromise = activeExecutions.getPostExecutePromise(executionId); + + activeExecutions.attachWorkflowExecution(executionId, workflowExecution); + activeExecutions.attachResponsePromise(executionId, responsePromise); + }); + + test('Should cancel ongoing executions', async () => { + activeExecutions.stopExecution(executionId); + + expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError)); + expect(workflowExecution.cancel).toHaveBeenCalledTimes(1); + await expect(postExecutePromise).rejects.toThrow(ExecutionCancelledError); + }); + + test('Should cancel waiting executions', async () => { + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.stopExecution(executionId); + + expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError)); + expect(workflowExecution.cancel).not.toHaveBeenCalled(); + }); + }); + + describe('shutdown', () => { + let newExecutionId1: string, newExecutionId2: string; + let waitingExecutionId1: string, waitingExecutionId2: string; + + beforeEach(async () => { + config.set('executions.mode', 'regular'); + + executionRepository.createNewExecution.mockImplementation(async () => + randomInt(1000, 2000).toString(), + ); + + (sleep as jest.Mock).mockImplementation(() => { + // @ts-expect-error private property + activeExecutions.activeExecutions = {}; + }); + + newExecutionId1 = await activeExecutions.add(executionData); + activeExecutions.setStatus(newExecutionId1, 'new'); + activeExecutions.attachResponsePromise(newExecutionId1, responsePromise); + + newExecutionId2 = await activeExecutions.add(executionData); + activeExecutions.setStatus(newExecutionId2, 'new'); + + waitingExecutionId1 = await activeExecutions.add(executionData); + activeExecutions.setStatus(waitingExecutionId1, 'waiting'); + activeExecutions.attachResponsePromise(waitingExecutionId1, responsePromise); + + waitingExecutionId2 = await activeExecutions.add(executionData); + activeExecutions.setStatus(waitingExecutionId2, 'waiting'); + }); + + test('Should cancel only executions with response-promises by default', async () => { + const stopExecutionSpy = jest.spyOn(activeExecutions, 'stopExecution'); + + expect(activeExecutions.getActiveExecutions()).toHaveLength(4); + + await activeExecutions.shutdown(); + + expect(concurrencyControl.disable).toHaveBeenCalled(); + + const removeAllCaptor = captor(); + expect(concurrencyControl.removeAll).toHaveBeenCalledWith(removeAllCaptor); + expect(removeAllCaptor.value.sort()).toEqual([newExecutionId1, waitingExecutionId1].sort()); + + expect(stopExecutionSpy).toHaveBeenCalledTimes(2); + expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId1); + expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId1); + expect(stopExecutionSpy).not.toHaveBeenCalledWith(newExecutionId2); + expect(stopExecutionSpy).not.toHaveBeenCalledWith(waitingExecutionId2); + + await new Promise(setImmediate); + // the other two executions aren't cancelled, but still removed from memory + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); + }); + + test('Should cancel all executions when cancelAll is true', async () => { + const stopExecutionSpy = jest.spyOn(activeExecutions, 'stopExecution'); + + expect(activeExecutions.getActiveExecutions()).toHaveLength(4); + + await activeExecutions.shutdown(true); + + expect(concurrencyControl.disable).toHaveBeenCalled(); + + const removeAllCaptor = captor(); + expect(concurrencyControl.removeAll).toHaveBeenCalledWith(removeAllCaptor); + expect(removeAllCaptor.value.sort()).toEqual( + [newExecutionId1, newExecutionId2, waitingExecutionId1, waitingExecutionId2].sort(), + ); + + expect(stopExecutionSpy).toHaveBeenCalledTimes(4); + expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId1); + expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId1); + expect(stopExecutionSpy).toHaveBeenCalledWith(newExecutionId2); + expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId2); + }); + }); +}); diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index e00b693be6..e0dde622eb 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -95,13 +95,14 @@ export class ActiveExecutions { const resumingExecution = this.activeExecutions[executionId]; const postExecutePromise = createDeferredPromise(); - this.activeExecutions[executionId] = { + const execution: IExecutingWorkflowData = { executionData, startedAt: resumingExecution?.startedAt ?? new Date(), postExecutePromise, status: executionStatus, responsePromise: resumingExecution?.responsePromise, }; + this.activeExecutions[executionId] = execution; // Automatically remove execution once the postExecutePromise settles void postExecutePromise.promise @@ -111,7 +112,10 @@ export class ActiveExecutions { }) .finally(() => { this.concurrencyControl.release({ mode: executionData.executionMode }); - if (this.activeExecutions[executionId]?.status !== 'waiting') { + if (execution.status === 'waiting') { + // Do not hold on a reference to the previous WorkflowExecute instance, since a resuming execution will use a new instance + delete execution.workflowExecution; + } else { delete this.activeExecutions[executionId]; this.logger.debug('Execution removed', { executionId }); } @@ -127,14 +131,14 @@ export class ActiveExecutions { */ attachWorkflowExecution(executionId: string, workflowExecution: PCancelable) { - this.getExecution(executionId).workflowExecution = workflowExecution; + this.getExecutionOrFail(executionId).workflowExecution = workflowExecution; } attachResponsePromise( executionId: string, responsePromise: IDeferredPromise, ): void { - this.getExecution(executionId).responsePromise = responsePromise; + this.getExecutionOrFail(executionId).responsePromise = responsePromise; } resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { @@ -149,15 +153,23 @@ export class ActiveExecutions { // There is no execution running with that id return; } - execution.workflowExecution?.cancel(); - execution.postExecutePromise.reject(new ExecutionCancelledError(executionId)); + const error = new ExecutionCancelledError(executionId); + execution.responsePromise?.reject(error); + if (execution.status === 'waiting') { + // A waiting execution will not have a valid workflowExecution or postExecutePromise + // So we can't rely on the `.finally` on the postExecutePromise for the execution removal + delete this.activeExecutions[executionId]; + } else { + execution.workflowExecution?.cancel(); + execution.postExecutePromise.reject(error); + } this.logger.debug('Execution cancelled', { executionId }); } /** Resolve the post-execution promise in an execution. */ finalizeExecution(executionId: string, fullRunData?: IRun) { if (!this.has(executionId)) return; - const execution = this.getExecution(executionId); + const execution = this.getExecutionOrFail(executionId); execution.postExecutePromise.resolve(fullRunData); this.logger.debug('Execution finalized', { executionId }); } @@ -166,7 +178,7 @@ export class ActiveExecutions { * Returns a promise which will resolve with the data of the execution with the given id */ async getPostExecutePromise(executionId: string): Promise { - return await this.getExecution(executionId).postExecutePromise.promise; + return await this.getExecutionOrFail(executionId).postExecutePromise.promise; } /** @@ -193,32 +205,40 @@ export class ActiveExecutions { } setStatus(executionId: string, status: ExecutionStatus) { - this.getExecution(executionId).status = status; + this.getExecutionOrFail(executionId).status = status; } getStatus(executionId: string): ExecutionStatus { - return this.getExecution(executionId).status; + return this.getExecutionOrFail(executionId).status; } /** Wait for all active executions to finish */ async shutdown(cancelAll = false) { - let executionIds = Object.keys(this.activeExecutions); - - if (config.getEnv('executions.mode') === 'regular') { + const isRegularMode = config.getEnv('executions.mode') === 'regular'; + if (isRegularMode) { // removal of active executions will no longer release capacity back, // so that throttled executions cannot resume during shutdown this.concurrencyControl.disable(); } - if (cancelAll) { - if (config.getEnv('executions.mode') === 'regular') { - await this.concurrencyControl.removeAll(this.activeExecutions); + let executionIds = Object.keys(this.activeExecutions); + const toCancel: string[] = []; + for (const executionId of executionIds) { + const { responsePromise, status } = this.activeExecutions[executionId]; + if (!!responsePromise || (isRegularMode && cancelAll)) { + // Cancel all exectutions that have a response promise, because these promises can't be retained between restarts + this.stopExecution(executionId); + toCancel.push(executionId); + } else if (status === 'waiting' || status === 'new') { + // Remove waiting and new executions to not block shutdown + delete this.activeExecutions[executionId]; } - - executionIds.forEach((executionId) => this.stopExecution(executionId)); } + await this.concurrencyControl.removeAll(toCancel); + let count = 0; + executionIds = Object.keys(this.activeExecutions); while (executionIds.length !== 0) { if (count++ % 4 === 0) { this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`); @@ -229,7 +249,7 @@ export class ActiveExecutions { } } - getExecution(executionId: string): IExecutingWorkflowData { + getExecutionOrFail(executionId: string): IExecutingWorkflowData { const execution = this.activeExecutions[executionId]; if (!execution) { throw new ExecutionNotFoundError(executionId); diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index a20c1769da..6a0ffcac18 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -11,7 +11,6 @@ import config from '@/config'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error'; import type { EventService } from '@/events/event.service'; -import type { IExecutingWorkflowData } from '@/interfaces'; import type { Telemetry } from '@/telemetry'; import { mockLogger } from '@test/mocking'; @@ -432,11 +431,7 @@ describe('ConcurrencyControlService', () => { /** * Act */ - await service.removeAll({ - '1': mock(), - '2': mock(), - '3': mock(), - }); + await service.removeAll(['1', '2', '3']); /** * Assert diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 1984148c29..f41a919680 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -8,7 +8,6 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error'; import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error'; import { EventService } from '@/events/event.service'; -import type { IExecutingWorkflowData } from '@/interfaces'; import { Telemetry } from '@/telemetry'; import { ConcurrencyQueue } from './concurrency-queue'; @@ -140,7 +139,7 @@ export class ConcurrencyControlService { * enqueued executions that have response promises, as these cannot * be re-run via `Start.runEnqueuedExecutions` during startup. */ - async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { + async removeAll(executionIdsToCancel: string[]) { if (!this.isEnabled) return; this.queues.forEach((queue) => { @@ -151,15 +150,13 @@ export class ConcurrencyControlService { } }); - const executionIds = Object.entries(activeExecutions) - .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) - .map(([executionId, _]) => executionId); + if (executionIdsToCancel.length === 0) return; - if (executionIds.length === 0) return; + await this.executionRepository.cancelMany(executionIdsToCancel); - await this.executionRepository.cancelMany(executionIds); - - this.logger.info('Canceled enqueued executions with response promises', { executionIds }); + this.logger.info('Canceled enqueued executions with response promises', { + executionIds: executionIdsToCancel, + }); } disable() { diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index dd6a775606..ca54a14b52 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -530,6 +530,7 @@ export async function executeWebhook( `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, { executionId, workflowId: workflow.id }, ); + responseCallback(error, {}); }); }