Added on/off to save data after each step, saving initial data and retries working

This commit is contained in:
Omar Ajoue 2020-12-29 18:45:07 +01:00
parent 826a72d82a
commit ca73dc9243
8 changed files with 81 additions and 43 deletions

View file

@ -201,6 +201,12 @@ const config = convict({
default: 'all', default: 'all',
env: 'EXECUTIONS_DATA_SAVE_ON_SUCCESS', env: 'EXECUTIONS_DATA_SAVE_ON_SUCCESS',
}, },
saveExecutionProgress: {
doc: 'Wether or not to save progress for each node executed',
format: 'Boolean',
default: false,
env: 'EXECUTIONS_DATA_SAVE_ON_PROGRESS',
},
// If the executions of workflows which got started via the editor // If the executions of workflows which got started via the editor
// should be saved. By default they will not be saved as this runs // should be saved. By default they will not be saved as this runs

View file

@ -37,9 +37,9 @@ export class ActiveExecutions {
* @memberof ActiveExecutions * @memberof ActiveExecutions
*/ */
async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): Promise<string> { async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): Promise<string> {
const fullExecutionData: IExecutionDb = { const fullExecutionData: IExecutionDb = {
data: executionData.executionData!, // this is only empty for CLI executions but works fine. data: executionData.executionData!,
mode: executionData.executionMode, mode: executionData.executionMode,
finished: false, finished: false,
startedAt: new Date(), startedAt: new Date(),

View file

@ -1524,12 +1524,14 @@ class App {
workflowData: fullExecutionData.workflowData, workflowData: fullExecutionData.workflowData,
}; };
const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string; const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string | undefined;
// Remove the old error and the data of the last run of the node that it can be replaced
delete data!.executionData!.resultData.error;
data!.executionData!.resultData.runData[lastNodeExecuted].pop();
if (lastNodeExecuted) {
// Remove the old error and the data of the last run of the node that it can be replaced
delete data!.executionData!.resultData.error;
data!.executionData!.resultData.runData[lastNodeExecuted].pop();
}
if (req.body.loadWorkflow === true) { if (req.body.loadWorkflow === true) {
// Loads the currently saved workflow to execute instead of the // Loads the currently saved workflow to execute instead of the
// one saved at the time of the execution. // one saved at the time of the execution.

View file

@ -212,19 +212,18 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
await externalHooks.run('workflow.preExecute', [workflow, this.mode]); await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
}, },
], ],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [ nodeExecuteAfter: [
async function (nodeName: string, data: ITaskData): Promise<void> { async function (nodeName: string, data: ITaskData, executionStack: IExecuteData[]): Promise<void> {
if (this.workflowData.settings !== undefined) {
if (this.workflowData.settings.saveExecutionProgress === false) {
return;
} else if (this.workflowData.settings.saveExecutionProgress !== true && !config.get('executions.saveExecutionProgress') as boolean) {
return;
}
} else if (!config.get('executions.saveExecutionProgress') as boolean) {
return;
}
const execution = await Db.collections.Execution!.findOne(this.executionId); const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) { if (execution === undefined) {
@ -234,6 +233,13 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
} }
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution); const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) { if (fullExecutionData.data === undefined) {
fullExecutionData.data = { fullExecutionData.data = {
@ -258,20 +264,29 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
// Initialize array and save data // Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data]; fullExecutionData.data.resultData.runData[nodeName] = [data];
} }
console.log("====================================");
console.log("Full exec data:");
console.log(fullExecutionData);
fullExecutionData.data.executionData!.nodeExecutionStack = executionStack;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
console.log("Would try to save:"); await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
console.log(executionData); },
console.log("===================================="); ]
};
}
// await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); /**
} * Returns hook functions to save workflow execution and call error workflow
], *
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [], workflowExecuteBefore: [],
workflowExecuteAfter: [ workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> { async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
@ -345,11 +360,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
console.log("====================================");
console.log("Saving final execution data:");
console.log(fullExecutionData);
console.log("====================================");
// Save the Execution in DB // Save the Execution in DB
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);

View file

@ -154,7 +154,7 @@ export class WorkflowRunner {
workflowExecution = workflowExecute.processRunExecutionData(workflow); workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) { } else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
// Execute all nodes // Execute all nodes
// Can execute without webhook so go on // Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode); workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);

View file

@ -2,6 +2,7 @@
import { import {
CredentialsOverwrites, CredentialsOverwrites,
CredentialTypes, CredentialTypes,
Db,
ExternalHooks, ExternalHooks,
IWorkflowExecutionDataProcessWithExecution, IWorkflowExecutionDataProcessWithExecution,
NodeTypes, NodeTypes,
@ -15,6 +16,7 @@ import {
import { import {
IDataObject, IDataObject,
IExecuteData,
IExecutionError, IExecutionError,
INodeType, INodeType,
INodeTypeData, INodeTypeData,
@ -25,6 +27,8 @@ import {
WorkflowHooks, WorkflowHooks,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as config from '../config';
export class WorkflowRunnerProcess { export class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined; data: IWorkflowExecutionDataProcessWithExecution | undefined;
startedAt = new Date(); startedAt = new Date();
@ -74,6 +78,19 @@ export class WorkflowRunnerProcess {
const externalHooks = ExternalHooks(); const externalHooks = ExternalHooks();
await externalHooks.init(); await externalHooks.init();
// This code has been split into 3 ifs just to make it easier to understand
// Can be made smaller but in the end it will make it impossible to read.
if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress === true) {
// Workflow settings specifying it should save
await Db.init();
} else if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress !== false && config.get('executions.saveExecutionProgress') as boolean) {
// Workflow settings not saying anything about saving but default settings says so
await Db.init();
} else if (inputData.workflowData.settings === undefined && config.get('executions.saveExecutionProgress') as boolean) {
// Workflow settings not saying anything about saving but default settings says so
await Db.init();
}
this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings}); this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings});
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials); const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials);
additionalData.hooks = this.getProcessForwardHooks(); additionalData.hooks = this.getProcessForwardHooks();
@ -83,7 +100,7 @@ export class WorkflowRunnerProcess {
return this.workflowExecute.processRunExecutionData(this.workflow); return this.workflowExecute.processRunExecutionData(this.workflow);
} else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) { } else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) {
// Execute all nodes // Execute all nodes
// Can execute without webhook so go on // Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode); return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode);
@ -134,8 +151,8 @@ export class WorkflowRunnerProcess {
}, },
], ],
nodeExecuteAfter: [ nodeExecuteAfter: [
async (nodeName: string, data: ITaskData): Promise<void> => { async (nodeName: string, data: ITaskData, executionStack: IExecuteData[]): Promise<void> => {
this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data, executionStack]);
}, },
], ],
workflowExecuteBefore: [ workflowExecuteBefore: [

View file

@ -689,7 +689,7 @@ export class WorkflowExecute {
// Add the execution data again so that it can get restarted // Add the execution data again so that it can get restarted
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData.executionData!.nodeExecutionStack]);
break; break;
} }
@ -700,8 +700,6 @@ export class WorkflowExecute {
'main': nodeSuccessData, 'main': nodeSuccessData,
} as ITaskDataConnections); } as ITaskDataConnections);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
this.runExecutionData.resultData.runData[executionNode.name].push(taskData); this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) {
@ -736,6 +734,12 @@ export class WorkflowExecute {
} }
} }
} }
// Await is needed to make sure that we don't fall into concurrency problems
// When saving node execution data
await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData.executionData!.nodeExecutionStack]);
} }
return Promise.resolve(); return Promise.resolve();
@ -760,7 +764,6 @@ export class WorkflowExecute {
// Static data of workflow changed // Static data of workflow changed
newStaticData = workflow.staticData; newStaticData = workflow.staticData;
} }
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(error => { await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(error => {
console.error('There was a problem running hook "workflowExecuteAfter"', error); console.error('There was a problem running hook "workflowExecuteAfter"', error);
}); });

View file

@ -713,7 +713,7 @@ export interface IWorkflowCredentials {
export interface IWorkflowExecuteHooks { export interface IWorkflowExecuteHooks {
[key: string]: Array<((...args: any[]) => Promise<void>)> | undefined; // tslint:disable-line:no-any [key: string]: Array<((...args: any[]) => Promise<void>)> | undefined; // tslint:disable-line:no-any
nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise<void>)>; nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData, executionStack: IExecuteData[]) => Promise<void>)>;
nodeExecuteBefore?: Array<((nodeName: string) => Promise<void>)>; nodeExecuteBefore?: Array<((nodeName: string) => Promise<void>)>;
workflowExecuteAfter?: Array<((data: IRun, newStaticData: IDataObject) => Promise<void>)>; workflowExecuteAfter?: Array<((data: IRun, newStaticData: IDataObject) => Promise<void>)>;
workflowExecuteBefore?: Array<((workflow: Workflow, data: IRunExecutionData) => Promise<void>)>; workflowExecuteBefore?: Array<((workflow: Workflow, data: IRunExecutionData) => Promise<void>)>;