feat(core): Parent workflows should wait for sub-workflows to finish

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-29 14:41:29 +01:00
parent 43dd2a06c9
commit 1aabca8319
No known key found for this signature in database
9 changed files with 223 additions and 41 deletions

View file

@ -1,7 +1,9 @@
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core'; import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow'; import type { IRun, IWorkflowBase } from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import type { ActiveExecutions } from '@/active-executions';
import type { Project } from '@/databases/entities/project'; import type { Project } from '@/databases/entities/project';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces'; import type { IExecutionResponse } from '@/interfaces';
@ -15,6 +17,7 @@ import { mockLogger } from '@test/mocking';
jest.useFakeTimers(); jest.useFakeTimers();
describe('WaitTracker', () => { describe('WaitTracker', () => {
const activeExecutions = mock<ActiveExecutions>();
const ownershipService = mock<OwnershipService>(); const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>(); const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>(); const executionRepository = mock<ExecutionRepository>();
@ -30,6 +33,7 @@ describe('WaitTracker', () => {
mode: 'manual', mode: 'manual',
data: mock({ data: mock({
pushRef: 'push_ref', pushRef: 'push_ref',
parentExecution: undefined,
}), }),
}); });
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' }); execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
@ -40,6 +44,7 @@ describe('WaitTracker', () => {
mockLogger(), mockLogger(),
executionRepository, executionRepository,
ownershipService, ownershipService,
activeExecutions,
workflowRunner, workflowRunner,
orchestrationService, orchestrationService,
instanceSettings, instanceSettings,
@ -80,7 +85,9 @@ describe('WaitTracker', () => {
let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>; let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>;
beforeEach(() => { beforeEach(() => {
executionRepository.findSingleExecution.mockResolvedValue(execution); executionRepository.findSingleExecution
.calledWith(execution.id)
.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]); executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project); ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
@ -110,13 +117,17 @@ describe('WaitTracker', () => {
}); });
describe('startExecution()', () => { describe('startExecution()', () => {
it('should query for execution to start', async () => { beforeEach(() => {
executionRepository.getWaitingExecutions.mockResolvedValue([]); executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init(); waitTracker.init();
executionRepository.findSingleExecution.mockResolvedValue(execution); executionRepository.findSingleExecution.calledWith(execution.id).mockResolvedValue(execution);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project); ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
execution.data.parentExecution = undefined;
});
it('should query for execution to start', async () => {
await waitTracker.startExecution(execution.id); await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
@ -137,6 +148,65 @@ describe('WaitTracker', () => {
execution.id, execution.id,
); );
}); });
it('should also resume parent execution once sub-workflow finishes', async () => {
const parentExecution = mock<IExecutionResponse>({
id: 'parent_execution_id',
finished: false,
});
parentExecution.workflowData = mock<IWorkflowBase>({ id: 'parent_workflow_id' });
execution.data.parentExecution = {
executionId: parentExecution.id,
workflowId: parentExecution.workflowData.id,
};
executionRepository.findSingleExecution
.calledWith(parentExecution.id)
.mockResolvedValue(parentExecution);
const postExecutePromise = createDeferredPromise<IRun | undefined>();
activeExecutions.getPostExecutePromise
.calledWith(execution.id)
.mockReturnValue(postExecutePromise.promise);
await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenNthCalledWith(1, execution.id, {
includeData: true,
unflattenData: true,
});
expect(workflowRunner.run).toHaveBeenCalledTimes(1);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
1,
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);
postExecutePromise.resolve(mock<IRun>());
jest.runAllTicks();
expect(workflowRunner.run).toHaveBeenCalledTimes(2);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
2,
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);
});
}); });
describe('single-main setup', () => { describe('single-main setup', () => {
@ -165,6 +235,7 @@ describe('WaitTracker', () => {
mockLogger(), mockLogger(),
executionRepository, executionRepository,
ownershipService, ownershipService,
activeExecutions,
workflowRunner, workflowRunner,
orchestrationService, orchestrationService,
mock<InstanceSettings>({ isLeader: false }), mock<InstanceSettings>({ isLeader: false }),

View file

@ -1,11 +1,11 @@
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { IWorkflowBase } from 'n8n-workflow'; import type { IWorkflowBase } from 'n8n-workflow';
import { import type {
type IExecuteWorkflowInfo, IExecuteWorkflowInfo,
type IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
type ExecuteWorkflowOptions, ExecuteWorkflowOptions,
type IRun, IRun,
type INodeExecutionData, INodeExecutionData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type PCancelable from 'p-cancelable'; import type PCancelable from 'p-cancelable';
import Container from 'typedi'; import Container from 'typedi';
@ -50,6 +50,7 @@ const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array<INodeExecutionDa
mode: 'manual', mode: 'manual',
startedAt: new Date(), startedAt: new Date(),
status: 'new', status: 'new',
waitTill: undefined,
}); });
const getCancelablePromise = async (run: IRun) => const getCancelablePromise = async (run: IRun) =>
@ -114,7 +115,9 @@ describe('WorkflowExecuteAdditionalData', () => {
}); });
describe('executeWorkflow', () => { describe('executeWorkflow', () => {
const runWithData = getMockRun({ lastNodeOutput: [[{ json: { test: 1 } }]] }); const runWithData = getMockRun({
lastNodeOutput: [[{ json: { test: 1 } }]],
});
beforeEach(() => { beforeEach(() => {
workflowRepository.get.mockResolvedValue( workflowRepository.get.mockResolvedValue(
@ -159,6 +162,23 @@ describe('WorkflowExecuteAdditionalData', () => {
expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID); expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID);
}); });
it('should return waitTill property when workflow execution is waiting', async () => {
const waitTill = new Date();
runWithData.waitTill = waitTill;
const response = await executeWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }),
);
expect(response).toEqual({
data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main,
executionId: EXECUTION_ID,
waitTill,
});
});
}); });
describe('getRunData', () => { describe('getRunData', () => {
@ -230,6 +250,10 @@ describe('WorkflowExecuteAdditionalData', () => {
waitingExecution: {}, waitingExecution: {},
waitingExecutionSource: {}, waitingExecutionSource: {},
}, },
parentExecution: {
executionId: '123',
workflowId: '567',
},
resultData: { runData: {} }, resultData: { runData: {} },
startData: {}, startData: {},
}, },

View file

@ -2,6 +2,7 @@ import { InstanceSettings } from 'n8n-core';
import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
@ -23,6 +24,7 @@ export class WaitTracker {
private readonly logger: Logger, private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService, private readonly ownershipService: OwnershipService,
private readonly activeExecutions: ActiveExecutions,
private readonly workflowRunner: WorkflowRunner, private readonly workflowRunner: WorkflowRunner,
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
@ -133,6 +135,13 @@ export class WaitTracker {
// Start the execution again // Start the execution again
await this.workflowRunner.run(data, false, false, executionId); await this.workflowRunner.run(data, false, false, executionId);
const { parentExecution } = fullExecutionData.data;
if (parentExecution) {
void this.activeExecutions.getPostExecutePromise(executionId).then(() => {
void this.startExecution(parentExecution.executionId);
});
}
} }
stopTracking() { stopTracking() {

View file

@ -53,6 +53,7 @@ import { Logger } from '@/logging/logger.service';
import { parseBody } from '@/middlewares'; import { parseBody } from '@/middlewares';
import { OwnershipService } from '@/services/ownership.service'; import { OwnershipService } from '@/services/ownership.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { WaitTracker } from '@/wait-tracker';
import { createMultiFormDataParser } from '@/webhooks/webhook-form-data'; import { createMultiFormDataParser } from '@/webhooks/webhook-form-data';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import * as WorkflowHelpers from '@/workflow-helpers'; import * as WorkflowHelpers from '@/workflow-helpers';
@ -548,11 +549,20 @@ export async function executeWebhook(
{ executionId }, { executionId },
); );
if (!didSendResponse) {
// Get a promise which resolves when the workflow did execute and send then response // Get a promise which resolves when the workflow did execute and send then response
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
executionId, executionId,
) as Promise<IExecutionDb | undefined>; ) as Promise<IExecutionDb | undefined>;
const { parentExecution } = runExecutionData;
if (parentExecution) {
void executePromise.then(() => {
const waitTracker = Container.get(WaitTracker);
void waitTracker.startExecution(parentExecution.executionId);
});
}
if (!didSendResponse) {
executePromise executePromise
// eslint-disable-next-line complexity // eslint-disable-next-line complexity
.then(async (data) => { .then(async (data) => {

View file

@ -709,6 +709,7 @@ export async function getRunData(
waitingExecution: {}, waitingExecution: {},
waitingExecutionSource: {}, waitingExecutionSource: {},
}, },
parentExecution,
}; };
return { return {
@ -944,6 +945,7 @@ async function startExecution(
return { return {
executionId, executionId,
data: returnData!.data!.main, data: returnData!.data!.main,
waitTill: data.waitTill,
}; };
} }
activeExecutions.finalizeExecution(executionId, data); activeExecutions.finalizeExecution(executionId, data);

View file

@ -1,4 +1,4 @@
import { captor, mock } from 'jest-mock-extended'; import { captor, mock, type MockProxy } from 'jest-mock-extended';
import type { import type {
IRunExecutionData, IRunExecutionData,
ContextType, ContextType,
@ -9,11 +9,21 @@ import type {
ITaskMetadata, ITaskMetadata,
ISourceData, ISourceData,
IExecuteData, IExecuteData,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowData,
RelatedExecution,
IExecuteWorkflowInfo,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow'; import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import type { BaseExecuteContext } from '../base-execute-context'; import type { BaseExecuteContext } from '../base-execute-context';
const binaryDataService = mock<BinaryDataService>();
Container.set(BinaryDataService, binaryDataService);
export const describeCommonTests = ( export const describeCommonTests = (
context: BaseExecuteContext, context: BaseExecuteContext,
{ {
@ -31,7 +41,7 @@ export const describeCommonTests = (
}, },
) => { ) => {
// @ts-expect-error `additionalData` is private // @ts-expect-error `additionalData` is private
const { additionalData } = context; const additionalData = context.additionalData as MockProxy<IWorkflowExecuteAdditionalData>;
describe('getExecutionCancelSignal', () => { describe('getExecutionCancelSignal', () => {
it('should return the abort signal', () => { it('should return the abort signal', () => {
@ -178,4 +188,55 @@ export const describeCommonTests = (
resolveSimpleParameterValueSpy.mockRestore(); resolveSimpleParameterValueSpy.mockRestore();
}); });
}); });
describe('putExecutionToWait', () => {
it('should set waitTill and execution status', async () => {
const waitTill = new Date();
await context.putExecutionToWait(waitTill);
expect(runExecutionData.waitTill).toEqual(waitTill);
expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
});
});
describe('executeWorkflow', () => {
const data = [[{ json: { test: true } }]];
const executeWorkflowData = mock<ExecuteWorkflowData>();
const workflowInfo = mock<IExecuteWorkflowInfo>();
const parentExecution: RelatedExecution = {
executionId: 'parent_execution_id',
workflowId: 'parent_workflow_id',
};
it('should execute workflow and return data', async () => {
additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData);
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});
expect(result.data).toEqual(data);
expect(binaryDataService.duplicateBinaryData).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId,
executeWorkflowData.data,
);
});
it('should put execution to wait if waitTill is returned', async () => {
const waitTill = new Date();
additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill });
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});
expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
expect(runExecutionData.waitTill).toBe(waitTill);
expect(result.waitTill).toBe(waitTill);
});
});
}; };

View file

@ -97,6 +97,13 @@ export class BaseExecuteContext extends NodeExecutionContext {
); );
} }
async putExecutionToWait(waitTill: Date): Promise<void> {
this.runExecutionData.waitTill = waitTill;
if (this.additionalData.setExecutionStatus) {
this.additionalData.setExecutionStatus('waiting');
}
}
async executeWorkflow( async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo, workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[], inputData?: INodeExecutionData[],
@ -106,23 +113,26 @@ export class BaseExecuteContext extends NodeExecutionContext {
parentExecution?: RelatedExecution; parentExecution?: RelatedExecution;
}, },
): Promise<ExecuteWorkflowData> { ): Promise<ExecuteWorkflowData> {
return await this.additionalData const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, {
.executeWorkflow(workflowInfo, this.additionalData, {
...options, ...options,
parentWorkflowId: this.workflow.id?.toString(), parentWorkflowId: this.workflow.id?.toString(),
inputData, inputData,
parentWorkflowSettings: this.workflow.settings, parentWorkflowSettings: this.workflow.settings,
node: this.node, node: this.node,
parentCallbackManager, parentCallbackManager,
}) });
.then(async (result) => {
// If a subworkflow goes into the waiting state, then put the parent workflow also into the waiting state
if (result.waitTill) {
await this.putExecutionToWait(result.waitTill);
}
const data = await this.binaryDataService.duplicateBinaryData( const data = await this.binaryDataService.duplicateBinaryData(
this.workflow.id, this.workflow.id,
this.additionalData.executionId!, this.additionalData.executionId!,
result.data, result.data,
); );
return { ...result, data }; return { ...result, data };
});
} }
getNodeInputs(): INodeInputConfiguration[] { getNodeInputs(): INodeInputConfiguration[] {

View file

@ -179,13 +179,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
return inputData[inputIndex]; return inputData[inputIndex];
} }
async putExecutionToWait(waitTill: Date): Promise<void> {
this.runExecutionData.waitTill = waitTill;
if (this.additionalData.setExecutionStatus) {
this.additionalData.setExecutionStatus('waiting');
}
}
logNodeOutput(...args: unknown[]): void { logNodeOutput(...args: unknown[]): void {
if (this.mode === 'manual') { if (this.mode === 'manual') {
this.sendMessageToUI(...args); this.sendMessageToUI(...args);

View file

@ -1558,6 +1558,7 @@ export interface ITriggerResponse {
export interface ExecuteWorkflowData { export interface ExecuteWorkflowData {
executionId: string; executionId: string;
data: Array<INodeExecutionData[] | null>; data: Array<INodeExecutionData[] | null>;
waitTill?: Date | null;
} }
export type WebhookSetupMethodNames = 'checkExists' | 'create' | 'delete'; export type WebhookSetupMethodNames = 'checkExists' | 'create' | 'delete';
@ -2134,6 +2135,7 @@ export interface IRunExecutionData {
waitingExecution: IWaitingForExecution; waitingExecution: IWaitingForExecution;
waitingExecutionSource: IWaitingForExecutionSource | null; waitingExecutionSource: IWaitingForExecutionSource | null;
}; };
parentExecution?: RelatedExecution;
waitTill?: Date; waitTill?: Date;
pushRef?: string; pushRef?: string;
} }