From ca73dc924337b29a4a2cd4530ab059a1820fab82 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Tue, 29 Dec 2020 18:45:07 +0100 Subject: [PATCH] Added on/off to save data after each step, saving initial data and retries working --- packages/cli/config/index.ts | 6 ++ packages/cli/src/ActiveExecutions.ts | 4 +- packages/cli/src/Server.ts | 12 ++-- .../cli/src/WorkflowExecuteAdditionalData.ts | 64 +++++++++++-------- packages/cli/src/WorkflowRunner.ts | 2 +- packages/cli/src/WorkflowRunnerProcess.ts | 23 ++++++- packages/core/src/WorkflowExecute.ts | 11 ++-- packages/workflow/src/Interfaces.ts | 2 +- 8 files changed, 81 insertions(+), 43 deletions(-) diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index 5cbacddf85..e0c28c13a7 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -201,6 +201,12 @@ const config = convict({ default: 'all', env: 'EXECUTIONS_DATA_SAVE_ON_SUCCESS', }, + saveExecutionProgress: { + doc: 'Wether or not to save progress for each node executed', + format: 'Boolean', + default: false, + env: 'EXECUTIONS_DATA_SAVE_ON_PROGRESS', + }, // If the executions of workflows which got started via the editor // should be saved. By default they will not be saved as this runs diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index f2970e9e41..c802b3afd0 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -37,9 +37,9 @@ export class ActiveExecutions { * @memberof ActiveExecutions */ async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): Promise { - + const fullExecutionData: IExecutionDb = { - data: executionData.executionData!, // this is only empty for CLI executions but works fine. + data: executionData.executionData!, mode: executionData.executionMode, finished: false, startedAt: new Date(), diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 9ea91ca4e1..f56d86f892 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1524,12 +1524,14 @@ class App { workflowData: fullExecutionData.workflowData, }; - const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string; - - // Remove the old error and the data of the last run of the node that it can be replaced - delete data!.executionData!.resultData.error; - data!.executionData!.resultData.runData[lastNodeExecuted].pop(); + const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string | undefined; + if (lastNodeExecuted) { + // Remove the old error and the data of the last run of the node that it can be replaced + delete data!.executionData!.resultData.error; + data!.executionData!.resultData.runData[lastNodeExecuted].pop(); + } + if (req.body.loadWorkflow === true) { // Loads the currently saved workflow to execute instead of the // one saved at the time of the execution. diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 6f60520fd6..38916e89dc 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -212,19 +212,18 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx await externalHooks.run('workflow.preExecute', [workflow, this.mode]); }, ], - }; -} - -/** - * Returns hook functions to save workflow execution and call error workflow - * - * @returns {IWorkflowExecuteHooks} - */ -function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { - return { - nodeExecuteBefore: [], nodeExecuteAfter: [ - async function (nodeName: string, data: ITaskData): Promise { + async function (nodeName: string, data: ITaskData, executionStack: IExecuteData[]): Promise { + if (this.workflowData.settings !== undefined) { + if (this.workflowData.settings.saveExecutionProgress === false) { + return; + } else if (this.workflowData.settings.saveExecutionProgress !== true && !config.get('executions.saveExecutionProgress') as boolean) { + return; + } + } else if (!config.get('executions.saveExecutionProgress') as boolean) { + return; + } + const execution = await Db.collections.Execution!.findOne(this.executionId); if (execution === undefined) { @@ -234,6 +233,13 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { } const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution); + if (fullExecutionData.finished) { + // We already received ´workflowExecuteAfter´ webhook, so this is just an async call + // that was left behind. We skip saving because the other call should have saved everything + // so this one is safe to ignore + return; + } + if (fullExecutionData.data === undefined) { fullExecutionData.data = { @@ -258,20 +264,29 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { // Initialize array and save data fullExecutionData.data.resultData.runData[nodeName] = [data]; } - - console.log("===================================="); - console.log("Full exec data:"); - console.log(fullExecutionData); + fullExecutionData.data.executionData!.nodeExecutionStack = executionStack; + + // Set last executed node so that it may resume on failure + fullExecutionData.data.resultData.lastNodeExecuted = nodeName; + const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); - console.log("Would try to save:"); - console.log(executionData); - console.log("===================================="); + await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); + }, + ] + }; +} - // await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); - } - ], +/** + * Returns hook functions to save workflow execution and call error workflow + * + * @returns {IWorkflowExecuteHooks} + */ +function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { + return { + nodeExecuteBefore: [], + nodeExecuteAfter: [], workflowExecuteBefore: [], workflowExecuteAfter: [ async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise { @@ -345,11 +360,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); - console.log("===================================="); - console.log("Saving final execution data:"); - console.log(fullExecutionData); - console.log("===================================="); - // Save the Execution in DB await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index f2476c97b0..7e97d0365a 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -154,7 +154,7 @@ export class WorkflowRunner { workflowExecution = workflowExecute.processRunExecutionData(workflow); } else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) { // Execute all nodes - + // Can execute without webhook so go on const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 8fc749b4b8..3f3fb4c1af 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -2,6 +2,7 @@ import { CredentialsOverwrites, CredentialTypes, + Db, ExternalHooks, IWorkflowExecutionDataProcessWithExecution, NodeTypes, @@ -15,6 +16,7 @@ import { import { IDataObject, + IExecuteData, IExecutionError, INodeType, INodeTypeData, @@ -25,6 +27,8 @@ import { WorkflowHooks, } from 'n8n-workflow'; +import * as config from '../config'; + export class WorkflowRunnerProcess { data: IWorkflowExecutionDataProcessWithExecution | undefined; startedAt = new Date(); @@ -74,6 +78,19 @@ export class WorkflowRunnerProcess { const externalHooks = ExternalHooks(); await externalHooks.init(); + // This code has been split into 3 ifs just to make it easier to understand + // Can be made smaller but in the end it will make it impossible to read. + if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress === true) { + // Workflow settings specifying it should save + await Db.init(); + } else if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress !== false && config.get('executions.saveExecutionProgress') as boolean) { + // Workflow settings not saying anything about saving but default settings says so + await Db.init(); + } else if (inputData.workflowData.settings === undefined && config.get('executions.saveExecutionProgress') as boolean) { + // Workflow settings not saying anything about saving but default settings says so + await Db.init(); + } + this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings}); const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials); additionalData.hooks = this.getProcessForwardHooks(); @@ -83,7 +100,7 @@ export class WorkflowRunnerProcess { return this.workflowExecute.processRunExecutionData(this.workflow); } else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) { // Execute all nodes - + // Can execute without webhook so go on this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode); @@ -134,8 +151,8 @@ export class WorkflowRunnerProcess { }, ], nodeExecuteAfter: [ - async (nodeName: string, data: ITaskData): Promise => { - this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); + async (nodeName: string, data: ITaskData, executionStack: IExecuteData[]): Promise => { + this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data, executionStack]); }, ], workflowExecuteBefore: [ diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index c08ea2a020..410e489b4e 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -689,7 +689,7 @@ export class WorkflowExecute { // Add the execution data again so that it can get restarted this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); - this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); + this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData.executionData!.nodeExecutionStack]); break; } @@ -700,8 +700,6 @@ export class WorkflowExecute { 'main': nodeSuccessData, } as ITaskDataConnections); - this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); - this.runExecutionData.resultData.runData[executionNode.name].push(taskData); if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { @@ -736,6 +734,12 @@ export class WorkflowExecute { } } } + + + // Await is needed to make sure that we don't fall into concurrency problems + // When saving node execution data + await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData.executionData!.nodeExecutionStack]); + } return Promise.resolve(); @@ -760,7 +764,6 @@ export class WorkflowExecute { // Static data of workflow changed newStaticData = workflow.staticData; } - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(error => { console.error('There was a problem running hook "workflowExecuteAfter"', error); }); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index d0f79f8e00..bee3fedaa2 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -713,7 +713,7 @@ export interface IWorkflowCredentials { export interface IWorkflowExecuteHooks { [key: string]: Array<((...args: any[]) => Promise)> | undefined; // tslint:disable-line:no-any - nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise)>; + nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData, executionStack: IExecuteData[]) => Promise)>; nodeExecuteBefore?: Array<((nodeName: string) => Promise)>; workflowExecuteAfter?: Array<((data: IRun, newStaticData: IDataObject) => Promise)>; workflowExecuteBefore?: Array<((workflow: Workflow, data: IRunExecutionData) => Promise)>;