Prepare webhooks for multitenancy

This commit is contained in:
Jan Oberhauser 2020-01-22 15:06:43 -08:00
parent 01a2dd13d5
commit aa1899f81d
10 changed files with 121 additions and 144 deletions

View file

@ -117,22 +117,24 @@ export class ActiveWorkflowRunner {
throw new ResponseHelper.ResponseError('The requested webhook is not registred.', 404, 404);
}
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(webhookData.workflowId, workflowData.nodes, workflowData.connections, workflowData.active, nodeTypes, workflowData.staticData, workflowData.settings);
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = webhookData.workflow.getNode(webhookData.node);
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
}
const executionMode = 'webhook';
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflow.id!);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflow.id}"`, 404, 404);
}
return new Promise((resolve, reject) => {
WebhookHelpers.executeWebhook(webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
const executionMode = 'webhook';
WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
@ -202,7 +204,9 @@ export class ActiveWorkflowRunner {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
for (const webhookData of webhooks) {
await this.activeWebhooks!.add(webhookData, mode);
await this.activeWebhooks!.add(workflow, webhookData, mode);
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
}
}
@ -214,8 +218,19 @@ export class ActiveWorkflowRunner {
* @returns
* @memberof ActiveWorkflowRunner
*/
removeWorkflowWebhooks(workflowId: string): Promise<boolean> {
return this.activeWebhooks!.removeByWorkflowId(workflowId);
async removeWorkflowWebhooks(workflowId: string): Promise<void> {
const workflowData = await Db.collections.Workflow!.findOne(workflowId);
if (workflowData === undefined) {
throw new Error(`Could not find workflow with id "${workflowId}"`);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(workflowId, workflowData.nodes, workflowData.connections, workflowData.active, nodeTypes, workflowData.staticData, workflowData.settings);
await this.activeWebhooks!.removeWorkflow(workflow);
// Save the static workflow data if needed
await WorkflowHelpers.saveStaticData(workflow);
}
@ -348,7 +363,7 @@ export class ActiveWorkflowRunner {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
// If there were activation errors delete them
delete this.activationErrors[workflowId];
}
} catch (error) {
@ -380,16 +395,9 @@ export class ActiveWorkflowRunner {
*/
async remove(workflowId: string): Promise<void> {
if (this.activeWorkflows !== null) {
const workflowData = this.activeWorkflows.get(workflowId);
// Remove all the webhooks of the workflow
await this.removeWorkflowWebhooks(workflowId);
if (workflowData) {
// Save the static workflow data if needed
await WorkflowHelpers.saveStaticData(workflowData.workflow);
}
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];

View file

@ -132,7 +132,7 @@ class App {
const authIgnoreRegex = new RegExp(`^\/(rest|healthz|${this.endpointWebhook}|${this.endpointWebhookTest})\/?.*$`);
// Check for basic auth credentials if activated
const basicAuthActive = config.get('security.basicAuth.active') as boolean;
const basicAuthActive = config.get('security.basicAuth.active') as boolean;
if (basicAuthActive === true) {
const basicAuthUser = await GenericHelpers.getConfigValue('security.basicAuth.user') as string;
if (basicAuthUser === '') {
@ -1072,7 +1072,16 @@ class App {
// Removes a test webhook
this.app.delete('/rest/test-webhook/:id', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<boolean> => {
const workflowId = req.params.id;
return this.testWebhooks.cancelTestWebhook(workflowId);
const workflowData = await Db.collections.Workflow!.findOne(workflowId);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${workflowId}" so webhook could not be deleted!`);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(workflowId.toString(), workflowData.nodes, workflowData.connections, workflowData.active, nodeTypes, workflowData.staticData, workflowData.settings);
return this.testWebhooks.cancelTestWebhook(workflowId, workflow);
}));

View file

@ -1,11 +1,18 @@
import * as express from 'express';
import {
In as findIn,
FindManyOptions,
} from 'typeorm';
import {
Db,
IResponseCallbackData,
IWorkflowDb,
NodeTypes,
Push,
ResponseHelper,
WebhookHelpers,
IWorkflowDb,
WorkflowHelpers,
} from './';
import {
@ -60,9 +67,17 @@ export class TestWebhooks {
throw new ResponseHelper.ResponseError('The requested webhook is not registred.', 404, 404);
}
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(webhookData.workflowId, workflowData.nodes, workflowData.connections, workflowData.active, nodeTypes, workflowData.staticData, workflowData.settings);
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = webhookData.workflow.getNode(webhookData.node);
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
}
@ -72,8 +87,7 @@ export class TestWebhooks {
return new Promise(async (resolve, reject) => {
try {
const executionMode = 'manual';
const executionId = await WebhookHelpers.executeWebhook(webhookData, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, request, response, (error: Error | null, data: IResponseCallbackData) => {
const executionId = await WebhookHelpers.executeWebhook(workflow, webhookData, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, request, response, (error: Error | null, data: IResponseCallbackData) => {
if (error !== null) {
return reject(error);
}
@ -90,7 +104,7 @@ export class TestWebhooks {
// Inform editor-ui that webhook got received
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
const pushInstance = Push.getInstance();
pushInstance.send('testWebhookReceived', { workflowId: webhookData.workflow.id, executionId }, this.testWebhookData[webhookKey].sessionId!);
pushInstance.send('testWebhookReceived', { workflowId: webhookData.workflowId, executionId }, this.testWebhookData[webhookKey].sessionId!);
}
} catch (error) {
@ -100,7 +114,7 @@ export class TestWebhooks {
// Remove the webhook
clearTimeout(this.testWebhookData[webhookKey].timeout);
delete this.testWebhookData[webhookKey];
this.activeWebhooks!.removeByWorkflowId(webhookData.workflow.id!.toString());
this.activeWebhooks!.removeWorkflow(workflow);
});
}
@ -125,7 +139,7 @@ export class TestWebhooks {
// Remove test-webhooks automatically if they do not get called (after 120 seconds)
const timeout = setTimeout(() => {
this.cancelTestWebhook(workflowData.id.toString());
this.cancelTestWebhook(workflowData.id.toString(), workflow);
}, 120000);
let key: string;
@ -136,9 +150,12 @@ export class TestWebhooks {
timeout,
workflowData,
};
await this.activeWebhooks!.add(webhookData, mode);
await this.activeWebhooks!.add(workflow, webhookData, mode);
}
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
return true;
}
@ -150,7 +167,7 @@ export class TestWebhooks {
* @returns {boolean}
* @memberof TestWebhooks
*/
cancelTestWebhook(workflowId: string): boolean {
cancelTestWebhook(workflowId: string, workflow: Workflow): boolean {
let foundWebhook = false;
for (const webhookKey of Object.keys(this.testWebhookData)) {
const webhookData = this.testWebhookData[webhookKey];
@ -175,7 +192,7 @@ export class TestWebhooks {
// Remove the webhook
delete this.testWebhookData[webhookKey];
this.activeWebhooks!.removeByWorkflowId(workflowId);
this.activeWebhooks!.removeWorkflow(workflow);
}
return foundWebhook;
@ -189,8 +206,22 @@ export class TestWebhooks {
if (this.activeWebhooks === null) {
return;
}
const nodeTypes = NodeTypes();
return this.activeWebhooks.removeAll();
const findQuery = {
where: {
id: findIn(this.activeWebhooks.getWorkflowIds())
},
} as FindManyOptions;
const workflowsDb = await Db.collections.Workflow!.find(findQuery);
const workflows: Workflow[] = [];
for (const workflowData of workflowsDb) {
const workflow = new Workflow(workflowData.id.toString(), workflowData.nodes, workflowData.connections, workflowData.active, nodeTypes, workflowData.staticData, workflowData.settings);
workflows.push(workflow);
}
return this.activeWebhooks.removeAll(workflows);
}
}

View file

@ -84,9 +84,9 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
* @param {((error: Error | null, data: IResponseCallbackData) => void)} responseCallback
* @returns {(Promise<string | undefined>)}
*/
export async function executeWebhook(webhookData: IWebhookData, workflowData: IWorkflowDb, workflowStartNode: INode, executionMode: WorkflowExecuteMode, sessionId: string | undefined, req: express.Request, res: express.Response, responseCallback: (error: Error | null, data: IResponseCallbackData) => void): Promise<string | undefined> {
export async function executeWebhook(workflow: Workflow, webhookData: IWebhookData, workflowData: IWorkflowDb, workflowStartNode: INode, executionMode: WorkflowExecuteMode, sessionId: string | undefined, req: express.Request, res: express.Response, responseCallback: (error: Error | null, data: IResponseCallbackData) => void): Promise<string | undefined> {
// Get the nodeType to know which responseMode is set
const nodeType = webhookData.workflow.nodeTypes.getByName(workflowStartNode.type);
const nodeType = workflow.nodeTypes.getByName(workflowStartNode.type);
if (nodeType === undefined) {
const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known.`;
responseCallback(new Error(errorMessage), {});
@ -94,8 +94,8 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
}
// Get the responseMode
const responseMode = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseMode'], 'onReceived');
const responseCode = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseCode'], 200) as number;
const responseMode = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseMode'], 'onReceived');
const responseCode = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseCode'], 200) as number;
if (!['onReceived', 'lastNode'].includes(responseMode as string)) {
// If the mode is not known we error. Is probably best like that instead of using
@ -122,7 +122,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
let webhookResultData: IWebhookResponseData;
try {
webhookResultData = await webhookData.workflow.runWebhook(webhookData, workflowStartNode, additionalData, NodeExecuteFunctions, executionMode);
webhookResultData = await workflow.runWebhook(webhookData, workflowStartNode, additionalData, NodeExecuteFunctions, executionMode);
} catch (e) {
// Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';
@ -287,7 +287,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
return data;
}
const responseData = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseData'], 'firstEntryJson');
const responseData = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseData'], 'firstEntryJson');
if (didSendResponse === false) {
let data: IDataObject | IDataObject[];
@ -296,13 +296,13 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
// Return the JSON data of the first entry
data = returnData.data!.main[0]![0].json;
const responsePropertyName = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responsePropertyName'], undefined);
const responsePropertyName = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responsePropertyName'], undefined);
if (responsePropertyName !== undefined) {
data = get(data, responsePropertyName as string) as IDataObject;
}
const responseContentType = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseContentType'], undefined);
const responseContentType = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseContentType'], undefined);
if (responseContentType !== undefined) {
// Send the webhook response manually to be able to set the content-type
@ -329,7 +329,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
didSendResponse = true;
}
const responseBinaryPropertyName = webhookData.workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseBinaryPropertyName'], 'data');
const responseBinaryPropertyName = workflow.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseBinaryPropertyName'], 'data');
if (responseBinaryPropertyName === undefined && didSendResponse === false) {
responseCallback(new Error('No "responseBinaryPropertyName" is set.'), {});

View file

@ -1,6 +1,7 @@
import {
IWebhookData,
WebhookHttpMethod,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
@ -29,29 +30,26 @@ export class ActiveWebhooks {
* @returns {Promise<void>}
* @memberof ActiveWebhooks
*/
async add(webhookData: IWebhookData, mode: WorkflowExecuteMode): Promise<void> {
if (webhookData.workflow.id === undefined) {
async add(workflow: Workflow, webhookData: IWebhookData, mode: WorkflowExecuteMode): Promise<void> {
if (workflow.id === undefined) {
throw new Error('Webhooks can only be added for saved workflows as an id is needed!');
}
if (this.workflowWebhooks[webhookData.workflow.id] === undefined) {
this.workflowWebhooks[webhookData.workflow.id] = [];
if (this.workflowWebhooks[webhookData.workflowId] === undefined) {
this.workflowWebhooks[webhookData.workflowId] = [];
}
// Make the webhook available directly because sometimes to create it successfully
// it gets called
this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)] = webhookData;
const webhookExists = await webhookData.workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
if (webhookExists === false) {
// If webhook does not exist yet create it
await webhookData.workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
}
// Run the "activate" hooks on the nodes
await webhookData.workflow.runNodeHooks('activate', webhookData, NodeExecuteFunctions, mode);
this.workflowWebhooks[webhookData.workflow.id].push(webhookData);
this.workflowWebhooks[webhookData.workflowId].push(webhookData);
}
@ -73,6 +71,17 @@ export class ActiveWebhooks {
}
/**
* Returns the ids of all the workflows which have active webhooks
*
* @returns {string[]}
* @memberof ActiveWebhooks
*/
getWorkflowIds(): string[] {
return Object.keys(this.workflowWebhooks);
}
/**
* Returns key to uniquely identify a webhook
*
@ -89,11 +98,13 @@ export class ActiveWebhooks {
/**
* Removes all webhooks of a workflow
*
* @param {string} workflowId
* @param {Workflow} workflow
* @returns {boolean}
* @memberof ActiveWebhooks
*/
async removeByWorkflowId(workflowId: string): Promise<boolean> {
async removeWorkflow(workflow: Workflow): Promise<boolean> {
const workflowId = workflow.id!.toString();
if (this.workflowWebhooks[workflowId] === undefined) {
// If it did not exist then there is nothing to remove
return false;
@ -105,10 +116,7 @@ export class ActiveWebhooks {
// Go through all the registered webhooks of the workflow and remove them
for (const webhookData of webhooks) {
await webhookData.workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
// Run the "deactivate" hooks on the nodes
await webhookData.workflow.runNodeHooks('deactivate', webhookData, NodeExecuteFunctions, mode);
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
delete this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)];
}
@ -121,55 +129,16 @@ export class ActiveWebhooks {
/**
* Removes all the currently active webhooks
* Removes all the webhooks of the given workflow
*/
async removeAll(): Promise<void> {
const workflowIds = Object.keys(this.workflowWebhooks);
async removeAll(workflows: Workflow[]): Promise<void> {
const removePromises = [];
for (const workflowId of workflowIds) {
removePromises.push(this.removeByWorkflowId(workflowId));
for (const workflow of workflows) {
removePromises.push(this.removeWorkflow(workflow));
}
await Promise.all(removePromises);
return;
}
// /**
// * Removes a single webhook by its key.
// * Currently not used, runNodeHooks for "deactivate" is missing
// *
// * @param {string} webhookKey
// * @returns {boolean}
// * @memberof ActiveWebhooks
// */
// removeByWebhookKey(webhookKey: string): boolean {
// if (this.webhookUrls[webhookKey] === undefined) {
// // If it did not exist then there is nothing to remove
// return false;
// }
// const webhookData = this.webhookUrls[webhookKey];
// // Remove from workflow-webhooks
// const workflowWebhooks = this.workflowWebhooks[webhookData.workflowId];
// for (let index = 0; index < workflowWebhooks.length; index++) {
// if (workflowWebhooks[index].path === webhookData.path) {
// workflowWebhooks.splice(index, 1);
// break;
// }
// }
// if (workflowWebhooks.length === 0) {
// // When there are no webhooks left for any workflow remove it totally
// delete this.workflowWebhooks[webhookData.workflowId];
// }
// // Remove from webhook urls
// delete this.webhookUrls[webhookKey];
// return true;
// }
}

View file

@ -69,9 +69,7 @@ export class ActiveWorkflows {
async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions): Promise<void> {
console.log('ADD ID (active): ' + id);
this.workflowData[id] = {
workflow
};
this.workflowData[id] = {};
const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined;
@ -170,7 +168,6 @@ export class ActiveWorkflows {
const pollResponse = await workflow.runPoll(node, pollFunctions);
if (pollResponse !== null) {
// TODO: Run workflow
pollFunctions.__emit(pollResponse);
}
};

View file

@ -14,7 +14,6 @@ import {
ITriggerResponse,
IWebhookFunctions as IWebhookFunctionsBase,
IWorkflowSettings as IWorkflowSettingsWorkflow,
Workflow,
} from 'n8n-workflow';
@ -125,5 +124,4 @@ export interface INodeInputDataConnections {
export interface IWorkflowData {
pollResponses?: IPollResponse[];
triggerResponses?: ITriggerResponse[];
workflow: Workflow;
}

View file

@ -495,7 +495,7 @@ export interface IWebhookData {
node: string;
path: string;
webhookDescription: IWebhookDescription;
workflow: Workflow;
workflowId: string;
workflowExecuteAdditionalData: IWorkflowExecuteAdditionalData;
}

View file

@ -771,7 +771,7 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
node: node.name,
path,
webhookDescription,
workflow,
workflowId: workflow.id,
workflowExecuteAdditionalData: additionalData,
});
}

View file

@ -907,41 +907,6 @@ export class Workflow {
}
/**
* Executes the hooks of the node
*
* @param {string} hookName The name of the hook to execute
* @param {IWebhookData} webhookData
* @param {INodeExecuteFunctions} nodeExecuteFunctions
* @param {WorkflowExecuteMode} mode
* @returns {Promise<void>}
* @memberof Workflow
*/
async runNodeHooks(hookName: string, webhookData: IWebhookData, nodeExecuteFunctions: INodeExecuteFunctions, mode: WorkflowExecuteMode): Promise<void> {
const node = this.getNode(webhookData.node) as INode;
const nodeType = this.nodeTypes.getByName(node.type) as INodeType;
if (nodeType.description.hooks === undefined) {
return;
}
if (nodeType.description.hooks[hookName] === undefined) {
return;
}
if (nodeType.hooks === undefined && nodeType.description.hooks[hookName]!.length !== 0) {
// There should be hook functions but they do not exist
throw new Error('There are hooks defined to run but are not implemented.');
}
for (const hookDescription of nodeType.description.hooks[hookName]!) {
const thisArgs = nodeExecuteFunctions.getExecuteHookFunctions(this, node, webhookData.workflowExecuteAdditionalData, mode);
await nodeType.hooks![hookDescription.method].call(thisArgs);
}
}
/**
* Executes the Webhooks method of the node