From d6d57c2df6c2b6e1a48273030666192b09e5f509 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Wed, 10 Mar 2021 15:51:18 +0100 Subject: [PATCH] :bug: Fixed an issue with queue mode for executions that should not be saved (#1509) * Fixed an issue with queue mode for executions that should not be saved * :zap: Minimal change Co-authored-by: Jan Oberhauser --- packages/cli/commands/worker.ts | 2 +- .../cli/src/WorkflowExecuteAdditionalData.ts | 89 ++++++++++++++++++- packages/cli/src/WorkflowRunner.ts | 20 +++++ packages/cli/src/WorkflowRunnerProcess.ts | 10 +++ .../src/components/ExecutionsList.vue | 21 +++++ 5 files changed, 140 insertions(+), 2 deletions(-) diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index aeb3b2ba08..b2b78c4c7a 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -132,7 +132,7 @@ export class Worker extends Command { const credentials = await WorkflowCredentials(currentExecutionDb.workflowData.nodes); const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksIntegrated(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string }); + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string }); let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index b40491cea4..562169980b 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -353,7 +353,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); } // Data is always saved, so we remove from database - Db.collections.Execution!.delete(this.executionId); + await Db.collections.Execution!.delete(this.executionId); return; } @@ -399,6 +399,77 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { } +/** + * Returns hook functions to save workflow execution and call error workflow + * for running with queues. Manual executions should never run on queues as + * they are always executed in the main process. + * + * @returns {IWorkflowExecuteHooks} + */ +function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { + return { + nodeExecuteBefore: [], + nodeExecuteAfter: [], + workflowExecuteBefore: [], + workflowExecuteAfter: [ + async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise { + try { + if (WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) { + // Workflow is saved so update in database + try { + await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData); + } catch (e) { + // TODO: Add proper logging! + console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`); + } + } + + // Check config to know if execution should be saved or not + let saveDataErrorExecution = config.get('executions.saveDataOnError') as string; + if (this.workflowData.settings !== undefined) { + saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution; + } + + const workflowDidSucceed = !fullRunData.data.resultData.error; + if (workflowDidSucceed === false && saveDataErrorExecution === 'none') { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + } + + const fullExecutionData: IExecutionDb = { + data: fullRunData.data, + mode: fullRunData.mode, + finished: fullRunData.finished ? fullRunData.finished : false, + startedAt: fullRunData.startedAt, + stoppedAt: fullRunData.stoppedAt, + workflowData: this.workflowData, + }; + + if (this.retryOf !== undefined) { + fullExecutionData.retryOf = this.retryOf.toString(); + } + + if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) { + fullExecutionData.workflowId = this.workflowData.id.toString(); + } + + const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); + + // Save the Execution in DB + await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb); + + if (fullRunData.finished === true && this.retryOf !== undefined) { + // If the retry was successful save the reference it on the original execution + // await Db.collections.Execution!.save(executionData as IExecutionFlattedDb); + await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId }); + } + } catch (error) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + } + }, + ], + }; +} + export async function getRunData(workflowData: IWorkflowBase, inputData?: INodeExecutionData[]): Promise { const mode = 'integrated'; @@ -613,6 +684,22 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); } +/** + * Returns WorkflowHooks instance for running integrated workflows + * (Workflows which get started inside of another workflow) + */ +export function getWorkflowHooksWorkerExecuter(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks { + optionalParameters = optionalParameters || {}; + const hookFunctions = hookFunctionsSaveWorker(); + const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); + for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); +} /** * Returns WorkflowHooks instance for main process if workflow runs via worker diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 0cf3a560f6..f884ac5dcc 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -346,7 +346,27 @@ export class WorkflowRunner { // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. hooks.executeHookFunctions('workflowExecuteAfter', [runData]); + try { + // Check if this execution data has to be removed from database + // based on workflow settings. + let saveDataErrorExecution = config.get('executions.saveDataOnError') as string; + let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string; + if (data.workflowData.settings !== undefined) { + saveDataErrorExecution = (data.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution; + saveDataSuccessExecution = (data.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution; + } + const workflowDidSucceed = !runData.data.resultData.error; + if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' || + workflowDidSucceed === false && saveDataErrorExecution === 'none' + ) { + await Db.collections.Execution!.delete(executionId); + } + } catch (err) { + // We don't want errors here to crash n8n. Just log and proceed. + console.log('Error removing saved execution from database. More details: ', err); + } + resolve(runData); }); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 50c623beef..f5305e29e8 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -41,8 +41,18 @@ export class WorkflowRunnerProcess { workflowExecute: WorkflowExecute | undefined; executionIdCallback: (executionId: string) => void | undefined; + static async stopProcess() { + setTimeout(() => { + // Attempt a graceful shutdown, giving executions 30 seconds to finish + process.exit(0); + }, 30000); + } + async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise { + process.on('SIGTERM', WorkflowRunnerProcess.stopProcess); + process.on('SIGINT', WorkflowRunnerProcess.stopProcess); + this.data = inputData; let className: string; let tempNode: INodeType; diff --git a/packages/editor-ui/src/components/ExecutionsList.vue b/packages/editor-ui/src/components/ExecutionsList.vue index 051f85513c..1c0dfd80e3 100644 --- a/packages/editor-ui/src/components/ExecutionsList.vue +++ b/packages/editor-ui/src/components/ExecutionsList.vue @@ -175,6 +175,10 @@ import { IDataObject, } from 'n8n-workflow'; +import { + range as _range, +} from 'lodash'; + import mixins from 'vue-typed-mixins'; export default mixins( @@ -433,8 +437,24 @@ export default mixins( this.$store.commit('setActiveExecutions', results[1]); const alreadyPresentExecutionIds = this.finishedExecutions.map(exec => exec.id); + let lastId = 0; + const gaps = [] as number[]; for(let i = results[0].results.length - 1; i >= 0; i--) { const currentItem = results[0].results[i]; + const currentId = parseInt(currentItem.id, 10); + if (lastId !== 0 && isNaN(currentId) === false) { + // We are doing this iteration to detect possible gaps. + // The gaps are used to remove executions that finished + // and were deleted from database but were displaying + // in this list while running. + if (currentId - lastId > 1) { + // We have some gaps. + const range = _range(lastId + 1, currentId); + gaps.push(...range); + } + } + lastId = parseInt(currentItem.id, 10) || 0; + // Check new results from end to start // Add new items accordingly. const executionIndex = alreadyPresentExecutionIds.indexOf(currentItem.id); @@ -464,6 +484,7 @@ export default mixins( this.finishedExecutions.unshift(currentItem); } } + this.finishedExecutions = this.finishedExecutions.filter(execution => !gaps.includes(parseInt(execution.id, 10)) && lastId >= parseInt(execution.id, 10)); this.finishedExecutionsCount = results[0].count; }, async loadFinishedExecutions (): Promise {