n8n/packages/cli/src/WorkflowExecuteAdditionalData.ts
Iván Ovejero 56c4c6991f
🎨 Set up linting and formatting (#2120)
* ⬆️ Upgrade TS to 4.3.5

* 👕 Add ESLint configs

* 🎨 Add Prettier config

* 📦 Add deps and commands

*  Adjust global .editorconfig to new ruleset

* 🔥 Remove unneeded local .editorconfig

* 📦 Update deps in editor-ui

* 🔨 Limit Prettier to only TS files

*  Add recommended VSCode extensions

* 👕 Fix build

* 🔥 Remove Vue setting from global config

*  Disable prefer-default-export per feedback

* ✏️ Add forgotten divider

* 👕 Disable no-plusplus

* 👕 Disable class-methods-use-this

* ✏️ Alphabetize overrides

* 👕 Add one-var consecutive override

*  Revert one-var consecutive override

This reverts commit b9252cf935.

* 🎨 👕 Lint and format workflow package (#2121)

* 🎨 Format /workflow package

* 👕 Lint /workflow package

* 🎨 Re-format /workflow package

* 👕 Re-lint /workflow package

* ✏️ Fix typo

*  Consolidate if-checks

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format node-dev package (#2122)

* 🎨 Format /node-dev package

*  Exclude templates from ESLint config

This keeps the templates consistent with the codebase while preventing lint exceptions from being made part of the templates.

* 👕 Lint /node-dev package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🎨 👕 Lint and format core package (#2123)

* 🎨 Format /core package

* 👕 Lint /core package

* 🎨 Re-format /core package

* 👕 Re-lint /core package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format cli package (#2124)

* 🎨 Format /cli package

* 👕 Exclude migrations from linting

* 👕 Lint /cli package

* 🎨 Re-format /cli package

* 👕 Re-lint /cli package

* 👕 Fix build

* 🔥 Remove prefer-default-export exceptions

*  Update exceptions in ActiveExecutions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 👕 fix lint issues

* 🔧 use package specific linter, remove tslint command

* 🔨 resolve build issue, sync dependencies

* 🔧 change lint command

Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com>
2021-08-29 20:58:11 +02:00

1092 lines
33 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/restrict-plus-operands */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
/* eslint-disable @typescript-eslint/await-thenable */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
/* eslint-disable no-param-reassign */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable id-denylist */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { UserSettings, WorkflowExecute } from 'n8n-core';
import {
IDataObject,
IExecuteData,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
INodeParameters,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowCredentials,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
WorkflowHooks,
} from 'n8n-workflow';
import { LessThanOrEqual } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import * as config from '../config';
// eslint-disable-next-line import/no-cycle
import {
ActiveExecutions,
CredentialsHelper,
Db,
ExternalHooks,
IExecutionDb,
IExecutionFlattedDb,
IExecutionResponse,
IPushDataExecutionFinished,
IWorkflowBase,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
NodeTypes,
Push,
ResponseHelper,
WebhookHelpers,
WorkflowCredentials,
WorkflowHelpers,
} from '.';
const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string;
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
* all the data and executes it
*
* @param {IWorkflowBase} workflowData The workflow which got executed
* @param {IRun} fullRunData The run which produced the error
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
* @param {string} [executionId] The id the execution got saved as
*/
function executeErrorWorkflow(
workflowData: IWorkflowBase,
fullRunData: IRun,
mode: WorkflowExecuteMode,
executionId?: string,
retryOf?: string,
): void {
// Check if there was an error and if so if an errorWorkflow or a trigger is set
let pastExecutionUrl: string | undefined;
if (executionId !== undefined) {
pastExecutionUrl = `${WebhookHelpers.getWebhookBaseUrl()}execution/${executionId}`;
}
if (fullRunData.data.resultData.error !== undefined) {
const workflowErrorData = {
execution: {
id: executionId,
url: pastExecutionUrl,
error: fullRunData.data.resultData.error,
lastNodeExecuted: fullRunData.data.resultData.lastNodeExecuted!,
mode,
retryOf,
},
workflow: {
id: workflowData.id !== undefined ? workflowData.id.toString() : undefined,
name: workflowData.name,
},
};
// Run the error workflow
// To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow.
if (
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain
workflowData.settings !== undefined &&
workflowData.settings.errorWorkflow &&
!(
mode === 'error' &&
workflowData.id &&
workflowData.settings.errorWorkflow.toString() === workflowData.id.toString()
)
) {
Logger.verbose(`Start external error workflow`, {
executionId,
errorWorkflowId: workflowData.settings.errorWorkflow.toString(),
workflowId: workflowData.id,
});
// If a specific error workflow is set run only that one
// eslint-disable-next-line @typescript-eslint/no-floating-promises
WorkflowHelpers.executeErrorWorkflow(
workflowData.settings.errorWorkflow as string,
workflowErrorData,
);
} else if (
mode !== 'error' &&
workflowData.id !== undefined &&
workflowData.nodes.some((node) => node.type === ERROR_TRIGGER_TYPE)
) {
Logger.verbose(`Start internal error workflow`, { executionId, workflowId: workflowData.id });
// If the workflow contains
// eslint-disable-next-line @typescript-eslint/no-floating-promises
WorkflowHelpers.executeErrorWorkflow(workflowData.id.toString(), workflowErrorData);
}
}
}
/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
*
*/
let throttling = false;
function pruneExecutionData(this: WorkflowHooks): void {
if (!throttling) {
Logger.verbose('Pruning execution data from database');
throttling = true;
const timeout = config.get('executions.pruneDataTimeout') as number; // in seconds
const maxAge = config.get('executions.pruneDataMaxAge') as number; // in h
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);
// throttle just on success to allow for self healing on failure
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Db.collections
.Execution!.delete({ stoppedAt: LessThanOrEqual(utcDate) })
.then((data) =>
setTimeout(() => {
throttling = false;
}, timeout * 1000),
)
.catch((error) => {
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
});
}
}
/**
* Returns hook functions to push data to Editor-UI
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsPush(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
// Push data to session which started workflow before each
// node which starts rendering
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
});
const pushInstance = Push.getInstance();
pushInstance.send(
'nodeExecuteBefore',
{
executionId: this.executionId,
nodeName,
},
this.sessionId,
);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
// Push data to session which started workflow after each rendered node
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
});
const pushInstance = Push.getInstance();
pushInstance.send(
'nodeExecuteAfter',
{
executionId: this.executionId,
nodeName,
data,
},
this.sessionId,
);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
});
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
}
const pushInstance = Push.getInstance();
pushInstance.send(
'executionStarted',
{
executionId: this.executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId: this.workflowData.id,
sessionId: this.sessionId,
workflowName: this.workflowData.name,
},
this.sessionId,
);
},
],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
});
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
}
// Clone the object except the runData. That one is not supposed
// to be send. Because that data got send piece by piece after
// each node which finished executing
const pushRunData = {
...fullRunData,
data: {
...fullRunData.data,
resultData: {
...fullRunData.data.resultData,
runData: {},
},
},
};
// Push data to editor-ui once workflow finished
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId: this.executionId,
data: pushRunData,
retryOf: this.retryOf,
};
const pushInstance = Push.getInstance();
pushInstance.send('executionFinished', sendData, this.sessionId);
},
],
};
}
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
const externalHooks = ExternalHooks();
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
nodeExecuteAfter: [
async function (
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
): Promise<void> {
if (this.workflowData.settings !== undefined) {
if (this.workflowData.settings.saveExecutionProgress === false) {
return;
}
if (
this.workflowData.settings.saveExecutionProgress !== true &&
!config.get('executions.saveExecutionProgress')
) {
return;
}
} else if (!config.get('executions.saveExecutionProgress')) {
return;
}
try {
Logger.debug(
`Save execution progress to database for execution ID ${this.executionId} `,
{ executionId: this.executionId, nodeName },
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return;
}
const fullExecutionData: IExecutionResponse =
ResponseHelper.unflattenExecutionData(execution);
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await Db.collections.Execution!.update(
this.executionId,
flattenedExecutionData as IExecutionFlattedDb,
);
} catch (err) {
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
Logger.error(
`Failed saving execution progress to database for execution ID ${this.executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
{
...err,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsSave)`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
});
// Prune old execution data
if (config.get('executions.pruneData')) {
pruneExecutionData.call(this);
}
const isManualMode = [this.mode, parentProcessMode].includes('manual');
try {
if (
!isManualMode &&
WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) &&
newStaticData
) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(
this.workflowData.id as string,
newStaticData,
);
} catch (e) {
Logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`,
{ executionId: this.executionId, workflowId: this.workflowData.id },
);
}
}
let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
if (
this.workflowData.settings !== undefined &&
this.workflowData.settings.saveManualExecutions !== undefined
) {
// Apply to workflow override
saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean;
}
if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await Db.collections.Execution!.delete(this.executionId);
return;
}
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution =
(this.workflowData.settings.saveDataErrorExecution as string) ||
saveDataErrorExecution;
saveDataSuccessExecution =
(this.workflowData.settings.saveDataSuccessExecution as string) ||
saveDataSuccessExecution;
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
if (!fullRunData.waitTill) {
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
undefined,
this.retryOf,
);
}
// Data is always saved, so we remove from database
await Db.collections.Execution!.delete(this.executionId);
return;
}
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: this.workflowData,
waitTill: fullRunData.waitTill,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (
this.workflowData.id !== undefined &&
WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString())
) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
Logger.debug(`Save execution data to database for execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
finished: fullExecutionData.finished,
stoppedAt: fullExecutionData.stoppedAt,
});
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
await Db.collections.Execution!.update(
this.executionId,
executionData as IExecutionFlattedDb,
);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(this.retryOf, {
retrySuccessId: this.executionId,
});
}
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} catch (error) {
Logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
error,
});
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
undefined,
this.retryOf,
);
}
}
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (
this: WorkflowHooks,
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
try {
if (WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(
this.workflowData.id as string,
newStaticData,
);
} catch (e) {
Logger.error(
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ sessionId: this.sessionId, workflowId: this.workflowData.id },
);
}
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (!workflowDidSucceed) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
undefined,
this.retryOf,
);
}
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData: this.workflowData,
waitTill: fullRunData.data.waitTill,
};
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (
this.workflowData.id !== undefined &&
WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString())
) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
await Db.collections.Execution!.update(
this.executionId,
executionData as IExecutionFlattedDb,
);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
await Db.collections.Execution!.update(this.retryOf, {
retrySuccessId: this.executionId,
});
}
} catch (error) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
},
],
};
}
export async function getRunData(
workflowData: IWorkflowBase,
inputData?: INodeExecutionData[],
): Promise<IWorkflowExecutionDataProcess> {
const mode = 'integrated';
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];
let startNode: INode | undefined;
// eslint-disable-next-line no-restricted-syntax
for (const node of workflowData.nodes) {
if (requiredNodeTypes.includes(node.type)) {
startNode = node;
break;
}
}
if (startNode === undefined) {
// If the workflow does not contain a start-node we can not know what
// should be executed and with what data to start.
throw new Error(`The workflow does not contain a "Start" node and can so not be executed.`);
}
// Always start with empty data if no inputData got supplied
inputData = inputData || [
{
json: {},
},
];
// Initialize the incoming data
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push({
node: startNode,
data: {
main: [inputData],
},
});
const runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
const runData: IWorkflowExecutionDataProcess = {
executionMode: mode,
executionData: runExecutionData,
// @ts-ignore
workflowData,
};
return runData;
}
export async function getWorkflowData(workflowInfo: IExecuteWorkflowInfo): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(
`No information about the workflow to execute found. Please provide either the "id" or "code"!`,
);
}
if (Db.collections.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
return workflowData!;
}
/**
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
inputData?: INodeExecutionData[],
parentExecutionId?: string,
loadedWorkflowData?: IWorkflowBase,
loadedRunData?: IWorkflowExecutionDataProcess,
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = ExternalHooks();
await externalHooks.init();
const nodeTypes = NodeTypes();
const workflowData =
loadedWorkflowData !== undefined ? loadedWorkflowData : await getWorkflowData(workflowInfo);
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({
id: workflowInfo.id,
name: workflowName,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes,
staticData: workflowData.staticData,
});
const runData =
loadedRunData !== undefined ? loadedRunData : await getRunData(workflowData, inputData);
let executionId;
if (parentExecutionId !== undefined) {
executionId = parentExecutionId;
} else {
executionId =
parentExecutionId !== undefined
? parentExecutionId
: await ActiveExecutions.getInstance().add(runData);
}
let data;
try {
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase();
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(
runData.executionMode,
executionId,
workflowData,
{ parentProcessMode: additionalData.hooks!.mode },
);
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process
// and get executionID from `activeExecutions` running on main process
additionalDataIntegrated.executeWorkflow = additionalData.executeWorkflow;
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
if (
workflowData.settings?.executionTimeout !== undefined &&
workflowData.settings.executionTimeout > 0
) {
// We might have received a max timeout timestamp from the parent workflow
// If we did, then we get the minimum time between the two timeouts
// If no timeout was given from the parent, then we use our timeout.
subworkflowTimeout = Math.min(
additionalData.executionTimeoutTimestamp || Number.MAX_SAFE_INTEGER,
Date.now() + (workflowData.settings.executionTimeout as number) * 1000,
);
}
additionalDataIntegrated.executionTimeoutTimestamp = subworkflowTimeout;
const runExecutionData = runData.executionData as IRunExecutionData;
// Execute the workflow
const workflowExecute = new WorkflowExecute(
additionalDataIntegrated,
runData.executionMode,
runExecutionData,
);
if (parentExecutionId !== undefined) {
// Must be changed to become typed
return {
startedAt: new Date(),
workflow,
workflowExecute,
};
}
data = await workflowExecute.processRunExecutionData(workflow);
} catch (error) {
const fullRunData: IRun = {
data: {
resultData: {
error,
runData: {},
},
},
finished: false,
mode: 'integrated',
startedAt: new Date(),
stoppedAt: new Date(),
};
// When failing, we might not have finished the execution
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this
const fullExecutionData: IExecutionDb = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData,
};
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
await Db.collections.Execution!.update(executionId, executionData as IExecutionFlattedDb);
throw {
...error,
stack: error.stack,
};
}
await externalHooks.run('workflow.postExecute', [data, workflowData]);
if (data.finished === true) {
// Workflow did finish successfully
await ActiveExecutions.getInstance().remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
}
await ActiveExecutions.getInstance().remove(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
// eslint-disable-next-line @typescript-eslint/no-throw-literal
throw {
...error,
stack: error!.stack,
};
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function sendMessageToUI(source: string, message: any) {
if (this.sessionId === undefined) {
return;
}
// Push data to session which started workflow
try {
const pushInstance = Push.getInstance();
pushInstance.send(
'sendConsoleMessage',
{
source: `Node: "${source}"`,
message,
},
this.sessionId,
);
} catch (error) {
Logger.warn(`There was a problem sending messsage to UI: ${error.message}`);
}
}
/**
* Returns the base additional data without webhooks
*
* @export
* @param {IWorkflowCredentials} credentials
* @param {INodeParameters} currentNodeParameters
* @returns {Promise<IWorkflowExecuteAdditionalData>}
*/
export async function getBase(
currentNodeParameters?: INodeParameters,
executionTimeoutTimestamp?: number,
): Promise<IWorkflowExecuteAdditionalData> {
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const timezone = config.get('generic.timezone') as string;
const webhookBaseUrl = urlBaseWebhook + config.get('endpoints.webhook');
const webhookWaitingBaseUrl = urlBaseWebhook + config.get('endpoints.webhookWaiting');
const webhookTestBaseUrl = urlBaseWebhook + config.get('endpoints.webhookTest');
const encryptionKey = await UserSettings.getEncryptionKey();
if (encryptionKey === undefined) {
throw new Error('No encryption key got found to decrypt the credentials!');
}
return {
credentialsHelper: new CredentialsHelper(encryptionKey),
encryptionKey,
executeWorkflow,
restApiUrl: urlBaseWebhook + config.get('endpoints.rest'),
timezone,
webhookBaseUrl,
webhookWaitingBaseUrl,
webhookTestBaseUrl,
currentNodeParameters,
executionTimeoutTimestamp,
};
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksIntegrated(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSaveWorker();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters,
): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*
* @export
* @param {IWorkflowExecutionDataProcess} data
* @param {string} executionId
* @returns {WorkflowHooks}
*/
export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess,
executionId: string,
isMainProcess = false,
): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
}
if (isMainProcess) {
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
}
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
sessionId: data.sessionId,
retryOf: data.retryOf as string,
});
}