🐛 Fixed an issue with queue mode for executions that should not be saved (#1509)

* Fixed an issue with queue mode for executions that should not be saved

*  Minimal change

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue 2021-03-10 15:51:18 +01:00 committed by GitHub
parent 3d80129a28
commit d6d57c2df6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 2 deletions

View file

@ -132,7 +132,7 @@ export class Worker extends Command {
const credentials = await WorkflowCredentials(currentExecutionDb.workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksIntegrated(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string });
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string });
let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>;

View file

@ -353,7 +353,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
// Data is always saved, so we remove from database
Db.collections.Execution!.delete(this.executionId);
await Db.collections.Execution!.delete(this.executionId);
return;
}
@ -399,6 +399,77 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}
/**
* Returns hook functions to save workflow execution and call error workflow
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
try {
if (WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
// TODO: Add proper logging!
console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`);
}
}
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (workflowDidSucceed === false && saveDataErrorExecution === 'none') {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: this.workflowData,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId });
}
} catch (error) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
},
],
};
}
export async function getRunData(workflowData: IWorkflowBase, inputData?: INodeExecutionData[]): Promise<IWorkflowExecutionDataProcess> {
const mode = 'integrated';
@ -613,6 +684,22 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksWorkerExecuter(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSaveWorker();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker

View file

@ -346,7 +346,27 @@ export class WorkflowRunner {
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.
hooks.executeHookFunctions('workflowExecuteAfter', [runData]);
try {
// Check if this execution data has to be removed from database
// based on workflow settings.
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
if (data.workflowData.settings !== undefined) {
saveDataErrorExecution = (data.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution = (data.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution;
}
const workflowDidSucceed = !runData.data.resultData.error;
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
) {
await Db.collections.Execution!.delete(executionId);
}
} catch (err) {
// We don't want errors here to crash n8n. Just log and proceed.
console.log('Error removing saved execution from database. More details: ', err);
}
resolve(runData);
});

View file

@ -41,8 +41,18 @@ export class WorkflowRunnerProcess {
workflowExecute: WorkflowExecute | undefined;
executionIdCallback: (executionId: string) => void | undefined;
static async stopProcess() {
setTimeout(() => {
// Attempt a graceful shutdown, giving executions 30 seconds to finish
process.exit(0);
}, 30000);
}
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
process.on('SIGTERM', WorkflowRunnerProcess.stopProcess);
process.on('SIGINT', WorkflowRunnerProcess.stopProcess);
this.data = inputData;
let className: string;
let tempNode: INodeType;

View file

@ -175,6 +175,10 @@ import {
IDataObject,
} from 'n8n-workflow';
import {
range as _range,
} from 'lodash';
import mixins from 'vue-typed-mixins';
export default mixins(
@ -433,8 +437,24 @@ export default mixins(
this.$store.commit('setActiveExecutions', results[1]);
const alreadyPresentExecutionIds = this.finishedExecutions.map(exec => exec.id);
let lastId = 0;
const gaps = [] as number[];
for(let i = results[0].results.length - 1; i >= 0; i--) {
const currentItem = results[0].results[i];
const currentId = parseInt(currentItem.id, 10);
if (lastId !== 0 && isNaN(currentId) === false) {
// We are doing this iteration to detect possible gaps.
// The gaps are used to remove executions that finished
// and were deleted from database but were displaying
// in this list while running.
if (currentId - lastId > 1) {
// We have some gaps.
const range = _range(lastId + 1, currentId);
gaps.push(...range);
}
}
lastId = parseInt(currentItem.id, 10) || 0;
// Check new results from end to start
// Add new items accordingly.
const executionIndex = alreadyPresentExecutionIds.indexOf(currentItem.id);
@ -464,6 +484,7 @@ export default mixins(
this.finishedExecutions.unshift(currentItem);
}
}
this.finishedExecutions = this.finishedExecutions.filter(execution => !gaps.includes(parseInt(execution.id, 10)) && lastId >= parseInt(execution.id, 10));
this.finishedExecutionsCount = results[0].count;
},
async loadFinishedExecutions (): Promise<void> {