n8n/packages/cli/src/WorkflowExecuteAdditionalData.ts

657 lines
22 KiB
TypeScript
Raw Normal View History

2019-06-23 03:35:23 -07:00
import {
ActiveExecutions,
CredentialsHelper,
2019-06-23 03:35:23 -07:00
Db,
ExternalHooks,
2019-06-23 03:35:23 -07:00
IExecutionDb,
IExecutionFlattedDb,
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
IExecutionResponse,
2019-06-23 03:35:23 -07:00
IPushDataExecutionFinished,
IWorkflowBase,
IWorkflowExecutionDataProcess,
2019-12-19 14:07:55 -08:00
NodeTypes,
2019-06-23 03:35:23 -07:00
Push,
ResponseHelper,
WebhookHelpers,
WorkflowCredentials,
2019-06-23 03:35:23 -07:00
WorkflowHelpers,
} from './';
import {
UserSettings,
2019-12-19 14:07:55 -08:00
WorkflowExecute,
} from 'n8n-core';
2019-06-23 03:35:23 -07:00
import {
IDataObject,
2019-12-19 14:07:55 -08:00
IExecuteData,
IExecuteWorkflowInfo,
2019-12-19 14:07:55 -08:00
INode,
INodeExecutionData,
2020-10-22 06:46:03 -07:00
INodeParameters,
2019-06-23 03:35:23 -07:00
IRun,
2019-12-19 14:07:55 -08:00
IRunExecutionData,
2019-06-23 03:35:23 -07:00
ITaskData,
IWorkflowCredentials,
2019-06-23 03:35:23 -07:00
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
2019-12-19 14:07:55 -08:00
IWorkflowHooksOptionalParameters,
Workflow,
2019-06-23 03:35:23 -07:00
WorkflowExecuteMode,
2019-12-19 14:07:55 -08:00
WorkflowHooks,
2019-06-23 03:35:23 -07:00
} from 'n8n-workflow';
import * as config from '../config';
2019-06-23 03:35:23 -07:00
import { LessThanOrEqual } from "typeorm";
const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string;
2019-06-23 03:35:23 -07:00
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
2019-06-23 03:35:23 -07:00
* all the data and executes it
*
* @param {IWorkflowBase} workflowData The workflow which got executed
* @param {IRun} fullRunData The run which produced the error
2019-12-19 14:07:55 -08:00
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
2019-06-23 03:35:23 -07:00
* @param {string} [executionId] The id the execution got saved as
*/
function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mode: WorkflowExecuteMode, executionId?: string, retryOf?: string): void {
// Check if there was an error and if so if an errorWorkflow or a trigger is set
let pastExecutionUrl: string | undefined = undefined;
if (executionId !== undefined) {
pastExecutionUrl = `${WebhookHelpers.getWebhookBaseUrl()}execution/${executionId}`;
}
if (fullRunData.data.resultData.error !== undefined) {
2019-06-23 03:35:23 -07:00
const workflowErrorData = {
execution: {
id: executionId,
url: pastExecutionUrl,
2019-06-23 03:35:23 -07:00
error: fullRunData.data.resultData.error,
lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!,
mode,
retryOf,
2019-06-23 03:35:23 -07:00
},
workflow: {
id: workflowData.id !== undefined ? workflowData.id.toString() as string : undefined,
name: workflowData.name,
2020-10-22 06:46:03 -07:00
},
2019-06-23 03:35:23 -07:00
};
2019-06-23 03:35:23 -07:00
// Run the error workflow
// To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow.
2020-12-12 11:59:17 -08:00
if (workflowData.settings !== undefined && workflowData.settings.errorWorkflow && !(mode === 'error' && workflowData.id && workflowData.settings.errorWorkflow.toString() === workflowData.id.toString())) {
// If a specific error workflow is set run only that one
WorkflowHelpers.executeErrorWorkflow(workflowData.settings.errorWorkflow as string, workflowErrorData);
} else if (mode !== 'error' && workflowData.id !== undefined && workflowData.nodes.some((node) => node.type === ERROR_TRIGGER_TYPE)) {
// If the workflow contains
WorkflowHelpers.executeErrorWorkflow(workflowData.id.toString(), workflowErrorData);
}
2019-06-23 03:35:23 -07:00
}
}
/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
*
*/
let throttling = false;
function pruneExecutionData(): void {
if (!throttling) {
throttling = true;
const timeout = config.get('executions.pruneDataTimeout') as number; // in seconds
const maxAge = config.get('executions.pruneDataMaxAge') as number; // in h
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);
// throttle just on success to allow for self healing on failure
Db.collections.Execution!.delete({ stoppedAt: LessThanOrEqual(date.toISOString()) })
.then(data =>
setTimeout(() => {
throttling = false;
}, timeout * 1000)
).catch(err => throttling = false);
}
}
2019-06-23 03:35:23 -07:00
/**
2019-12-19 14:07:55 -08:00
* Returns hook functions to push data to Editor-UI
*
* @returns {IWorkflowExecuteHooks}
*/
2019-12-19 14:07:55 -08:00
function hookFunctionsPush(): IWorkflowExecuteHooks {
2019-06-23 03:35:23 -07:00
return {
nodeExecuteBefore: [
2019-12-19 14:07:55 -08:00
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
// Push data to session which started workflow before each
// node which starts rendering
2019-12-19 14:07:55 -08:00
if (this.sessionId === undefined) {
2019-06-23 03:35:23 -07:00
return;
}
2019-08-28 06:28:47 -07:00
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteBefore', {
2019-12-19 14:07:55 -08:00
executionId: this.executionId,
2019-06-23 03:35:23 -07:00
nodeName,
2019-12-19 14:07:55 -08:00
}, this.sessionId);
2019-06-23 03:35:23 -07:00
},
],
nodeExecuteAfter: [
2019-12-19 14:07:55 -08:00
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
// Push data to session which started workflow after each rendered node
2019-12-19 14:07:55 -08:00
if (this.sessionId === undefined) {
2019-06-23 03:35:23 -07:00
return;
}
2019-08-28 06:28:47 -07:00
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteAfter', {
2019-12-19 14:07:55 -08:00
executionId: this.executionId,
2019-06-23 03:35:23 -07:00
nodeName,
data,
2019-12-19 14:07:55 -08:00
}, this.sessionId);
2019-06-23 03:35:23 -07:00
},
],
workflowExecuteBefore: [
2019-12-19 14:07:55 -08:00
async function (this: WorkflowHooks): Promise<void> {
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
:sparkles: Separate webhooks from core (#1408) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * Added webhooks service and setting to disable webhooks from main process * Fixed executions list when running with queues. Now we get the list of actively running workflows from bull. * Add option to disable deregistration of webhooks on shutdown * Rename WEBHOOK_TUNNEL_URL to WEBHOOK_URL keeping backwards compat. * Added auto refresh to executions list * Improvements to workflow stop process when running with queues * Refactor queue system to use a singleton and avoid code duplication * Improve comments and remove unnecessary commits * Remove console.log from vue file * Blocking webhook process to run without queues * Handling execution stop graciously when possible * Removing initialization of all workflows from webhook process * Refactoring code to remove code duplication for job stop * Improved execution list to be more fluid and less intrusive * Fixing workflow name for current executions when auto updating * :zap: Right align autorefresh checkbox Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-09 14:32:40 -08:00
}
const pushInstance = Push.getInstance();
pushInstance.send('executionStarted', {
executionId: this.executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId: this.workflowData.id as string,
workflowName: this.workflowData.name,
}, this.sessionId);
2020-10-22 06:46:03 -07:00
},
],
2019-06-23 03:35:23 -07:00
workflowExecuteAfter: [
2019-12-19 14:07:55 -08:00
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
:sparkles: Separate webhooks from core (#1408) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * Added webhooks service and setting to disable webhooks from main process * Fixed executions list when running with queues. Now we get the list of actively running workflows from bull. * Add option to disable deregistration of webhooks on shutdown * Rename WEBHOOK_TUNNEL_URL to WEBHOOK_URL keeping backwards compat. * Added auto refresh to executions list * Improvements to workflow stop process when running with queues * Refactor queue system to use a singleton and avoid code duplication * Improve comments and remove unnecessary commits * Remove console.log from vue file * Blocking webhook process to run without queues * Handling execution stop graciously when possible * Removing initialization of all workflows from webhook process * Refactoring code to remove code duplication for job stop * Improved execution list to be more fluid and less intrusive * Fixing workflow name for current executions when auto updating * :zap: Right align autorefresh checkbox Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-09 14:32:40 -08:00
}
// Clone the object except the runData. That one is not supposed
// to be send. Because that data got send piece by piece after
// each node which finished executing
const pushRunData = {
...fullRunData,
data: {
...fullRunData.data,
resultData: {
...fullRunData.data.resultData,
runData: {},
},
},
};
// Push data to editor-ui once workflow finished
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId: this.executionId,
data: pushRunData,
retryOf: this.retryOf,
};
const pushInstance = Push.getInstance();
pushInstance.send('executionFinished', sendData, this.sessionId);
2019-12-19 14:07:55 -08:00
},
2020-10-22 06:46:03 -07:00
],
2019-12-19 14:07:55 -08:00
};
}
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
const externalHooks = ExternalHooks();
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
nodeExecuteAfter: [
async function (nodeName: string, data: ITaskData, executionData: IRunExecutionData): Promise<void> {
if (this.workflowData.settings !== undefined) {
if (this.workflowData.settings.saveExecutionProgress === false) {
return;
} else if (this.workflowData.settings.saveExecutionProgress !== true && !config.get('executions.saveExecutionProgress') as boolean) {
return;
}
} else if (!config.get('executions.saveExecutionProgress') as boolean) {
return;
}
const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return undefined;
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
}
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb);
},
],
};
}
2019-12-19 14:07:55 -08:00
/**
* Returns hook functions to save workflow execution and call error workflow
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
// Prune old execution data
if (config.get('executions.pruneData')) {
pruneExecutionData();
}
2019-12-19 14:07:55 -08:00
const isManualMode = [this.mode, parentProcessMode].includes('manual');
2019-06-23 03:35:23 -07:00
try {
2019-12-19 14:07:55 -08:00
if (!isManualMode && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) {
// Workflow is saved so update in database
try {
2019-12-19 14:07:55 -08:00
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
// TODO: Add proper logging!
2019-12-19 14:07:55 -08:00
console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`);
}
}
2019-06-23 03:35:23 -07:00
let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
2019-12-19 14:07:55 -08:00
if (this.workflowData.settings !== undefined && this.workflowData.settings.saveManualExecutions !== undefined) {
2019-06-23 03:35:23 -07:00
// Apply to workflow override
2019-12-19 14:07:55 -08:00
saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean;
2019-06-23 03:35:23 -07:00
}
2019-12-19 14:07:55 -08:00
if (isManualMode && saveManualExecutions === false) {
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
// Data is always saved, so we remove from database
await Db.collections.Execution!.delete(this.executionId);
return;
}
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
2019-12-19 14:07:55 -08:00
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution = (this.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution;
}
2019-06-23 03:35:23 -07:00
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
) {
2019-12-19 14:07:55 -08:00
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
// Data is always saved, so we remove from database
Db.collections.Execution!.delete(this.executionId);
2019-06-23 03:35:23 -07:00
return;
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
2019-12-19 14:07:55 -08:00
workflowData: this.workflowData,
2019-06-23 03:35:23 -07:00
};
2019-12-19 14:07:55 -08:00
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
2019-06-23 03:35:23 -07:00
}
2019-12-19 14:07:55 -08:00
if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = this.workflowData.id.toString();
2019-06-23 03:35:23 -07:00
}
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
2019-06-23 03:35:23 -07:00
2019-12-19 14:07:55 -08:00
if (fullRunData.finished === true && this.retryOf !== undefined) {
2019-06-23 03:35:23 -07:00
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId });
2019-06-23 03:35:23 -07:00
}
2019-12-19 14:07:55 -08:00
if (!isManualMode) {
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, this.retryOf);
2019-12-19 14:07:55 -08:00
}
2019-06-23 03:35:23 -07:00
} catch (error) {
2019-12-19 14:07:55 -08:00
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
2019-06-23 03:35:23 -07:00
}
},
2020-10-22 06:46:03 -07:00
],
2019-06-23 03:35:23 -07:00
};
2019-12-19 14:07:55 -08:00
}
export async function getRunData(workflowData: IWorkflowBase, inputData?: INodeExecutionData[]): Promise<IWorkflowExecutionDataProcess> {
2019-12-19 14:07:55 -08:00
const mode = 'integrated';
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];
let startNode: INode | undefined;
for (const node of workflowData!.nodes) {
if (requiredNodeTypes.includes(node.type)) {
startNode = node;
break;
}
}
if (startNode === undefined) {
// If the workflow does not contain a start-node we can not know what
// should be executed and with what data to start.
throw new Error(`The workflow does not contain a "Start" node and can so not be executed.`);
}
// Always start with empty data if no inputData got supplied
inputData = inputData || [
{
2020-10-22 06:46:03 -07:00
json: {},
},
2019-12-19 14:07:55 -08:00
];
// Initialize the incoming data
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push(
{
node: startNode,
data: {
main: [inputData],
},
2020-10-22 06:46:03 -07:00
}
2019-12-19 14:07:55 -08:00
);
const runExecutionData: IRunExecutionData = {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
const credentials = await WorkflowCredentials(workflowData!.nodes);
const runData: IWorkflowExecutionDataProcess = {
credentials,
executionMode: mode,
executionData: runExecutionData,
// @ts-ignore
workflowData,
};
return runData;
}
export async function getWorkflowData(workflowInfo: IExecuteWorkflowInfo): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(`No information about the workflow to execute found. Please provide either the "id" or "code"!`);
}
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections!.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
return workflowData!;
}
/**
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: IWorkflowExecutionDataProcess): Promise<Array<INodeExecutionData[] | null> | IRun> {
const externalHooks = ExternalHooks();
await externalHooks.init();
const nodeTypes = NodeTypes();
const workflowData = loadedWorkflowData !== undefined ? loadedWorkflowData : await getWorkflowData(workflowInfo);
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({ id: workflowInfo.id, name: workflowName, nodes: workflowData!.nodes, connections: workflowData!.connections, active: workflowData!.active, nodeTypes, staticData: workflowData!.staticData });
const runData = loadedRunData !== undefined ? loadedRunData : await getRunData(workflowData, inputData);
let executionId;
if (parentExecutionId !== undefined) {
executionId = parentExecutionId;
} else {
executionId = parentExecutionId !== undefined ? parentExecutionId : await ActiveExecutions.getInstance().add(runData);
}
const runExecutionData = runData.executionData as IRunExecutionData;
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
const credentials = await WorkflowCredentials(workflowData!.nodes);
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(runData.executionMode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
2019-12-19 14:07:55 -08:00
// Execute the workflow
const workflowExecute = new WorkflowExecute(additionalDataIntegrated, runData.executionMode, runExecutionData);
2019-12-19 14:07:55 -08:00
const data = await workflowExecute.processRunExecutionData(workflow);
await externalHooks.run('workflow.postExecute', [data, workflowData]);
2019-12-19 14:07:55 -08:00
if (data.finished === true) {
// Workflow did finish successfully
if (parentExecutionId !== undefined) {
return data;
} else {
await ActiveExecutions.getInstance().remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
}
2019-12-19 14:07:55 -08:00
} else {
// Workflow did fail
const error = new Error(data.data.resultData.error!.message);
error.stack = data.data.resultData.error!.stack;
throw error;
}
}
2019-06-23 03:35:23 -07:00
/**
* Returns the base additional data without webhooks
*
* @export
* @param {IWorkflowCredentials} credentials
* @param {INodeParameters} currentNodeParameters
* @returns {Promise<IWorkflowExecuteAdditionalData>}
*/
export async function getBase(credentials: IWorkflowCredentials, currentNodeParameters?: INodeParameters): Promise<IWorkflowExecuteAdditionalData> {
2019-06-23 03:35:23 -07:00
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const timezone = config.get('generic.timezone') as string;
const webhookBaseUrl = urlBaseWebhook + config.get('endpoints.webhook') as string;
const webhookTestBaseUrl = urlBaseWebhook + config.get('endpoints.webhookTest') as string;
2019-06-23 03:35:23 -07:00
const encryptionKey = await UserSettings.getEncryptionKey();
if (encryptionKey === undefined) {
throw new Error('No encryption key got found to decrypt the credentials!');
}
return {
credentials,
credentialsHelper: new CredentialsHelper(credentials, encryptionKey),
2019-06-23 03:35:23 -07:00
encryptionKey,
2019-12-19 14:07:55 -08:00
executeWorkflow,
restApiUrl: urlBaseWebhook + config.get('endpoints.rest') as string,
2019-06-23 03:35:23 -07:00
timezone,
webhookBaseUrl,
webhookTestBaseUrl,
2019-10-20 12:42:34 -07:00
currentNodeParameters,
2019-06-23 03:35:23 -07:00
};
}
/**
2019-12-19 14:07:55 -08:00
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
2019-12-19 14:07:55 -08:00
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
2019-12-19 14:07:55 -08:00
/**
* Returns WorkflowHooks instance for running the main workflow
*
* @export
* @param {IWorkflowExecutionDataProcess} data
* @param {string} executionId
2019-12-19 14:07:55 -08:00
* @returns {WorkflowHooks}
*/
export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string, isMainProcess = false): WorkflowHooks {
2019-12-19 14:07:55 -08:00
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
2019-12-19 14:07:55 -08:00
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
}
if (isMainProcess) {
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
}
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string });
}
:sparkles: Unify execution id + Queue system (#1340) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * :zap: Improved output on worker job start Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-02-08 23:59:32 -08:00