🔀 Merge branch 'static-stateless-webhooks'

This commit is contained in:
Jan Oberhauser 2020-07-14 12:34:34 +02:00
commit d17161cf40
34 changed files with 674 additions and 81 deletions

View file

@ -11,11 +11,11 @@ import {
WorkflowHelpers,
WorkflowRunner,
WorkflowExecuteAdditionalData,
IWebhookDb,
} from './';
import {
ActiveWorkflows,
ActiveWebhooks,
NodeExecuteFunctions,
} from 'n8n-core';
@ -26,7 +26,7 @@ import {
INode,
INodeExecutionData,
IRunExecutionData,
IWebhookData,
NodeHelpers,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
WebhookHttpMethod,
Workflow,
@ -35,22 +35,23 @@ import {
import * as express from 'express';
export class ActiveWorkflowRunner {
private activeWorkflows: ActiveWorkflows | null = null;
private activeWebhooks: ActiveWebhooks | null = null;
private activationErrors: {
[key: string]: IActivationError;
} = {};
async init() {
// Get the active workflows from database
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
this.activeWebhooks = new ActiveWebhooks();
// Add them as active workflows
this.activeWorkflows = new ActiveWorkflows();
if (workflowsData.length !== 0) {
@ -58,7 +59,14 @@ export class ActiveWorkflowRunner {
console.log(' Start Active Workflows:');
console.log(' ================================');
const nodeTypes = NodeTypes();
for (const workflowData of workflowsData) {
const workflow = new Workflow({ id: workflowData.id.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
if (workflow.getTriggerNodes().length !== 0
|| workflow.getPollNodes().length !== 0) {
console.log(` - ${workflowData.name}`);
try {
await this.add(workflowData.id.toString(), workflowData);
@ -70,7 +78,7 @@ export class ActiveWorkflowRunner {
}
}
}
}
/**
* Removes all the currently active workflows
@ -94,7 +102,6 @@ export class ActiveWorkflowRunner {
return;
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*
@ -110,30 +117,41 @@ export class ActiveWorkflowRunner {
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
}
const webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
const webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
if (webhookData === undefined) {
// check if something exist
if (webhook === undefined) {
// The requested webhook is not registered
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
}
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId);
const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404);
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhook.workflowId}"`, 404, 404);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow({ id: webhookData.workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
const workflow = new Workflow({ id: webhook.workflowId.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
const credentials = await WorkflowCredentials([workflow.getNode(webhook.node as string) as INode]);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(webhook.node as string) as INode, additionalData).filter((webhook) => {
return (webhook.httpMethod === httpMethod && webhook.path === path);
})[0];
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
}
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
//@ts-ignore
WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
@ -143,19 +161,14 @@ export class ActiveWorkflowRunner {
});
}
/**
* Returns the ids of the currently active workflows
*
* @returns {string[]}
* @memberof ActiveWorkflowRunner
*/
getActiveWorkflows(): string[] {
if (this.activeWorkflows === null) {
return [];
}
return this.activeWorkflows.allActiveWorkflows();
getActiveWorkflows(): Promise<IWorkflowDb[]> {
return Db.collections.Workflow?.find({ select: ['id'] }) as Promise<IWorkflowDb[]>;
}
@ -166,15 +179,11 @@ export class ActiveWorkflowRunner {
* @returns {boolean}
* @memberof ActiveWorkflowRunner
*/
isActive(id: string): boolean {
if (this.activeWorkflows !== null) {
return this.activeWorkflows.isActive(id);
async isActive(id: string): Promise<boolean> {
const workflow = await Db.collections.Workflow?.findOne({ id }) as IWorkflowDb;
return workflow?.active as boolean;
}
return false;
}
/**
* Return error if there was a problem activating the workflow
*
@ -190,7 +199,6 @@ export class ActiveWorkflowRunner {
return this.activationErrors[id];
}
/**
* Adds all the webhooks of the workflow
*
@ -202,13 +210,70 @@ export class ActiveWorkflowRunner {
*/
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise<void> {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
let path = '' as string | undefined;
for (const webhookData of webhooks) {
await this.activeWebhooks!.add(workflow, webhookData, mode);
const node = workflow.getNode(webhookData.node) as INode;
node.name = webhookData.node;
path = node.parameters.path as string;
if (node.parameters.path === undefined) {
path = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['path']) as string | undefined;
if (path === undefined) {
// TODO: Use a proper logger
console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflow.id}".`);
continue;
}
}
const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['isFullPath'], false) as boolean;
const webhook = {
workflowId: webhookData.workflowId,
webhookPath: NodeHelpers.getNodeWebhookPath(workflow.id as string, node, path, isFullPath),
node: node.name,
method: webhookData.httpMethod,
} as IWebhookDb;
try {
await Db.collections.Webhook?.insert(webhook);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
if (webhookExists === false) {
// If webhook does not exist yet create it
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false);
}
} catch (error) {
let errorMessage = '';
await Db.collections.Webhook?.delete({ workflowId: workflow.id });
// if it's a workflow from the the insert
// TODO check if there is standard error code for deplicate key violation that works
// with all databases
if (error.name === 'MongoError' || error.name === 'QueryFailedError') {
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
} else if (error.detail) {
// it's a error runnig the webhook methods (checkExists, create)
errorMessage = error.detail;
} else {
errorMessage = error.message;
}
throw new Error(errorMessage);
}
}
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
}
}
/**
@ -227,12 +292,28 @@ export class ActiveWorkflowRunner {
const nodeTypes = NodeTypes();
const workflow = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings });
await this.activeWebhooks!.removeWorkflow(workflow);
const mode = 'internal';
// Save the static workflow data if needed
await WorkflowHelpers.saveStaticData(workflow);
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
for (const webhookData of webhooks) {
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
}
// if it's a mongo objectId convert it to string
if (typeof workflowData.id === 'object') {
workflowData.id = workflowData.id.toString();
}
const webhook = {
workflowId: workflowData.id,
} as IWebhookDb;
await Db.collections.Webhook?.delete(webhook);
}
/**
* Runs the given workflow
@ -322,7 +403,6 @@ export class ActiveWorkflowRunner {
});
}
/**
* Makes a workflow active
*
@ -361,7 +441,11 @@ export class ActiveWorkflowRunner {
// Add the workflows which have webhooks defined
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode);
if (workflowInstance.getTriggerNodes().length !== 0
|| workflowInstance.getPollNodes().length !== 0) {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
}
if (this.activationErrors[workflowId] !== undefined) {
// If there were activation errors delete them
@ -386,7 +470,6 @@ export class ActiveWorkflowRunner {
await WorkflowHelpers.saveStaticData(workflowInstance!);
}
/**
* Makes a workflow inactive
*
@ -395,6 +478,7 @@ export class ActiveWorkflowRunner {
* @memberof ActiveWorkflowRunner
*/
async remove(workflowId: string): Promise<void> {
if (this.activeWorkflows !== null) {
// Remove all the webhooks of the workflow
await this.removeWorkflowWebhooks(workflowId);
@ -404,8 +488,13 @@ export class ActiveWorkflowRunner {
delete this.activationErrors[workflowId];
}
// Remove the workflow from the "list" of active workflows
return this.activeWorkflows.remove(workflowId);
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
this.activeWorkflows.remove(workflowId);
}
return;
}
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);

View file

@ -29,22 +29,27 @@ export let collections: IDatabaseCollections = {
Credentials: null,
Execution: null,
Workflow: null,
Webhook: null,
};
import {
InitialMigration1587669153312
InitialMigration1587669153312,
WebhookModel1589476000887,
} from './databases/postgresdb/migrations';
import {
InitialMigration1587563438936
InitialMigration1587563438936,
WebhookModel1592679094242,
} from './databases/mongodb/migrations';
import {
InitialMigration1588157391238
InitialMigration1588157391238,
WebhookModel1592447867632,
} from './databases/mysqldb/migrations';
import {
InitialMigration1588102412422
InitialMigration1588102412422,
WebhookModel1592445003908,
} from './databases/sqlite/migrations';
import * as path from 'path';
@ -66,7 +71,7 @@ export async function init(): Promise<IDatabaseCollections> {
entityPrefix,
url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string,
useNewUrlParser: true,
migrations: [InitialMigration1587563438936],
migrations: [InitialMigration1587563438936, WebhookModel1592679094242],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@ -99,7 +104,7 @@ export async function init(): Promise<IDatabaseCollections> {
port: await GenericHelpers.getConfigValue('database.postgresdb.port') as number,
username: await GenericHelpers.getConfigValue('database.postgresdb.user') as string,
schema: config.get('database.postgresdb.schema'),
migrations: [InitialMigration1587669153312],
migrations: [InitialMigration1587669153312, WebhookModel1589476000887],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
ssl,
@ -118,7 +123,7 @@ export async function init(): Promise<IDatabaseCollections> {
password: await GenericHelpers.getConfigValue('database.mysqldb.password') as string,
port: await GenericHelpers.getConfigValue('database.mysqldb.port') as number,
username: await GenericHelpers.getConfigValue('database.mysqldb.user') as string,
migrations: [InitialMigration1588157391238],
migrations: [InitialMigration1588157391238, WebhookModel1592447867632],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@ -130,7 +135,7 @@ export async function init(): Promise<IDatabaseCollections> {
type: 'sqlite',
database: path.join(n8nFolder, 'database.sqlite'),
entityPrefix,
migrations: [InitialMigration1588102412422],
migrations: [InitialMigration1588102412422, WebhookModel1592445003908],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@ -155,6 +160,7 @@ export async function init(): Promise<IDatabaseCollections> {
collections.Credentials = getRepository(entities.CredentialsEntity);
collections.Execution = getRepository(entities.ExecutionEntity);
collections.Workflow = getRepository(entities.WorkflowEntity);
collections.Webhook = getRepository(entities.WebhookEntity);
return collections;
}

View file

@ -49,8 +49,15 @@ export interface IDatabaseCollections {
Credentials: Repository<ICredentialsDb> | null;
Execution: Repository<IExecutionFlattedDb> | null;
Workflow: Repository<IWorkflowDb> | null;
Webhook: Repository<IWebhookDb> | null;
}
export interface IWebhookDb {
workflowId: number | string | ObjectID;
webhookPath: string;
method: string;
node: string;
}
export interface IWorkflowBase extends IWorkflowBaseWorkflow {
id?: number | string | ObjectID;

View file

@ -457,7 +457,9 @@ class App {
await this.externalHooks.run('workflow.update', [newWorkflowData]);
if (this.activeWorkflowRunner.isActive(id)) {
const isActive = await this.activeWorkflowRunner.isActive(id);
if (isActive) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await this.activeWorkflowRunner.remove(id);
@ -526,7 +528,9 @@ class App {
await this.externalHooks.run('workflow.delete', [id]);
if (this.activeWorkflowRunner.isActive(id)) {
const isActive = await this.activeWorkflowRunner.isActive(id);
if (isActive) {
// Before deleting a workflow deactivate it
await this.activeWorkflowRunner.remove(id);
}
@ -666,7 +670,8 @@ class App {
// Returns the active workflow ids
this.app.get(`/${this.restEndpoint}/active`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string[]> => {
return this.activeWorkflowRunner.getActiveWorkflows();
const activeWorkflows = await this.activeWorkflowRunner.getActiveWorkflows();
return activeWorkflows.map(workflow => workflow.id.toString()) as string[];
}));

View file

@ -141,12 +141,14 @@ export class TestWebhooks {
let key: string;
for (const webhookData of webhooks) {
key = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path);
await this.activeWebhooks!.add(workflow, webhookData, mode);
this.testWebhookData[key] = {
sessionId,
timeout,
workflowData,
};
await this.activeWebhooks!.add(workflow, webhookData, mode);
// Save static data!
this.testWebhookData[key].workflowData.staticData = workflow.staticData;

View file

@ -69,6 +69,33 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
return returnData;
}
/**
* Returns all the webhooks which should be created for the give workflow
*
* @export
* @param {string} workflowId
* @param {Workflow} workflow
* @returns {IWebhookData[]}
*/
export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
// Check all the nodes in the workflow if they have webhooks
const returnData: IWebhookData[] = [];
let parentNodes: string[] | undefined;
for (const node of Object.values(workflow.nodes)) {
if (parentNodes !== undefined && !parentNodes.includes(node.name)) {
// If parentNodes are given check only them if they have webhooks
// and no other ones
continue;
}
returnData.push.apply(returnData, NodeHelpers.getNodeWebhooksBasic(workflow, node));
}
return returnData;
}
/**
* Executes a webhook

View file

@ -0,0 +1,30 @@
import {
Column,
Entity,
Index,
ObjectID,
ObjectIdColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@ObjectIdColumn()
id: ObjectID;
@Column()
workflowId: number;
@Column()
webhookPath: string;
@Column()
method: string;
@Column()
node: string;
}

View file

@ -1,3 +1,5 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View file

@ -0,0 +1,57 @@
import {
MigrationInterface,
} from 'typeorm';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow/dist/src/Workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
import * as config from '../../../../config';
import {
MongoQueryRunner,
} from 'typeorm/driver/mongodb/MongoQueryRunner';
export class WebhookModel1592679094242 implements MigrationInterface {
name = 'WebhookModel1592679094242';
async up(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
const workflows = await queryRunner.cursor( `${tablePrefix}workflow_entity`, { active: true }).toArray() as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.insertMany(`${tablePrefix}webhook_entity`, data);
}
await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false });
}
async down(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.dropTable(`${tablePrefix}webhook_entity`);
}
}

View file

@ -1 +1,2 @@
export * from './1587563438936-InitialMigration';
export * from './1592679094242-WebhookModel';

View file

@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View file

@ -1,3 +1,4 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View file

@ -0,0 +1,59 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
export class WebhookModel1592447867632 implements MigrationInterface {
name = 'WebhookModel1592447867632';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity (workflowId int NOT NULL, webhookPath varchar(255) NOT NULL, method varchar(255) NOT NULL, node varchar(255) NOT NULL, PRIMARY KEY (webhookPath, method)) ENGINE=InnoDB`);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`);
}
}

View file

@ -1 +1,2 @@
export * from './1588157391238-InitialMigration';
export * from './1592447867632-WebhookModel';

View file

@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View file

@ -1,3 +1,5 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View file

@ -1,4 +1,5 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import {
MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';

View file

@ -0,0 +1,69 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
import * as config from '../../../../config';
export class WebhookModel1589476000887 implements MigrationInterface {
name = 'WebhookModel1589476000887';
async up(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const tablePrefixIndex = tablePrefix;
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`, undefined);
}
}

View file

@ -1 +1,3 @@
export * from './1587669153312-InitialMigration';
export * from './1589476000887-WebhookModel';

View file

@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View file

@ -1,4 +1,4 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View file

@ -1,4 +1,7 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';

View file

@ -0,0 +1,63 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
export class WebhookModel1592445003908 implements MigrationInterface {
name = 'WebhookModel1592445003908';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
workflow.nodes = JSON.parse(workflow.nodes as unknown as string);
workflow.connections = JSON.parse(workflow.connections as unknown as string);
workflow.staticData = JSON.parse(workflow.staticData as unknown as string);
workflow.settings = JSON.parse(workflow.settings as unknown as string);
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`);
}
}

View file

@ -1 +1,2 @@
export * from './1588102412422-InitialMigration';
export * from './1592445003908-WebhookModel';

View file

@ -35,13 +35,20 @@ export class ActiveWebhooks {
throw new Error('Webhooks can only be added for saved workflows as an id is needed!');
}
const webhookKey = this.getWebhookKey(webhookData.httpMethod, webhookData.path);
//check that there is not a webhook already registed with that path/method
if (this.webhookUrls[webhookKey] !== undefined) {
throw new Error(`Test-Webhook can not be activated because another one with the same method "${webhookData.httpMethod}" and path "${webhookData.path}" is already active!`);
}
if (this.workflowWebhooks[webhookData.workflowId] === undefined) {
this.workflowWebhooks[webhookData.workflowId] = [];
}
// Make the webhook available directly because sometimes to create it successfully
// it gets called
this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)] = webhookData;
this.webhookUrls[webhookKey] = webhookData;
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
if (webhookExists === false) {

View file

@ -418,7 +418,8 @@ export function getNodeWebhookUrl(name: string, workflow: Workflow, node: INode,
return undefined;
}
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString());
const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean;
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath);
}

View file

@ -23,7 +23,9 @@
"test:e2e": "vue-cli-service test:e2e",
"test:unit": "vue-cli-service test:unit"
},
"dependencies": {},
"dependencies": {
"uuid": "^8.1.0"
},
"devDependencies": {
"@beyonk/google-fonts-webpack-plugin": "^1.2.3",
"@fortawesome/fontawesome-svg-core": "^1.2.19",

View file

@ -110,8 +110,9 @@ export default mixins(
const workflowId = this.$store.getters.workflowId;
const path = this.getValue(webhookData, 'path');
const isFullPath = this.getValue(webhookData, 'isFullPath') as unknown as boolean || false;
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflowId, this.node, path);
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflowId, this.node, path, isFullPath);
},
},
watch: {

View file

@ -126,6 +126,8 @@ import RunData from '@/components/RunData.vue';
import mixins from 'vue-typed-mixins';
import { v4 as uuidv4 } from 'uuid';
import { debounce } from 'lodash';
import axios from 'axios';
import {
@ -946,6 +948,10 @@ export default mixins(
// Check if node-name is unique else find one that is
newNodeData.name = this.getUniqueNodeName(newNodeData.name);
if (nodeTypeData.webhooks && nodeTypeData.webhooks.length) {
newNodeData.webhookId = uuidv4();
}
await this.addNodes([newNodeData]);
// Automatically deselect all nodes and select the current one and also active
@ -1579,6 +1585,11 @@ export default mixins(
console.error(e); // eslint-disable-line no-console
}
node.parameters = nodeParameters !== null ? nodeParameters : {};
// if it's a webhook and the path is empty set the UUID as the default path
if (node.type === 'n8n-nodes-base.webhook' && node.parameters.path === '') {
node.parameters.path = node.webhookId as string;
}
}
foundNodeIssues = this.getNodeIssues(nodeType, node);

View file

@ -58,8 +58,8 @@
"change-case": "^4.1.1",
"copyfiles": "^2.1.1",
"inquirer": "^7.0.0",
"n8n-core": "^0.31.0",
"n8n-workflow": "^0.28.0",
"n8n-core": "^0.36.0",
"n8n-workflow": "^0.33.0",
"replace-in-file": "^6.0.0",
"request": "^2.88.2",
"tmp-promise": "^2.0.2",

View file

@ -77,6 +77,7 @@ export class Webhook implements INodeType {
{
name: 'default',
httpMethod: '={{$parameter["httpMethod"]}}',
isFullPath: true,
responseCode: '={{$parameter["responseCode"]}}',
responseMode: '={{$parameter["responseMode"]}}',
responseData: '={{$parameter["responseData"]}}',
@ -133,7 +134,7 @@ export class Webhook implements INodeType {
default: '',
placeholder: 'webhook',
required: true,
description: 'The path to listen to. Slashes("/") in the path are not allowed.',
description: 'The path to listen to.',
},
{
displayName: 'Response Code',

View file

@ -336,6 +336,7 @@ export interface INode {
continueOnFail?: boolean;
parameters: INodeParameters;
credentials?: INodeCredentials;
webhookId?: string;
}
@ -558,8 +559,9 @@ export interface IWebhookData {
}
export interface IWebhookDescription {
[key: string]: WebhookHttpMethod | WebhookResponseMode | string | undefined;
[key: string]: WebhookHttpMethod | WebhookResponseMode | boolean | string | undefined;
httpMethod: WebhookHttpMethod | string;
isFullPath?: boolean;
name: string;
path: string;
responseBinaryPropertyName?: string;

View file

@ -755,7 +755,7 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
const returnData: IWebhookData[] = [];
for (const webhookDescription of nodeType.description.webhooks) {
let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path'], 'GET');
let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path']);
if (nodeWebhookPath === undefined) {
// TODO: Use a proper logger
console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`);
@ -768,7 +768,8 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
nodeWebhookPath = nodeWebhookPath.slice(1);
}
const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath);
const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean;
const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath, isFullPath);
const httpMethod = workflow.getSimpleParameterValue(node, webhookDescription['httpMethod'], 'GET');
@ -791,6 +792,61 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
return returnData;
}
export function getNodeWebhooksBasic(workflow: Workflow, node: INode): IWebhookData[] {
if (node.disabled === true) {
// Node is disabled so webhooks will also not be enabled
return [];
}
const nodeType = workflow.nodeTypes.getByName(node.type) as INodeType;
if (nodeType.description.webhooks === undefined) {
// Node does not have any webhooks so return
return [];
}
const workflowId = workflow.id || '__UNSAVED__';
const returnData: IWebhookData[] = [];
for (const webhookDescription of nodeType.description.webhooks) {
let nodeWebhookPath = workflow.getSimpleParameterValue(node, webhookDescription['path']);
if (nodeWebhookPath === undefined) {
// TODO: Use a proper logger
console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`);
continue;
}
nodeWebhookPath = nodeWebhookPath.toString();
if (nodeWebhookPath.charAt(0) === '/') {
nodeWebhookPath = nodeWebhookPath.slice(1);
}
const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookDescription['isFullPath'], false) as boolean;
const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath, isFullPath);
const httpMethod = workflow.getSimpleParameterValue(node, webhookDescription['httpMethod']);
if (httpMethod === undefined) {
// TODO: Use a proper logger
console.error(`The webhook "${path}" for node "${node.name}" in workflow "${workflowId}" could not be added because the httpMethod is not defined.`);
continue;
}
//@ts-ignore
returnData.push({
httpMethod: httpMethod.toString() as WebhookHttpMethod,
node: node.name,
path,
webhookDescription,
workflowId,
});
}
return returnData;
}
/**
* Returns the webhook path
@ -801,8 +857,17 @@ export function getNodeWebhooks(workflow: Workflow, node: INode, additionalData:
* @param {string} path
* @returns {string}
*/
export function getNodeWebhookPath(workflowId: string, node: INode, path: string): string {
return `${workflowId}/${encodeURIComponent(node.name.toLowerCase())}/${path}`;
export function getNodeWebhookPath(workflowId: string, node: INode, path: string, isFullPath?: boolean): string {
let webhookPath = '';
if (node.webhookId === undefined) {
webhookPath = `${workflowId}/${encodeURIComponent(node.name.toLowerCase())}/${path}`;
} else {
if (isFullPath === true) {
return path;
}
webhookPath = `${node.webhookId}/${path}`;
}
return webhookPath;
}
@ -814,11 +879,11 @@ export function getNodeWebhookPath(workflowId: string, node: INode, path: string
* @param {string} workflowId
* @param {string} nodeTypeName
* @param {string} path
* @param {boolean} isFullPath
* @returns {string}
*/
export function getNodeWebhookUrl(baseUrl: string, workflowId: string, node: INode, path: string): string {
// return `${baseUrl}/${workflowId}/${nodeTypeName}/${path}`;
return `${baseUrl}/${getNodeWebhookPath(workflowId, node, path)}`;
export function getNodeWebhookUrl(baseUrl: string, workflowId: string, node: INode, path: string, isFullPath?: boolean): string {
return `${baseUrl}/${getNodeWebhookPath(workflowId, node, path, isFullPath)}`;
}

View file

@ -715,7 +715,7 @@ export class Workflow {
* @returns {(string | undefined)}
* @memberof Workflow
*/
getSimpleParameterValue(node: INode, parameterValue: string | undefined, defaultValue?: boolean | number | string): boolean | number | string | undefined {
getSimpleParameterValue(node: INode, parameterValue: string | boolean | undefined, defaultValue?: boolean | number | string): boolean | number | string | undefined {
if (parameterValue === undefined) {
// Value is not set so return the default
return defaultValue;