mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 04:04:06 -08:00
✨ Add support for webhook route parameters (#1343)
* 🚧 add webhookId to URL
* 🚧 add webhookId to webhook entity, 🔧 refactor migrations
* 🚧 🐘 postgres migration
* 🚧 add mySQL migration
* 🚧 refactor mongoDB
* 🚧 add webhookId to IWebhookDb
* 🚧 starting workflow with dynamic route works
* ⚡ production dynamic webhooks complete
* 🎨 fix lint issues
* 🔧 dynamic path for webhook-test complete
* 🎨 fix lint issues
* 🎨 fix typescript issue
* ⚡ add error message for dynamic webhook-test
* 🔨 improve handling of leading `/`
* 🚧 add webhookId to URL
* 🚧 add webhookId to webhook entity, 🔧 refactor migrations
* 🚧 🐘 postgres migration
* 🚧 add mySQL migration
* 🚧 refactor mongoDB
* 🚧 add webhookId to IWebhookDb
* 🚧 starting workflow with dynamic route works
* ⚡ production dynamic webhooks complete
* 🎨 fix lint issues
* 🔧 dynamic path for webhook-test complete
* 🎨 fix lint issues
* 🎨 fix typescript issue
* ⚡ add error message for dynamic webhook-test
* 🔨 improve handling of leading `/`
* ⚡ Fix issue that tab-title did not get reset on new workflow
* Revert "⚡ Fix issue that tab-title did not get reset on new workflow"
This reverts commit 699d0a8946
.
* 🔧 reset params before extraction
* 🐘 removing unique constraint for webhookId
* 🚧 handle multiple webhooks per id
* 🔧 enable webhook-test for multiple WH with same id
* 🐘 add migration for postgres
* ⚡ add mysql migration
* 🎨 fix lint issue
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
parent
1a68303319
commit
d395498882
|
@ -110,7 +110,7 @@ By default n8n uses SQLite to save credentials, past executions and workflows.
|
|||
n8n however also supports MongoDB, PostgresDB and MySQL. To use them simply a few
|
||||
environment variables have to be set.
|
||||
|
||||
It is important to still persist the data in the `/root/.n8` folder. The reason
|
||||
It is important to still persist the data in the `/root/.n8n` folder. The reason
|
||||
is that it contains n8n user data. That is the name of the webhook
|
||||
(in case) the n8n tunnel gets used and even more important the encryption key
|
||||
for the credentials. If none gets found n8n creates automatically one on
|
||||
|
|
|
@ -114,7 +114,7 @@
|
|||
"sqlite3": "^4.2.0",
|
||||
"sse-channel": "^3.1.1",
|
||||
"tslib": "1.11.2",
|
||||
"typeorm": "^0.2.24"
|
||||
"typeorm": "^0.2.30"
|
||||
},
|
||||
"jest": {
|
||||
"transform": {
|
||||
|
|
|
@ -116,13 +116,50 @@ export class ActiveWorkflowRunner {
|
|||
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
|
||||
}
|
||||
|
||||
const webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
|
||||
let webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
|
||||
let webhookId: string | undefined;
|
||||
|
||||
// check if something exist
|
||||
// check if path is dynamic
|
||||
if (webhook === undefined) {
|
||||
// check if a dynamic webhook path exists
|
||||
const pathElements = path.split('/');
|
||||
webhookId = pathElements.shift();
|
||||
const dynamicWebhooks = await Db.collections.Webhook?.find({ webhookId, method: httpMethod, pathLength: pathElements.length });
|
||||
if (dynamicWebhooks === undefined) {
|
||||
// The requested webhook is not registered
|
||||
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
|
||||
}
|
||||
// set webhook to the first webhook result
|
||||
// if more results have been returned choose the one with the most route-matches
|
||||
webhook = dynamicWebhooks[0];
|
||||
if (dynamicWebhooks.length > 1) {
|
||||
let maxMatches = 0;
|
||||
const pathElementsSet = new Set(pathElements);
|
||||
dynamicWebhooks.forEach(dynamicWebhook => {
|
||||
const intersection =
|
||||
dynamicWebhook.webhookPath
|
||||
.split('/')
|
||||
.reduce((acc, element) => pathElementsSet.has(element) ? acc += 1 : acc, 0);
|
||||
|
||||
if (intersection > maxMatches) {
|
||||
maxMatches = intersection;
|
||||
webhook = dynamicWebhook;
|
||||
}
|
||||
});
|
||||
if (maxMatches === 0) {
|
||||
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
|
||||
}
|
||||
}
|
||||
|
||||
path = webhook.webhookPath;
|
||||
// extracting params from path
|
||||
webhook.webhookPath.split('/').forEach((ele, index) => {
|
||||
if (ele.startsWith(':')) {
|
||||
// write params to req.params
|
||||
req.params[ele.slice(1)] = pathElements[index];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId);
|
||||
if (workflowData === undefined) {
|
||||
|
@ -253,6 +290,15 @@ export class ActiveWorkflowRunner {
|
|||
method: webhookData.httpMethod,
|
||||
} as IWebhookDb;
|
||||
|
||||
if (webhook.webhookPath.startsWith('/')) {
|
||||
webhook.webhookPath = webhook.webhookPath.slice(1);
|
||||
}
|
||||
|
||||
if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) {
|
||||
webhook.webhookId = node.webhookId;
|
||||
webhook.pathLength = webhook.webhookPath.split('/').length;
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
await Db.collections.Webhook?.insert(webhook);
|
||||
|
@ -273,10 +319,9 @@ export class ActiveWorkflowRunner {
|
|||
let errorMessage = '';
|
||||
|
||||
// if it's a workflow from the the insert
|
||||
// TODO check if there is standard error code for deplicate key violation that works
|
||||
// TODO check if there is standard error code for duplicate key violation that works
|
||||
// with all databases
|
||||
if (error.name === 'MongoError' || error.name === 'QueryFailedError') {
|
||||
|
||||
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
|
||||
|
||||
} else if (error.detail) {
|
||||
|
|
|
@ -32,29 +32,10 @@ export let collections: IDatabaseCollections = {
|
|||
Webhook: null,
|
||||
};
|
||||
|
||||
import {
|
||||
CreateIndexStoppedAt1594828256133,
|
||||
InitialMigration1587669153312,
|
||||
WebhookModel1589476000887,
|
||||
} from './databases/postgresdb/migrations';
|
||||
|
||||
import {
|
||||
CreateIndexStoppedAt1594910478695,
|
||||
InitialMigration1587563438936,
|
||||
WebhookModel1592679094242,
|
||||
} from './databases/mongodb/migrations';
|
||||
|
||||
import {
|
||||
CreateIndexStoppedAt1594902918301,
|
||||
InitialMigration1588157391238,
|
||||
WebhookModel1592447867632,
|
||||
} from './databases/mysqldb/migrations';
|
||||
|
||||
import {
|
||||
CreateIndexStoppedAt1594825041918,
|
||||
InitialMigration1588102412422,
|
||||
WebhookModel1592445003908,
|
||||
} from './databases/sqlite/migrations';
|
||||
import { postgresMigrations } from './databases/postgresdb/migrations';
|
||||
import { mongodbMigrations } from './databases/mongodb/migrations';
|
||||
import { mysqlMigrations } from './databases/mysqldb/migrations';
|
||||
import { sqliteMigrations } from './databases/sqlite/migrations';
|
||||
|
||||
import * as path from 'path';
|
||||
|
||||
|
@ -75,11 +56,7 @@ export async function init(): Promise<IDatabaseCollections> {
|
|||
entityPrefix,
|
||||
url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string,
|
||||
useNewUrlParser: true,
|
||||
migrations: [
|
||||
InitialMigration1587563438936,
|
||||
WebhookModel1592679094242,
|
||||
CreateIndexStoppedAt1594910478695,
|
||||
],
|
||||
migrations: mongodbMigrations,
|
||||
migrationsRun: true,
|
||||
migrationsTableName: `${entityPrefix}migrations`,
|
||||
};
|
||||
|
@ -112,11 +89,7 @@ export async function init(): Promise<IDatabaseCollections> {
|
|||
port: await GenericHelpers.getConfigValue('database.postgresdb.port') as number,
|
||||
username: await GenericHelpers.getConfigValue('database.postgresdb.user') as string,
|
||||
schema: config.get('database.postgresdb.schema'),
|
||||
migrations: [
|
||||
InitialMigration1587669153312,
|
||||
WebhookModel1589476000887,
|
||||
CreateIndexStoppedAt1594828256133,
|
||||
],
|
||||
migrations: postgresMigrations,
|
||||
migrationsRun: true,
|
||||
migrationsTableName: `${entityPrefix}migrations`,
|
||||
ssl,
|
||||
|
@ -135,11 +108,7 @@ export async function init(): Promise<IDatabaseCollections> {
|
|||
password: await GenericHelpers.getConfigValue('database.mysqldb.password') as string,
|
||||
port: await GenericHelpers.getConfigValue('database.mysqldb.port') as number,
|
||||
username: await GenericHelpers.getConfigValue('database.mysqldb.user') as string,
|
||||
migrations: [
|
||||
InitialMigration1588157391238,
|
||||
WebhookModel1592447867632,
|
||||
CreateIndexStoppedAt1594902918301,
|
||||
],
|
||||
migrations: mysqlMigrations,
|
||||
migrationsRun: true,
|
||||
migrationsTableName: `${entityPrefix}migrations`,
|
||||
};
|
||||
|
@ -151,11 +120,7 @@ export async function init(): Promise<IDatabaseCollections> {
|
|||
type: 'sqlite',
|
||||
database: path.join(n8nFolder, 'database.sqlite'),
|
||||
entityPrefix,
|
||||
migrations: [
|
||||
InitialMigration1588102412422,
|
||||
WebhookModel1592445003908,
|
||||
CreateIndexStoppedAt1594825041918,
|
||||
],
|
||||
migrations: sqliteMigrations,
|
||||
migrationsRun: true,
|
||||
migrationsTableName: `${entityPrefix}migrations`,
|
||||
};
|
||||
|
|
|
@ -57,6 +57,8 @@ export interface IWebhookDb {
|
|||
webhookPath: string;
|
||||
method: string;
|
||||
node: string;
|
||||
webhookId?: string;
|
||||
pathLength?: number;
|
||||
}
|
||||
|
||||
export interface IWorkflowBase extends IWorkflowBaseWorkflow {
|
||||
|
|
|
@ -1693,6 +1693,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.activeWorkflowRunner.executeWebhook('HEAD', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
@ -1734,6 +1735,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.activeWorkflowRunner.executeWebhook('GET', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
@ -1755,6 +1757,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.activeWorkflowRunner.executeWebhook('POST', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
@ -1776,6 +1779,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.testWebhooks.callTestWebhook('HEAD', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
@ -1817,6 +1821,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.testWebhooks.callTestWebhook('GET', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
@ -1838,6 +1843,7 @@ class App {
|
|||
|
||||
let response;
|
||||
try {
|
||||
delete req.params[0];
|
||||
response = await this.testWebhooks.callTestWebhook('POST', requestUrl, req, res);
|
||||
} catch (error) {
|
||||
ResponseHelper.sendErrorResponse(res, error);
|
||||
|
|
|
@ -54,14 +54,28 @@ export class TestWebhooks {
|
|||
* @memberof TestWebhooks
|
||||
*/
|
||||
async callTestWebhook(httpMethod: WebhookHttpMethod, path: string, request: express.Request, response: express.Response): Promise<IResponseCallbackData> {
|
||||
const webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
|
||||
let webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
|
||||
|
||||
// check if path is dynamic
|
||||
if (webhookData === undefined) {
|
||||
const pathElements = path.split('/');
|
||||
const webhookId = pathElements.shift();
|
||||
webhookData = this.activeWebhooks!.get(httpMethod, pathElements.join('/'), webhookId);
|
||||
if (webhookData === undefined) {
|
||||
// The requested webhook is not registered
|
||||
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
|
||||
}
|
||||
path = webhookData.path;
|
||||
// extracting params from path
|
||||
path.split('/').forEach((ele, index) => {
|
||||
if (ele.startsWith(':')) {
|
||||
// write params to req.params
|
||||
request.params[ele.slice(1)] = pathElements[index];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const webhookKey = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path);
|
||||
const webhookKey = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path, webhookData.webhookId) + `|${webhookData.workflowId}`;
|
||||
|
||||
// TODO: Clean that duplication up one day and improve code generally
|
||||
if (this.testWebhookData[webhookKey] === undefined) {
|
||||
|
@ -81,7 +95,7 @@ export class TestWebhooks {
|
|||
return new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(workflow, webhookData, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, request, response, (error: Error | null, data: IResponseCallbackData) => {
|
||||
const executionId = await WebhookHelpers.executeWebhook(workflow, webhookData!, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, request, response, (error: Error | null, data: IResponseCallbackData) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
|
@ -98,7 +112,7 @@ export class TestWebhooks {
|
|||
// Inform editor-ui that webhook got received
|
||||
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
|
||||
const pushInstance = Push.getInstance();
|
||||
pushInstance.send('testWebhookReceived', { workflowId: webhookData.workflowId, executionId }, this.testWebhookData[webhookKey].sessionId!);
|
||||
pushInstance.send('testWebhookReceived', { workflowId: webhookData!.workflowId, executionId }, this.testWebhookData[webhookKey].sessionId!);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
|
@ -158,7 +172,7 @@ export class TestWebhooks {
|
|||
let key: string;
|
||||
const activatedKey: string[] = [];
|
||||
for (const webhookData of webhooks) {
|
||||
key = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path);
|
||||
key = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path, webhookData.webhookId) + `|${workflowData.id}`;
|
||||
|
||||
activatedKey.push(key);
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import {
|
|||
} from '../../Interfaces';
|
||||
|
||||
@Entity()
|
||||
@Index(["webhookPath", "method"], { unique: true })
|
||||
@Index(["webhookId", "method"], { unique: true })
|
||||
export class WebhookEntity implements IWebhookDb {
|
||||
|
||||
@ObjectIdColumn()
|
||||
|
@ -27,4 +29,10 @@ export class WebhookEntity implements IWebhookDb {
|
|||
|
||||
@Column()
|
||||
node: string;
|
||||
|
||||
@Column()
|
||||
webhookId: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
pathLength: number;
|
||||
}
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
export * from './1587563438936-InitialMigration';
|
||||
export * from './1592679094242-WebhookModel';
|
||||
export * from './151594910478695-CreateIndexStoppedAt';
|
||||
import { InitialMigration1587563438936 } from './1587563438936-InitialMigration';
|
||||
import { WebhookModel1592679094242 } from './1592679094242-WebhookModel';
|
||||
import { CreateIndexStoppedAt1594910478695 } from './151594910478695-CreateIndexStoppedAt';
|
||||
|
||||
export const mongodbMigrations = [
|
||||
InitialMigration1587563438936,
|
||||
WebhookModel1592679094242,
|
||||
CreateIndexStoppedAt1594910478695,
|
||||
];
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import {
|
||||
Column,
|
||||
Entity,
|
||||
Index,
|
||||
PrimaryColumn,
|
||||
} from 'typeorm';
|
||||
|
||||
|
@ -9,6 +10,7 @@ import {
|
|||
} from '../../Interfaces';
|
||||
|
||||
@Entity()
|
||||
@Index(['webhookId', 'method', 'pathLength'])
|
||||
export class WebhookEntity implements IWebhookDb {
|
||||
|
||||
@Column()
|
||||
|
@ -22,4 +24,10 @@ export class WebhookEntity implements IWebhookDb {
|
|||
|
||||
@Column()
|
||||
node: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
webhookId: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
pathLength: number;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
import {MigrationInterface, QueryRunner} from "typeorm";
|
||||
import * as config from '../../../../config';
|
||||
|
||||
export class AddWebhookId1611149998770 implements MigrationInterface {
|
||||
name = 'AddWebhookId1611149998770';
|
||||
|
||||
async up(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = config.get('database.tablePrefix');
|
||||
|
||||
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'webhook_entity` ADD `webhookId` varchar(255) NULL');
|
||||
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'webhook_entity` ADD `pathLength` int NULL');
|
||||
await queryRunner.query('CREATE INDEX `IDX_' + tablePrefix + '742496f199721a057051acf4c2` ON `' + tablePrefix + 'webhook_entity` (`webhookId`, `method`, `pathLength`)');
|
||||
}
|
||||
|
||||
async down(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = config.get('database.tablePrefix');
|
||||
|
||||
await queryRunner.query(
|
||||
'DROP INDEX `IDX_' + tablePrefix + '742496f199721a057051acf4c2` ON `' + tablePrefix + 'webhook_entity`'
|
||||
);
|
||||
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'webhook_entity` DROP COLUMN `pathLength`');
|
||||
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'webhook_entity` DROP COLUMN `webhookId`');
|
||||
}
|
||||
}
|
|
@ -1,3 +1,11 @@
|
|||
export * from './1588157391238-InitialMigration';
|
||||
export * from './1592447867632-WebhookModel';
|
||||
export * from './1594902918301-CreateIndexStoppedAt';
|
||||
import { InitialMigration1588157391238 } from './1588157391238-InitialMigration';
|
||||
import { WebhookModel1592447867632 } from './1592447867632-WebhookModel';
|
||||
import { CreateIndexStoppedAt1594902918301 } from './1594902918301-CreateIndexStoppedAt';
|
||||
import { AddWebhookId1611149998770 } from './1611149998770-AddWebhookId';
|
||||
|
||||
export const mysqlMigrations = [
|
||||
InitialMigration1588157391238,
|
||||
WebhookModel1592447867632,
|
||||
CreateIndexStoppedAt1594902918301,
|
||||
AddWebhookId1611149998770,
|
||||
];
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import {
|
||||
Column,
|
||||
Entity,
|
||||
Index,
|
||||
PrimaryColumn,
|
||||
} from 'typeorm';
|
||||
|
||||
|
@ -9,6 +10,7 @@ import {
|
|||
} from '../../';
|
||||
|
||||
@Entity()
|
||||
@Index(['webhookId', 'method', 'pathLength'])
|
||||
export class WebhookEntity implements IWebhookDb {
|
||||
|
||||
@Column()
|
||||
|
@ -22,4 +24,10 @@ export class WebhookEntity implements IWebhookDb {
|
|||
|
||||
@Column()
|
||||
node: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
webhookId: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
pathLength: number;
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ export class WebhookModel1589476000887 implements MigrationInterface {
|
|||
tablePrefix = schema + '.' + tablePrefix;
|
||||
}
|
||||
|
||||
await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
|
||||
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
|
||||
}
|
||||
|
||||
async down(queryRunner: QueryRunner): Promise<void> {
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
import {MigrationInterface, QueryRunner} from "typeorm";
|
||||
import * as config from '../../../../config';
|
||||
|
||||
export class AddWebhookId1611144599516 implements MigrationInterface {
|
||||
name = 'AddWebhookId1611144599516';
|
||||
|
||||
async up(queryRunner: QueryRunner): Promise<void> {
|
||||
let tablePrefix = config.get('database.tablePrefix');
|
||||
const tablePrefixPure = tablePrefix;
|
||||
const schema = config.get('database.postgresdb.schema');
|
||||
if (schema) {
|
||||
tablePrefix = schema + '.' + tablePrefix;
|
||||
}
|
||||
|
||||
await queryRunner.query(`ALTER TABLE ${tablePrefix}webhook_entity ADD "webhookId" character varying`);
|
||||
await queryRunner.query(`ALTER TABLE ${tablePrefix}webhook_entity ADD "pathLength" integer`);
|
||||
await queryRunner.query(`CREATE INDEX IF NOT EXISTS IDX_${tablePrefixPure}16f4436789e804e3e1c9eeb240 ON ${tablePrefix}webhook_entity ("webhookId", "method", "pathLength") `);
|
||||
}
|
||||
|
||||
async down(queryRunner: QueryRunner): Promise<void> {
|
||||
let tablePrefix = config.get('database.tablePrefix');
|
||||
const tablePrefixPure = tablePrefix;
|
||||
const schema = config.get('database.postgresdb.schema');
|
||||
if (schema) {
|
||||
tablePrefix = schema + '.' + tablePrefix;
|
||||
}
|
||||
|
||||
await queryRunner.query(`DROP INDEX IDX_${tablePrefixPure}16f4436789e804e3e1c9eeb240`);
|
||||
await queryRunner.query(`ALTER TABLE ${tablePrefix}webhook_entity DROP COLUMN "pathLength"`);
|
||||
await queryRunner.query(`ALTER TABLE ${tablePrefix}webhook_entity DROP COLUMN "webhookId"`);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,4 +1,11 @@
|
|||
export * from './1587669153312-InitialMigration';
|
||||
export * from './1589476000887-WebhookModel';
|
||||
export * from './1594828256133-CreateIndexStoppedAt';
|
||||
import { InitialMigration1587669153312 } from './1587669153312-InitialMigration';
|
||||
import { WebhookModel1589476000887 } from './1589476000887-WebhookModel';
|
||||
import { CreateIndexStoppedAt1594828256133 } from './1594828256133-CreateIndexStoppedAt';
|
||||
import { AddWebhookId1611144599516 } from './1611144599516-AddWebhookId';
|
||||
|
||||
export const postgresMigrations = [
|
||||
InitialMigration1587669153312,
|
||||
WebhookModel1589476000887,
|
||||
CreateIndexStoppedAt1594828256133,
|
||||
AddWebhookId1611144599516,
|
||||
];
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import {
|
||||
Column,
|
||||
Entity,
|
||||
Index,
|
||||
PrimaryColumn,
|
||||
} from 'typeorm';
|
||||
|
||||
|
@ -9,6 +10,7 @@ import {
|
|||
} from '../../Interfaces';
|
||||
|
||||
@Entity()
|
||||
@Index(['webhookId', 'method', 'pathLength'])
|
||||
export class WebhookEntity implements IWebhookDb {
|
||||
|
||||
@Column()
|
||||
|
@ -22,4 +24,10 @@ export class WebhookEntity implements IWebhookDb {
|
|||
|
||||
@Column()
|
||||
node: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
webhookId: string;
|
||||
|
||||
@Column({ nullable: true })
|
||||
pathLength: number;
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ export class CreateIndexStoppedAt1594825041918 implements MigrationInterface {
|
|||
async up(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = config.get('database.tablePrefix');
|
||||
|
||||
await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "execution_entity" ("stoppedAt") `);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt") `);
|
||||
}
|
||||
|
||||
async down(queryRunner: QueryRunner): Promise<void> {
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
import {MigrationInterface, QueryRunner} from "typeorm";
|
||||
import * as config from '../../../../config';
|
||||
|
||||
export class AddWebhookId1611071044839 implements MigrationInterface {
|
||||
name = 'AddWebhookId1611071044839';
|
||||
|
||||
async up(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = config.get('database.tablePrefix');
|
||||
|
||||
await queryRunner.query(`CREATE TABLE "temporary_webhook_entity" ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, "webhookId" varchar, "pathLength" integer, PRIMARY KEY ("webhookPath", "method"))`);
|
||||
await queryRunner.query(`INSERT INTO "temporary_webhook_entity"("workflowId", "webhookPath", "method", "node") SELECT "workflowId", "webhookPath", "method", "node" FROM "${tablePrefix}webhook_entity"`);
|
||||
await queryRunner.query(`DROP TABLE "${tablePrefix}webhook_entity"`);
|
||||
await queryRunner.query(`ALTER TABLE "temporary_webhook_entity" RENAME TO "${tablePrefix}webhook_entity"`);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}742496f199721a057051acf4c2" ON "${tablePrefix}webhook_entity" ("webhookId", "method", "pathLength") `);
|
||||
}
|
||||
|
||||
async down(queryRunner: QueryRunner): Promise<void> {
|
||||
const tablePrefix = config.get('database.tablePrefix');
|
||||
|
||||
await queryRunner.query(`DROP INDEX "IDX_${tablePrefix}742496f199721a057051acf4c2"`);
|
||||
await queryRunner.query(`ALTER TABLE "${tablePrefix}webhook_entity" RENAME TO "temporary_webhook_entity"`);
|
||||
await queryRunner.query(`CREATE TABLE "${tablePrefix}webhook_entity" ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`);
|
||||
await queryRunner.query(`INSERT INTO "${tablePrefix}webhook_entity"("workflowId", "webhookPath", "method", "node") SELECT "workflowId", "webhookPath", "method", "node" FROM "temporary_webhook_entity"`);
|
||||
await queryRunner.query(`DROP TABLE "temporary_webhook_entity"`);
|
||||
}
|
||||
}
|
|
@ -1,3 +1,11 @@
|
|||
export * from './1588102412422-InitialMigration';
|
||||
export * from './1592445003908-WebhookModel';
|
||||
export * from './1594825041918-CreateIndexStoppedAt';
|
||||
import { InitialMigration1588102412422 } from './1588102412422-InitialMigration';
|
||||
import { WebhookModel1592445003908 } from './1592445003908-WebhookModel';
|
||||
import { CreateIndexStoppedAt1594825041918 } from './1594825041918-CreateIndexStoppedAt';
|
||||
import { AddWebhookId1611071044839 } from './1611071044839-AddWebhookId';
|
||||
|
||||
export const sqliteMigrations = [
|
||||
InitialMigration1588102412422,
|
||||
WebhookModel1592445003908,
|
||||
CreateIndexStoppedAt1594825041918,
|
||||
AddWebhookId1611071044839,
|
||||
];
|
||||
|
|
|
@ -16,7 +16,7 @@ export class ActiveWebhooks {
|
|||
} = {};
|
||||
|
||||
private webhookUrls: {
|
||||
[key: string]: IWebhookData;
|
||||
[key: string]: IWebhookData[];
|
||||
} = {};
|
||||
|
||||
testWebhooks = false;
|
||||
|
@ -35,10 +35,10 @@ export class ActiveWebhooks {
|
|||
throw new Error('Webhooks can only be added for saved workflows as an id is needed!');
|
||||
}
|
||||
|
||||
const webhookKey = this.getWebhookKey(webhookData.httpMethod, webhookData.path);
|
||||
const webhookKey = this.getWebhookKey(webhookData.httpMethod, webhookData.path, webhookData.webhookId);
|
||||
|
||||
//check that there is not a webhook already registed with that path/method
|
||||
if (this.webhookUrls[webhookKey] !== undefined) {
|
||||
if (this.webhookUrls[webhookKey] && !webhookData.webhookId) {
|
||||
throw new Error(`Test-Webhook can not be activated because another one with the same method "${webhookData.httpMethod}" and path "${webhookData.path}" is already active!`);
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,10 @@ export class ActiveWebhooks {
|
|||
|
||||
// Make the webhook available directly because sometimes to create it successfully
|
||||
// it gets called
|
||||
this.webhookUrls[webhookKey] = webhookData;
|
||||
if (!this.webhookUrls[webhookKey]) {
|
||||
this.webhookUrls[webhookKey] = [];
|
||||
}
|
||||
this.webhookUrls[webhookKey].push(webhookData);
|
||||
|
||||
try {
|
||||
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
|
||||
|
@ -59,7 +62,11 @@ export class ActiveWebhooks {
|
|||
}
|
||||
} catch (error) {
|
||||
// If there was a problem unregister the webhook again
|
||||
if (this.webhookUrls[webhookKey].length <= 1) {
|
||||
delete this.webhookUrls[webhookKey];
|
||||
} else {
|
||||
this.webhookUrls[webhookKey] = this.webhookUrls[webhookKey].filter(webhook => webhook.path !== webhookData.path);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
@ -72,16 +79,39 @@ export class ActiveWebhooks {
|
|||
*
|
||||
* @param {WebhookHttpMethod} httpMethod
|
||||
* @param {string} path
|
||||
* @param {(string | undefined)} webhookId
|
||||
* @returns {(IWebhookData | undefined)}
|
||||
* @memberof ActiveWebhooks
|
||||
*/
|
||||
get(httpMethod: WebhookHttpMethod, path: string): IWebhookData | undefined {
|
||||
const webhookKey = this.getWebhookKey(httpMethod, path);
|
||||
get(httpMethod: WebhookHttpMethod, path: string, webhookId?: string): IWebhookData | undefined {
|
||||
const webhookKey = this.getWebhookKey(httpMethod, path, webhookId);
|
||||
if (this.webhookUrls[webhookKey] === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return this.webhookUrls[webhookKey];
|
||||
// set webhook to the first webhook result
|
||||
// if more results have been returned choose the one with the most route-matches
|
||||
let webhook = this.webhookUrls[webhookKey][0];
|
||||
if (this.webhookUrls[webhookKey].length > 1) {
|
||||
let maxMatches = 0;
|
||||
const pathElementsSet = new Set(path.split('/'));
|
||||
this.webhookUrls[webhookKey].forEach(dynamicWebhook => {
|
||||
const intersection =
|
||||
dynamicWebhook.path
|
||||
.split('/')
|
||||
.reduce((acc, element) => pathElementsSet.has(element) ? acc += 1 : acc, 0);
|
||||
|
||||
if (intersection > maxMatches) {
|
||||
maxMatches = intersection;
|
||||
webhook = dynamicWebhook;
|
||||
}
|
||||
});
|
||||
if (maxMatches === 0) {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
return webhook;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,10 +146,18 @@ export class ActiveWebhooks {
|
|||
*
|
||||
* @param {WebhookHttpMethod} httpMethod
|
||||
* @param {string} path
|
||||
* @param {(string | undefined)} webhookId
|
||||
* @returns {string}
|
||||
* @memberof ActiveWebhooks
|
||||
*/
|
||||
getWebhookKey(httpMethod: WebhookHttpMethod, path: string): string {
|
||||
getWebhookKey(httpMethod: WebhookHttpMethod, path: string, webhookId?: string): string {
|
||||
if (webhookId) {
|
||||
if (path.startsWith(webhookId)) {
|
||||
const cutFromIndex = path.indexOf('/') + 1;
|
||||
path = path.slice(cutFromIndex);
|
||||
}
|
||||
return `${httpMethod}|${webhookId}|${path.split('/').length}`;
|
||||
}
|
||||
return `${httpMethod}|${path}`;
|
||||
}
|
||||
|
||||
|
@ -147,7 +185,7 @@ export class ActiveWebhooks {
|
|||
for (const webhookData of webhooks) {
|
||||
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
|
||||
|
||||
delete this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)];
|
||||
delete this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path, webhookData.webhookId)];
|
||||
}
|
||||
|
||||
// Remove also the workflow-webhook entry
|
||||
|
|
|
@ -993,6 +993,12 @@ export function getExecuteWebhookFunctions(workflow: Workflow, node: INode, addi
|
|||
|
||||
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
|
||||
},
|
||||
getParamsData(): object {
|
||||
if (additionalData.httpRequest === undefined) {
|
||||
throw new Error('Request is missing!');
|
||||
}
|
||||
return additionalData.httpRequest.params;
|
||||
},
|
||||
getQueryData(): object {
|
||||
if (additionalData.httpRequest === undefined) {
|
||||
throw new Error('Request is missing!');
|
||||
|
|
|
@ -54,9 +54,10 @@ export class ClassNameReplace implements INodeType {
|
|||
const returnData: IDataObject[] = [];
|
||||
returnData.push(
|
||||
{
|
||||
body: this.getBodyData(),
|
||||
headers: this.getHeaderData(),
|
||||
params: this.getParamsData(),
|
||||
query: this.getQueryData(),
|
||||
body: this.getBodyData(),
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -412,9 +412,10 @@ export class Webhook implements INodeType {
|
|||
const returnItem: INodeExecutionData = {
|
||||
binary: {},
|
||||
json: {
|
||||
body: data,
|
||||
headers,
|
||||
params: this.getParamsData(),
|
||||
query: this.getQueryData(),
|
||||
body: data,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -458,9 +459,10 @@ export class Webhook implements INodeType {
|
|||
const returnItem: INodeExecutionData = {
|
||||
binary: {},
|
||||
json: {
|
||||
body: this.getBodyData(),
|
||||
headers,
|
||||
params: this.getParamsData(),
|
||||
query: this.getQueryData(),
|
||||
body: this.getBodyData(),
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -483,9 +485,10 @@ export class Webhook implements INodeType {
|
|||
|
||||
const response: INodeExecutionData = {
|
||||
json: {
|
||||
body: this.getBodyData(),
|
||||
headers,
|
||||
params: this.getParamsData(),
|
||||
query: this.getQueryData(),
|
||||
body: this.getBodyData(),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
@ -311,6 +311,7 @@ export interface IWebhookFunctions {
|
|||
getNode(): INode;
|
||||
getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any
|
||||
getNodeWebhookUrl: (name: string) => string | undefined;
|
||||
getParamsData(): object;
|
||||
getQueryData(): object;
|
||||
getRequestObject(): express.Request;
|
||||
getResponseObject(): express.Response;
|
||||
|
@ -566,6 +567,7 @@ export interface IWebhookData {
|
|||
webhookDescription: IWebhookDescription;
|
||||
workflowId: string;
|
||||
workflowExecuteAdditionalData: IWorkflowExecuteAdditionalData;
|
||||
webhookId?: string;
|
||||
}
|
||||
|
||||
export interface IWebhookDescription {
|
||||
|
|
|
@ -641,13 +641,13 @@ export function getNodeParameters(nodePropertiesArray: INodeProperties[], nodeVa
|
|||
}
|
||||
}
|
||||
|
||||
// Itterate over all collections
|
||||
// Iterate over all collections
|
||||
for (const itemName of Object.keys(propertyValues || {})) {
|
||||
if (nodeProperties.typeOptions !== undefined && nodeProperties.typeOptions.multipleValues === true) {
|
||||
// Multiple can be set so will be an array
|
||||
|
||||
const tempArrayValue: INodeParameters[] = [];
|
||||
// Itterate over all items as it contains multiple ones
|
||||
// Iterate over all items as it contains multiple ones
|
||||
for (const nodeValue of (propertyValues as INodeParameters)[itemName] as INodeParameters[]) {
|
||||
nodePropertyOptions = nodeProperties!.options!.find((nodePropertyOptions) => nodePropertyOptions.name === itemName) as INodePropertyCollection;
|
||||
|
||||
|
@ -779,6 +779,11 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
|
|||
continue;
|
||||
}
|
||||
|
||||
let webhookId: string | undefined;
|
||||
if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) {
|
||||
webhookId = node.webhookId;
|
||||
}
|
||||
|
||||
returnData.push({
|
||||
httpMethod: httpMethod.toString() as WebhookHttpMethod,
|
||||
node: node.name,
|
||||
|
@ -786,6 +791,7 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
|
|||
webhookDescription,
|
||||
workflowId,
|
||||
workflowExecuteAdditionalData: additionalData,
|
||||
webhookId,
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -883,6 +889,13 @@ export function getNodeWebhookPath(workflowId: string, node: INode, path: string
|
|||
* @returns {string}
|
||||
*/
|
||||
export function getNodeWebhookUrl(baseUrl: string, workflowId: string, node: INode, path: string, isFullPath?: boolean): string {
|
||||
if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) {
|
||||
// setting this to false to prefix the webhookId
|
||||
isFullPath = false;
|
||||
}
|
||||
if (path.startsWith('/')) {
|
||||
path = path.slice(1);
|
||||
}
|
||||
return `${baseUrl}/${getNodeWebhookPath(workflowId, node, path, isFullPath)}`;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue