n8n/packages/cli/src/WorkflowExecuteAdditionalData.ts
Jan 5a179cd5ae
Implement Wait functionality (#1817)
* refactor saving

* refactor api layer to be stateless

* refactor header details

* set variable for menu height

* clean up scss

* clean up indentation

* clean up dropdown impl

* refactor no tags view

* split away header

* Fix tslint issues

* Refactor tag manager

* add tags to patch request

* clean up scss

*  Refactor types to entities

* fix issues

* update no workflow error

* clean up tagscontainer

* use getters instead of state

* remove imports

* use custom colors

* clean up tags container

* clean up dropdown

* clean up focusoncreate

*  Ignore mistaken ID in POST /workflows

*  Fix undefined tag ID in PATCH /workflows

*  Shorten response for POST /tags

* remove scss mixins

* clean up imports

*  Implement validation with class-validator

* address ivan's comments

* implement modals

* Fix lint issues

* fix disabling shortcuts

* fix focus issues

* fix focus issues

* fix focus issues with modal

* fix linting issues

* use dispatch

* use constants for modal keys

* fix focus

* fix lint issues

* remove unused prop

* add modal root

* fix lint issues

* remove unused methods

* fix shortcut

* remove max width

*  Fix duplicate entry error for pg and MySQL

* update rename messaging

* update order of buttons

* fix firefox overflow on windows

* fix dropdown height

* 🔨 refactor tag crud controllers

* 🧹 remove unused imports

* use variable for number of items

* fix dropdown spacing

*  Restore type to fix build

*  Fix post-refactor PATCH /workflows/:id

*  Fix PATCH /workflows/:id for zero tags

*  Fix usage count becoming stringified

* address max's comments

* fix filter spacing

* fix blur bug

* address most of ivan's comments

* address tags type concern

* remove defaults

*  return tag id as string

* 🔨 add hooks to tag CUD operations

* 🏎 simplify timestamp pruning

* remove blur event

* fix onblur bug

*  Fix fs import to fix build

* address max's comments

* implement responsive tag container

* fix lint issues

* update tag limits

* address ivan's comments

* remove rename, refactor header, implement new designs for save, remove responsive tag container

* update styling

* update styling

* implement responsive tag container

* implement header tags edit

* implement header tags edit

* fix lint issues

* implement expandable input

* minor fixes

* minor fixes

* use variable

* rename save as

* duplicate fixes

* minor edit fixes

* lint fixes

* style fixes

* hook up saving name

* hook up tags

* clean up impl

* fix dirty state bug

* update limit

* update notification messages

* on click outside

* fix minor bug with count

* lint fixes

* handle minor edge cases

* handle minor edge cases

* handle minor bugs; fix firefox dropdown issue

* Fix min width

* apply tags only after api success

* remove count fix

* clean up workflow tags impl, fix tags delete bug

* fix minor issue

* fix minor spacing issue

* disable wrap for ops

* fix viewport root; save on click in dropdown

* save button loading when saving name/tags

* implement max width on tags container

* implement cleaner create experience

* disable edit while updating

* codacy hex color

* refactor tags container

* fix clickability

* fix workflow open and count

* clean up structure

* fix up lint issues

* fix button size

* increase workflow name limit for larger screen

* tslint fixes

* disable responsiveness for workflow modal

* rename event

* change min width for tags

* clean up pr

* address max's comments on styles

* remove success toasts

* add hover mode to name

* minor fixes

* refactor name preview

* fix name input not to jiggle

* finish up name input

* Fix up add tags

* clean up param

* clean up scss

* fix resizing name

* fix resizing name

* fix resize bug

* clean up edit spacing

* ignore on esc

* fix input bug

* focus input on clear

* build

* fix up add tags clickablity

* remove scrollbars

* move into folders

* clean up multiple patch req

* remove padding top from edit

* update tags on enter

* build

* rollout blur on enter behavior

* rollout esc behavior

* fix tags bug when duplicating tags

* move key to reload tags

* update header spacing

* build

* update hex case

* refactor workflow title

* remove unusued prop

* keep focus on error, fix bug on error

* Fix bug with name / tags toggle on error

* fix connection push bug

* :spakles: Implement wait functionality

* 🐛 Do not delete waiting executions with prune

*  Improve SQLite migration to not lose execution data anymore

*  Make it possible to restart waiting execution via webhook

*  Add missing file

* 🐛 Some more merge fixes

*  Do not show error for Wait-Nodes if in time-mode

*  Make $executionId available in expressions

* 👕 Fix lint issue

* 👕 Fix lint issue

* 👕 Fix lint issue

*  Set the unlimited sleep time as a variable

*  Add also sleeping webhook path to config

*  Make it possible to retrieve restartUrl in workflow

*  Add authentication to Wait-Node in Webhook-Mode

*  Return 404 when trying to restart execution via webhook which does
not support it

*  Make it possible to set absolute time on Wait-Node

*  Remove not needed imports

*  Fix description format

*  Implement missing webhook features on Wait-Node

*  Display webhook variable in NodeWebhooks

*  Include also date in displayed sleep time

*  Make it possible to see sleep time on node

*  Make sure that no executions does get executed twice

*  Add comment

*  Further improvements

*  Make Wait-Node easier to use

*  Add support for "notice" parameter type

* Fixing wait node to work with queue, improved logging and execution view

* Added support for mysql and pg

*  Add support for webhook postfix path

*  Make it possible to stop sleeping executions

*  Fix issue with webhook paths in not webhook mode

*  Remove not needed console.log

*  Update TODOs

*  Increase min time of workflow staying active to descrease possible issue
with overlap

* 👕 Fix lint issue

* 🐛 Fix issues with webhooks

*  Make error message clearer

*  Fix issue with missing execution ID in scaling mode

* Fixed execution list to correctly display waiting executins

* Feature: enable webhook wait workflows to continue after specified time

* Fixed linting

*  Improve waiting description text

*  Fix parameter display issue and rename

*  Remove comment

*  Do not display webhooks on Wait-Node

* Changed wording from restart to resume on wait node

* Fixed wording and inconsistent screen when changing resume modes

* Removed dots from the descriptions

* Changed docs url and renaming postfix to suffix

* Changed names from sleep to wait

*  Apply suggestions from ben

Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com>

* Some fixes by Ben

*  Remove console.logs

*  Fixes and improvements

Co-authored-by: Mutasem <mutdmour@gmail.com>
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com>
Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com>
Co-authored-by: Omar Ajoue <krynble@gmail.com>
2021-08-21 14:11:32 +02:00

846 lines
30 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
ActiveExecutions,
CredentialsHelper,
Db,
ExternalHooks,
IExecutionDb,
IExecutionFlattedDb,
IExecutionResponse,
IPushDataExecutionFinished,
IWorkflowBase,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
NodeTypes,
Push,
ResponseHelper,
WebhookHelpers,
WorkflowCredentials,
WorkflowHelpers,
} from './';
import {
UserSettings,
WorkflowExecute,
} from 'n8n-core';
import {
IDataObject,
IExecuteData,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
INodeParameters,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowCredentials,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
WorkflowHooks,
} from 'n8n-workflow';
import * as config from '../config';
import { LessThanOrEqual } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string;
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
* all the data and executes it
*
* @param {IWorkflowBase} workflowData The workflow which got executed
* @param {IRun} fullRunData The run which produced the error
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
* @param {string} [executionId] The id the execution got saved as
*/
function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mode: WorkflowExecuteMode, executionId?: string, retryOf?: string): void {
// Check if there was an error and if so if an errorWorkflow or a trigger is set
let pastExecutionUrl: string | undefined = undefined;
if (executionId !== undefined) {
pastExecutionUrl = `${WebhookHelpers.getWebhookBaseUrl()}execution/${executionId}`;
}
if (fullRunData.data.resultData.error !== undefined) {
const workflowErrorData = {
execution: {
id: executionId,
url: pastExecutionUrl,
error: fullRunData.data.resultData.error,
lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!,
mode,
retryOf,
},
workflow: {
id: workflowData.id !== undefined ? workflowData.id.toString() as string : undefined,
name: workflowData.name,
},
};
// Run the error workflow
// To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow.
if (workflowData.settings !== undefined && workflowData.settings.errorWorkflow && !(mode === 'error' && workflowData.id && workflowData.settings.errorWorkflow.toString() === workflowData.id.toString())) {
Logger.verbose(`Start external error workflow`, { executionId, errorWorkflowId: workflowData.settings.errorWorkflow.toString(), workflowId: workflowData.id });
// If a specific error workflow is set run only that one
WorkflowHelpers.executeErrorWorkflow(workflowData.settings.errorWorkflow as string, workflowErrorData);
} else if (mode !== 'error' && workflowData.id !== undefined && workflowData.nodes.some((node) => node.type === ERROR_TRIGGER_TYPE)) {
Logger.verbose(`Start internal error workflow`, { executionId, workflowId: workflowData.id });
// If the workflow contains
WorkflowHelpers.executeErrorWorkflow(workflowData.id.toString(), workflowErrorData);
}
}
}
/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
*
*/
let throttling = false;
function pruneExecutionData(this: WorkflowHooks): void {
if (!throttling) {
Logger.verbose('Pruning execution data from database');
throttling = true;
const timeout = config.get('executions.pruneDataTimeout') as number; // in seconds
const maxAge = config.get('executions.pruneDataMaxAge') as number; // in h
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);
// throttle just on success to allow for self healing on failure
Db.collections.Execution!.delete({ stoppedAt: LessThanOrEqual(utcDate) })
.then(data =>
setTimeout(() => {
throttling = false;
}, timeout * 1000)
).catch(error => {
throttling = false;
Logger.error(`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`, { ...error, executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
});
}
}
/**
* Returns hook functions to push data to Editor-UI
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsPush(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
// Push data to session which started workflow before each
// node which starts rendering
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteBefore', {
executionId: this.executionId,
nodeName,
}, this.sessionId);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
// Push data to session which started workflow after each rendered node
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteAfter', {
executionId: this.executionId,
nodeName,
data,
}, this.sessionId);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
}
const pushInstance = Push.getInstance();
pushInstance.send('executionStarted', {
executionId: this.executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId: this.workflowData.id, sessionId: this.sessionId as string,
workflowName: this.workflowData.name,
}, this.sessionId);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
}
// Clone the object except the runData. That one is not supposed
// to be send. Because that data got send piece by piece after
// each node which finished executing
const pushRunData = {
...fullRunData,
data: {
...fullRunData.data,
resultData: {
...fullRunData.data.resultData,
runData: {},
},
},
};
// Push data to editor-ui once workflow finished
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, { executionId: this.executionId, workflowId: this.workflowData.id });
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId: this.executionId,
data: pushRunData,
retryOf: this.retryOf,
};
const pushInstance = Push.getInstance();
pushInstance.send('executionFinished', sendData, this.sessionId);
},
],
};
}
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
const externalHooks = ExternalHooks();
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
nodeExecuteAfter: [
async function (nodeName: string, data: ITaskData, executionData: IRunExecutionData): Promise<void> {
if (this.workflowData.settings !== undefined) {
if (this.workflowData.settings.saveExecutionProgress === false) {
return;
} else if (this.workflowData.settings.saveExecutionProgress !== true && !config.get('executions.saveExecutionProgress') as boolean) {
return;
}
} else if (!config.get('executions.saveExecutionProgress') as boolean) {
return;
}
try {
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, { executionId: this.executionId, nodeName });
const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return;
}
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb);
} catch (err) {
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
Logger.error(`Failed saving execution progress to database for execution ID ${this.executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`, { ...err, executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
}
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsSave)`, { executionId: this.executionId, workflowId: this.workflowData.id });
// Prune old execution data
if (config.get('executions.pruneData')) {
pruneExecutionData.call(this);
}
const isManualMode = [this.mode, parentProcessMode].includes('manual');
try {
if (!isManualMode && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
Logger.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, { executionId: this.executionId, workflowId: this.workflowData.id });
}
}
let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
if (this.workflowData.settings !== undefined && this.workflowData.settings.saveManualExecutions !== undefined) {
// Apply to workflow override
saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean;
}
if (isManualMode && saveManualExecutions === false && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
await Db.collections.Execution!.delete(this.executionId);
return;
}
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution = (this.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution;
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
) {
if (!fullRunData.waitTill) {
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
// Data is always saved, so we remove from database
await Db.collections.Execution!.delete(this.executionId);
return;
}
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: this.workflowData,
waitTill: fullRunData.waitTill,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
Logger.debug(`Save execution data to database for execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
finished: fullExecutionData.finished,
stoppedAt: fullExecutionData.stoppedAt,
});
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId });
}
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, this.retryOf);
}
} catch (error) {
Logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
error,
});
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
}
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
try {
if (WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
Logger.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, { sessionId: this.sessionId, workflowId: this.workflowData.id });
}
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (workflowDidSucceed === false) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: this.workflowData,
waitTill: fullRunData.data.waitTill,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId });
}
} catch (error) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
},
],
};
}
export async function getRunData(workflowData: IWorkflowBase, inputData?: INodeExecutionData[]): Promise<IWorkflowExecutionDataProcess> {
const mode = 'integrated';
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];
let startNode: INode | undefined;
for (const node of workflowData!.nodes) {
if (requiredNodeTypes.includes(node.type)) {
startNode = node;
break;
}
}
if (startNode === undefined) {
// If the workflow does not contain a start-node we can not know what
// should be executed and with what data to start.
throw new Error(`The workflow does not contain a "Start" node and can so not be executed.`);
}
// Always start with empty data if no inputData got supplied
inputData = inputData || [
{
json: {},
},
];
// Initialize the incoming data
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push(
{
node: startNode,
data: {
main: [inputData],
},
}
);
const runExecutionData: IRunExecutionData = {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
const runData: IWorkflowExecutionDataProcess = {
executionMode: mode,
executionData: runExecutionData,
// @ts-ignore
workflowData,
};
return runData;
}
export async function getWorkflowData(workflowInfo: IExecuteWorkflowInfo): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(`No information about the workflow to execute found. Please provide either the "id" or "code"!`);
}
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections!.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
return workflowData!;
}
/**
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: IWorkflowExecutionDataProcess): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = ExternalHooks();
await externalHooks.init();
const nodeTypes = NodeTypes();
const workflowData = loadedWorkflowData !== undefined ? loadedWorkflowData : await getWorkflowData(workflowInfo);
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({ id: workflowInfo.id, name: workflowName, nodes: workflowData!.nodes, connections: workflowData!.connections, active: workflowData!.active, nodeTypes, staticData: workflowData!.staticData });
const runData = loadedRunData !== undefined ? loadedRunData : await getRunData(workflowData, inputData);
let executionId;
if (parentExecutionId !== undefined) {
executionId = parentExecutionId;
} else {
executionId = parentExecutionId !== undefined ? parentExecutionId : await ActiveExecutions.getInstance().add(runData);
}
let data;
try {
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase();
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(runData.executionMode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow;
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
if (workflowData.settings?.executionTimeout !== undefined && workflowData.settings.executionTimeout > 0) {
// We might have received a max timeout timestamp from the parent workflow
// If we did, then we get the minimum time between the two timeouts
// If no timeout was given from the parent, then we use our timeout.
subworkflowTimeout = Math.min(additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER, Date.now() + (workflowData.settings.executionTimeout as number * 1000));
}
additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout;
const runExecutionData = runData.executionData as IRunExecutionData;
// Execute the workflow
const workflowExecute = new WorkflowExecute(additionalDataIntegrated, runData.executionMode, runExecutionData);
if (parentExecutionId !== undefined) {
// Must be changed to become typed
return {
startedAt: new Date(),
workflow,
workflowExecute,
};
}
data = await workflowExecute.processRunExecutionData(workflow);
} catch (error) {
const fullRunData: IRun = {
data: {
resultData: {
error,
runData: {},
},
},
finished: false,
mode: 'integrated',
startedAt: new Date(),
stoppedAt: new Date(),
};
// When failing, we might not have finished the execution
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData,
};
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
await Db.collections.Execution!.update(executionId, executionData as IExecutionFlattedDb);
throw {
...error,
stack: error!.stack,
};
}
await externalHooks.run('workflow.postExecute', [data, workflowData]);
if (data.finished === true) {
// Workflow did finish successfully
await ActiveExecutions.getInstance().remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
} else {
await ActiveExecutions.getInstance().remove(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
throw {
...error,
stack: error!.stack,
};
}
}
export function sendMessageToUI(source: string, message: any) { // tslint:disable-line:no-any
if (this.sessionId === undefined) {
return;
}
// Push data to session which started workflow
try {
const pushInstance = Push.getInstance();
pushInstance.send('sendConsoleMessage', {
source: `Node: "${source}"`,
message,
}, this.sessionId);
} catch (error) {
Logger.warn(`There was a problem sending messsage to UI: ${error.message}`);
}
}
/**
* Returns the base additional data without webhooks
*
* @export
* @param {IWorkflowCredentials} credentials
* @param {INodeParameters} currentNodeParameters
* @returns {Promise<IWorkflowExecuteAdditionalData>}
*/
export async function getBase(currentNodeParameters?: INodeParameters, executionTimeoutTimestamp?: number): Promise<IWorkflowExecuteAdditionalData> {
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const timezone = config.get('generic.timezone') as string;
const webhookBaseUrl = urlBaseWebhook + config.get('endpoints.webhook') as string;
const webhookWaitingBaseUrl = urlBaseWebhook + config.get('endpoints.webhookWaiting') as string;
const webhookTestBaseUrl = urlBaseWebhook + config.get('endpoints.webhookTest') as string;
const encryptionKey = await UserSettings.getEncryptionKey();
if (encryptionKey === undefined) {
throw new Error('No encryption key got found to decrypt the credentials!');
}
return {
credentialsHelper: new CredentialsHelper(encryptionKey),
encryptionKey,
executeWorkflow,
restApiUrl: urlBaseWebhook + config.get('endpoints.rest') as string,
timezone,
webhookBaseUrl,
webhookWaitingBaseUrl,
webhookTestBaseUrl,
currentNodeParameters,
executionTimeoutTimestamp,
};
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksWorkerExecuter(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSaveWorker();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*
* @export
* @param {IWorkflowExecutionDataProcess} data
* @param {string} executionId
* @returns {WorkflowHooks}
*/
export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string, isMainProcess = false): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
}
if (isMainProcess) {
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
}
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string });
}