feat(core): Parent workflows should wait for sub-workflows to finish (#11985)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-12-06 12:20:34 +01:00 committed by GitHub
parent 956b11a560
commit 60b3dccf93
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 257 additions and 70 deletions

View file

@ -1,7 +1,9 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { IRun, IWorkflowBase } from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import type { ActiveExecutions } from '@/active-executions';
import type { Project } from '@/databases/entities/project';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';
@ -12,9 +14,10 @@ import { WaitTracker } from '@/wait-tracker';
import type { WorkflowRunner } from '@/workflow-runner';
import { mockLogger } from '@test/mocking';
jest.useFakeTimers();
jest.useFakeTimers({ advanceTimers: true });
describe('WaitTracker', () => {
const activeExecutions = mock<ActiveExecutions>();
const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>();
@ -30,6 +33,7 @@ describe('WaitTracker', () => {
mode: 'manual',
data: mock({
pushRef: 'push_ref',
parentExecution: undefined,
}),
});
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
@ -40,6 +44,7 @@ describe('WaitTracker', () => {
mockLogger(),
executionRepository,
ownershipService,
activeExecutions,
workflowRunner,
orchestrationService,
instanceSettings,
@ -80,7 +85,9 @@ describe('WaitTracker', () => {
let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>;
beforeEach(() => {
executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.findSingleExecution
.calledWith(execution.id)
.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
@ -110,13 +117,17 @@ describe('WaitTracker', () => {
});
describe('startExecution()', () => {
it('should query for execution to start', async () => {
beforeEach(() => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init();
executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.findSingleExecution.calledWith(execution.id).mockResolvedValue(execution);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
execution.data.parentExecution = undefined;
});
it('should query for execution to start', async () => {
await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
@ -137,6 +148,65 @@ describe('WaitTracker', () => {
execution.id,
);
});
it('should also resume parent execution once sub-workflow finishes', async () => {
const parentExecution = mock<IExecutionResponse>({
id: 'parent_execution_id',
finished: false,
});
parentExecution.workflowData = mock<IWorkflowBase>({ id: 'parent_workflow_id' });
execution.data.parentExecution = {
executionId: parentExecution.id,
workflowId: parentExecution.workflowData.id,
};
executionRepository.findSingleExecution
.calledWith(parentExecution.id)
.mockResolvedValue(parentExecution);
const postExecutePromise = createDeferredPromise<IRun | undefined>();
activeExecutions.getPostExecutePromise
.calledWith(execution.id)
.mockReturnValue(postExecutePromise.promise);
await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenNthCalledWith(1, execution.id, {
includeData: true,
unflattenData: true,
});
expect(workflowRunner.run).toHaveBeenCalledTimes(1);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
1,
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);
postExecutePromise.resolve(mock<IRun>());
await jest.advanceTimersByTimeAsync(100);
expect(workflowRunner.run).toHaveBeenCalledTimes(2);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
2,
{
executionMode: parentExecution.mode,
executionData: parentExecution.data,
workflowData: parentExecution.workflowData,
projectId: project.id,
pushRef: parentExecution.data.pushRef,
},
false,
false,
parentExecution.id,
);
});
});
describe('single-main setup', () => {
@ -165,6 +235,7 @@ describe('WaitTracker', () => {
mockLogger(),
executionRepository,
ownershipService,
activeExecutions,
workflowRunner,
orchestrationService,
mock<InstanceSettings>({ isLeader: false }),

View file

@ -1,11 +1,11 @@
import { mock } from 'jest-mock-extended';
import type { IWorkflowBase } from 'n8n-workflow';
import {
type IExecuteWorkflowInfo,
type IWorkflowExecuteAdditionalData,
type ExecuteWorkflowOptions,
type IRun,
type INodeExecutionData,
import type {
IExecuteWorkflowInfo,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowOptions,
IRun,
INodeExecutionData,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
import Container from 'typedi';
@ -50,6 +50,7 @@ const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array<INodeExecutionDa
mode: 'manual',
startedAt: new Date(),
status: 'new',
waitTill: undefined,
});
const getCancelablePromise = async (run: IRun) =>
@ -114,7 +115,9 @@ describe('WorkflowExecuteAdditionalData', () => {
});
describe('executeWorkflow', () => {
const runWithData = getMockRun({ lastNodeOutput: [[{ json: { test: 1 } }]] });
const runWithData = getMockRun({
lastNodeOutput: [[{ json: { test: 1 } }]],
});
beforeEach(() => {
workflowRepository.get.mockResolvedValue(
@ -159,6 +162,23 @@ describe('WorkflowExecuteAdditionalData', () => {
expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID);
});
it('should return waitTill property when workflow execution is waiting', async () => {
const waitTill = new Date();
runWithData.waitTill = waitTill;
const response = await executeWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }),
);
expect(response).toEqual({
data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main,
executionId: EXECUTION_ID,
waitTill,
});
});
});
describe('getRunData', () => {
@ -230,6 +250,10 @@ describe('WorkflowExecuteAdditionalData', () => {
waitingExecution: {},
waitingExecutionSource: {},
},
parentExecution: {
executionId: '123',
workflowId: '567',
},
resultData: { runData: {} },
startData: {},
},

View file

@ -2,6 +2,7 @@ import { InstanceSettings } from 'n8n-core';
import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service';
@ -23,6 +24,7 @@ export class WaitTracker {
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService,
private readonly activeExecutions: ActiveExecutions,
private readonly workflowRunner: WorkflowRunner,
private readonly orchestrationService: OrchestrationService,
private readonly instanceSettings: InstanceSettings,
@ -133,6 +135,14 @@ export class WaitTracker {
// Start the execution again
await this.workflowRunner.run(data, false, false, executionId);
const { parentExecution } = fullExecutionData.data;
if (parentExecution) {
// on child execution completion, resume parent execution
void this.activeExecutions.getPostExecutePromise(executionId).then(() => {
void this.startExecution(parentExecution.executionId);
});
}
}
stopTracking() {

View file

@ -48,11 +48,12 @@ import type { Project } from '@/databases/entities/project';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import type { IExecutionDb, IWorkflowDb } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import { parseBody } from '@/middlewares';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { WaitTracker } from '@/wait-tracker';
import { createMultiFormDataParser } from '@/webhooks/webhook-form-data';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import * as WorkflowHelpers from '@/workflow-helpers';
@ -548,11 +549,21 @@ export async function executeWebhook(
{ executionId },
);
const activeExecutions = Container.get(ActiveExecutions);
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = activeExecutions.getPostExecutePromise(executionId);
const { parentExecution } = runExecutionData;
if (parentExecution) {
// on child execution completion, resume parent execution
void executePromise.then(() => {
const waitTracker = Container.get(WaitTracker);
void waitTracker.startExecution(parentExecution.executionId);
});
}
if (!didSendResponse) {
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
executionId,
) as Promise<IExecutionDb | undefined>;
executePromise
// eslint-disable-next-line complexity
.then(async (data) => {

View file

@ -709,6 +709,7 @@ export async function getRunData(
waitingExecution: {},
waitingExecutionSource: {},
},
parentExecution,
};
return {
@ -944,6 +945,7 @@ async function startExecution(
return {
executionId,
data: returnData!.data!.main,
waitTill: data.waitTill,
};
}
activeExecutions.finalizeExecution(executionId, data);

View file

@ -1,4 +1,4 @@
import { captor, mock } from 'jest-mock-extended';
import { captor, mock, type MockProxy } from 'jest-mock-extended';
import type {
IRunExecutionData,
ContextType,
@ -9,11 +9,21 @@ import type {
ITaskMetadata,
ISourceData,
IExecuteData,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowData,
RelatedExecution,
IExecuteWorkflowInfo,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY } from 'n8n-workflow';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import type { BaseExecuteContext } from '../base-execute-context';
const binaryDataService = mock<BinaryDataService>();
Container.set(BinaryDataService, binaryDataService);
export const describeCommonTests = (
context: BaseExecuteContext,
{
@ -31,7 +41,7 @@ export const describeCommonTests = (
},
) => {
// @ts-expect-error `additionalData` is private
const { additionalData } = context;
const additionalData = context.additionalData as MockProxy<IWorkflowExecuteAdditionalData>;
describe('getExecutionCancelSignal', () => {
it('should return the abort signal', () => {
@ -178,4 +188,55 @@ export const describeCommonTests = (
resolveSimpleParameterValueSpy.mockRestore();
});
});
describe('putExecutionToWait', () => {
it('should set waitTill and execution status', async () => {
const waitTill = new Date();
await context.putExecutionToWait(waitTill);
expect(runExecutionData.waitTill).toEqual(waitTill);
expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
});
});
describe('executeWorkflow', () => {
const data = [[{ json: { test: true } }]];
const executeWorkflowData = mock<ExecuteWorkflowData>();
const workflowInfo = mock<IExecuteWorkflowInfo>();
const parentExecution: RelatedExecution = {
executionId: 'parent_execution_id',
workflowId: 'parent_workflow_id',
};
it('should execute workflow and return data', async () => {
additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData);
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});
expect(result.data).toEqual(data);
expect(binaryDataService.duplicateBinaryData).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId,
executeWorkflowData.data,
);
});
it('should put execution to wait if waitTill is returned', async () => {
const waitTill = new Date();
additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill });
binaryDataService.duplicateBinaryData.mockResolvedValue(data);
const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});
expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
expect(runExecutionData.waitTill).toEqual(WAIT_INDEFINITELY);
expect(result.waitTill).toBe(waitTill);
});
});
};

View file

@ -22,7 +22,7 @@ import type {
ISourceData,
AiEvent,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WorkflowDataProxy } from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY, WorkflowDataProxy } from 'n8n-workflow';
import { Container } from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
@ -97,6 +97,13 @@ export class BaseExecuteContext extends NodeExecutionContext {
);
}
async putExecutionToWait(waitTill: Date): Promise<void> {
this.runExecutionData.waitTill = waitTill;
if (this.additionalData.setExecutionStatus) {
this.additionalData.setExecutionStatus('waiting');
}
}
async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
@ -106,23 +113,28 @@ export class BaseExecuteContext extends NodeExecutionContext {
parentExecution?: RelatedExecution;
},
): Promise<ExecuteWorkflowData> {
return await this.additionalData
.executeWorkflow(workflowInfo, this.additionalData, {
...options,
parentWorkflowId: this.workflow.id?.toString(),
inputData,
parentWorkflowSettings: this.workflow.settings,
node: this.node,
parentCallbackManager,
})
.then(async (result) => {
const data = await this.binaryDataService.duplicateBinaryData(
this.workflow.id,
this.additionalData.executionId!,
result.data,
);
return { ...result, data };
});
const result = await this.additionalData.executeWorkflow(workflowInfo, this.additionalData, {
...options,
parentWorkflowId: this.workflow.id,
inputData,
parentWorkflowSettings: this.workflow.settings,
node: this.node,
parentCallbackManager,
});
// If a sub-workflow execution goes into the waiting state
if (result.waitTill) {
// then put the parent workflow execution also into the waiting state,
// but do not use the sub-workflow `waitTill` to avoid WaitTracker resuming the parent execution at the same time as the sub-workflow
await this.putExecutionToWait(WAIT_INDEFINITELY);
}
const data = await this.binaryDataService.duplicateBinaryData(
this.workflow.id,
this.additionalData.executionId!,
result.data,
);
return { ...result, data };
}
getNodeInputs(): INodeInputConfiguration[] {

View file

@ -179,13 +179,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
return inputData[inputIndex];
}
async putExecutionToWait(waitTill: Date): Promise<void> {
this.runExecutionData.waitTill = waitTill;
if (this.additionalData.setExecutionStatus) {
this.additionalData.setExecutionStatus('waiting');
}
}
logNodeOutput(...args: unknown[]): void {
if (this.mode === 'manual') {
this.sendMessageToUI(...args);

View file

@ -9,7 +9,6 @@ import {
SIMULATE_NODE_TYPE,
SIMULATE_TRIGGER_NODE_TYPE,
WAIT_NODE_TYPE,
WAIT_TIME_UNLIMITED,
} from '@/constants';
import type {
ExecutionSummary,
@ -18,7 +17,12 @@ import type {
NodeOperationError,
Workflow,
} from 'n8n-workflow';
import { NodeConnectionType, NodeHelpers, SEND_AND_WAIT_OPERATION } from 'n8n-workflow';
import {
NodeConnectionType,
NodeHelpers,
SEND_AND_WAIT_OPERATION,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import type { StyleValue } from 'vue';
import { computed, onMounted, ref, watch } from 'vue';
import xss from 'xss';
@ -345,7 +349,7 @@ const waiting = computed(() => {
return i18n.baseText('node.theNodeIsWaitingFormCall');
}
const waitDate = new Date(workflowExecution.waitTill);
if (waitDate.toISOString() === WAIT_TIME_UNLIMITED) {
if (waitDate.getTime() === WAIT_INDEFINITELY.getTime()) {
return i18n.baseText('node.theNodeIsWaitingIndefinitelyForAnIncomingWebhookCall');
}
return i18n.baseText('node.nodeIsWaitingTill', {

View file

@ -1,8 +1,8 @@
<script lang="ts" setup>
import { ref, computed, useCssModule } from 'vue';
import type { ExecutionSummary } from 'n8n-workflow';
import { WAIT_INDEFINITELY } from 'n8n-workflow';
import { useI18n } from '@/composables/useI18n';
import { WAIT_TIME_UNLIMITED } from '@/constants';
import { convertToDisplayDate } from '@/utils/formatters/dateFormatter';
import { i18n as locale } from '@/plugins/i18n';
import ExecutionsTime from '@/components/executions/ExecutionsTime.vue';
@ -52,7 +52,7 @@ const isWaitTillIndefinite = computed(() => {
return false;
}
return new Date(props.execution.waitTill).toISOString() === WAIT_TIME_UNLIMITED;
return new Date(props.execution.waitTill).getTime() === WAIT_INDEFINITELY.getTime();
});
const isRetriable = computed(() => executionHelpers.isExecutionRetriable(props.execution));

View file

@ -37,15 +37,14 @@ import type {
ITaskData,
Workflow,
} from 'n8n-workflow';
import { NodeConnectionType, NodeHelpers, SEND_AND_WAIT_OPERATION } from 'n8n-workflow';
import type { INodeUi } from '@/Interface';
import {
CUSTOM_API_CALL_KEY,
FORM_NODE_TYPE,
STICKY_NODE_TYPE,
WAIT_NODE_TYPE,
WAIT_TIME_UNLIMITED,
} from '@/constants';
NodeConnectionType,
NodeHelpers,
SEND_AND_WAIT_OPERATION,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import type { INodeUi } from '@/Interface';
import { CUSTOM_API_CALL_KEY, FORM_NODE_TYPE, STICKY_NODE_TYPE, WAIT_NODE_TYPE } from '@/constants';
import { sanitizeHtml } from '@/utils/htmlUtils';
import { MarkerType } from '@vue-flow/core';
import { useNodeHelpers } from './useNodeHelpers';
@ -419,7 +418,7 @@ export function useCanvasMapping({
const waitDate = new Date(workflowExecution.waitTill);
if (waitDate.toISOString() === WAIT_TIME_UNLIMITED) {
if (waitDate.getTime() === WAIT_INDEFINITELY.getTime()) {
acc[node.id] = i18n.baseText(
'node.theNodeIsWaitingIndefinitelyForAnIncomingWebhookCall',
);

View file

@ -300,7 +300,6 @@ export const NODE_CONNECTION_TYPE_ALLOW_MULTIPLE: NodeConnectionType[] = [
// General
export const INSTANCE_ID_HEADER = 'n8n-instance-id';
export const WAIT_TIME_UNLIMITED = '3000-01-01T00:00:00.000Z';
/** PERSONALIZATION SURVEY */
export const EMAIL_KEY = 'email';

View file

@ -7,7 +7,6 @@ import type {
NodeTypeAndVersion,
} from 'n8n-workflow';
import {
WAIT_TIME_UNLIMITED,
Node,
updateDisplayOptions,
NodeOperationError,
@ -16,6 +15,7 @@ import {
tryToParseJsonToFormFields,
NodeConnectionType,
WAIT_NODE_TYPE,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import { formDescription, formFields, formTitle } from '../Form/common.descriptions';
@ -409,8 +409,7 @@ export class Form extends Node {
}
if (operation !== 'completion') {
const waitTill = new Date(WAIT_TIME_UNLIMITED);
await context.putExecutionToWait(waitTill);
await context.putExecutionToWait(WAIT_INDEFINITELY);
} else {
const staticData = context.getWorkflowStaticData('node');
const completionTitle = context.getNodeParameter('completionTitle', 0, '') as string;

View file

@ -12,7 +12,7 @@ import {
NodeConnectionType,
NodeOperationError,
SEND_AND_WAIT_OPERATION,
WAIT_TIME_UNLIMITED,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import type { IEmail } from '../../../../utils/sendAndWait/interfaces';
@ -270,7 +270,7 @@ export class GmailV2 implements INodeType {
raw: await encodeEmail(email),
});
await this.putExecutionToWait(new Date(WAIT_TIME_UNLIMITED));
await this.putExecutionToWait(WAIT_INDEFINITELY);
return [this.getInputData()];
}

View file

@ -20,7 +20,7 @@ import {
NodeConnectionType,
NodeOperationError,
SEND_AND_WAIT_OPERATION,
WAIT_TIME_UNLIMITED,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import moment from 'moment-timezone';
@ -379,7 +379,7 @@ export class SlackV2 implements INodeType {
createSendAndWaitMessageBody(this),
);
await this.putExecutionToWait(new Date(WAIT_TIME_UNLIMITED));
await this.putExecutionToWait(WAIT_INDEFINITELY);
return [this.getInputData()];
}

View file

@ -7,7 +7,7 @@ import type {
IDisplayOptions,
IWebhookFunctions,
} from 'n8n-workflow';
import { WAIT_TIME_UNLIMITED, NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType, WAIT_INDEFINITELY } from 'n8n-workflow';
import {
authenticationProperty,
@ -516,7 +516,7 @@ export class Wait extends Webhook {
}
private async configureAndPutToWait(context: IExecuteFunctions) {
let waitTill = new Date(WAIT_TIME_UNLIMITED);
let waitTill = WAIT_INDEFINITELY;
const limitWaitTime = context.getNodeParameter('limitWaitTime', 0);
if (limitWaitTime === true) {

View file

@ -6,7 +6,7 @@ export const LOWERCASE_LETTERS = UPPERCASE_LETTERS.toLowerCase();
export const ALPHABET = [DIGITS, UPPERCASE_LETTERS, LOWERCASE_LETTERS].join('');
export const BINARY_ENCODING = 'base64';
export const WAIT_TIME_UNLIMITED = '3000-01-01T00:00:00.000Z';
export const WAIT_INDEFINITELY = new Date('3000-01-01T00:00:00.000Z');
export const LOG_LEVELS = ['silent', 'error', 'warn', 'info', 'debug'] as const;

View file

@ -1559,6 +1559,7 @@ export interface ITriggerResponse {
export interface ExecuteWorkflowData {
executionId: string;
data: Array<INodeExecutionData[] | null>;
waitTill?: Date | null;
}
export type WebhookSetupMethodNames = 'checkExists' | 'create' | 'delete';
@ -2143,6 +2144,7 @@ export interface IRunExecutionData {
waitingExecution: IWaitingForExecution;
waitingExecutionSource: IWaitingForExecutionSource | null;
};
parentExecution?: RelatedExecution;
waitTill?: Date;
pushRef?: string;
}