diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 9fef6ed243..c7f7ea1d69 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -11,11 +11,11 @@ import { WorkflowHelpers, WorkflowRunner, WorkflowExecuteAdditionalData, + IWebhookDb, } from './'; import { ActiveWorkflows, - ActiveWebhooks, NodeExecuteFunctions, } from 'n8n-core'; @@ -26,7 +26,7 @@ import { INode, INodeExecutionData, IRunExecutionData, - IWebhookData, + NodeHelpers, IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow, WebhookHttpMethod, Workflow, @@ -35,22 +35,23 @@ import { import * as express from 'express'; - export class ActiveWorkflowRunner { private activeWorkflows: ActiveWorkflows | null = null; - private activeWebhooks: ActiveWebhooks | null = null; + private activationErrors: { [key: string]: IActivationError; } = {}; async init() { + // Get the active workflows from database + + // NOTE + // Here I guess we can have a flag on the workflow table like hasTrigger + // so intead of pulling all the active wehhooks just pull the actives that have a trigger const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[]; - this.activeWebhooks = new ActiveWebhooks(); - - // Add them as active workflows this.activeWorkflows = new ActiveWorkflows(); if (workflowsData.length !== 0) { @@ -58,20 +59,27 @@ export class ActiveWorkflowRunner { console.log(' Start Active Workflows:'); console.log(' ================================'); + const nodeTypes = NodeTypes(); + for (const workflowData of workflowsData) { - console.log(` - ${workflowData.name}`); - try { - await this.add(workflowData.id.toString(), workflowData); - console.log(` => Started`); - } catch (error) { - console.log(` => ERROR: Workflow could not be activated:`); - console.log(` ${error.message}`); + + const workflow = new Workflow({ id: workflowData.id.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings}); + + if (workflow.getTriggerNodes().length !== 0 + || workflow.getPollNodes().length !== 0) { + console.log(` - ${workflowData.name}`); + try { + await this.add(workflowData.id.toString(), workflowData); + console.log(` => Started`); + } catch (error) { + console.log(` => ERROR: Workflow could not be activated:`); + console.log(` ${error.message}`); + } } } } } - /** * Removes all the currently active workflows * @@ -94,7 +102,6 @@ export class ActiveWorkflowRunner { return; } - /** * Checks if a webhook for the given method and path exists and executes the workflow. * @@ -110,30 +117,41 @@ export class ActiveWorkflowRunner { throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404); } - const webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path); + const webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb; - if (webhookData === undefined) { + // check if something exist + if (webhook === undefined) { // The requested webhook is not registered throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404); } - const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId); + const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId); if (workflowData === undefined) { - throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404); + throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhook.workflowId}"`, 404, 404); } const nodeTypes = NodeTypes(); - const workflow = new Workflow({ id: webhookData.workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings}); + const workflow = new Workflow({ id: webhook.workflowId.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings}); + + const credentials = await WorkflowCredentials([workflow.getNode(webhook.node as string) as INode]); + + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); + + const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(webhook.node as string) as INode, additionalData).filter((webhook) => { + return (webhook.httpMethod === httpMethod && webhook.path === path); + })[0]; // Get the node which has the webhook defined to know where to start from and to // get additional data const workflowStartNode = workflow.getNode(webhookData.node); + if (workflowStartNode === null) { throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404); } return new Promise((resolve, reject) => { const executionMode = 'webhook'; + //@ts-ignore WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => { if (error !== null) { return reject(error); @@ -143,19 +161,14 @@ export class ActiveWorkflowRunner { }); } - /** * Returns the ids of the currently active workflows * * @returns {string[]} * @memberof ActiveWorkflowRunner */ - getActiveWorkflows(): string[] { - if (this.activeWorkflows === null) { - return []; - } - - return this.activeWorkflows.allActiveWorkflows(); + getActiveWorkflows(): Promise { + return Db.collections.Workflow?.find({ select: ['id'] }) as Promise; } @@ -166,15 +179,11 @@ export class ActiveWorkflowRunner { * @returns {boolean} * @memberof ActiveWorkflowRunner */ - isActive(id: string): boolean { - if (this.activeWorkflows !== null) { - return this.activeWorkflows.isActive(id); - } - - return false; + async isActive(id: string): Promise { + const workflow = await Db.collections.Workflow?.findOne({ id }) as IWorkflowDb; + return workflow?.active as boolean; } - /** * Return error if there was a problem activating the workflow * @@ -190,7 +199,6 @@ export class ActiveWorkflowRunner { return this.activationErrors[id]; } - /** * Adds all the webhooks of the workflow * @@ -202,12 +210,69 @@ export class ActiveWorkflowRunner { */ async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise { const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData); + let path = '' as string | undefined; for (const webhookData of webhooks) { - await this.activeWebhooks!.add(workflow, webhookData, mode); - // Save static data! - await WorkflowHelpers.saveStaticData(workflow); + + const node = workflow.getNode(webhookData.node) as INode; + node.name = webhookData.node; + + path = node.parameters.path as string; + + if (node.parameters.path === undefined) { + path = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['path']) as string | undefined; + + if (path === undefined) { + // TODO: Use a proper logger + console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflow.id}".`); + continue; + } + } + + const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['isFullPath'], false) as boolean; + + const webhook = { + workflowId: webhookData.workflowId, + webhookPath: NodeHelpers.getNodeWebhookPath(workflow.id as string, node, path, isFullPath), + node: node.name, + method: webhookData.httpMethod, + } as IWebhookDb; + + try { + + await Db.collections.Webhook?.insert(webhook); + + const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false); + if (webhookExists === false) { + // If webhook does not exist yet create it + await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false); + } + + } catch (error) { + + let errorMessage = ''; + + await Db.collections.Webhook?.delete({ workflowId: workflow.id }); + + // if it's a workflow from the the insert + // TODO check if there is standard error code for deplicate key violation that works + // with all databases + if (error.name === 'MongoError' || error.name === 'QueryFailedError') { + + errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`; + + } else if (error.detail) { + // it's a error runnig the webhook methods (checkExists, create) + errorMessage = error.detail; + } else { + errorMessage = error.message; + } + + throw new Error(errorMessage); + } } + // Save static data! + await WorkflowHelpers.saveStaticData(workflow); } @@ -227,13 +292,29 @@ export class ActiveWorkflowRunner { const nodeTypes = NodeTypes(); const workflow = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings }); - await this.activeWebhooks!.removeWorkflow(workflow); + const mode = 'internal'; - // Save the static workflow data if needed - await WorkflowHelpers.saveStaticData(workflow); + const credentials = await WorkflowCredentials(workflowData.nodes); + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); + + const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData); + + for (const webhookData of webhooks) { + await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false); + } + + // if it's a mongo objectId convert it to string + if (typeof workflowData.id === 'object') { + workflowData.id = workflowData.id.toString(); + } + + const webhook = { + workflowId: workflowData.id, + } as IWebhookDb; + + await Db.collections.Webhook?.delete(webhook); } - /** * Runs the given workflow * @@ -322,7 +403,6 @@ export class ActiveWorkflowRunner { }); } - /** * Makes a workflow active * @@ -361,7 +441,11 @@ export class ActiveWorkflowRunner { // Add the workflows which have webhooks defined await this.addWorkflowWebhooks(workflowInstance, additionalData, mode); - await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions); + + if (workflowInstance.getTriggerNodes().length !== 0 + || workflowInstance.getPollNodes().length !== 0) { + await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions); + } if (this.activationErrors[workflowId] !== undefined) { // If there were activation errors delete them @@ -386,7 +470,6 @@ export class ActiveWorkflowRunner { await WorkflowHelpers.saveStaticData(workflowInstance!); } - /** * Makes a workflow inactive * @@ -395,6 +478,7 @@ export class ActiveWorkflowRunner { * @memberof ActiveWorkflowRunner */ async remove(workflowId: string): Promise { + if (this.activeWorkflows !== null) { // Remove all the webhooks of the workflow await this.removeWorkflowWebhooks(workflowId); @@ -404,8 +488,13 @@ export class ActiveWorkflowRunner { delete this.activationErrors[workflowId]; } - // Remove the workflow from the "list" of active workflows - return this.activeWorkflows.remove(workflowId); + // if it's active in memory then it's a trigger + // so remove from list of actives workflows + if (this.activeWorkflows.isActive(workflowId)) { + this.activeWorkflows.remove(workflowId); + } + + return; } throw new Error(`The "activeWorkflows" instance did not get initialized yet.`); diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index efda1f6366..3739b779ef 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -29,22 +29,27 @@ export let collections: IDatabaseCollections = { Credentials: null, Execution: null, Workflow: null, + Webhook: null, }; import { - InitialMigration1587669153312 + InitialMigration1587669153312, + WebhookModel1589476000887, } from './databases/postgresdb/migrations'; import { - InitialMigration1587563438936 + InitialMigration1587563438936, + WebhookModel1592679094242, } from './databases/mongodb/migrations'; import { - InitialMigration1588157391238 + InitialMigration1588157391238, + WebhookModel1592447867632, } from './databases/mysqldb/migrations'; import { - InitialMigration1588102412422 + InitialMigration1588102412422, + WebhookModel1592445003908, } from './databases/sqlite/migrations'; import * as path from 'path'; @@ -66,7 +71,7 @@ export async function init(): Promise { entityPrefix, url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string, useNewUrlParser: true, - migrations: [InitialMigration1587563438936], + migrations: [InitialMigration1587563438936, WebhookModel1592679094242], migrationsRun: true, migrationsTableName: `${entityPrefix}migrations`, }; @@ -99,7 +104,7 @@ export async function init(): Promise { port: await GenericHelpers.getConfigValue('database.postgresdb.port') as number, username: await GenericHelpers.getConfigValue('database.postgresdb.user') as string, schema: config.get('database.postgresdb.schema'), - migrations: [InitialMigration1587669153312], + migrations: [InitialMigration1587669153312, WebhookModel1589476000887], migrationsRun: true, migrationsTableName: `${entityPrefix}migrations`, ssl, @@ -118,7 +123,7 @@ export async function init(): Promise { password: await GenericHelpers.getConfigValue('database.mysqldb.password') as string, port: await GenericHelpers.getConfigValue('database.mysqldb.port') as number, username: await GenericHelpers.getConfigValue('database.mysqldb.user') as string, - migrations: [InitialMigration1588157391238], + migrations: [InitialMigration1588157391238, WebhookModel1592447867632], migrationsRun: true, migrationsTableName: `${entityPrefix}migrations`, }; @@ -130,7 +135,7 @@ export async function init(): Promise { type: 'sqlite', database: path.join(n8nFolder, 'database.sqlite'), entityPrefix, - migrations: [InitialMigration1588102412422], + migrations: [InitialMigration1588102412422, WebhookModel1592445003908], migrationsRun: true, migrationsTableName: `${entityPrefix}migrations`, }; @@ -155,6 +160,7 @@ export async function init(): Promise { collections.Credentials = getRepository(entities.CredentialsEntity); collections.Execution = getRepository(entities.ExecutionEntity); collections.Workflow = getRepository(entities.WorkflowEntity); + collections.Webhook = getRepository(entities.WebhookEntity); return collections; } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index b9f7144bf4..2aecd27466 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -49,8 +49,15 @@ export interface IDatabaseCollections { Credentials: Repository | null; Execution: Repository | null; Workflow: Repository | null; + Webhook: Repository | null; } +export interface IWebhookDb { + workflowId: number | string | ObjectID; + webhookPath: string; + method: string; + node: string; +} export interface IWorkflowBase extends IWorkflowBaseWorkflow { id?: number | string | ObjectID; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 1a0b75842a..e4093e5a15 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -457,7 +457,9 @@ class App { await this.externalHooks.run('workflow.update', [newWorkflowData]); - if (this.activeWorkflowRunner.isActive(id)) { + const isActive = await this.activeWorkflowRunner.isActive(id); + + if (isActive) { // When workflow gets saved always remove it as the triggers could have been // changed and so the changes would not take effect await this.activeWorkflowRunner.remove(id); @@ -526,7 +528,9 @@ class App { await this.externalHooks.run('workflow.delete', [id]); - if (this.activeWorkflowRunner.isActive(id)) { + const isActive = await this.activeWorkflowRunner.isActive(id); + + if (isActive) { // Before deleting a workflow deactivate it await this.activeWorkflowRunner.remove(id); } @@ -666,7 +670,8 @@ class App { // Returns the active workflow ids this.app.get(`/${this.restEndpoint}/active`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { - return this.activeWorkflowRunner.getActiveWorkflows(); + const activeWorkflows = await this.activeWorkflowRunner.getActiveWorkflows(); + return activeWorkflows.map(workflow => workflow.id.toString()) as string[]; })); diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index 45ae624e2b..a20fa76c88 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -141,12 +141,14 @@ export class TestWebhooks { let key: string; for (const webhookData of webhooks) { key = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path); + + await this.activeWebhooks!.add(workflow, webhookData, mode); + this.testWebhookData[key] = { sessionId, timeout, workflowData, }; - await this.activeWebhooks!.add(workflow, webhookData, mode); // Save static data! this.testWebhookData[key].workflowData.staticData = workflow.staticData; diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 64e4c101ff..cbfb7be95c 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -69,6 +69,33 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo return returnData; } +/** + * Returns all the webhooks which should be created for the give workflow + * + * @export + * @param {string} workflowId + * @param {Workflow} workflow + * @returns {IWebhookData[]} + */ +export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { + // Check all the nodes in the workflow if they have webhooks + + const returnData: IWebhookData[] = []; + + let parentNodes: string[] | undefined; + + for (const node of Object.values(workflow.nodes)) { + if (parentNodes !== undefined && !parentNodes.includes(node.name)) { + // If parentNodes are given check only them if they have webhooks + // and no other ones + continue; + } + returnData.push.apply(returnData, NodeHelpers.getNodeWebhooksBasic(workflow, node)); + } + + return returnData; +} + /** * Executes a webhook diff --git a/packages/cli/src/databases/mongodb/WebhookEntity.ts b/packages/cli/src/databases/mongodb/WebhookEntity.ts new file mode 100644 index 0000000000..dbf90f3da1 --- /dev/null +++ b/packages/cli/src/databases/mongodb/WebhookEntity.ts @@ -0,0 +1,30 @@ +import { + Column, + Entity, + Index, + ObjectID, + ObjectIdColumn, +} from 'typeorm'; + +import { + IWebhookDb, + } from '../../Interfaces'; + +@Entity() +export class WebhookEntity implements IWebhookDb { + + @ObjectIdColumn() + id: ObjectID; + + @Column() + workflowId: number; + + @Column() + webhookPath: string; + + @Column() + method: string; + + @Column() + node: string; +} diff --git a/packages/cli/src/databases/mongodb/index.ts b/packages/cli/src/databases/mongodb/index.ts index 164d67fd0c..bd6b9abd60 100644 --- a/packages/cli/src/databases/mongodb/index.ts +++ b/packages/cli/src/databases/mongodb/index.ts @@ -1,3 +1,5 @@ export * from './CredentialsEntity'; export * from './ExecutionEntity'; export * from './WorkflowEntity'; +export * from './WebhookEntity'; + diff --git a/packages/cli/src/databases/mongodb/migrations/1592679094242-WebhookModel.ts b/packages/cli/src/databases/mongodb/migrations/1592679094242-WebhookModel.ts new file mode 100644 index 0000000000..c05a44f765 --- /dev/null +++ b/packages/cli/src/databases/mongodb/migrations/1592679094242-WebhookModel.ts @@ -0,0 +1,57 @@ +import { + MigrationInterface, +} from 'typeorm'; + +import { + IWorkflowDb, + NodeTypes, + WebhookHelpers, +} from '../../..'; + +import { + Workflow, +} from 'n8n-workflow/dist/src/Workflow'; + +import { + IWebhookDb, +} from '../../../Interfaces'; + +import * as config from '../../../../config'; + +import { + MongoQueryRunner, +} from 'typeorm/driver/mongodb/MongoQueryRunner'; + +export class WebhookModel1592679094242 implements MigrationInterface { + name = 'WebhookModel1592679094242'; + + async up(queryRunner: MongoQueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + const workflows = await queryRunner.cursor( `${tablePrefix}workflow_entity`, { active: true }).toArray() as IWorkflowDb[]; + const data: IWebhookDb[] = []; + const nodeTypes = NodeTypes(); + for (const workflow of workflows) { + const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings }); + const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance); + for (const webhook of webhooks) { + data.push({ + workflowId: workflowInstance.id as string, + webhookPath: webhook.path, + method: webhook.httpMethod, + node: webhook.node, + }); + } + } + + if (data.length !== 0) { + await queryRunner.manager.insertMany(`${tablePrefix}webhook_entity`, data); + } + + await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false }); + } + + async down(queryRunner: MongoQueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + await queryRunner.dropTable(`${tablePrefix}webhook_entity`); + } +} diff --git a/packages/cli/src/databases/mongodb/migrations/index.ts b/packages/cli/src/databases/mongodb/migrations/index.ts index a60bdc7cf8..4072ab582d 100644 --- a/packages/cli/src/databases/mongodb/migrations/index.ts +++ b/packages/cli/src/databases/mongodb/migrations/index.ts @@ -1 +1,2 @@ export * from './1587563438936-InitialMigration'; +export * from './1592679094242-WebhookModel'; diff --git a/packages/cli/src/databases/mysqldb/WebhookEntity.ts b/packages/cli/src/databases/mysqldb/WebhookEntity.ts new file mode 100644 index 0000000000..a78fd34ae9 --- /dev/null +++ b/packages/cli/src/databases/mysqldb/WebhookEntity.ts @@ -0,0 +1,25 @@ +import { + Column, + Entity, + PrimaryColumn, +} from 'typeorm'; + +import { + IWebhookDb, + } from '../../Interfaces'; + +@Entity() +export class WebhookEntity implements IWebhookDb { + + @Column() + workflowId: number; + + @PrimaryColumn() + webhookPath: string; + + @PrimaryColumn() + method: string; + + @Column() + node: string; +} diff --git a/packages/cli/src/databases/mysqldb/index.ts b/packages/cli/src/databases/mysqldb/index.ts index 164d67fd0c..a3494531db 100644 --- a/packages/cli/src/databases/mysqldb/index.ts +++ b/packages/cli/src/databases/mysqldb/index.ts @@ -1,3 +1,4 @@ export * from './CredentialsEntity'; export * from './ExecutionEntity'; export * from './WorkflowEntity'; +export * from './WebhookEntity'; diff --git a/packages/cli/src/databases/mysqldb/migrations/1592447867632-WebhookModel.ts b/packages/cli/src/databases/mysqldb/migrations/1592447867632-WebhookModel.ts new file mode 100644 index 0000000000..8a49080462 --- /dev/null +++ b/packages/cli/src/databases/mysqldb/migrations/1592447867632-WebhookModel.ts @@ -0,0 +1,59 @@ +import { + MigrationInterface, + QueryRunner, +} from 'typeorm'; + +import * as config from '../../../../config'; + +import { + IWorkflowDb, + NodeTypes, + WebhookHelpers, +} from '../../..'; + +import { + Workflow, +} from 'n8n-workflow'; + +import { + IWebhookDb, +} from '../../../Interfaces'; + +export class WebhookModel1592447867632 implements MigrationInterface { + name = 'WebhookModel1592447867632'; + + async up(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity (workflowId int NOT NULL, webhookPath varchar(255) NOT NULL, method varchar(255) NOT NULL, node varchar(255) NOT NULL, PRIMARY KEY (webhookPath, method)) ENGINE=InnoDB`); + + const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[]; + const data: IWebhookDb[] = []; + const nodeTypes = NodeTypes(); + for (const workflow of workflows) { + const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings }); + const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance); + for (const webhook of webhooks) { + data.push({ + workflowId: workflowInstance.id as string, + webhookPath: webhook.path, + method: webhook.httpMethod, + node: webhook.node, + }); + } + } + + if (data.length !== 0) { + await queryRunner.manager.createQueryBuilder() + .insert() + .into(`${tablePrefix}webhook_entity`) + .values(data) + .execute(); + } + } + + async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`); + } +} diff --git a/packages/cli/src/databases/mysqldb/migrations/index.ts b/packages/cli/src/databases/mysqldb/migrations/index.ts index ac2dcab467..9e5abd3b8c 100644 --- a/packages/cli/src/databases/mysqldb/migrations/index.ts +++ b/packages/cli/src/databases/mysqldb/migrations/index.ts @@ -1 +1,2 @@ -export * from './1588157391238-InitialMigration'; \ No newline at end of file +export * from './1588157391238-InitialMigration'; +export * from './1592447867632-WebhookModel'; diff --git a/packages/cli/src/databases/postgresdb/WebhookEntity.ts b/packages/cli/src/databases/postgresdb/WebhookEntity.ts new file mode 100644 index 0000000000..6e511cde74 --- /dev/null +++ b/packages/cli/src/databases/postgresdb/WebhookEntity.ts @@ -0,0 +1,25 @@ +import { + Column, + Entity, + PrimaryColumn, +} from 'typeorm'; + +import { + IWebhookDb, + } from '../../'; + +@Entity() +export class WebhookEntity implements IWebhookDb { + + @Column() + workflowId: number; + + @PrimaryColumn() + webhookPath: string; + + @PrimaryColumn() + method: string; + + @Column() + node: string; +} diff --git a/packages/cli/src/databases/postgresdb/index.ts b/packages/cli/src/databases/postgresdb/index.ts index 164d67fd0c..bd6b9abd60 100644 --- a/packages/cli/src/databases/postgresdb/index.ts +++ b/packages/cli/src/databases/postgresdb/index.ts @@ -1,3 +1,5 @@ export * from './CredentialsEntity'; export * from './ExecutionEntity'; export * from './WorkflowEntity'; +export * from './WebhookEntity'; + diff --git a/packages/cli/src/databases/postgresdb/migrations/1587669153312-InitialMigration.ts b/packages/cli/src/databases/postgresdb/migrations/1587669153312-InitialMigration.ts index 29a80e434f..eace7a92fb 100644 --- a/packages/cli/src/databases/postgresdb/migrations/1587669153312-InitialMigration.ts +++ b/packages/cli/src/databases/postgresdb/migrations/1587669153312-InitialMigration.ts @@ -1,4 +1,5 @@ -import { MigrationInterface, QueryRunner } from 'typeorm'; +import { + MigrationInterface, QueryRunner } from 'typeorm'; import * as config from '../../../../config'; diff --git a/packages/cli/src/databases/postgresdb/migrations/1589476000887-WebhookModel.ts b/packages/cli/src/databases/postgresdb/migrations/1589476000887-WebhookModel.ts new file mode 100644 index 0000000000..e53fc28915 --- /dev/null +++ b/packages/cli/src/databases/postgresdb/migrations/1589476000887-WebhookModel.ts @@ -0,0 +1,69 @@ +import { + MigrationInterface, + QueryRunner, +} from 'typeorm'; + +import { + IWorkflowDb, + NodeTypes, + WebhookHelpers, +} from '../../..'; + +import { + Workflow, +} from 'n8n-workflow'; + +import { + IWebhookDb, +} from '../../../Interfaces'; + +import * as config from '../../../../config'; + +export class WebhookModel1589476000887 implements MigrationInterface { + name = 'WebhookModel1589476000887'; + + async up(queryRunner: QueryRunner): Promise { + let tablePrefix = config.get('database.tablePrefix'); + const tablePrefixIndex = tablePrefix; + const schema = config.get('database.postgresdb.schema'); + if (schema) { + tablePrefix = schema + '.' + tablePrefix; + } + + await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined); + + const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[]; + const data: IWebhookDb[] = []; + const nodeTypes = NodeTypes(); + for (const workflow of workflows) { + const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings }); + const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance); + for (const webhook of webhooks) { + data.push({ + workflowId: workflowInstance.id as string, + webhookPath: webhook.path, + method: webhook.httpMethod, + node: webhook.node, + }); + } + } + + if (data.length !== 0) { + await queryRunner.manager.createQueryBuilder() + .insert() + .into(`${tablePrefix}webhook_entity`) + .values(data) + .execute(); + } + } + + async down(queryRunner: QueryRunner): Promise { + let tablePrefix = config.get('database.tablePrefix'); + const schema = config.get('database.postgresdb.schema'); + if (schema) { + tablePrefix = schema + '.' + tablePrefix; + } + await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`, undefined); + } + +} diff --git a/packages/cli/src/databases/postgresdb/migrations/index.ts b/packages/cli/src/databases/postgresdb/migrations/index.ts index 5bb6551492..827f01796c 100644 --- a/packages/cli/src/databases/postgresdb/migrations/index.ts +++ b/packages/cli/src/databases/postgresdb/migrations/index.ts @@ -1 +1,3 @@ export * from './1587669153312-InitialMigration'; +export * from './1589476000887-WebhookModel'; + diff --git a/packages/cli/src/databases/sqlite/WebhookEntity.ts b/packages/cli/src/databases/sqlite/WebhookEntity.ts new file mode 100644 index 0000000000..a78fd34ae9 --- /dev/null +++ b/packages/cli/src/databases/sqlite/WebhookEntity.ts @@ -0,0 +1,25 @@ +import { + Column, + Entity, + PrimaryColumn, +} from 'typeorm'; + +import { + IWebhookDb, + } from '../../Interfaces'; + +@Entity() +export class WebhookEntity implements IWebhookDb { + + @Column() + workflowId: number; + + @PrimaryColumn() + webhookPath: string; + + @PrimaryColumn() + method: string; + + @Column() + node: string; +} diff --git a/packages/cli/src/databases/sqlite/index.ts b/packages/cli/src/databases/sqlite/index.ts index 2c7d6e25e9..a3494531db 100644 --- a/packages/cli/src/databases/sqlite/index.ts +++ b/packages/cli/src/databases/sqlite/index.ts @@ -1,4 +1,4 @@ export * from './CredentialsEntity'; export * from './ExecutionEntity'; export * from './WorkflowEntity'; - +export * from './WebhookEntity'; diff --git a/packages/cli/src/databases/sqlite/migrations/1588102412422-InitialMigration.ts b/packages/cli/src/databases/sqlite/migrations/1588102412422-InitialMigration.ts index c2bb55040e..09a0da911a 100644 --- a/packages/cli/src/databases/sqlite/migrations/1588102412422-InitialMigration.ts +++ b/packages/cli/src/databases/sqlite/migrations/1588102412422-InitialMigration.ts @@ -1,4 +1,7 @@ -import { MigrationInterface, QueryRunner } from "typeorm"; +import { + MigrationInterface, + QueryRunner, +} from 'typeorm'; import * as config from '../../../../config'; diff --git a/packages/cli/src/databases/sqlite/migrations/1592445003908-WebhookModel.ts b/packages/cli/src/databases/sqlite/migrations/1592445003908-WebhookModel.ts new file mode 100644 index 0000000000..92704482b2 --- /dev/null +++ b/packages/cli/src/databases/sqlite/migrations/1592445003908-WebhookModel.ts @@ -0,0 +1,63 @@ +import { + MigrationInterface, + QueryRunner, +} from 'typeorm'; + +import * as config from '../../../../config'; + +import { + IWorkflowDb, + NodeTypes, + WebhookHelpers, +} from '../../..'; + +import { + Workflow, +} from 'n8n-workflow'; + +import { + IWebhookDb, +} from '../../../Interfaces'; + +export class WebhookModel1592445003908 implements MigrationInterface { + name = 'WebhookModel1592445003908'; + + async up(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`); + + const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[]; + const data: IWebhookDb[] = []; + const nodeTypes = NodeTypes(); + for (const workflow of workflows) { + workflow.nodes = JSON.parse(workflow.nodes as unknown as string); + workflow.connections = JSON.parse(workflow.connections as unknown as string); + workflow.staticData = JSON.parse(workflow.staticData as unknown as string); + workflow.settings = JSON.parse(workflow.settings as unknown as string); + const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings }); + const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance); + for (const webhook of webhooks) { + data.push({ + workflowId: workflowInstance.id as string, + webhookPath: webhook.path, + method: webhook.httpMethod, + node: webhook.node, + }); + } + } + + if (data.length !== 0) { + await queryRunner.manager.createQueryBuilder() + .insert() + .into(`${tablePrefix}webhook_entity`) + .values(data) + .execute(); + } + } + + async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`); + } +} diff --git a/packages/cli/src/databases/sqlite/migrations/index.ts b/packages/cli/src/databases/sqlite/migrations/index.ts index 8d9a0a0b16..a830a007c7 100644 --- a/packages/cli/src/databases/sqlite/migrations/index.ts +++ b/packages/cli/src/databases/sqlite/migrations/index.ts @@ -1 +1,2 @@ -export * from './1588102412422-InitialMigration'; \ No newline at end of file +export * from './1588102412422-InitialMigration'; +export * from './1592445003908-WebhookModel'; diff --git a/packages/core/src/ActiveWebhooks.ts b/packages/core/src/ActiveWebhooks.ts index 17cf753830..529fe01af3 100644 --- a/packages/core/src/ActiveWebhooks.ts +++ b/packages/core/src/ActiveWebhooks.ts @@ -35,13 +35,20 @@ export class ActiveWebhooks { throw new Error('Webhooks can only be added for saved workflows as an id is needed!'); } + const webhookKey = this.getWebhookKey(webhookData.httpMethod, webhookData.path); + + //check that there is not a webhook already registed with that path/method + if (this.webhookUrls[webhookKey] !== undefined) { + throw new Error(`Test-Webhook can not be activated because another one with the same method "${webhookData.httpMethod}" and path "${webhookData.path}" is already active!`); + } + if (this.workflowWebhooks[webhookData.workflowId] === undefined) { this.workflowWebhooks[webhookData.workflowId] = []; } // Make the webhook available directly because sometimes to create it successfully // it gets called - this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)] = webhookData; + this.webhookUrls[webhookKey] = webhookData; const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks); if (webhookExists === false) { diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 5efbe32914..f9811099ec 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -418,7 +418,8 @@ export function getNodeWebhookUrl(name: string, workflow: Workflow, node: INode, return undefined; } - return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString()); + const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean; + return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath); } diff --git a/packages/editor-ui/package.json b/packages/editor-ui/package.json index 6298fdb18a..552fe79330 100644 --- a/packages/editor-ui/package.json +++ b/packages/editor-ui/package.json @@ -23,7 +23,9 @@ "test:e2e": "vue-cli-service test:e2e", "test:unit": "vue-cli-service test:unit" }, - "dependencies": {}, + "dependencies": { + "uuid": "^8.1.0" + }, "devDependencies": { "@beyonk/google-fonts-webpack-plugin": "^1.2.3", "@fortawesome/fontawesome-svg-core": "^1.2.19", diff --git a/packages/editor-ui/src/components/NodeWebhooks.vue b/packages/editor-ui/src/components/NodeWebhooks.vue index b649575a0b..e2875a49fd 100644 --- a/packages/editor-ui/src/components/NodeWebhooks.vue +++ b/packages/editor-ui/src/components/NodeWebhooks.vue @@ -110,8 +110,9 @@ export default mixins( const workflowId = this.$store.getters.workflowId; const path = this.getValue(webhookData, 'path'); + const isFullPath = this.getValue(webhookData, 'isFullPath') as unknown as boolean || false; - return NodeHelpers.getNodeWebhookUrl(baseUrl, workflowId, this.node, path); + return NodeHelpers.getNodeWebhookUrl(baseUrl, workflowId, this.node, path, isFullPath); }, }, watch: { diff --git a/packages/editor-ui/src/views/NodeView.vue b/packages/editor-ui/src/views/NodeView.vue index de89534e09..e22b98f595 100644 --- a/packages/editor-ui/src/views/NodeView.vue +++ b/packages/editor-ui/src/views/NodeView.vue @@ -126,6 +126,8 @@ import RunData from '@/components/RunData.vue'; import mixins from 'vue-typed-mixins'; +import { v4 as uuidv4 } from 'uuid'; + import { debounce } from 'lodash'; import axios from 'axios'; import { @@ -946,6 +948,10 @@ export default mixins( // Check if node-name is unique else find one that is newNodeData.name = this.getUniqueNodeName(newNodeData.name); + if (nodeTypeData.webhooks && nodeTypeData.webhooks.length) { + newNodeData.webhookId = uuidv4(); + } + await this.addNodes([newNodeData]); // Automatically deselect all nodes and select the current one and also active @@ -1579,6 +1585,11 @@ export default mixins( console.error(e); // eslint-disable-line no-console } node.parameters = nodeParameters !== null ? nodeParameters : {}; + + // if it's a webhook and the path is empty set the UUID as the default path + if (node.type === 'n8n-nodes-base.webhook' && node.parameters.path === '') { + node.parameters.path = node.webhookId as string; + } } foundNodeIssues = this.getNodeIssues(nodeType, node); diff --git a/packages/node-dev/package.json b/packages/node-dev/package.json index 66394fec29..8df65a2792 100644 --- a/packages/node-dev/package.json +++ b/packages/node-dev/package.json @@ -58,8 +58,8 @@ "change-case": "^4.1.1", "copyfiles": "^2.1.1", "inquirer": "^7.0.0", - "n8n-core": "^0.31.0", - "n8n-workflow": "^0.28.0", + "n8n-core": "^0.36.0", + "n8n-workflow": "^0.33.0", "replace-in-file": "^6.0.0", "request": "^2.88.2", "tmp-promise": "^2.0.2", diff --git a/packages/nodes-base/nodes/Webhook.node.ts b/packages/nodes-base/nodes/Webhook.node.ts index 135960ea4e..1778d44290 100644 --- a/packages/nodes-base/nodes/Webhook.node.ts +++ b/packages/nodes-base/nodes/Webhook.node.ts @@ -77,6 +77,7 @@ export class Webhook implements INodeType { { name: 'default', httpMethod: '={{$parameter["httpMethod"]}}', + isFullPath: true, responseCode: '={{$parameter["responseCode"]}}', responseMode: '={{$parameter["responseMode"]}}', responseData: '={{$parameter["responseData"]}}', @@ -133,7 +134,7 @@ export class Webhook implements INodeType { default: '', placeholder: 'webhook', required: true, - description: 'The path to listen to. Slashes("/") in the path are not allowed.', + description: 'The path to listen to.', }, { displayName: 'Response Code', diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index f8a6cc5e82..fa3628e379 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -336,6 +336,7 @@ export interface INode { continueOnFail?: boolean; parameters: INodeParameters; credentials?: INodeCredentials; + webhookId?: string; } @@ -558,8 +559,9 @@ export interface IWebhookData { } export interface IWebhookDescription { - [key: string]: WebhookHttpMethod | WebhookResponseMode | string | undefined; + [key: string]: WebhookHttpMethod | WebhookResponseMode | boolean | string | undefined; httpMethod: WebhookHttpMethod | string; + isFullPath?: boolean; name: string; path: string; responseBinaryPropertyName?: string; diff --git a/packages/workflow/src/NodeHelpers.ts b/packages/workflow/src/NodeHelpers.ts index fcaca55a0a..3e38931ae8 100644 --- a/packages/workflow/src/NodeHelpers.ts +++ b/packages/workflow/src/NodeHelpers.ts @@ -755,7 +755,7 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData: const returnData: IWebhookData[] = []; for (const webhookDescription of nodeType.description.webhooks) { - let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path'], 'GET'); + let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path']); if (nodeWebhookPath === undefined) { // TODO: Use a proper logger console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`); @@ -768,7 +768,8 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData: nodeWebhookPath = nodeWebhookPath.slice(1); } - const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath); + const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean; + const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath, isFullPath); const httpMethod = workflow.getSimpleParameterValue(node, webhookDescription['httpMethod'], 'GET'); @@ -791,6 +792,61 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData: return returnData; } +export function getNodeWebhooksBasic(workflow: Workflow, node: INode): IWebhookData[] { + if (node.disabled === true) { + // Node is disabled so webhooks will also not be enabled + return []; + } + + const nodeType = workflow.nodeTypes.getByName(node.type) as INodeType; + + if (nodeType.description.webhooks === undefined) { + // Node does not have any webhooks so return + return []; + } + + const workflowId = workflow.id || '__UNSAVED__'; + + const returnData: IWebhookData[] = []; + for (const webhookDescription of nodeType.description.webhooks) { + let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path']); + if (nodeWebhookPath === undefined) { + // TODO: Use a proper logger + console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`); + continue; + } + + nodeWebhookPath = nodeWebhookPath.toString(); + + if (nodeWebhookPath.charAt(0) === '/') { + nodeWebhookPath = nodeWebhookPath.slice(1); + } + + const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean; + + const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath, isFullPath); + + const httpMethod = workflow.getSimpleParameterValue(node, webhookDescription['httpMethod']); + + if (httpMethod === undefined) { + // TODO: Use a proper logger + console.error(`The webhook "${path}" for node "${node.name}" in workflow "${workflowId}" could not be added because the httpMethod is not defined.`); + continue; + } + + //@ts-ignore + returnData.push({ + httpMethod: httpMethod.toString() as WebhookHttpMethod, + node: node.name, + path, + webhookDescription, + workflowId, + }); + } + + return returnData; +} + /** * Returns the webhook path @@ -801,8 +857,17 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData: * @param {string} path * @returns {string} */ -export function getNodeWebhookPath(workflowId: string, node: INode, path: string): string { - return `${workflowId}/${encodeURIComponent(node.name.toLowerCase())}/${path}`; +export function getNodeWebhookPath(workflowId: string, node: INode, path: string, isFullPath?: boolean): string { + let webhookPath = ''; + if (node.webhookId === undefined) { + webhookPath = `${workflowId}/${encodeURIComponent(node.name.toLowerCase())}/${path}`; + } else { + if (isFullPath === true) { + return path; + } + webhookPath = `${node.webhookId}/${path}`; + } + return webhookPath; } @@ -814,11 +879,11 @@ export function getNodeWebhookPath(workflowId: string, node: INode, path: string * @param {string} workflowId * @param {string} nodeTypeName * @param {string} path + * @param {boolean} isFullPath * @returns {string} */ -export function getNodeWebhookUrl(baseUrl: string, workflowId: string, node: INode, path: string): string { - // return `${baseUrl}/${workflowId}/${nodeTypeName}/${path}`; - return `${baseUrl}/${getNodeWebhookPath(workflowId, node, path)}`; +export function getNodeWebhookUrl(baseUrl: string, workflowId: string, node: INode, path: string, isFullPath?: boolean): string { + return `${baseUrl}/${getNodeWebhookPath(workflowId, node, path, isFullPath)}`; } diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 5fbd780524..9306b68492 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -715,7 +715,7 @@ export class Workflow { * @returns {(string | undefined)} * @memberof Workflow */ - getSimpleParameterValue(node: INode, parameterValue: string | undefined, defaultValue?: boolean | number | string): boolean | number | string | undefined { + getSimpleParameterValue(node: INode, parameterValue: string | boolean | undefined, defaultValue?: boolean | number | string): boolean | number | string | undefined { if (parameterValue === undefined) { // Value is not set so return the default return defaultValue;