From e8635f257433748f4d7d2c4b0ae794de6bff5b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 3 Feb 2025 12:24:11 +0100 Subject: [PATCH] fix(core): "Respond to Webhook" should work with workflows with waiting nodes (#12806) --- .../src/__tests__/active-executions.test.ts | 40 ++++++++++++++++--- packages/cli/src/active-executions.ts | 12 ++++-- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index 5432bf5c4a..8e4fd1a142 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -41,7 +41,7 @@ describe('ActiveExecutions', () => { }); test('Should initialize activeExecutions with empty list', () => { - expect(activeExecutions.getActiveExecutions().length).toBe(0); + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); }); test('Should add execution to active execution list', async () => { @@ -49,7 +49,7 @@ describe('ActiveExecutions', () => { const executionId = await activeExecutions.add(newExecution); expect(executionId).toBe(FAKE_EXECUTION_ID); - expect(activeExecutions.getActiveExecutions().length).toBe(1); + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); expect(createNewExecution).toHaveBeenCalledTimes(1); expect(updateExistingExecution).toHaveBeenCalledTimes(0); }); @@ -59,7 +59,7 @@ describe('ActiveExecutions', () => { const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID); expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); - expect(activeExecutions.getActiveExecutions().length).toBe(1); + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); expect(createNewExecution).toHaveBeenCalledTimes(0); expect(updateExistingExecution).toHaveBeenCalledTimes(1); }); @@ -93,6 +93,37 @@ describe('ActiveExecutions', () => { await expect(deferredPromise.promise).resolves.toEqual(fakeResponse); }); + test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => { + const newExecution = mockExecutionData(); + const executionId = await activeExecutions.add(newExecution); + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.attachResponsePromise(executionId, mockDeferredPromise()); + + const waitingExecution = activeExecutions.getExecution(executionId); + expect(waitingExecution.responsePromise).toBeDefined(); + + // Resume the execution + await activeExecutions.add(newExecution, executionId); + + const resumedExecution = activeExecutions.getExecution(executionId); + expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt); + expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise); + }); + + test('Should not remove a waiting execution', async () => { + const newExecution = mockExecutionData(); + const executionId = await activeExecutions.add(newExecution); + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.finalizeExecution(executionId); + + // Wait until the next tick to ensure that the post-execution promise has settled + await new Promise(setImmediate); + + // Execution should still be in activeExecutions + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); + expect(activeExecutions.getStatus(executionId)).toBe('waiting'); + }); + test('Should remove an existing execution', async () => { // ARRANGE const newExecution = mockExecutionData(); @@ -105,11 +136,10 @@ describe('ActiveExecutions', () => { await new Promise(setImmediate); // ASSERT - expect(activeExecutions.getActiveExecutions().length).toBe(0); + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); }); test('Should not try to resolve a post-execute promise for an inactive execution', async () => { - // @ts-expect-error Private method const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecution'); activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData()); diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index 9056271f64..fb9cd02706 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -94,13 +94,15 @@ export class ActiveExecutions { await this.executionRepository.updateExistingExecution(executionId, execution); } + const resumingExecution = this.activeExecutions[executionId]; const postExecutePromise = createDeferredPromise(); this.activeExecutions[executionId] = { executionData, - startedAt: new Date(), + startedAt: resumingExecution?.startedAt ?? new Date(), postExecutePromise, status: executionStatus, + responsePromise: resumingExecution?.responsePromise, }; // Automatically remove execution once the postExecutePromise settles @@ -111,8 +113,10 @@ export class ActiveExecutions { }) .finally(() => { this.concurrencyControl.release({ mode: executionData.executionMode }); - delete this.activeExecutions[executionId]; - this.logger.debug('Execution removed', { executionId }); + if (this.activeExecutions[executionId]?.status !== 'waiting') { + delete this.activeExecutions[executionId]; + this.logger.debug('Execution removed', { executionId }); + } }); this.logger.debug('Execution added', { executionId }); @@ -227,7 +231,7 @@ export class ActiveExecutions { } } - private getExecution(executionId: string): IExecutingWorkflowData { + getExecution(executionId: string): IExecutingWorkflowData { const execution = this.activeExecutions[executionId]; if (!execution) { throw new ExecutionNotFoundError(executionId);