diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index a002bc4054..403e60f51d 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -40,6 +40,7 @@ import { import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { OnShutdown } from '@/decorators/on-shutdown'; +import { executeErrorWorkflow } from '@/execution-lifecycle/execute-error-workflow'; import { ExecutionService } from '@/executions/execution.service'; import { ExternalHooks } from '@/external-hooks'; import type { IWorkflowDb } from '@/interfaces'; @@ -400,7 +401,7 @@ export class ActiveWorkflowManager { status: 'running', }; - WorkflowExecuteAdditionalData.executeErrorWorkflow(workflowData, fullRunData, mode); + executeErrorWorkflow(workflowData, fullRunData, mode); } /** diff --git a/packages/cli/src/events/relays/telemetry.event-relay.ts b/packages/cli/src/events/relays/telemetry.event-relay.ts index 67fbacb107..8d0f050dee 100644 --- a/packages/cli/src/events/relays/telemetry.event-relay.ts +++ b/packages/cli/src/events/relays/telemetry.event-relay.ts @@ -15,7 +15,7 @@ import { SharedWorkflowRepository } from '@/databases/repositories/shared-workfl import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventService } from '@/events/event.service'; import type { RelayEventMap } from '@/events/maps/relay.event-map'; -import { determineFinalExecutionStatus } from '@/execution-lifecycle-hooks/shared/shared-hook-functions'; +import { determineFinalExecutionStatus } from '@/execution-lifecycle/shared/shared-hook-functions'; import type { IExecutionTrackProperties } from '@/interfaces'; import { License } from '@/license'; import { NodeTypes } from '@/node-types'; diff --git a/packages/cli/src/__tests__/execution-lifecycle-hooks.test.ts b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts similarity index 99% rename from packages/cli/src/__tests__/execution-lifecycle-hooks.test.ts rename to packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts index 227f89dd40..5ea8e411ad 100644 --- a/packages/cli/src/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts @@ -28,7 +28,7 @@ import { getWorkflowHooksMain, getWorkflowHooksWorkerExecuter, getWorkflowHooksWorkerMain, -} from '../workflow-execute-additional-data'; +} from '../execution-lifecycle-hooks'; describe('Execution Lifecycle Hooks', () => { mockInstance(Logger); diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/restore-binary-data-id.test.ts b/packages/cli/src/execution-lifecycle/__tests__/restore-binary-data-id.test.ts similarity index 98% rename from packages/cli/src/execution-lifecycle-hooks/__tests__/restore-binary-data-id.test.ts rename to packages/cli/src/execution-lifecycle/__tests__/restore-binary-data-id.test.ts index f4f7a463bc..76ac0d4e21 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/restore-binary-data-id.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/restore-binary-data-id.test.ts @@ -2,7 +2,7 @@ import { BinaryDataService } from 'n8n-core'; import type { IRun } from 'n8n-workflow'; import config from '@/config'; -import { restoreBinaryDataId } from '@/execution-lifecycle-hooks/restore-binary-data-id'; +import { restoreBinaryDataId } from '@/execution-lifecycle/restore-binary-data-id'; import { mockInstance } from '@test/mocking'; function toIRun(item?: object) { diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts b/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts similarity index 94% rename from packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts rename to packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts index ac52cf3920..863006d9e7 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/save-execution-progress.test.ts @@ -3,11 +3,12 @@ import { Logger } from 'n8n-core'; import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { saveExecutionProgress } from '@/execution-lifecycle-hooks/save-execution-progress'; -import * as fnModule from '@/execution-lifecycle-hooks/to-save-settings'; import type { IExecutionResponse } from '@/interfaces'; import { mockInstance } from '@test/mocking'; +import { saveExecutionProgress } from '../save-execution-progress'; +import * as fnModule from '../to-save-settings'; + mockInstance(Logger); const errorReporter = mockInstance(ErrorReporter); const executionRepository = mockInstance(ExecutionRepository); diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/to-save-settings.test.ts b/packages/cli/src/execution-lifecycle/__tests__/to-save-settings.test.ts similarity index 98% rename from packages/cli/src/execution-lifecycle-hooks/__tests__/to-save-settings.test.ts rename to packages/cli/src/execution-lifecycle/__tests__/to-save-settings.test.ts index f12c209827..142b3c34ce 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/to-save-settings.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/to-save-settings.test.ts @@ -1,5 +1,6 @@ import config from '@/config'; -import { toSaveSettings } from '@/execution-lifecycle-hooks/to-save-settings'; + +import { toSaveSettings } from '../to-save-settings'; afterEach(() => { config.load(config.default); diff --git a/packages/cli/src/execution-lifecycle/execute-error-workflow.ts b/packages/cli/src/execution-lifecycle/execute-error-workflow.ts new file mode 100644 index 0000000000..fefce8a97b --- /dev/null +++ b/packages/cli/src/execution-lifecycle/execute-error-workflow.ts @@ -0,0 +1,130 @@ +import { GlobalConfig } from '@n8n/config'; +import { Container } from '@n8n/di'; +import { ErrorReporter, Logger } from 'n8n-core'; +import type { IRun, IWorkflowBase, WorkflowExecuteMode } from 'n8n-workflow'; + +import type { IWorkflowErrorData } from '@/interfaces'; +import { OwnershipService } from '@/services/ownership.service'; +import { UrlService } from '@/services/url.service'; +import { WorkflowExecutionService } from '@/workflows/workflow-execution.service'; + +/** + * Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects + * all the data and executes it + * + * @param {IWorkflowBase} workflowData The workflow which got executed + * @param {IRun} fullRunData The run which produced the error + * @param {WorkflowExecuteMode} mode The mode in which the workflow got started in + * @param {string} [executionId] The id the execution got saved as + */ +export function executeErrorWorkflow( + workflowData: IWorkflowBase, + fullRunData: IRun, + mode: WorkflowExecuteMode, + executionId?: string, + retryOf?: string, +): void { + const logger = Container.get(Logger); + + // Check if there was an error and if so if an errorWorkflow or a trigger is set + let pastExecutionUrl: string | undefined; + if (executionId !== undefined) { + pastExecutionUrl = `${Container.get(UrlService).getWebhookBaseUrl()}workflow/${ + workflowData.id + }/executions/${executionId}`; + } + + if (fullRunData.data.resultData.error !== undefined) { + let workflowErrorData: IWorkflowErrorData; + const workflowId = workflowData.id; + + if (executionId) { + // The error did happen in an execution + workflowErrorData = { + execution: { + id: executionId, + url: pastExecutionUrl, + error: fullRunData.data.resultData.error, + lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!, + mode, + retryOf, + }, + workflow: { + id: workflowId, + name: workflowData.name, + }, + }; + } else { + // The error did happen in a trigger + workflowErrorData = { + trigger: { + error: fullRunData.data.resultData.error, + mode, + }, + workflow: { + id: workflowId, + name: workflowData.name, + }, + }; + } + + const { errorTriggerType } = Container.get(GlobalConfig).nodes; + // Run the error workflow + // To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow. + const { errorWorkflow } = workflowData.settings ?? {}; + if (errorWorkflow && !(mode === 'error' && workflowId && errorWorkflow === workflowId)) { + logger.debug('Start external error workflow', { + executionId, + errorWorkflowId: errorWorkflow, + workflowId, + }); + // If a specific error workflow is set run only that one + + // First, do permission checks. + if (!workflowId) { + // Manual executions do not trigger error workflows + // So this if should never happen. It was added to + // make sure there are no possible security gaps + return; + } + + Container.get(OwnershipService) + .getWorkflowProjectCached(workflowId) + .then((project) => { + void Container.get(WorkflowExecutionService).executeErrorWorkflow( + errorWorkflow, + workflowErrorData, + project, + ); + }) + .catch((error: Error) => { + Container.get(ErrorReporter).error(error); + logger.error( + `Could not execute ErrorWorkflow for execution ID ${executionId} because of error querying the workflow owner`, + { + executionId, + errorWorkflowId: errorWorkflow, + workflowId, + error, + workflowErrorData, + }, + ); + }); + } else if ( + mode !== 'error' && + workflowId !== undefined && + workflowData.nodes.some((node) => node.type === errorTriggerType) + ) { + logger.debug('Start internal error workflow', { executionId, workflowId }); + void Container.get(OwnershipService) + .getWorkflowProjectCached(workflowId) + .then((project) => { + void Container.get(WorkflowExecutionService).executeErrorWorkflow( + workflowId, + workflowErrorData, + project, + ); + }); + } + } +} diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts new file mode 100644 index 0000000000..1296f53958 --- /dev/null +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -0,0 +1,628 @@ +import { Container } from '@n8n/di'; +import { stringify } from 'flatted'; +import { ErrorReporter, Logger, InstanceSettings } from 'n8n-core'; +import { WorkflowHooks } from 'n8n-workflow'; +import type { + IDataObject, + INode, + IRun, + IRunExecutionData, + ITaskData, + IWorkflowBase, + IWorkflowExecuteHooks, + IWorkflowHooksOptionalParameters, + WorkflowExecuteMode, + IWorkflowExecutionDataProcess, + Workflow, +} from 'n8n-workflow'; + +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { EventService } from '@/events/event.service'; +import { ExternalHooks } from '@/external-hooks'; +import { Push } from '@/push'; +import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { isWorkflowIdValid } from '@/utils'; +import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; + +import { executeErrorWorkflow } from './execute-error-workflow'; +import { restoreBinaryDataId } from './restore-binary-data-id'; +import { saveExecutionProgress } from './save-execution-progress'; +import { + determineFinalExecutionStatus, + prepareExecutionDataForDbUpdate, + updateExistingExecution, +} from './shared/shared-hook-functions'; +import { toSaveSettings } from './to-save-settings'; + +/** + * Returns hook functions to push data to Editor-UI + */ +function hookFunctionsPush(): IWorkflowExecuteHooks { + const logger = Container.get(Logger); + const pushInstance = Container.get(Push); + return { + nodeExecuteBefore: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { pushRef, executionId } = this; + // Push data to session which started workflow before each + // node which starts rendering + if (pushRef === undefined) { + return; + } + + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); + + pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); + }, + ], + nodeExecuteAfter: [ + async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { + const { pushRef, executionId } = this; + // Push data to session which started workflow after each rendered node + if (pushRef === undefined) { + return; + } + + logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { + executionId, + pushRef, + workflowId: this.workflowData.id, + }); + + pushInstance.send( + { type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, + pushRef, + ); + }, + ], + workflowExecuteBefore: [ + async function (this: WorkflowHooks, _workflow, data): Promise { + const { pushRef, executionId } = this; + const { id: workflowId, name: workflowName } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); + // Push data to session which started the workflow + if (pushRef === undefined) { + return; + } + pushInstance.send( + { + type: 'executionStarted', + data: { + executionId, + mode: this.mode, + startedAt: new Date(), + retryOf: this.retryOf, + workflowId, + workflowName, + flattedRunData: data?.resultData.runData + ? stringify(data.resultData.runData) + : stringify({}), + }, + }, + pushRef, + ); + }, + ], + workflowExecuteAfter: [ + async function (this: WorkflowHooks, fullRunData: IRun): Promise { + const { pushRef, executionId } = this; + if (pushRef === undefined) return; + + const { id: workflowId } = this.workflowData; + logger.debug('Executing hook (hookFunctionsPush)', { + executionId, + pushRef, + workflowId, + }); + + const { status } = fullRunData; + if (status === 'waiting') { + pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); + } else { + const rawData = stringify(fullRunData.data); + pushInstance.send( + { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, + pushRef, + ); + } + }, + ], + }; +} + +function hookFunctionsPreExecute(): IWorkflowExecuteHooks { + const externalHooks = Container.get(ExternalHooks); + return { + workflowExecuteBefore: [ + async function (this: WorkflowHooks, workflow: Workflow): Promise { + await externalHooks.run('workflow.preExecute', [workflow, this.mode]); + }, + ], + nodeExecuteAfter: [ + async function ( + this: WorkflowHooks, + nodeName: string, + data: ITaskData, + executionData: IRunExecutionData, + ): Promise { + await saveExecutionProgress( + this.workflowData, + this.executionId, + nodeName, + data, + executionData, + this.pushRef, + ); + }, + ], + }; +} + +/** + * Returns hook functions to save workflow execution and call error workflow + */ +function hookFunctionsSave(): IWorkflowExecuteHooks { + const logger = Container.get(Logger); + const workflowStatisticsService = Container.get(WorkflowStatisticsService); + const eventService = Container.get(EventService); + return { + nodeExecuteBefore: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + + eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); + }, + ], + nodeExecuteAfter: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + + eventService.emit('node-post-execute', { executionId, workflow, nodeName }); + }, + ], + workflowExecuteBefore: [], + workflowExecuteAfter: [ + async function ( + this: WorkflowHooks, + fullRunData: IRun, + newStaticData: IDataObject, + ): Promise { + logger.debug('Executing hook (hookFunctionsSave)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + }); + + await restoreBinaryDataId(fullRunData, this.executionId, this.mode); + + const isManualMode = this.mode === 'manual'; + + try { + if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { + // Workflow is saved so update in database + try { + await Container.get(WorkflowStaticDataService).saveStaticDataById( + this.workflowData.id, + newStaticData, + ); + } catch (e) { + Container.get(ErrorReporter).error(e); + logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, + { executionId: this.executionId, workflowId: this.workflowData.id }, + ); + } + } + + const executionStatus = determineFinalExecutionStatus(fullRunData); + fullRunData.status = executionStatus; + + const saveSettings = toSaveSettings(this.workflowData.settings); + + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await Container.get(ExecutionRepository).softDelete(this.executionId); + + return; + } + + const shouldNotSave = + (executionStatus === 'success' && !saveSettings.success) || + (executionStatus !== 'success' && !saveSettings.error); + + if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + + await Container.get(ExecutionRepository).hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + + return; + } + + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal: executionStatus, + retryOf: this.retryOf, + }); + + // When going into the waiting state, store the pushRef in the execution-data + if (fullRunData.waitTill && isManualMode) { + fullExecutionData.data.pushRef = this.pushRef; + } + + await updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id, + executionData: fullExecutionData, + }); + + if (!isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + } catch (error) { + Container.get(ErrorReporter).error(error); + logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, { + executionId: this.executionId, + workflowId: this.workflowData.id, + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + error, + }); + if (!isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + } finally { + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, + fullRunData, + }); + } + }, + ], + nodeFetchedData: [ + async (workflowId: string, node: INode) => { + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); + }, + ], + }; +} + +/** + * Returns hook functions to save workflow execution and call error workflow + * for running with queues. Manual executions should never run on queues as + * they are always executed in the main process. + */ +function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { + const logger = Container.get(Logger); + const workflowStatisticsService = Container.get(WorkflowStatisticsService); + const eventService = Container.get(EventService); + return { + nodeExecuteBefore: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + + eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); + }, + ], + nodeExecuteAfter: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + + eventService.emit('node-post-execute', { executionId, workflow, nodeName }); + }, + ], + workflowExecuteBefore: [ + async function (this: WorkflowHooks): Promise { + const { executionId, workflowData } = this; + + eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); + }, + ], + workflowExecuteAfter: [ + async function ( + this: WorkflowHooks, + fullRunData: IRun, + newStaticData: IDataObject, + ): Promise { + logger.debug('Executing hook (hookFunctionsSaveWorker)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + }); + + const isManualMode = this.mode === 'manual'; + + try { + if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { + // Workflow is saved so update in database + try { + await Container.get(WorkflowStaticDataService).saveStaticDataById( + this.workflowData.id, + newStaticData, + ); + } catch (e) { + Container.get(ErrorReporter).error(e); + logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, + { pushRef: this.pushRef, workflowId: this.workflowData.id }, + ); + } + } + + const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); + fullRunData.status = workflowStatusFinal; + + if ( + !isManualMode && + workflowStatusFinal !== 'success' && + workflowStatusFinal !== 'waiting' + ) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal, + retryOf: this.retryOf, + }); + + // When going into the waiting state, store the pushRef in the execution-data + if (fullRunData.waitTill && isManualMode) { + fullExecutionData.data.pushRef = this.pushRef; + } + + await updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id, + executionData: fullExecutionData, + }); + } catch (error) { + if (!isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); + } + } finally { + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, + fullRunData, + }); + } + }, + async function (this: WorkflowHooks, runData: IRun): Promise { + const { executionId, workflowData: workflow } = this; + + eventService.emit('workflow-post-execute', { + workflow, + executionId, + runData, + }); + }, + async function (this: WorkflowHooks, fullRunData: IRun) { + const externalHooks = Container.get(ExternalHooks); + if (externalHooks.exists('workflow.postExecute')) { + try { + await externalHooks.run('workflow.postExecute', [ + fullRunData, + this.workflowData, + this.executionId, + ]); + } catch (error) { + Container.get(ErrorReporter).error(error); + Container.get(Logger).error( + 'There was a problem running hook "workflow.postExecute"', + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + error, + ); + } + } + }, + ], + nodeFetchedData: [ + async (workflowId: string, node: INode) => { + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); + }, + ], + }; +} + +/** + * Returns WorkflowHooks instance for running integrated workflows + * (Workflows which get started inside of another workflow) + */ +export function getWorkflowHooksIntegrated( + mode: WorkflowExecuteMode, + executionId: string, + workflowData: IWorkflowBase, +): WorkflowHooks { + const hookFunctions = hookFunctionsSave(); + const preExecuteFunctions = hookFunctionsPreExecute(); + for (const key of Object.keys(preExecuteFunctions)) { + const hooks = hookFunctions[key] ?? []; + hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); +} + +/** + * Returns WorkflowHooks instance for worker in scaling mode. + */ +export function getWorkflowHooksWorkerExecuter( + mode: WorkflowExecuteMode, + executionId: string, + workflowData: IWorkflowBase, + optionalParameters?: IWorkflowHooksOptionalParameters, +): WorkflowHooks { + optionalParameters = optionalParameters || {}; + const hookFunctions = hookFunctionsSaveWorker(); + const preExecuteFunctions = hookFunctionsPreExecute(); + for (const key of Object.keys(preExecuteFunctions)) { + const hooks = hookFunctions[key] ?? []; + hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + + if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { + const pushHooks = hookFunctionsPush(); + for (const key of Object.keys(pushHooks)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + // eslint-disable-next-line prefer-spread + hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]); + } + } + + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); +} + +/** + * Returns WorkflowHooks instance for main process if workflow runs via worker + */ +export function getWorkflowHooksWorkerMain( + mode: WorkflowExecuteMode, + executionId: string, + workflowData: IWorkflowBase, + optionalParameters?: IWorkflowHooksOptionalParameters, +): WorkflowHooks { + optionalParameters = optionalParameters || {}; + const hookFunctions = hookFunctionsPreExecute(); + + // TODO: why are workers pushing to frontend? + // TODO: simplifying this for now to just leave the bare minimum hooks + + // const hookFunctions = hookFunctionsPush(); + // const preExecuteFunctions = hookFunctionsPreExecute(); + // for (const key of Object.keys(preExecuteFunctions)) { + // if (hookFunctions[key] === undefined) { + // hookFunctions[key] = []; + // } + // hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + // } + + // When running with worker mode, main process executes + // Only workflowExecuteBefore + workflowExecuteAfter + // So to avoid confusion, we are removing other hooks. + hookFunctions.nodeExecuteBefore = []; + hookFunctions.nodeExecuteAfter = []; + hookFunctions.workflowExecuteAfter = [ + async function (this: WorkflowHooks, fullRunData: IRun): Promise { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; + + const executionStatus = determineFinalExecutionStatus(fullRunData); + fullRunData.status = executionStatus; + + const saveSettings = toSaveSettings(this.workflowData.settings); + + const isManualMode = this.mode === 'manual'; + + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await Container.get(ExecutionRepository).softDelete(this.executionId); + + return; + } + + const shouldNotSave = + (executionStatus === 'success' && !saveSettings.success) || + (executionStatus !== 'success' && !saveSettings.error); + + if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { + await Container.get(ExecutionRepository).hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + } + }, + ]; + + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); +} + +/** + * Returns WorkflowHooks instance for running the main workflow + */ +export function getWorkflowHooksMain( + data: IWorkflowExecutionDataProcess, + executionId: string, +): WorkflowHooks { + const hookFunctions = hookFunctionsSave(); + const pushFunctions = hookFunctionsPush(); + for (const key of Object.keys(pushFunctions)) { + const hooks = hookFunctions[key] ?? []; + hooks.push.apply(hookFunctions[key], pushFunctions[key]); + } + + const preExecuteFunctions = hookFunctionsPreExecute(); + for (const key of Object.keys(preExecuteFunctions)) { + const hooks = hookFunctions[key] ?? []; + hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + + if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = []; + if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = []; + + return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { + pushRef: data.pushRef, + retryOf: data.retryOf as string, + }); +} diff --git a/packages/cli/src/execution-lifecycle-hooks/restore-binary-data-id.ts b/packages/cli/src/execution-lifecycle/restore-binary-data-id.ts similarity index 100% rename from packages/cli/src/execution-lifecycle-hooks/restore-binary-data-id.ts rename to packages/cli/src/execution-lifecycle/restore-binary-data-id.ts diff --git a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts b/packages/cli/src/execution-lifecycle/save-execution-progress.ts similarity index 97% rename from packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts rename to packages/cli/src/execution-lifecycle/save-execution-progress.ts index 9e751c90f6..8ca33c7095 100644 --- a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts +++ b/packages/cli/src/execution-lifecycle/save-execution-progress.ts @@ -3,7 +3,8 @@ import { ErrorReporter, Logger } from 'n8n-core'; import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { toSaveSettings } from '@/execution-lifecycle-hooks/to-save-settings'; + +import { toSaveSettings } from './to-save-settings'; export async function saveExecutionProgress( workflowData: IWorkflowBase, diff --git a/packages/cli/src/execution-lifecycle-hooks/shared/__tests__/shared-hook-functions.test.ts b/packages/cli/src/execution-lifecycle/shared/__tests__/shared-hook-functions.test.ts similarity index 100% rename from packages/cli/src/execution-lifecycle-hooks/shared/__tests__/shared-hook-functions.test.ts rename to packages/cli/src/execution-lifecycle/shared/__tests__/shared-hook-functions.test.ts diff --git a/packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts b/packages/cli/src/execution-lifecycle/shared/shared-hook-functions.ts similarity index 100% rename from packages/cli/src/execution-lifecycle-hooks/shared/shared-hook-functions.ts rename to packages/cli/src/execution-lifecycle/shared/shared-hook-functions.ts diff --git a/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts b/packages/cli/src/execution-lifecycle/to-save-settings.ts similarity index 100% rename from packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts rename to packages/cli/src/execution-lifecycle/to-save-settings.ts diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 503e53d023..bb759eae2c 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -9,9 +9,9 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { EventService } from '@/events/event.service'; +import { getWorkflowHooksMain } from '@/execution-lifecycle/execution-lifecycle-hooks'; import type { IExecutionResponse } from '@/interfaces'; import { Push } from '@/push'; -import { getWorkflowHooksMain } from '@/workflow-execute-additional-data'; // @TODO: Dependency cycle import type { EventMessageTypes } from '../eventbus/event-message-classes'; diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 2aff0787c4..768338d9b0 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -13,6 +13,7 @@ import type PCancelable from 'p-cancelable'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { getWorkflowHooksWorkerExecuter } from '@/execution-lifecycle/execution-lifecycle-hooks'; import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -124,7 +125,7 @@ export class JobProcessor { const { pushRef } = job.data; - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + additionalData.hooks = getWorkflowHooksWorkerExecuter( execution.mode, job.data.executionId, execution.workflowData, diff --git a/packages/cli/src/__tests__/object-to-error.test.ts b/packages/cli/src/utils/__tests__/object-to-error.test.ts similarity index 94% rename from packages/cli/src/__tests__/object-to-error.test.ts rename to packages/cli/src/utils/__tests__/object-to-error.test.ts index 311f4dce55..c65676a426 100644 --- a/packages/cli/src/__tests__/object-to-error.test.ts +++ b/packages/cli/src/utils/__tests__/object-to-error.test.ts @@ -2,7 +2,7 @@ import { mock } from 'jest-mock-extended'; import type { INode } from 'n8n-workflow'; import { NodeOperationError, type Workflow } from 'n8n-workflow'; -import { objectToError } from '../workflow-execute-additional-data'; +import { objectToError } from '../object-to-error'; describe('objectToError', () => { describe('node error handling', () => { diff --git a/packages/cli/src/utils/object-to-error.ts b/packages/cli/src/utils/object-to-error.ts new file mode 100644 index 0000000000..ffb0cd8fb3 --- /dev/null +++ b/packages/cli/src/utils/object-to-error.ts @@ -0,0 +1,53 @@ +import { isObjectLiteral } from 'n8n-core'; +import { NodeOperationError } from 'n8n-workflow'; +import type { Workflow } from 'n8n-workflow'; + +export function objectToError(errorObject: unknown, workflow: Workflow): Error { + // TODO: Expand with other error types + if (errorObject instanceof Error) { + // If it's already an Error instance, return it as is. + return errorObject; + } else if ( + isObjectLiteral(errorObject) && + 'message' in errorObject && + typeof errorObject.message === 'string' + ) { + // If it's an object with a 'message' property, create a new Error instance. + let error: Error | undefined; + if ( + 'node' in errorObject && + isObjectLiteral(errorObject.node) && + typeof errorObject.node.name === 'string' + ) { + const node = workflow.getNode(errorObject.node.name); + + if (node) { + error = new NodeOperationError( + node, + errorObject as unknown as Error, + errorObject as object, + ); + } + } + + if (error === undefined) { + error = new Error(errorObject.message); + } + + if ('description' in errorObject) { + // @ts-expect-error Error descriptions are surfaced by the UI but + // not all backend errors account for this property yet. + error.description = errorObject.description as string; + } + + if ('stack' in errorObject) { + // If there's a 'stack' property, set it on the new Error instance. + error.stack = errorObject.stack as string; + } + + return error; + } else { + // If it's neither an Error nor an object with a 'message' property, create a generic Error. + return new Error('An error occurred'); + } +} diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 7d69084357..c3b3ed8693 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -5,15 +5,8 @@ import type { PushMessage, PushType } from '@n8n/api-types'; import { GlobalConfig } from '@n8n/config'; import { Container } from '@n8n/di'; -import { stringify } from 'flatted'; -import { - ErrorReporter, - Logger, - InstanceSettings, - WorkflowExecute, - isObjectLiteral, -} from 'n8n-core'; -import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow'; +import { Logger, WorkflowExecute } from 'n8n-core'; +import { ApplicationError, Workflow } from 'n8n-workflow'; import type { IDataObject, IExecuteData, @@ -23,11 +16,8 @@ import type { INodeParameters, IRun, IRunExecutionData, - ITaskData, IWorkflowBase, IWorkflowExecuteAdditionalData, - IWorkflowExecuteHooks, - IWorkflowHooksOptionalParameters, IWorkflowSettings, WorkflowExecuteMode, ExecutionStatus, @@ -44,646 +34,23 @@ import type { import { ActiveExecutions } from '@/active-executions'; import { CredentialsHelper } from '@/credentials-helper'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { EventService } from '@/events/event.service'; import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map'; +import { getWorkflowHooksIntegrated } from '@/execution-lifecycle/execution-lifecycle-hooks'; import { ExternalHooks } from '@/external-hooks'; -import type { IWorkflowErrorData, UpdateExecutionPayload } from '@/interfaces'; +import type { UpdateExecutionPayload } from '@/interfaces'; import { NodeTypes } from '@/node-types'; import { Push } from '@/push'; -import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; -import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; +import { SecretsHelper } from '@/secrets-helpers.ee'; +import { UrlService } from '@/services/url.service'; +import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; +import { TaskRequester } from '@/task-runners/task-managers/task-requester'; +import { PermissionChecker } from '@/user-management/permission-checker'; +import { findSubworkflowStart } from '@/utils'; +import { objectToError } from '@/utils/object-to-error'; import * as WorkflowHelpers from '@/workflow-helpers'; -import { WorkflowRepository } from './databases/repositories/workflow.repository'; -import { EventService } from './events/event.service'; -import { restoreBinaryDataId } from './execution-lifecycle-hooks/restore-binary-data-id'; -import { saveExecutionProgress } from './execution-lifecycle-hooks/save-execution-progress'; -import { - determineFinalExecutionStatus, - prepareExecutionDataForDbUpdate, - updateExistingExecution, -} from './execution-lifecycle-hooks/shared/shared-hook-functions'; -import { toSaveSettings } from './execution-lifecycle-hooks/to-save-settings'; -import { SecretsHelper } from './secrets-helpers.ee'; -import { OwnershipService } from './services/ownership.service'; -import { UrlService } from './services/url.service'; -import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service'; -import { TaskRequester } from './task-runners/task-managers/task-requester'; -import { PermissionChecker } from './user-management/permission-checker'; -import { WorkflowExecutionService } from './workflows/workflow-execution.service'; -import { WorkflowStaticDataService } from './workflows/workflow-static-data.service'; - -export function objectToError(errorObject: unknown, workflow: Workflow): Error { - // TODO: Expand with other error types - if (errorObject instanceof Error) { - // If it's already an Error instance, return it as is. - return errorObject; - } else if ( - isObjectLiteral(errorObject) && - 'message' in errorObject && - typeof errorObject.message === 'string' - ) { - // If it's an object with a 'message' property, create a new Error instance. - let error: Error | undefined; - if ( - 'node' in errorObject && - isObjectLiteral(errorObject.node) && - typeof errorObject.node.name === 'string' - ) { - const node = workflow.getNode(errorObject.node.name); - - if (node) { - error = new NodeOperationError( - node, - errorObject as unknown as Error, - errorObject as object, - ); - } - } - - if (error === undefined) { - error = new Error(errorObject.message); - } - - if ('description' in errorObject) { - // @ts-expect-error Error descriptions are surfaced by the UI but - // not all backend errors account for this property yet. - error.description = errorObject.description as string; - } - - if ('stack' in errorObject) { - // If there's a 'stack' property, set it on the new Error instance. - error.stack = errorObject.stack as string; - } - - return error; - } else { - // If it's neither an Error nor an object with a 'message' property, create a generic Error. - return new Error('An error occurred'); - } -} - -/** - * Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects - * all the data and executes it - * - * @param {IWorkflowBase} workflowData The workflow which got executed - * @param {IRun} fullRunData The run which produced the error - * @param {WorkflowExecuteMode} mode The mode in which the workflow got started in - * @param {string} [executionId] The id the execution got saved as - */ -export function executeErrorWorkflow( - workflowData: IWorkflowBase, - fullRunData: IRun, - mode: WorkflowExecuteMode, - executionId?: string, - retryOf?: string, -): void { - const logger = Container.get(Logger); - - // Check if there was an error and if so if an errorWorkflow or a trigger is set - let pastExecutionUrl: string | undefined; - if (executionId !== undefined) { - pastExecutionUrl = `${Container.get(UrlService).getWebhookBaseUrl()}workflow/${ - workflowData.id - }/executions/${executionId}`; - } - - if (fullRunData.data.resultData.error !== undefined) { - let workflowErrorData: IWorkflowErrorData; - const workflowId = workflowData.id; - - if (executionId) { - // The error did happen in an execution - workflowErrorData = { - execution: { - id: executionId, - url: pastExecutionUrl, - error: fullRunData.data.resultData.error, - lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!, - mode, - retryOf, - }, - workflow: { - id: workflowId, - name: workflowData.name, - }, - }; - } else { - // The error did happen in a trigger - workflowErrorData = { - trigger: { - error: fullRunData.data.resultData.error, - mode, - }, - workflow: { - id: workflowId, - name: workflowData.name, - }, - }; - } - - const { errorTriggerType } = Container.get(GlobalConfig).nodes; - // Run the error workflow - // To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow. - const { errorWorkflow } = workflowData.settings ?? {}; - if (errorWorkflow && !(mode === 'error' && workflowId && errorWorkflow === workflowId)) { - logger.debug('Start external error workflow', { - executionId, - errorWorkflowId: errorWorkflow, - workflowId, - }); - // If a specific error workflow is set run only that one - - // First, do permission checks. - if (!workflowId) { - // Manual executions do not trigger error workflows - // So this if should never happen. It was added to - // make sure there are no possible security gaps - return; - } - - Container.get(OwnershipService) - .getWorkflowProjectCached(workflowId) - .then((project) => { - void Container.get(WorkflowExecutionService).executeErrorWorkflow( - errorWorkflow, - workflowErrorData, - project, - ); - }) - .catch((error: Error) => { - Container.get(ErrorReporter).error(error); - logger.error( - `Could not execute ErrorWorkflow for execution ID ${this.executionId} because of error querying the workflow owner`, - { - executionId, - errorWorkflowId: errorWorkflow, - workflowId, - error, - workflowErrorData, - }, - ); - }); - } else if ( - mode !== 'error' && - workflowId !== undefined && - workflowData.nodes.some((node) => node.type === errorTriggerType) - ) { - logger.debug('Start internal error workflow', { executionId, workflowId }); - void Container.get(OwnershipService) - .getWorkflowProjectCached(workflowId) - .then((project) => { - void Container.get(WorkflowExecutionService).executeErrorWorkflow( - workflowId, - workflowErrorData, - project, - ); - }); - } - } -} - -/** - * Returns hook functions to push data to Editor-UI - * - */ -function hookFunctionsPush(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const pushInstance = Container.get(Push); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { pushRef, executionId } = this; - // Push data to session which started workflow before each - // node which starts rendering - if (pushRef === undefined) { - return; - } - - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); - - pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { - const { pushRef, executionId } = this; - // Push data to session which started workflow after each rendered node - if (pushRef === undefined) { - return; - } - - logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId, - pushRef, - workflowId: this.workflowData.id, - }); - - pushInstance.send( - { type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, - pushRef, - ); - }, - ], - workflowExecuteBefore: [ - async function (this: WorkflowHooks, _workflow, data): Promise { - const { pushRef, executionId } = this; - const { id: workflowId, name: workflowName } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { - executionId, - pushRef, - workflowId, - }); - // Push data to session which started the workflow - if (pushRef === undefined) { - return; - } - pushInstance.send( - { - type: 'executionStarted', - data: { - executionId, - mode: this.mode, - startedAt: new Date(), - retryOf: this.retryOf, - workflowId, - workflowName, - flattedRunData: data?.resultData.runData - ? stringify(data.resultData.runData) - : stringify({}), - }, - }, - pushRef, - ); - }, - ], - workflowExecuteAfter: [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - const { pushRef, executionId } = this; - if (pushRef === undefined) return; - - const { id: workflowId } = this.workflowData; - logger.debug('Executing hook (hookFunctionsPush)', { - executionId, - pushRef, - workflowId, - }); - - const { status } = fullRunData; - if (status === 'waiting') { - pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); - } else { - const rawData = stringify(fullRunData.data); - pushInstance.send( - { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, - pushRef, - ); - } - }, - ], - }; -} - -export function hookFunctionsPreExecute(): IWorkflowExecuteHooks { - const externalHooks = Container.get(ExternalHooks); - return { - workflowExecuteBefore: [ - async function (this: WorkflowHooks, workflow: Workflow): Promise { - await externalHooks.run('workflow.preExecute', [workflow, this.mode]); - }, - ], - nodeExecuteAfter: [ - async function ( - this: WorkflowHooks, - nodeName: string, - data: ITaskData, - executionData: IRunExecutionData, - ): Promise { - await saveExecutionProgress( - this.workflowData, - this.executionId, - nodeName, - data, - executionData, - this.pushRef, - ); - }, - ], - }; -} - -/** - * Returns hook functions to save workflow execution and call error workflow - * - */ -function hookFunctionsSave(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [], - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSave)', { - executionId: this.executionId, - workflowId: this.workflowData.id, - }); - - await restoreBinaryDataId(fullRunData, this.executionId, this.mode); - - const isManualMode = this.mode === 'manual'; - - try { - if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - Container.get(ErrorReporter).error(e); - logger.error( - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, - { executionId: this.executionId, workflowId: this.workflowData.id }, - ); - } - } - - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - - const saveSettings = toSaveSettings(this.workflowData.settings); - - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await Container.get(ExecutionRepository).softDelete(this.executionId); - - return; - } - - const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); - - if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - - return; - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal: executionStatus, - retryOf: this.retryOf, - }); - - // When going into the waiting state, store the pushRef in the execution-data - if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = this.pushRef; - } - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - } catch (error) { - Container.get(ErrorReporter).error(error); - logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, { - executionId: this.executionId, - workflowId: this.workflowData.id, - error, - }); - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); - } - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], - }; -} - -/** - * Returns hook functions to save workflow execution and call error workflow - * for running with queues. Manual executions should never run on queues as - * they are always executed in the main process. - * - */ -function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { - const logger = Container.get(Logger); - const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); - return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [ - async function (): Promise { - const { executionId, workflowData } = this; - - eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); - }, - ], - workflowExecuteAfter: [ - async function ( - this: WorkflowHooks, - fullRunData: IRun, - newStaticData: IDataObject, - ): Promise { - logger.debug('Executing hook (hookFunctionsSaveWorker)', { - executionId: this.executionId, - workflowId: this.workflowData.id, - }); - - const isManualMode = this.mode === 'manual'; - - try { - if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { - // Workflow is saved so update in database - try { - await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id, - newStaticData, - ); - } catch (e) { - Container.get(ErrorReporter).error(e); - logger.error( - `There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, - { pushRef: this.pushRef, workflowId: this.workflowData.id }, - ); - } - } - - const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); - fullRunData.status = workflowStatusFinal; - - if ( - !isManualMode && - workflowStatusFinal !== 'success' && - workflowStatusFinal !== 'waiting' - ) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - - // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive - // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const fullExecutionData = prepareExecutionDataForDbUpdate({ - runData: fullRunData, - workflowData: this.workflowData, - workflowStatusFinal, - retryOf: this.retryOf, - }); - - // When going into the waiting state, store the pushRef in the execution-data - if (fullRunData.waitTill && isManualMode) { - fullExecutionData.data.pushRef = this.pushRef; - } - - await updateExistingExecution({ - executionId: this.executionId, - workflowId: this.workflowData.id, - executionData: fullExecutionData, - }); - } catch (error) { - if (!isManualMode) - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } finally { - workflowStatisticsService.emit('workflowExecutionCompleted', { - workflowData: this.workflowData, - fullRunData, - }); - } - }, - async function (this: WorkflowHooks, runData: IRun): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('workflow-post-execute', { - workflow, - executionId, - runData, - }); - }, - async function (this: WorkflowHooks, fullRunData: IRun) { - const externalHooks = Container.get(ExternalHooks); - if (externalHooks.exists('workflow.postExecute')) { - try { - await externalHooks.run('workflow.postExecute', [ - fullRunData, - this.workflowData, - this.executionId, - ]); - } catch (error) { - Container.get(ErrorReporter).error(error); - Container.get(Logger).error( - 'There was a problem running hook "workflow.postExecute"', - error, - ); - } - } - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); - }, - ], - }; -} - export async function getRunData( workflowData: IWorkflowBase, inputData?: INodeExecutionData[], @@ -1074,154 +441,3 @@ export async function getBase( eventService.emit(eventName, payload), }; } - -/** - * Returns WorkflowHooks instance for running integrated workflows - * (Workflows which get started inside of another workflow) - */ -function getWorkflowHooksIntegrated( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, -): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); -} - -/** - * Returns WorkflowHooks instance for worker in scaling mode. - */ -export function getWorkflowHooksWorkerExecuter( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, -): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsSaveWorker(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { - const pushHooks = hookFunctionsPush(); - for (const key of Object.keys(pushHooks)) { - if (hookFunctions[key] === undefined) { - hookFunctions[key] = []; - } - // eslint-disable-next-line prefer-spread - hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]); - } - } - - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); -} - -/** - * Returns WorkflowHooks instance for main process if workflow runs via worker - */ -export function getWorkflowHooksWorkerMain( - mode: WorkflowExecuteMode, - executionId: string, - workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, -): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsPreExecute(); - - // TODO: why are workers pushing to frontend? - // TODO: simplifying this for now to just leave the bare minimum hooks - - // const hookFunctions = hookFunctionsPush(); - // const preExecuteFunctions = hookFunctionsPreExecute(); - // for (const key of Object.keys(preExecuteFunctions)) { - // if (hookFunctions[key] === undefined) { - // hookFunctions[key] = []; - // } - // hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); - // } - - // When running with worker mode, main process executes - // Only workflowExecuteBefore + workflowExecuteAfter - // So to avoid confusion, we are removing other hooks. - hookFunctions.nodeExecuteBefore = []; - hookFunctions.nodeExecuteAfter = []; - hookFunctions.workflowExecuteAfter = [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - // Don't delete executions before they are finished - if (!fullRunData.finished) return; - - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - - const saveSettings = toSaveSettings(this.workflowData.settings); - - const isManualMode = this.mode === 'manual'; - - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await Container.get(ExecutionRepository).softDelete(this.executionId); - - return; - } - - const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); - - if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - } - }, - ]; - - return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); -} - -/** - * Returns WorkflowHooks instance for running the main workflow - * - */ -export function getWorkflowHooksMain( - data: IWorkflowExecutionDataProcess, - executionId: string, -): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const pushFunctions = hookFunctionsPush(); - for (const key of Object.keys(pushFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], pushFunctions[key]); - } - - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = []; - if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = []; - - return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { - pushRef: data.pushRef, - retryOf: data.retryOf as string, - }); -} diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 598e6a8b58..148df7edcd 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -20,7 +20,15 @@ import PCancelable from 'p-cancelable'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; +import { EventService } from '@/events/event.service'; +import { + getWorkflowHooksMain, + getWorkflowHooksWorkerExecuter, + getWorkflowHooksWorkerMain, +} from '@/execution-lifecycle/execution-lifecycle-hooks'; import { ExternalHooks } from '@/external-hooks'; +import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import type { ScalingService } from '@/scaling/scaling.service'; import type { Job, JobData } from '@/scaling/scaling.types'; @@ -29,10 +37,6 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da import { generateFailedExecutionFromError } from '@/workflow-helpers'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; -import { ExecutionNotFoundError } from './errors/execution-not-found-error'; -import { EventService } from './events/event.service'; -import { ManualExecutionService } from './manual-execution.service'; - @Service() export class WorkflowRunner { private scalingService: ScalingService; @@ -138,7 +142,7 @@ export class WorkflowRunner { } catch (error) { // Create a failed execution with the data for the node, save it and abort execution const runData = generateFailedExecutionFromError(data.executionMode, error, error.node); - const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); + const workflowHooks = getWorkflowHooksMain(data, executionId); await workflowHooks.executeHookFunctions('workflowExecuteBefore', [ undefined, data.executionData, @@ -267,7 +271,7 @@ export class WorkflowRunner { await this.executionRepository.setRunning(executionId); // write try { - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); + additionalData.hooks = getWorkflowHooksMain(data, executionId); additionalData.hooks.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { @@ -368,12 +372,9 @@ export class WorkflowRunner { try { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); - hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain( - data.executionMode, - executionId, - data.workflowData, - { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, - ); + hooks = getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { + retryOf: data.retryOf ? data.retryOf.toString() : undefined, + }); // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. @@ -381,7 +382,7 @@ export class WorkflowRunner { } catch (error) { // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooks = getWorkflowHooksWorkerExecuter( data.executionMode, executionId, data.workflowData, @@ -399,7 +400,7 @@ export class WorkflowRunner { // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooksWorker = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooksWorker = getWorkflowHooksWorkerExecuter( data.executionMode, executionId, data.workflowData, @@ -417,7 +418,7 @@ export class WorkflowRunner { } catch (error) { // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. - const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + const hooks = getWorkflowHooksWorkerExecuter( data.executionMode, executionId, data.workflowData,