fix(core): Update subworkflow execution status correctly (#10764)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-11 10:39:18 +02:00 committed by GitHub
parent b9d157db40
commit 4f94319cd9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 51 additions and 78 deletions

View file

@ -1,9 +1,12 @@
import { ActiveExecutions } from '@/active-executions';
import PCancelable from 'p-cancelable';
import { v4 as uuid } from 'uuid';
import type { IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type {
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { mock } from 'jest-mock-extended';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';

View file

@ -1,6 +1,5 @@
import { type Workflow } from 'n8n-workflow';
import type { Workflow, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { getExecutionStartNode } from '@/workflow-helpers';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
describe('WorkflowHelpers', () => {
describe('getExecutionStartNode', () => {

View file

@ -5,6 +5,7 @@ import type {
IExecuteResponsePromiseData,
IRun,
ExecutionStatus,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
@ -19,7 +20,6 @@ import type {
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';

View file

@ -3,7 +3,7 @@ import { Container } from 'typedi';
import { Flags } from '@oclif/core';
import fs from 'fs';
import os from 'os';
import type { IRun, ITaskData } from 'n8n-workflow';
import type { IRun, ITaskData, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { sep } from 'path';
import { diff } from 'json-diff';
@ -11,7 +11,7 @@ import pick from 'lodash/pick';
import { ActiveExecutions } from '@/active-executions';
import { WorkflowRunner } from '@/workflow-runner';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import type { User } from '@/databases/entities/user';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { OwnershipService } from '@/services/ownership.service';

View file

@ -1,11 +1,10 @@
import { Container } from 'typedi';
import { Flags } from '@oclif/core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { IWorkflowBase, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, ExecutionBaseError } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { WorkflowRunner } from '@/workflow-runner';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { BaseCommand } from './base-command';

View file

@ -8,7 +8,7 @@ import { createReadStream, createWriteStream, existsSync } from 'fs';
import { pipeline } from 'stream/promises';
import replaceStream from 'replacestream';
import glob from 'fast-glob';
import { jsonParse, randomString } from 'n8n-workflow';
import { jsonParse, randomString, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import config from '@/config';
import { ActiveExecutions } from '@/active-executions';
@ -26,7 +26,6 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/wait-tracker';
import { BaseCommand } from './base-command';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/workflow-runner';

View file

@ -3,8 +3,9 @@ import type {
IPersonalizationSurveyAnswersV4,
IRun,
IWorkflowBase,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import type { ProjectRole } from '@/databases/entities/project-relation';
import type { GlobalRole } from '@/databases/entities/user';
import type { AuthProviderType } from '@/databases/entities/auth-identity';

View file

@ -8,6 +8,7 @@ import type {
IRunExecutionData,
IWorkflowBase,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
@ -21,7 +22,6 @@ import type {
IExecutionFlattedResponse,
IExecutionResponse,
IWorkflowDb,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import { NodeTypes } from '@/node-types';
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';

View file

@ -7,9 +7,7 @@ import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
IPinData,
IRun,
IRunData,
IRunExecutionData,
ITaskData,
ITelemetryTrackProperties,
@ -22,7 +20,7 @@ import type {
FeatureFlags,
INodeProperties,
IUserSettings,
StartNodeData,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import type { ActiveWorkflowManager } from '@/active-workflow-manager';
@ -495,21 +493,6 @@ export interface IWorkflowErrorData {
};
}
export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
pinData?: IPinData;
retryOf?: string;
pushRef?: string;
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId?: string;
projectId?: string;
}
export interface IWorkflowExecuteProcess {
startedAt: Date;
workflow: Workflow;

View file

@ -1,6 +1,9 @@
import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
type IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { Service } from 'typedi';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { WorkflowRunner } from '@/workflow-runner';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { OwnershipService } from '@/services/ownership.service';

View file

@ -31,6 +31,7 @@ import type {
WebhookResponseMode,
Workflow,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
@ -53,7 +54,7 @@ import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import type { Project } from '@/databases/entities/project';
import type { IExecutionDb, IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IExecutionDb, IWorkflowDb } from '@/interfaces';
/**
* Returns all the webhooks which should be created for the given workflow

View file

@ -24,7 +24,8 @@ import type {
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
CallbackManager,
ExecuteWorkflowOptions,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
@ -42,7 +43,6 @@ import { ExternalHooks } from '@/external-hooks';
import type {
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
IPushDataType,
ExecutionPayload,
@ -714,13 +714,11 @@ export async function getRunData(
},
};
const runData: IWorkflowExecutionDataProcess = {
return {
executionMode: mode,
executionData: runExecutionData,
workflowData,
};
return runData;
}
export async function getWorkflowData(
@ -769,16 +767,7 @@ export async function getWorkflowData(
async function executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: {
node?: INode;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
},
options: ExecuteWorkflowOptions,
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
@ -786,6 +775,7 @@ async function executeWorkflow(
const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const eventService = Container.get(EventService);
const executionRepository = Container.get(ExecutionRepository);
const workflowData =
options.loadedWorkflowData ??
@ -805,13 +795,8 @@ async function executeWorkflow(
const runData = options.loadedRunData ?? (await getRunData(workflowData, options.inputData));
let executionId;
if (options.parentExecutionId !== undefined) {
executionId = options.parentExecutionId;
} else {
executionId = options.parentExecutionId ?? (await activeExecutions.add(runData));
}
const executionId = await activeExecutions.add(runData);
await executionRepository.updateStatus(executionId, 'running');
Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData });
@ -862,14 +847,6 @@ async function executeWorkflow(
runData.executionMode,
runExecutionData,
);
if (options.parentExecutionId !== undefined) {
// Must be changed to become typed
return {
startedAt: new Date(),
workflow,
workflowExecute,
};
}
const execution = workflowExecute.processRunExecutionData(workflow);
activeExecutions.attachWorkflowExecution(executionId, execution);
data = await execution;
@ -909,10 +886,7 @@ async function executeWorkflow(
// remove execution from active executions
activeExecutions.remove(executionId, fullRunData);
await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError(
{
...executionError,

View file

@ -11,9 +11,9 @@ import type {
WorkflowOperationError,
Workflow,
NodeOperationError,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { CredentialsRepository } from '@/databases/repositories/credentials.repository';
import { VariablesService } from '@/environments/variables/variables.service.ee';

View file

@ -13,6 +13,7 @@ import type {
IRun,
WorkflowExecuteMode,
WorkflowHooks,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
@ -26,7 +27,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IExecutionResponse } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import type { Job, JobData, JobResult } from '@/scaling/scaling.types';
import type { ScalingService } from '@/scaling/scaling.service';

View file

@ -9,6 +9,7 @@ import type {
IRunExecutionData,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
SubworkflowOperationError,
@ -21,12 +22,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import * as WorkflowHelpers from '@/workflow-helpers';
import type { WorkflowRequest } from '@/workflows/workflow.request';
import type {
ExecutionPayload,
IWorkflowDb,
IWorkflowErrorData,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { WorkflowRunner } from '@/workflow-runner';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';

View file

@ -2167,13 +2167,27 @@ export const eventNamesAiNodes = [
export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];
export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
pinData?: IPinData;
retryOf?: string;
pushRef?: string;
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId?: string;
projectId?: string;
}
export interface ExecuteWorkflowOptions {
node?: INode;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: any;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
}