From 1aabca8319df5c01975d6ddc4d64964e89c7585b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 29 Nov 2024 14:41:29 +0100 Subject: [PATCH] feat(core): Parent workflows should wait for sub-workflows to finish --- .../cli/src/__tests__/wait-tracker.test.ts | 79 ++++++++++++++++++- .../workflow-execute-additional-data.test.ts | 38 +++++++-- packages/cli/src/wait-tracker.ts | 9 +++ packages/cli/src/webhooks/webhook-helpers.ts | 18 ++++- .../src/workflow-execute-additional-data.ts | 2 + .../__tests__/shared-tests.ts | 65 ++++++++++++++- .../base-execute-context.ts | 44 +++++++---- .../node-execution-context/execute-context.ts | 7 -- packages/workflow/src/Interfaces.ts | 2 + 9 files changed, 223 insertions(+), 41 deletions(-) diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index 49e8517272..e0661a511b 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -1,7 +1,9 @@ import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; -import type { IWorkflowBase } from 'n8n-workflow'; +import type { IRun, IWorkflowBase } from 'n8n-workflow'; +import { createDeferredPromise } from 'n8n-workflow'; +import type { ActiveExecutions } from '@/active-executions'; import type { Project } from '@/databases/entities/project'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/interfaces'; @@ -15,6 +17,7 @@ import { mockLogger } from '@test/mocking'; jest.useFakeTimers(); describe('WaitTracker', () => { + const activeExecutions = mock(); const ownershipService = mock(); const workflowRunner = mock(); const executionRepository = mock(); @@ -30,6 +33,7 @@ describe('WaitTracker', () => { mode: 'manual', data: mock({ pushRef: 'push_ref', + parentExecution: undefined, }), }); execution.workflowData = mock({ id: 'abcd' }); @@ -40,6 +44,7 @@ describe('WaitTracker', () => { mockLogger(), executionRepository, ownershipService, + activeExecutions, workflowRunner, orchestrationService, instanceSettings, @@ -80,7 +85,9 @@ describe('WaitTracker', () => { let startExecutionSpy: jest.SpyInstance, [executionId: string]>; beforeEach(() => { - executionRepository.findSingleExecution.mockResolvedValue(execution); + executionRepository.findSingleExecution + .calledWith(execution.id) + .mockResolvedValue(execution); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); ownershipService.getWorkflowProjectCached.mockResolvedValue(project); @@ -110,13 +117,17 @@ describe('WaitTracker', () => { }); describe('startExecution()', () => { - it('should query for execution to start', async () => { + beforeEach(() => { executionRepository.getWaitingExecutions.mockResolvedValue([]); waitTracker.init(); - executionRepository.findSingleExecution.mockResolvedValue(execution); + executionRepository.findSingleExecution.calledWith(execution.id).mockResolvedValue(execution); ownershipService.getWorkflowProjectCached.mockResolvedValue(project); + execution.data.parentExecution = undefined; + }); + + it('should query for execution to start', async () => { await waitTracker.startExecution(execution.id); expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { @@ -137,6 +148,65 @@ describe('WaitTracker', () => { execution.id, ); }); + + it('should also resume parent execution once sub-workflow finishes', async () => { + const parentExecution = mock({ + id: 'parent_execution_id', + finished: false, + }); + parentExecution.workflowData = mock({ id: 'parent_workflow_id' }); + execution.data.parentExecution = { + executionId: parentExecution.id, + workflowId: parentExecution.workflowData.id, + }; + executionRepository.findSingleExecution + .calledWith(parentExecution.id) + .mockResolvedValue(parentExecution); + const postExecutePromise = createDeferredPromise(); + activeExecutions.getPostExecutePromise + .calledWith(execution.id) + .mockReturnValue(postExecutePromise.promise); + + await waitTracker.startExecution(execution.id); + + expect(executionRepository.findSingleExecution).toHaveBeenNthCalledWith(1, execution.id, { + includeData: true, + unflattenData: true, + }); + + expect(workflowRunner.run).toHaveBeenCalledTimes(1); + expect(workflowRunner.run).toHaveBeenNthCalledWith( + 1, + { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + pushRef: execution.data.pushRef, + }, + false, + false, + execution.id, + ); + + postExecutePromise.resolve(mock()); + jest.runAllTicks(); + + expect(workflowRunner.run).toHaveBeenCalledTimes(2); + expect(workflowRunner.run).toHaveBeenNthCalledWith( + 2, + { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + pushRef: execution.data.pushRef, + }, + false, + false, + execution.id, + ); + }); }); describe('single-main setup', () => { @@ -165,6 +235,7 @@ describe('WaitTracker', () => { mockLogger(), executionRepository, ownershipService, + activeExecutions, workflowRunner, orchestrationService, mock({ isLeader: false }), diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index d0aeb3111f..e7d94d3e34 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,11 +1,11 @@ import { mock } from 'jest-mock-extended'; import type { IWorkflowBase } from 'n8n-workflow'; -import { - type IExecuteWorkflowInfo, - type IWorkflowExecuteAdditionalData, - type ExecuteWorkflowOptions, - type IRun, - type INodeExecutionData, +import type { + IExecuteWorkflowInfo, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowOptions, + IRun, + INodeExecutionData, } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; import Container from 'typedi'; @@ -50,6 +50,7 @@ const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array @@ -114,7 +115,9 @@ describe('WorkflowExecuteAdditionalData', () => { }); describe('executeWorkflow', () => { - const runWithData = getMockRun({ lastNodeOutput: [[{ json: { test: 1 } }]] }); + const runWithData = getMockRun({ + lastNodeOutput: [[{ json: { test: 1 } }]], + }); beforeEach(() => { workflowRepository.get.mockResolvedValue( @@ -159,6 +162,23 @@ describe('WorkflowExecuteAdditionalData', () => { expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID); }); + + it('should return waitTill property when workflow execution is waiting', async () => { + const waitTill = new Date(); + runWithData.waitTill = waitTill; + + const response = await executeWorkflow( + mock(), + mock(), + mock({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), + ); + + expect(response).toEqual({ + data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main, + executionId: EXECUTION_ID, + waitTill, + }); + }); }); describe('getRunData', () => { @@ -230,6 +250,10 @@ describe('WorkflowExecuteAdditionalData', () => { waitingExecution: {}, waitingExecutionSource: {}, }, + parentExecution: { + executionId: '123', + workflowId: '567', + }, resultData: { runData: {} }, startData: {}, }, diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 7035db3cbe..2cb67d2027 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -2,6 +2,7 @@ import { InstanceSettings } from 'n8n-core'; import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import { Service } from 'typedi'; +import { ActiveExecutions } from '@/active-executions'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { Logger } from '@/logging/logger.service'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -23,6 +24,7 @@ export class WaitTracker { private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly ownershipService: OwnershipService, + private readonly activeExecutions: ActiveExecutions, private readonly workflowRunner: WorkflowRunner, private readonly orchestrationService: OrchestrationService, private readonly instanceSettings: InstanceSettings, @@ -133,6 +135,13 @@ export class WaitTracker { // Start the execution again await this.workflowRunner.run(data, false, false, executionId); + + const { parentExecution } = fullExecutionData.data; + if (parentExecution) { + void this.activeExecutions.getPostExecutePromise(executionId).then(() => { + void this.startExecution(parentExecution.executionId); + }); + } } stopTracking() { diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 6110584f7e..7985cbeab1 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -53,6 +53,7 @@ import { Logger } from '@/logging/logger.service'; import { parseBody } from '@/middlewares'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { WaitTracker } from '@/wait-tracker'; import { createMultiFormDataParser } from '@/webhooks/webhook-form-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowHelpers from '@/workflow-helpers'; @@ -548,11 +549,20 @@ export async function executeWebhook( { executionId }, ); + // Get a promise which resolves when the workflow did execute and send then response + const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( + executionId, + ) as Promise; + + const { parentExecution } = runExecutionData; + if (parentExecution) { + void executePromise.then(() => { + const waitTracker = Container.get(WaitTracker); + void waitTracker.startExecution(parentExecution.executionId); + }); + } + if (!didSendResponse) { - // Get a promise which resolves when the workflow did execute and send then response - const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( - executionId, - ) as Promise; executePromise // eslint-disable-next-line complexity .then(async (data) => { diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 2588a442d9..e3e65bcb4d 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -709,6 +709,7 @@ export async function getRunData( waitingExecution: {}, waitingExecutionSource: {}, }, + parentExecution, }; return { @@ -944,6 +945,7 @@ async function startExecution( return { executionId, data: returnData!.data!.main, + waitTill: data.waitTill, }; } activeExecutions.finalizeExecution(executionId, data); diff --git a/packages/core/src/node-execution-context/__tests__/shared-tests.ts b/packages/core/src/node-execution-context/__tests__/shared-tests.ts index c7262554d0..80cf3900d7 100644 --- a/packages/core/src/node-execution-context/__tests__/shared-tests.ts +++ b/packages/core/src/node-execution-context/__tests__/shared-tests.ts @@ -1,4 +1,4 @@ -import { captor, mock } from 'jest-mock-extended'; +import { captor, mock, type MockProxy } from 'jest-mock-extended'; import type { IRunExecutionData, ContextType, @@ -9,11 +9,21 @@ import type { ITaskMetadata, ISourceData, IExecuteData, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowData, + RelatedExecution, + IExecuteWorkflowInfo, } from 'n8n-workflow'; import { ApplicationError, NodeHelpers } from 'n8n-workflow'; +import Container from 'typedi'; + +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; import type { BaseExecuteContext } from '../base-execute-context'; +const binaryDataService = mock(); +Container.set(BinaryDataService, binaryDataService); + export const describeCommonTests = ( context: BaseExecuteContext, { @@ -31,7 +41,7 @@ export const describeCommonTests = ( }, ) => { // @ts-expect-error `additionalData` is private - const { additionalData } = context; + const additionalData = context.additionalData as MockProxy; describe('getExecutionCancelSignal', () => { it('should return the abort signal', () => { @@ -178,4 +188,55 @@ export const describeCommonTests = ( resolveSimpleParameterValueSpy.mockRestore(); }); }); + + describe('putExecutionToWait', () => { + it('should set waitTill and execution status', async () => { + const waitTill = new Date(); + + await context.putExecutionToWait(waitTill); + + expect(runExecutionData.waitTill).toEqual(waitTill); + expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting'); + }); + }); + + describe('executeWorkflow', () => { + const data = [[{ json: { test: true } }]]; + const executeWorkflowData = mock(); + const workflowInfo = mock(); + const parentExecution: RelatedExecution = { + executionId: 'parent_execution_id', + workflowId: 'parent_workflow_id', + }; + + it('should execute workflow and return data', async () => { + additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData); + binaryDataService.duplicateBinaryData.mockResolvedValue(data); + + const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { + parentExecution, + }); + + expect(result.data).toEqual(data); + expect(binaryDataService.duplicateBinaryData).toHaveBeenCalledWith( + workflow.id, + additionalData.executionId, + executeWorkflowData.data, + ); + }); + + it('should put execution to wait if waitTill is returned', async () => { + const waitTill = new Date(); + additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill }); + binaryDataService.duplicateBinaryData.mockResolvedValue(data); + + const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { + parentExecution, + }); + + expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting'); + expect(runExecutionData.waitTill).toBe(waitTill); + expect(result.waitTill).toBe(waitTill); + }); + }); }; diff --git a/packages/core/src/node-execution-context/base-execute-context.ts b/packages/core/src/node-execution-context/base-execute-context.ts index c13ba66dc2..ccda7dc531 100644 --- a/packages/core/src/node-execution-context/base-execute-context.ts +++ b/packages/core/src/node-execution-context/base-execute-context.ts @@ -97,6 +97,13 @@ export class BaseExecuteContext extends NodeExecutionContext { ); } + async putExecutionToWait(waitTill: Date): Promise { + this.runExecutionData.waitTill = waitTill; + if (this.additionalData.setExecutionStatus) { + this.additionalData.setExecutionStatus('waiting'); + } + } + async executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], @@ -106,23 +113,26 @@ export class BaseExecuteContext extends NodeExecutionContext { parentExecution?: RelatedExecution; }, ): Promise { - return await this.additionalData - .executeWorkflow(workflowInfo, this.additionalData, { - ...options, - parentWorkflowId: this.workflow.id?.toString(), - inputData, - parentWorkflowSettings: this.workflow.settings, - node: this.node, - parentCallbackManager, - }) - .then(async (result) => { - const data = await this.binaryDataService.duplicateBinaryData( - this.workflow.id, - this.additionalData.executionId!, - result.data, - ); - return { ...result, data }; - }); + const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, { + ...options, + parentWorkflowId: this.workflow.id?.toString(), + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + }); + + // If a subworkflow goes into the waiting state, then put the parent workflow also into the waiting state + if (result.waitTill) { + await this.putExecutionToWait(result.waitTill); + } + + const data = await this.binaryDataService.duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result.data, + ); + return { ...result, data }; } getNodeInputs(): INodeInputConfiguration[] { diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index 514c9cf27f..c587fb2168 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -179,13 +179,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti return inputData[inputIndex]; } - async putExecutionToWait(waitTill: Date): Promise { - this.runExecutionData.waitTill = waitTill; - if (this.additionalData.setExecutionStatus) { - this.additionalData.setExecutionStatus('waiting'); - } - } - logNodeOutput(...args: unknown[]): void { if (this.mode === 'manual') { this.sendMessageToUI(...args); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 0a7a5d0379..bb4a7c8949 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1558,6 +1558,7 @@ export interface ITriggerResponse { export interface ExecuteWorkflowData { executionId: string; data: Array; + waitTill?: Date | null; } export type WebhookSetupMethodNames = 'checkExists' | 'create' | 'delete'; @@ -2134,6 +2135,7 @@ export interface IRunExecutionData { waitingExecution: IWaitingForExecution; waitingExecutionSource: IWaitingForExecutionSource | null; }; + parentExecution?: RelatedExecution; waitTill?: Date; pushRef?: string; }