diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index 49e8517272..ef420e7d78 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -1,7 +1,9 @@ import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; -import type { IWorkflowBase } from 'n8n-workflow'; +import type { IRun, IWorkflowBase } from 'n8n-workflow'; +import { createDeferredPromise } from 'n8n-workflow'; +import type { ActiveExecutions } from '@/active-executions'; import type { Project } from '@/databases/entities/project'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/interfaces'; @@ -12,9 +14,10 @@ import { WaitTracker } from '@/wait-tracker'; import type { WorkflowRunner } from '@/workflow-runner'; import { mockLogger } from '@test/mocking'; -jest.useFakeTimers(); +jest.useFakeTimers({ advanceTimers: true }); describe('WaitTracker', () => { + const activeExecutions = mock(); const ownershipService = mock(); const workflowRunner = mock(); const executionRepository = mock(); @@ -30,6 +33,7 @@ describe('WaitTracker', () => { mode: 'manual', data: mock({ pushRef: 'push_ref', + parentExecution: undefined, }), }); execution.workflowData = mock({ id: 'abcd' }); @@ -40,6 +44,7 @@ describe('WaitTracker', () => { mockLogger(), executionRepository, ownershipService, + activeExecutions, workflowRunner, orchestrationService, instanceSettings, @@ -80,7 +85,9 @@ describe('WaitTracker', () => { let startExecutionSpy: jest.SpyInstance, [executionId: string]>; beforeEach(() => { - executionRepository.findSingleExecution.mockResolvedValue(execution); + executionRepository.findSingleExecution + .calledWith(execution.id) + .mockResolvedValue(execution); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); ownershipService.getWorkflowProjectCached.mockResolvedValue(project); @@ -110,13 +117,17 @@ describe('WaitTracker', () => { }); describe('startExecution()', () => { - it('should query for execution to start', async () => { + beforeEach(() => { executionRepository.getWaitingExecutions.mockResolvedValue([]); waitTracker.init(); - executionRepository.findSingleExecution.mockResolvedValue(execution); + executionRepository.findSingleExecution.calledWith(execution.id).mockResolvedValue(execution); ownershipService.getWorkflowProjectCached.mockResolvedValue(project); + execution.data.parentExecution = undefined; + }); + + it('should query for execution to start', async () => { await waitTracker.startExecution(execution.id); expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { @@ -137,6 +148,65 @@ describe('WaitTracker', () => { execution.id, ); }); + + it('should also resume parent execution once sub-workflow finishes', async () => { + const parentExecution = mock({ + id: 'parent_execution_id', + finished: false, + }); + parentExecution.workflowData = mock({ id: 'parent_workflow_id' }); + execution.data.parentExecution = { + executionId: parentExecution.id, + workflowId: parentExecution.workflowData.id, + }; + executionRepository.findSingleExecution + .calledWith(parentExecution.id) + .mockResolvedValue(parentExecution); + const postExecutePromise = createDeferredPromise(); + activeExecutions.getPostExecutePromise + .calledWith(execution.id) + .mockReturnValue(postExecutePromise.promise); + + await waitTracker.startExecution(execution.id); + + expect(executionRepository.findSingleExecution).toHaveBeenNthCalledWith(1, execution.id, { + includeData: true, + unflattenData: true, + }); + + expect(workflowRunner.run).toHaveBeenCalledTimes(1); + expect(workflowRunner.run).toHaveBeenNthCalledWith( + 1, + { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + pushRef: execution.data.pushRef, + }, + false, + false, + execution.id, + ); + + postExecutePromise.resolve(mock()); + await jest.advanceTimersByTimeAsync(100); + + expect(workflowRunner.run).toHaveBeenCalledTimes(2); + expect(workflowRunner.run).toHaveBeenNthCalledWith( + 2, + { + executionMode: parentExecution.mode, + executionData: parentExecution.data, + workflowData: parentExecution.workflowData, + projectId: project.id, + pushRef: parentExecution.data.pushRef, + }, + false, + false, + parentExecution.id, + ); + }); }); describe('single-main setup', () => { @@ -165,6 +235,7 @@ describe('WaitTracker', () => { mockLogger(), executionRepository, ownershipService, + activeExecutions, workflowRunner, orchestrationService, mock({ isLeader: false }), diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index d0aeb3111f..e7d94d3e34 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,11 +1,11 @@ import { mock } from 'jest-mock-extended'; import type { IWorkflowBase } from 'n8n-workflow'; -import { - type IExecuteWorkflowInfo, - type IWorkflowExecuteAdditionalData, - type ExecuteWorkflowOptions, - type IRun, - type INodeExecutionData, +import type { + IExecuteWorkflowInfo, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowOptions, + IRun, + INodeExecutionData, } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; import Container from 'typedi'; @@ -50,6 +50,7 @@ const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array @@ -114,7 +115,9 @@ describe('WorkflowExecuteAdditionalData', () => { }); describe('executeWorkflow', () => { - const runWithData = getMockRun({ lastNodeOutput: [[{ json: { test: 1 } }]] }); + const runWithData = getMockRun({ + lastNodeOutput: [[{ json: { test: 1 } }]], + }); beforeEach(() => { workflowRepository.get.mockResolvedValue( @@ -159,6 +162,23 @@ describe('WorkflowExecuteAdditionalData', () => { expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID); }); + + it('should return waitTill property when workflow execution is waiting', async () => { + const waitTill = new Date(); + runWithData.waitTill = waitTill; + + const response = await executeWorkflow( + mock(), + mock(), + mock({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), + ); + + expect(response).toEqual({ + data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main, + executionId: EXECUTION_ID, + waitTill, + }); + }); }); describe('getRunData', () => { @@ -230,6 +250,10 @@ describe('WorkflowExecuteAdditionalData', () => { waitingExecution: {}, waitingExecutionSource: {}, }, + parentExecution: { + executionId: '123', + workflowId: '567', + }, resultData: { runData: {} }, startData: {}, }, diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 7035db3cbe..a80ae8f259 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -2,6 +2,7 @@ import { InstanceSettings } from 'n8n-core'; import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import { Service } from 'typedi'; +import { ActiveExecutions } from '@/active-executions'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { Logger } from '@/logging/logger.service'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -23,6 +24,7 @@ export class WaitTracker { private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly ownershipService: OwnershipService, + private readonly activeExecutions: ActiveExecutions, private readonly workflowRunner: WorkflowRunner, private readonly orchestrationService: OrchestrationService, private readonly instanceSettings: InstanceSettings, @@ -133,6 +135,14 @@ export class WaitTracker { // Start the execution again await this.workflowRunner.run(data, false, false, executionId); + + const { parentExecution } = fullExecutionData.data; + if (parentExecution) { + // on child execution completion, resume parent execution + void this.activeExecutions.getPostExecutePromise(executionId).then(() => { + void this.startExecution(parentExecution.executionId); + }); + } } stopTracking() { diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 0dd8f576a4..259142561a 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -48,11 +48,12 @@ import type { Project } from '@/databases/entities/project'; import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error'; -import type { IExecutionDb, IWorkflowDb } from '@/interfaces'; +import type { IWorkflowDb } from '@/interfaces'; import { Logger } from '@/logging/logger.service'; import { parseBody } from '@/middlewares'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { WaitTracker } from '@/wait-tracker'; import { createMultiFormDataParser } from '@/webhooks/webhook-form-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowHelpers from '@/workflow-helpers'; @@ -548,11 +549,21 @@ export async function executeWebhook( { executionId }, ); + const activeExecutions = Container.get(ActiveExecutions); + + // Get a promise which resolves when the workflow did execute and send then response + const executePromise = activeExecutions.getPostExecutePromise(executionId); + + const { parentExecution } = runExecutionData; + if (parentExecution) { + // on child execution completion, resume parent execution + void executePromise.then(() => { + const waitTracker = Container.get(WaitTracker); + void waitTracker.startExecution(parentExecution.executionId); + }); + } + if (!didSendResponse) { - // Get a promise which resolves when the workflow did execute and send then response - const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( - executionId, - ) as Promise; executePromise // eslint-disable-next-line complexity .then(async (data) => { diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 2588a442d9..e3e65bcb4d 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -709,6 +709,7 @@ export async function getRunData( waitingExecution: {}, waitingExecutionSource: {}, }, + parentExecution, }; return { @@ -944,6 +945,7 @@ async function startExecution( return { executionId, data: returnData!.data!.main, + waitTill: data.waitTill, }; } activeExecutions.finalizeExecution(executionId, data); diff --git a/packages/core/src/node-execution-context/__tests__/shared-tests.ts b/packages/core/src/node-execution-context/__tests__/shared-tests.ts index c7262554d0..9992507bdd 100644 --- a/packages/core/src/node-execution-context/__tests__/shared-tests.ts +++ b/packages/core/src/node-execution-context/__tests__/shared-tests.ts @@ -1,4 +1,4 @@ -import { captor, mock } from 'jest-mock-extended'; +import { captor, mock, type MockProxy } from 'jest-mock-extended'; import type { IRunExecutionData, ContextType, @@ -9,11 +9,21 @@ import type { ITaskMetadata, ISourceData, IExecuteData, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowData, + RelatedExecution, + IExecuteWorkflowInfo, } from 'n8n-workflow'; -import { ApplicationError, NodeHelpers } from 'n8n-workflow'; +import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY } from 'n8n-workflow'; +import Container from 'typedi'; + +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; import type { BaseExecuteContext } from '../base-execute-context'; +const binaryDataService = mock(); +Container.set(BinaryDataService, binaryDataService); + export const describeCommonTests = ( context: BaseExecuteContext, { @@ -31,7 +41,7 @@ export const describeCommonTests = ( }, ) => { // @ts-expect-error `additionalData` is private - const { additionalData } = context; + const additionalData = context.additionalData as MockProxy; describe('getExecutionCancelSignal', () => { it('should return the abort signal', () => { @@ -178,4 +188,55 @@ export const describeCommonTests = ( resolveSimpleParameterValueSpy.mockRestore(); }); }); + + describe('putExecutionToWait', () => { + it('should set waitTill and execution status', async () => { + const waitTill = new Date(); + + await context.putExecutionToWait(waitTill); + + expect(runExecutionData.waitTill).toEqual(waitTill); + expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting'); + }); + }); + + describe('executeWorkflow', () => { + const data = [[{ json: { test: true } }]]; + const executeWorkflowData = mock(); + const workflowInfo = mock(); + const parentExecution: RelatedExecution = { + executionId: 'parent_execution_id', + workflowId: 'parent_workflow_id', + }; + + it('should execute workflow and return data', async () => { + additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData); + binaryDataService.duplicateBinaryData.mockResolvedValue(data); + + const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { + parentExecution, + }); + + expect(result.data).toEqual(data); + expect(binaryDataService.duplicateBinaryData).toHaveBeenCalledWith( + workflow.id, + additionalData.executionId, + executeWorkflowData.data, + ); + }); + + it('should put execution to wait if waitTill is returned', async () => { + const waitTill = new Date(); + additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill }); + binaryDataService.duplicateBinaryData.mockResolvedValue(data); + + const result = await context.executeWorkflow(workflowInfo, undefined, undefined, { + parentExecution, + }); + + expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting'); + expect(runExecutionData.waitTill).toEqual(WAIT_INDEFINITELY); + expect(result.waitTill).toBe(waitTill); + }); + }); }; diff --git a/packages/core/src/node-execution-context/base-execute-context.ts b/packages/core/src/node-execution-context/base-execute-context.ts index c13ba66dc2..0794a263b0 100644 --- a/packages/core/src/node-execution-context/base-execute-context.ts +++ b/packages/core/src/node-execution-context/base-execute-context.ts @@ -22,7 +22,7 @@ import type { ISourceData, AiEvent, } from 'n8n-workflow'; -import { ApplicationError, NodeHelpers, WorkflowDataProxy } from 'n8n-workflow'; +import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY, WorkflowDataProxy } from 'n8n-workflow'; import { Container } from 'typedi'; import { BinaryDataService } from '@/BinaryData/BinaryData.service'; @@ -97,6 +97,13 @@ export class BaseExecuteContext extends NodeExecutionContext { ); } + async putExecutionToWait(waitTill: Date): Promise { + this.runExecutionData.waitTill = waitTill; + if (this.additionalData.setExecutionStatus) { + this.additionalData.setExecutionStatus('waiting'); + } + } + async executeWorkflow( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], @@ -106,23 +113,28 @@ export class BaseExecuteContext extends NodeExecutionContext { parentExecution?: RelatedExecution; }, ): Promise { - return await this.additionalData - .executeWorkflow(workflowInfo, this.additionalData, { - ...options, - parentWorkflowId: this.workflow.id?.toString(), - inputData, - parentWorkflowSettings: this.workflow.settings, - node: this.node, - parentCallbackManager, - }) - .then(async (result) => { - const data = await this.binaryDataService.duplicateBinaryData( - this.workflow.id, - this.additionalData.executionId!, - result.data, - ); - return { ...result, data }; - }); + const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, { + ...options, + parentWorkflowId: this.workflow.id, + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + }); + + // If a sub-workflow execution goes into the waiting state + if (result.waitTill) { + // then put the parent workflow execution also into the waiting state, + // but do not use the sub-workflow `waitTill` to avoid WaitTracker resuming the parent execution at the same time as the sub-workflow + await this.putExecutionToWait(WAIT_INDEFINITELY); + } + + const data = await this.binaryDataService.duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result.data, + ); + return { ...result, data }; } getNodeInputs(): INodeInputConfiguration[] { diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index 514c9cf27f..c587fb2168 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -179,13 +179,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti return inputData[inputIndex]; } - async putExecutionToWait(waitTill: Date): Promise { - this.runExecutionData.waitTill = waitTill; - if (this.additionalData.setExecutionStatus) { - this.additionalData.setExecutionStatus('waiting'); - } - } - logNodeOutput(...args: unknown[]): void { if (this.mode === 'manual') { this.sendMessageToUI(...args); diff --git a/packages/editor-ui/src/components/Node.vue b/packages/editor-ui/src/components/Node.vue index 1f1ea90ab5..f20724bd5f 100644 --- a/packages/editor-ui/src/components/Node.vue +++ b/packages/editor-ui/src/components/Node.vue @@ -9,7 +9,6 @@ import { SIMULATE_NODE_TYPE, SIMULATE_TRIGGER_NODE_TYPE, WAIT_NODE_TYPE, - WAIT_TIME_UNLIMITED, } from '@/constants'; import type { ExecutionSummary, @@ -18,7 +17,12 @@ import type { NodeOperationError, Workflow, } from 'n8n-workflow'; -import { NodeConnectionType, NodeHelpers, SEND_AND_WAIT_OPERATION } from 'n8n-workflow'; +import { + NodeConnectionType, + NodeHelpers, + SEND_AND_WAIT_OPERATION, + WAIT_INDEFINITELY, +} from 'n8n-workflow'; import type { StyleValue } from 'vue'; import { computed, onMounted, ref, watch } from 'vue'; import xss from 'xss'; @@ -345,7 +349,7 @@ const waiting = computed(() => { return i18n.baseText('node.theNodeIsWaitingFormCall'); } const waitDate = new Date(workflowExecution.waitTill); - if (waitDate.toISOString() === WAIT_TIME_UNLIMITED) { + if (waitDate.getTime() === WAIT_INDEFINITELY.getTime()) { return i18n.baseText('node.theNodeIsWaitingIndefinitelyForAnIncomingWebhookCall'); } return i18n.baseText('node.nodeIsWaitingTill', { diff --git a/packages/editor-ui/src/components/executions/global/GlobalExecutionsListItem.vue b/packages/editor-ui/src/components/executions/global/GlobalExecutionsListItem.vue index 1fbbfd542a..d179476531 100644 --- a/packages/editor-ui/src/components/executions/global/GlobalExecutionsListItem.vue +++ b/packages/editor-ui/src/components/executions/global/GlobalExecutionsListItem.vue @@ -1,8 +1,8 @@