n8n/packages/cli/src/ActiveWorkflowRunner.ts

583 lines
19 KiB
TypeScript
Raw Normal View History

2019-06-23 03:35:23 -07:00
import {
Db,
2020-10-22 06:46:03 -07:00
IActivationError,
2019-06-23 03:35:23 -07:00
IResponseCallbackData,
2020-10-22 06:46:03 -07:00
IWebhookDb,
2019-06-23 03:35:23 -07:00
IWorkflowDb,
IWorkflowExecutionDataProcess,
2020-10-22 06:46:03 -07:00
NodeTypes,
2019-06-23 03:35:23 -07:00
ResponseHelper,
WebhookHelpers,
WorkflowCredentials,
2020-10-22 06:46:03 -07:00
WorkflowExecuteAdditionalData,
2019-06-23 03:35:23 -07:00
WorkflowHelpers,
WorkflowRunner,
2019-06-23 03:35:23 -07:00
} from './';
import {
ActiveWorkflows,
NodeExecuteFunctions,
2019-06-23 03:35:23 -07:00
} from 'n8n-core';
import {
2021-01-23 11:35:38 -08:00
IDataObject,
IExecuteData,
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
INodeExecutionData,
IRunExecutionData,
2019-06-23 03:35:23 -07:00
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
2020-10-22 06:46:03 -07:00
NodeHelpers,
2019-06-23 03:35:23 -07:00
WebhookHttpMethod,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import * as express from 'express';
export class ActiveWorkflowRunner {
private activeWorkflows: ActiveWorkflows | null = null;
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
private activationErrors: {
[key: string]: IActivationError;
} = {};
async init() {
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
// Get the active workflows from database
2020-05-27 16:32:49 -07:00
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
2019-06-23 03:35:23 -07:00
// Clear up active workflow table
await Db.collections.Webhook?.clear();
2019-06-23 03:35:23 -07:00
this.activeWorkflows = new ActiveWorkflows();
if (workflowsData.length !== 0) {
console.log('\n ================================');
console.log(' Start Active Workflows:');
console.log(' ================================');
for (const workflowData of workflowsData) {
console.log(` - ${workflowData.name}`);
try {
await this.add(workflowData.id.toString(), workflowData);
console.log(` => Started`);
} catch (error) {
console.log(` => ERROR: Workflow could not be activated:`);
console.log(` ${error.message}`);
2019-06-23 03:35:23 -07:00
}
}
}
}
/**
* Removes all the currently active workflows
*
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async removeAll(): Promise<void> {
const activeWorkflowId: string[] = [];
if (this.activeWorkflows !== null) {
// TODO: This should be renamed!
activeWorkflowId.push.apply(activeWorkflowId, this.activeWorkflows.allActiveWorkflows());
2019-06-23 03:35:23 -07:00
}
const activeWorkflows = await this.getActiveWorkflows();
activeWorkflowId.push.apply(activeWorkflowId, activeWorkflows.map(workflow => workflow.id));
2019-06-23 03:35:23 -07:00
const removePromises = [];
for (const workflowId of activeWorkflowId) {
2019-06-23 03:35:23 -07:00
removePromises.push(this.remove(workflowId));
}
await Promise.all(removePromises);
return;
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*
* @param {WebhookHttpMethod} httpMethod
* @param {string} path
* @param {express.Request} req
* @param {express.Response} res
* @returns {Promise<object>}
* @memberof ActiveWorkflowRunner
*/
async executeWebhook(httpMethod: WebhookHttpMethod, path: string, req: express.Request, res: express.Response): Promise<IResponseCallbackData> {
if (this.activeWorkflows === null) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
2019-06-23 03:35:23 -07:00
}
// Reset request parameters
req.params = {};
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
let webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
let webhookId: string | undefined;
2019-06-23 03:35:23 -07:00
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
// check if path is dynamic
2020-05-27 16:32:49 -07:00
if (webhook === undefined) {
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
// check if a dynamic webhook path exists
const pathElements = path.split('/');
webhookId = pathElements.shift();
const dynamicWebhooks = await Db.collections.Webhook?.find({ webhookId, method: httpMethod, pathLength: pathElements.length });
if (dynamicWebhooks === undefined || dynamicWebhooks.length === 0) {
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
// The requested webhook is not registered
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
}
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
// set webhook to the first webhook result
// if more results have been returned choose the one with the most route-matches
webhook = dynamicWebhooks[0];
if (dynamicWebhooks.length > 1) {
let maxMatches = 0;
const pathElementsSet = new Set(pathElements);
dynamicWebhooks.forEach(dynamicWebhook => {
const intersection =
dynamicWebhook.webhookPath
.split('/')
.reduce((acc, element) => pathElementsSet.has(element) ? acc += 1 : acc, 0);
if (intersection > maxMatches) {
maxMatches = intersection;
webhook = dynamicWebhook;
}
});
if (maxMatches === 0) {
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
}
}
path = webhook.webhookPath;
// extracting params from path
webhook.webhookPath.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
req.params[ele.slice(1)] = pathElements[index];
}
});
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId);
if (workflowData === undefined) {
2020-05-27 16:32:49 -07:00
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhook.workflowId}"`, 404, 404);
}
const nodeTypes = NodeTypes();
2020-05-27 16:32:49 -07:00
const workflow = new Workflow({ id: webhook.workflowId.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
const credentials = await WorkflowCredentials([workflow.getNode(webhook.node as string) as INode]);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(webhook.node as string) as INode, additionalData).filter((webhook) => {
return (webhook.httpMethod === httpMethod && webhook.path === path);
})[0];
2019-06-23 03:35:23 -07:00
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
if (workflowStartNode === null) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
2019-06-23 03:35:23 -07:00
}
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
2020-05-27 16:32:49 -07:00
//@ts-ignore
WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
2019-06-23 03:35:23 -07:00
if (error !== null) {
return reject(error);
}
resolve(data);
});
});
}
/**
* Gets all request methods associated with a single webhook
*
* @param {string} path webhook path
* @returns {Promise<string[]>}
* @memberof ActiveWorkflowRunner
*/
async getWebhookMethods(path: string) : Promise<string[]> {
const webhooks = await Db.collections.Webhook?.find({ webhookPath: path}) as IWebhookDb[];
// Gather all request methods in string array
const webhookMethods: string[] = webhooks.map(webhook => webhook.method);
return webhookMethods;
}
2019-06-23 03:35:23 -07:00
/**
* Returns the ids of the currently active workflows
*
* @returns {string[]}
* @memberof ActiveWorkflowRunner
*/
async getActiveWorkflows(): Promise<IWorkflowDb[]> {
const activeWorkflows = await Db.collections.Workflow?.find({ where: { active: true }, select: ['id'] }) as IWorkflowDb[];
return activeWorkflows.filter(workflow => this.activationErrors[workflow.id.toString()] === undefined);
2019-06-23 03:35:23 -07:00
}
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
* @returns {boolean}
* @memberof ActiveWorkflowRunner
*/
2020-05-27 16:32:49 -07:00
async isActive(id: string): Promise<boolean> {
const workflow = await Db.collections.Workflow?.findOne({ id }) as IWorkflowDb;
return workflow?.active as boolean;
2019-06-23 03:35:23 -07:00
}
/**
* Return error if there was a problem activating the workflow
*
* @param {string} id The id of the workflow to return the error of
* @returns {(IActivationError | undefined)}
* @memberof ActiveWorkflowRunner
*/
getActivationError(id: string): IActivationError | undefined {
if (this.activationErrors[id] === undefined) {
return undefined;
}
return this.activationErrors[id];
}
/**
* Adds all the webhooks of the workflow
*
* @param {Workflow} workflow
* @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise<void> {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
let path = '' as string | undefined;
2019-06-23 03:35:23 -07:00
for (const webhookData of webhooks) {
2020-05-27 16:32:49 -07:00
const node = workflow.getNode(webhookData.node) as INode;
node.name = webhookData.node;
path = node.parameters.path as string;
if (node.parameters.path === undefined) {
path = workflow.expression.getSimpleParameterValue(node, webhookData.webhookDescription['path']) as string | undefined;
if (path === undefined) {
// TODO: Use a proper logger
console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflow.id}".`);
continue;
}
2020-05-27 16:32:49 -07:00
}
const isFullPath: boolean = workflow.expression.getSimpleParameterValue(node, webhookData.webhookDescription['isFullPath'], false) as boolean;
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId: webhookData.workflowId,
webhookPath: NodeHelpers.getNodeWebhookPath(workflow.id as string, node, path, isFullPath),
2020-05-27 16:32:49 -07:00
node: node.name,
method: webhookData.httpMethod,
} as IWebhookDb;
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
if (webhook.webhookPath.startsWith('/')) {
webhook.webhookPath = webhook.webhookPath.slice(1);
}
if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) {
webhook.webhookId = node.webhookId;
webhook.pathLength = webhook.webhookPath.split('/').length;
}
2020-05-27 16:32:49 -07:00
try {
2020-06-20 18:59:06 -07:00
2020-05-27 16:32:49 -07:00
await Db.collections.Webhook?.insert(webhook);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
if (webhookExists !== true) {
2020-05-27 16:32:49 -07:00
// If webhook does not exist yet create it
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false);
}
} catch (error) {
try {
await this.removeWorkflowWebhooks(workflow.id as string);
} catch (error) {
console.error(`Could not remove webhooks of workflow "${workflow.id}" because of error: "${error.message}"`);
}
2020-05-30 16:03:58 -07:00
let errorMessage = '';
2020-05-27 16:32:49 -07:00
2020-05-30 16:03:58 -07:00
// if it's a workflow from the the insert
:sparkles: Add support for webhook route parameters (#1343) * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :construction: add webhookId to URL * :construction: add webhookId to webhook entity, :wrench: refactor migrations * :construction: :elephant: postgres migration * :construction: add mySQL migration * :construction: refactor mongoDB * :construction: add webhookId to IWebhookDb * :construction: starting workflow with dynamic route works * :zap: production dynamic webhooks complete * :art: fix lint issues * :wrench: dynamic path for webhook-test complete * :art: fix lint issues * :art: fix typescript issue * :zap: add error message for dynamic webhook-test * :hammer: improve handling of leading `/` * :zap: Fix issue that tab-title did not get reset on new workflow * Revert ":zap: Fix issue that tab-title did not get reset on new workflow" This reverts commit 699d0a8946e08339558c72b2714601329fbf5f2c. * :wrench: reset params before extraction * :elephant: removing unique constraint for webhookId * :construction: handle multiple webhooks per id * :wrench: enable webhook-test for multiple WH with same id * :elephant: add migration for postgres * :zap: add mysql migration * :art: fix lint issue Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-01-23 11:00:32 -08:00
// TODO check if there is standard error code for duplicate key violation that works
2020-05-30 16:03:58 -07:00
// with all databases
2021-01-23 11:35:38 -08:00
if (error.name === 'QueryFailedError') {
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
2020-05-30 16:03:58 -07:00
} else if (error.detail) {
// it's a error runnig the webhook methods (checkExists, create)
errorMessage = error.detail;
} else {
errorMessage = error.message;
2020-05-30 16:03:58 -07:00
}
throw new Error(errorMessage);
2020-05-27 16:32:49 -07:00
}
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
2019-06-23 03:35:23 -07:00
}
/**
* Remove all the webhooks of the workflow
*
* @param {string} workflowId
* @returns
* @memberof ActiveWorkflowRunner
*/
async removeWorkflowWebhooks(workflowId: string): Promise<void> {
const workflowData = await Db.collections.Workflow!.findOne(workflowId);
if (workflowData === undefined) {
throw new Error(`Could not find workflow with id "${workflowId}"`);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings });
2020-05-27 16:32:49 -07:00
const mode = 'internal';
2020-05-27 16:32:49 -07:00
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
for (const webhookData of webhooks) {
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
}
await WorkflowHelpers.saveStaticData(workflow);
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId: workflowData.id,
} as IWebhookDb;
await Db.collections.Webhook?.delete(webhook);
2019-06-23 03:35:23 -07:00
}
/**
* Runs the given workflow
*
* @param {IWorkflowDb} workflowData
* @param {INode} node
* @param {INodeExecutionData[][]} data
* @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData
* @param {WorkflowExecuteMode} mode
* @returns
* @memberof ActiveWorkflowRunner
*/
runWorkflow(workflowData: IWorkflowDb, node: INode, data: INodeExecutionData[][], additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode) {
const nodeExecutionStack: IExecuteData[] = [
{
node,
data: {
main: data,
2020-10-22 06:46:03 -07:00
},
},
];
const executionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
// Start the workflow
const runData: IWorkflowExecutionDataProcess = {
credentials: additionalData.credentials,
executionMode: mode,
executionData,
workflowData,
};
const workflowRunner = new WorkflowRunner();
return workflowRunner.run(runData, true);
}
/**
* Return poll function which gets the global functions from n8n-core
* and overwrites the __emit to be able to start it in subprocess
*
* @param {IWorkflowDb} workflowData
* @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IGetExecutePollFunctions}
* @memberof ActiveWorkflowRunner
*/
getExecutePollFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecutePollFunctions {
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(workflow, node, additionalData, mode);
returnFunctions.__emit = (data: INodeExecutionData[][]): void => {
this.runWorkflow(workflowData, node, data, additionalData, mode);
};
return returnFunctions;
});
}
/**
* Return trigger function which gets the global functions from n8n-core
* and overwrites the emit to be able to start it in subprocess
*
* @param {IWorkflowDb} workflowData
* @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IGetExecuteTriggerFunctions}
* @memberof ActiveWorkflowRunner
*/
getExecuteTriggerFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecuteTriggerFunctions{
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode);
returnFunctions.emit = (data: INodeExecutionData[][]): void => {
WorkflowHelpers.saveStaticData(workflow);
this.runWorkflow(workflowData, node, data, additionalData, mode).catch((err) => console.error(err));
};
return returnFunctions;
});
}
2019-06-23 03:35:23 -07:00
/**
* Makes a workflow active
*
* @param {string} workflowId The id of the workflow to activate
* @param {IWorkflowDb} [workflowData] If workflowData is given it saves the DB query
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async add(workflowId: string, workflowData?: IWorkflowDb): Promise<void> {
if (this.activeWorkflows === null) {
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);
}
let workflowInstance: Workflow;
try {
if (workflowData === undefined) {
workflowData = await Db.collections.Workflow!.findOne(workflowId) as IWorkflowDb;
}
if (!workflowData) {
throw new Error(`Could not find workflow with id "${workflowId}".`);
}
const nodeTypes = NodeTypes();
workflowInstance = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings });
2019-06-23 03:35:23 -07:00
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(['n8n-nodes-base.start']);
if (canBeActivated === false) {
throw new Error(`The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.`);
}
const mode = 'trigger';
const credentials = await WorkflowCredentials(workflowData.nodes);
2019-12-19 14:07:55 -08:00
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode);
const getPollFunctions = this.getExecutePollFunctions(workflowData, additionalData, mode);
2019-06-23 03:35:23 -07:00
// Add the workflows which have webhooks defined
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode);
2020-05-27 16:32:49 -07:00
if (workflowInstance.getTriggerNodes().length !== 0
|| workflowInstance.getPollNodes().length !== 0) {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
}
2019-06-23 03:35:23 -07:00
if (this.activationErrors[workflowId] !== undefined) {
// If there were activation errors delete them
2019-06-23 03:35:23 -07:00
delete this.activationErrors[workflowId];
}
} catch (error) {
// There was a problem activating the workflow
// Save the error
this.activationErrors[workflowId] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};
throw error;
}
// If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted.
2019-06-23 03:35:23 -07:00
await WorkflowHelpers.saveStaticData(workflowInstance!);
}
/**
* Makes a workflow inactive
*
* @param {string} workflowId The id of the workflow to deactivate
* @returns {Promise<void>}
* @memberof ActiveWorkflowRunner
*/
async remove(workflowId: string): Promise<void> {
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
if (this.activeWorkflows !== null) {
// Remove all the webhooks of the workflow
try {
await this.removeWorkflowWebhooks(workflowId);
} catch (error) {
console.error(`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`);
}
2019-06-23 03:35:23 -07:00
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
2020-05-27 16:32:49 -07:00
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
this.activeWorkflows.remove(workflowId);
}
return;
2019-06-23 03:35:23 -07:00
}
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);
}
}
let workflowRunnerInstance: ActiveWorkflowRunner | undefined;
export function getInstance(): ActiveWorkflowRunner {
if (workflowRunnerInstance === undefined) {
workflowRunnerInstance = new ActiveWorkflowRunner();
}
return workflowRunnerInstance;
}