Separate webhooks from core (#1408)

* Unify execution ID across executions

* Fix indentation and improved comments

* WIP: saving data after each node execution

* Added on/off to save data after each step, saving initial data and retries working

* Fixing lint issues

* Fixing more lint issues

*  Add bull to execute workflows

* 👕 Fix lint issue

*  Add graceful shutdown to worker

*  Add loading staticData to worker

* 👕 Fix lint issue

*  Fix import

* Changed tables metadata to add nullable to stoppedAt

* Reload database on migration run

* Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration

* Added checks to Redis and exiting process if connection is unavailable

* Fixing error with new installations

* Fix issue with data not being sent back to browser on manual executions with defined destination

* Merging bull and unify execution id branch fixes

* Main process will now get execution success from database instead of redis

* Omit execution duration if execution did not stop

* Fix issue with execution list displaying inconsistant information information while a workflow is running

* Remove unused hooks to clarify for developers that these wont run in queue mode

* Added active pooling to help recover from Redis crashes

* Lint issues

* Changing default polling interval to 60 seconds

* Removed unnecessary attributes from bull job

* Added webhooks service and setting to disable webhooks from main process

* Fixed executions list when running with queues. Now we get the list of actively running workflows from bull.

* Add option to disable deregistration of webhooks on shutdown

* Rename WEBHOOK_TUNNEL_URL to WEBHOOK_URL keeping backwards compat.

* Added auto refresh to executions list

* Improvements to workflow stop process when running with queues

* Refactor queue system to use a singleton and avoid code duplication

* Improve comments and remove unnecessary commits

* Remove console.log from vue file

* Blocking webhook process to run without queues

* Handling execution stop graciously when possible

* Removing initialization of all workflows from webhook process

* Refactoring code to remove code duplication for job stop

* Improved execution list to be more fluid and less intrusive

* Fixing workflow name for current executions when auto updating

*  Right align autorefresh checkbox

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue 2021-02-09 23:32:40 +01:00 committed by GitHub
parent 98fa529e51
commit e53efdd337
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 869 additions and 177 deletions

View file

@ -49,7 +49,7 @@ services:
- N8N_PROTOCOL=https - N8N_PROTOCOL=https
- NODE_ENV=production - NODE_ENV=production
- N8N_PATH - N8N_PATH
- WEBHOOK_TUNNEL_URL=https://${DOMAIN_NAME}${N8N_PATH} - WEBHOOK_URL=https://${DOMAIN_NAME}${N8N_PATH}
volumes: volumes:
- /var/run/docker.sock:/var/run/docker.sock - /var/run/docker.sock:/var/run/docker.sock
- ${DATA_FOLDER}/.n8n:/home/node/.n8n - ${DATA_FOLDER}/.n8n:/home/node/.n8n

View file

@ -82,8 +82,10 @@ export class Start extends Command {
process.exit(processExistCode); process.exit(processExistCode);
}, 30000); }, 30000);
const skipWebhookDeregistration = config.get('endpoints.skipWebhoooksDeregistrationOnShutdown') as boolean;
const removePromises = []; const removePromises = [];
if (activeWorkflowRunner !== undefined) { if (activeWorkflowRunner !== undefined && skipWebhookDeregistration !== true) {
removePromises.push(activeWorkflowRunner.removeAll()); removePromises.push(activeWorkflowRunner.removeAll());
} }
@ -253,8 +255,8 @@ export class Start extends Command {
// @ts-ignore // @ts-ignore
const webhookTunnel = await localtunnel(port, tunnelSettings); const webhookTunnel = await localtunnel(port, tunnelSettings);
process.env.WEBHOOK_TUNNEL_URL = webhookTunnel.url + '/'; process.env.WEBHOOK_URL = webhookTunnel.url + '/';
this.log(`Tunnel URL: ${process.env.WEBHOOK_TUNNEL_URL}\n`); this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`);
this.log('IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!'); this.log('IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!');
} }

View file

@ -0,0 +1,223 @@
import {
UserSettings,
} from 'n8n-core';
import { Command, flags } from '@oclif/command';
import * as Redis from 'ioredis';
import * as config from '../config';
import {
ActiveExecutions,
ActiveWorkflowRunner,
CredentialsOverwrites,
CredentialTypes,
Db,
ExternalHooks,
GenericHelpers,
LoadNodesAndCredentials,
NodeTypes,
TestWebhooks,
WebhookServer,
} from "../src";
import { IDataObject } from 'n8n-workflow';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
let processExistCode = 0;
export class Webhook extends Command {
static description = 'Starts n8n webhook process. Intercepts only production URLs.';
static examples = [
`$ n8n webhook`,
];
static flags = {
help: flags.help({ char: 'h' }),
};
/**
* Stops the n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
* get removed.
*/
static async stopProcess() {
console.log(`\nStopping n8n...`);
try {
const externalHooks = ExternalHooks();
await externalHooks.run('n8n.stop', []);
setTimeout(() => {
// In case that something goes wrong with shutdown we
// kill after max. 30 seconds no matter what
process.exit(processExistCode);
}, 30000);
const removePromises = [];
if (activeWorkflowRunner !== undefined) {
removePromises.push(activeWorkflowRunner.removeAll());
}
// Remove all test webhooks
const testWebhooks = TestWebhooks.getInstance();
removePromises.push(testWebhooks.removeAll());
await Promise.all(removePromises);
// Wait for active workflow executions to finish
const activeExecutionsInstance = ActiveExecutions.getInstance();
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
}
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
} catch (error) {
console.error('There was an error shutting down n8n.', error);
}
process.exit(processExistCode);
}
async run() {
// Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Webhook.stopProcess);
process.on('SIGINT', Webhook.stopProcess);
const { flags } = this.parse(Webhook);
// Wrap that the process does not close but we can still use async
await (async () => {
if (config.get('executions.mode') !== 'queue') {
/**
* It is technically possible to run without queues but
* there are 2 known bugs when running in this mode:
* - Executions list will be problematic as the main process
* is not aware of current executions in the webhook processes
* and therefore will display all current executions as error
* as it is unable to determine if it is still running or crashed
* - You cannot stop currently executing jobs from webhook processes
* when running without queues as the main process cannot talk to
* the wehbook processes to communicate workflow execution interruption.
*/
this.error('Webhook processes can only run with execution mode as queue.');
}
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch(error => {
console.error(`There was an error initializing DB: ${error.message}`);
processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
});
// Make sure the settings exist
const userSettings = await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Load the credentials overwrites if any exist
const credentialsOverwrites = CredentialsOverwrites();
await credentialsOverwrites.init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes();
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
const credentialTypes = CredentialTypes();
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
// Wait till the database is ready
await startDbInitPromise;
if (config.get('executions.mode') === 'queue') {
const redisHost = config.get('queue.bull.redis.host');
const redisPassword = config.get('queue.bull.redis.password');
const redisPort = config.get('queue.bull.redis.port');
const redisDB = config.get('queue.bull.redis.db');
const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold');
let lastTimer = 0, cumulativeTimeout = 0;
const settings = {
retryStrategy: (times: number): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
return 500;
},
} as IDataObject;
if (redisHost) {
settings.host = redisHost;
}
if (redisPassword) {
settings.password = redisPassword;
}
if (redisPort) {
settings.port = redisPort;
}
if (redisDB) {
settings.db = redisDB;
}
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above
// to control how and when to exit.
const redis = new Redis(settings);
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
console.warn('Redis unavailable - trying to reconnect...');
} else {
console.warn('Error with Redis: ', error);
}
});
}
await WebhookServer.start();
// Start to get active workflows and run their triggers
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
await activeWorkflowRunner.initWebhooks();
const editorUrl = GenericHelpers.getBaseUrl();
this.log('Webhook listener waiting for requests.');
} catch (error) {
this.error(`There was an error: ${error.message}`);
processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
}
})();
}
}

View file

@ -39,6 +39,7 @@ import {
import * as config from '../config'; import * as config from '../config';
import * as Bull from 'bull'; import * as Bull from 'bull';
import * as Queue from '../src/Queue';
export class Worker extends Command { export class Worker extends Command {
static description = '\nStarts a n8n worker'; static description = '\nStarts a n8n worker';
@ -112,7 +113,6 @@ export class Worker extends Command {
const jobData = job.data as IBullJobData; const jobData = job.data as IBullJobData;
const executionDb = await Db.collections.Execution!.findOne(jobData.executionId) as IExecutionFlattedDb; const executionDb = await Db.collections.Execution!.findOne(jobData.executionId) as IExecutionFlattedDb;
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse; const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
console.log(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`); console.log(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`);
let staticData = currentExecutionDb.workflowData!.staticData; let staticData = currentExecutionDb.workflowData!.staticData;
@ -203,16 +203,9 @@ export class Worker extends Command {
// Wait till the database is ready // Wait till the database is ready
await startDbInitPromise; await startDbInitPromise;
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as IDataObject;
const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold'); const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold');
// Disabling ready check is necessary as it allows worker to
// quickly reconnect to Redis if Redis crashes or is unreachable Worker.jobQueue = Queue.getInstance().getBullObjectInstance();
// for some time. With it enabled, worker might take minutes to realize
// redis is back up and resume working.
redisOptions.enableReadyCheck = false;
Worker.jobQueue = new Bull('jobs', { prefix, redis: redisOptions });
Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes)); Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes));
const versions = await GenericHelpers.getVersions(); const versions = await GenericHelpers.getVersions();

View file

@ -464,6 +464,30 @@ const config = convict({
env: 'N8N_ENDPOINT_WEBHOOK_TEST', env: 'N8N_ENDPOINT_WEBHOOK_TEST',
doc: 'Path for test-webhook endpoint', doc: 'Path for test-webhook endpoint',
}, },
disableProductionWebhooksOnMainProcess: {
format: Boolean,
default: false,
env: 'N8N_DISABLE_PRODUCTION_MAIN_PROCESS',
doc: 'Disable production webhooks from main process. This helps ensures no http traffic load to main process when using webhook-specific processes.',
},
skipWebhoooksDeregistrationOnShutdown: {
/**
* Longer explanation: n8n deregisters webhooks on shutdown / deactivation
* and registers on startup / activation. If we skip
* deactivation on shutdown, webhooks will remain active on 3rd party services.
* We don't have to worry about startup as it always
* checks if webhooks already exist.
* If users want to upgrade n8n, it is possible to run
* two instances simultaneously without downtime, similar
* to blue/green deployment.
* WARNING: Trigger nodes (like Cron) will cause duplication
* of work, so be aware when using.
*/
doc: 'Deregister webhooks on external services only when workflows are deactivated. Useful for blue/green deployments.',
format: Boolean,
default: false,
env: 'N8N_SKIP_WEBHOOK_DEREGISTRATION_STARTUP_SHUTDOWN',
},
}, },
externalHookFiles: { externalHookFiles: {

View file

@ -76,6 +76,10 @@ export class ActiveWorkflowRunner {
} }
} }
async initWebhooks() {
this.activeWorkflows = new ActiveWorkflows();
}
/** /**
* Removes all the currently active workflows * Removes all the currently active workflows
* *

67
packages/cli/src/Queue.ts Normal file
View file

@ -0,0 +1,67 @@
import * as Bull from 'bull';
import * as config from '../config';
import { IBullJobData } from './Interfaces';
export class Queue {
private jobQueue: Bull.Queue;
constructor() {
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
// Disabling ready check is necessary as it allows worker to
// quickly reconnect to Redis if Redis crashes or is unreachable
// for some time. With it enabled, worker might take minutes to realize
// redis is back up and resume working.
// More here: https://github.com/OptimalBits/bull/issues/890
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
}
async add(jobData: IBullJobData, jobOptions: object): Promise<Bull.Job> {
return await this.jobQueue.add(jobData,jobOptions);
}
async getJob(jobId: Bull.JobId): Promise<Bull.Job | null> {
return await this.jobQueue.getJob(jobId);
}
async getJobs(jobTypes: Bull.JobStatus[]): Promise<Bull.Job[]> {
return await this.jobQueue.getJobs(jobTypes);
}
getBullObjectInstance(): Bull.Queue {
return this.jobQueue;
}
/**
*
* @param job A Bull.Job instance
* @returns boolean true if we were able to securely stop the job
*/
async stopJob(job: Bull.Job): Promise<boolean> {
if (await job.isActive()) {
// Job is already running so tell it to stop
await job.progress(-1);
return true;
} else {
// Job did not get started yet so remove from queue
try {
await job.remove();
return true;
} catch (e) {
await job.progress(-1);
}
}
return false;
}
}
let activeQueueInstance: Queue | undefined;
export function getInstance(): Queue {
if (activeQueueInstance === undefined) {
activeQueueInstance = new Queue();
}
return activeQueueInstance;
}

View file

@ -188,6 +188,7 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb):
startedAt: fullExecutionData.startedAt, startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt, stoppedAt: fullExecutionData.stoppedAt,
finished: fullExecutionData.finished ? fullExecutionData.finished : false, finished: fullExecutionData.finished ? fullExecutionData.finished : false,
workflowId: fullExecutionData.workflowId,
}); });
return returnData; return returnData;

View file

@ -62,6 +62,7 @@ import {
ResponseHelper, ResponseHelper,
TestWebhooks, TestWebhooks,
WebhookHelpers, WebhookHelpers,
WebhookServer,
WorkflowCredentials, WorkflowCredentials,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowRunner, WorkflowRunner,
@ -105,6 +106,7 @@ import * as jwks from 'jwks-rsa';
import * as timezones from 'google-timezones-json'; import * as timezones from 'google-timezones-json';
import * as parseUrl from 'parseurl'; import * as parseUrl from 'parseurl';
import * as querystring from 'querystring'; import * as querystring from 'querystring';
import * as Queue from '../src/Queue';
import { OptionsWithUrl } from 'request-promise-native'; import { OptionsWithUrl } from 'request-promise-native';
class App { class App {
@ -1428,7 +1430,14 @@ class App {
limit = parseInt(req.query.limit as string, 10); limit = parseInt(req.query.limit as string, 10);
} }
const executingWorkflowIds = this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[]; let executingWorkflowIds;
if (config.get('executions.mode') === 'queue') {
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
executingWorkflowIds = currentJobs.map(job => job.data.executionId) as string[];
} else {
executingWorkflowIds = this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[];
}
const countFilter = JSON.parse(JSON.stringify(filter)); const countFilter = JSON.parse(JSON.stringify(filter));
countFilter.select = ['id']; countFilter.select = ['id'];
@ -1453,10 +1462,10 @@ class App {
resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]}); resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]});
}); });
if (req.query.lastId) { if (req.query.lastId) {
resultsQuery.andWhere(`execution.id <= :lastId`, {lastId: req.query.lastId}); resultsQuery.andWhere(`execution.id < :lastId`, {lastId: req.query.lastId});
} }
if (req.query.firstId) { if (req.query.firstId) {
resultsQuery.andWhere(`execution.id >= :firstId`, {firstId: req.query.firstId}); resultsQuery.andWhere(`execution.id > :firstId`, {firstId: req.query.firstId});
} }
if (executingWorkflowIds.length > 0) { if (executingWorkflowIds.length > 0) {
resultsQuery.andWhere(`execution.id NOT IN (:...ids)`, {ids: executingWorkflowIds}); resultsQuery.andWhere(`execution.id NOT IN (:...ids)`, {ids: executingWorkflowIds});
@ -1626,6 +1635,42 @@ class App {
// Returns all the currently working executions // Returns all the currently working executions
this.app.get(`/${this.restEndpoint}/executions-current`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsSummary[]> => { this.app.get(`/${this.restEndpoint}/executions-current`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsSummary[]> => {
if (config.get('executions.mode') === 'queue') {
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
const currentlyRunningExecutionIds = currentJobs.map(job => job.data.executionId);
const resultsQuery = await Db.collections.Execution!
.createQueryBuilder("execution")
.select([
'execution.id',
'execution.workflowId',
'execution.mode',
'execution.retryOf',
'execution.startedAt',
])
.orderBy('execution.id', 'DESC')
.andWhere(`execution.id IN (:...ids)`, {ids: currentlyRunningExecutionIds});
if (req.query.filter) {
const filter = JSON.parse(req.query.filter as string);
if (filter.workflowId !== undefined) {
resultsQuery.andWhere('execution.workflowId = :workflowId', {workflowId: filter.workflowId});
}
}
const results = await resultsQuery.getMany();
return results.map(result => {
return {
idActive: result.id,
workflowId: result.workflowId,
mode: result.mode,
retryOf: result.retryOf !== null ? result.retryOf : undefined,
startedAt: new Date(result.startedAt),
} as IExecutionsSummary;
});
} else {
const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions(); const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions();
const returnData: IExecutionsSummary[] = []; const returnData: IExecutionsSummary[] = [];
@ -1651,10 +1696,35 @@ class App {
} }
return returnData; return returnData;
}
})); }));
// Forces the execution to stop // Forces the execution to stop
this.app.post(`/${this.restEndpoint}/executions-current/:id/stop`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsStopData> => { this.app.post(`/${this.restEndpoint}/executions-current/:id/stop`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsStopData> => {
if (config.get('executions.mode') === 'queue') {
const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']);
const job = currentJobs.find(job => job.data.executionId.toString() === req.params.id);
if (!job) {
throw new Error(`Could not stop "${req.params.id}" as it is no longer in queue.`);
} else {
await Queue.getInstance().stopJob(job);
}
const executionDb = await Db.collections.Execution?.findOne(req.params.id) as IExecutionFlattedDb;
const fullExecutionData = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
const returnData: IExecutionsStopData = {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
};
return returnData;
} else {
const executionId = req.params.id; const executionId = req.params.id;
// Stopt he execution and wait till it is done and we got the data // Stopt he execution and wait till it is done and we got the data
@ -1672,6 +1742,7 @@ class App {
}; };
return returnData; return returnData;
}
})); }));
@ -1711,89 +1782,10 @@ class App {
// Webhooks // Webhooks
// ---------------------------------------- // ----------------------------------------
// HEAD webhook requests if (config.get('endpoints.disableProductionWebhooksOnMainProcess') !== true) {
this.app.head(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { WebhookServer.registerProductionWebhooks.apply(this);
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('HEAD', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
} }
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// OPTIONS webhook requests
this.app.options(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let allowedMethods: string[];
try {
allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
});
// GET webhook requests
this.app.get(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('GET', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// POST webhook requests
this.app.post(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('POST', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// HEAD webhook requests (test for UI) // HEAD webhook requests (test for UI)
this.app.head(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => { this.app.head(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook-test/" to get the registred part of the url // Cut away the "/webhook-test/" to get the registred part of the url

View file

@ -453,8 +453,11 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
export function getWebhookBaseUrl() { export function getWebhookBaseUrl() {
let urlBaseWebhook = GenericHelpers.getBaseUrl(); let urlBaseWebhook = GenericHelpers.getBaseUrl();
if (process.env.WEBHOOK_TUNNEL_URL !== undefined) { // We renamed WEBHOOK_TUNNEL_URL to WEBHOOK_URL. This is here to maintain
urlBaseWebhook = process.env.WEBHOOK_TUNNEL_URL; // backward compatibility. Will be deprecated and removed in the future.
if (process.env.WEBHOOK_TUNNEL_URL !== undefined || process.env.WEBHOOK_URL !== undefined) {
// @ts-ignore
urlBaseWebhook = process.env.WEBHOOK_TUNNEL_URL || process.env.WEBHOOK_URL;
} }
return urlBaseWebhook; return urlBaseWebhook;

View file

@ -0,0 +1,306 @@
import * as express from 'express';
import {
readFileSync,
} from 'fs';
import {
getConnectionManager,
} from 'typeorm';
import * as bodyParser from 'body-parser';
require('body-parser-xml')(bodyParser);
import * as _ from 'lodash';
import {
ActiveExecutions,
ActiveWorkflowRunner,
Db,
ExternalHooks,
GenericHelpers,
ICustomRequest,
IExternalHooksClass,
IPackageVersions,
ResponseHelper,
} from './';
import * as compression from 'compression';
import * as config from '../config';
import * as parseUrl from 'parseurl';
export function registerProductionWebhooks() {
// HEAD webhook requests
this.app.head(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('HEAD', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// OPTIONS webhook requests
this.app.options(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let allowedMethods: string[];
try {
allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
});
// GET webhook requests
this.app.get(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('GET', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// POST webhook requests
this.app.post(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let response;
try {
response = await this.activeWorkflowRunner.executeWebhook('POST', requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
}
class App {
app: express.Application;
activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner;
endpointWebhook: string;
endpointPresetCredentials: string;
externalHooks: IExternalHooksClass;
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
versions: IPackageVersions | undefined;
restEndpoint: string;
protocol: string;
sslKey: string;
sslCert: string;
presetCredentialsLoaded: boolean;
constructor() {
this.app = express();
this.endpointWebhook = config.get('endpoints.webhook') as string;
this.saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
this.saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
this.saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
this.executionTimeout = config.get('executions.timeout') as number;
this.maxExecutionTimeout = config.get('executions.maxTimeout') as number;
this.timezone = config.get('generic.timezone') as string;
this.restEndpoint = config.get('endpoints.rest') as string;
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.protocol = config.get('protocol');
this.sslKey = config.get('ssl_key');
this.sslCert = config.get('ssl_cert');
this.externalHooks = ExternalHooks();
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.get('credentials.overwrite.endpoint') as string;
}
/**
* Returns the current epoch time
*
* @returns {number}
* @memberof App
*/
getCurrentDate(): Date {
return new Date();
}
async config(): Promise<void> {
this.versions = await GenericHelpers.getVersions();
// Compress the response data
this.app.use(compression());
// Make sure that each request has the "parsedUrl" parameter
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
(req as ICustomRequest).parsedUrl = parseUrl(req);
// @ts-ignore
req.rawBody = Buffer.from('', 'base64');
next();
});
// Support application/json type post data
this.app.use(bodyParser.json({
limit: '16mb', verify: (req, res, buf) => {
// @ts-ignore
req.rawBody = buf;
},
}));
// Support application/xml type post data
// @ts-ignore
this.app.use(bodyParser.xml({
limit: '16mb', xmlParseOptions: {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
},
}));
this.app.use(bodyParser.text({
limit: '16mb', verify: (req, res, buf) => {
// @ts-ignore
req.rawBody = buf;
},
}));
//support application/x-www-form-urlencoded post data
this.app.use(bodyParser.urlencoded({ extended: false,
verify: (req, res, buf) => {
// @ts-ignore
req.rawBody = buf;
},
}));
if (process.env['NODE_ENV'] !== 'production') {
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
// Allow access also from frontend when developing
res.header('Access-Control-Allow-Origin', 'http://localhost:8080');
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE');
res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept, sessionid');
next();
});
}
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (Db.collections.Workflow === null) {
const error = new ResponseHelper.ResponseError('Database is not ready!', undefined, 503);
return ResponseHelper.sendErrorResponse(res, error);
}
next();
});
// ----------------------------------------
// Healthcheck
// ----------------------------------------
// Does very basic health check
this.app.get('/healthz', async (req: express.Request, res: express.Response) => {
const connectionManager = getConnectionManager();
if (connectionManager.connections.length === 0) {
const error = new ResponseHelper.ResponseError('No Database connection found!', undefined, 503);
return ResponseHelper.sendErrorResponse(res, error);
}
if (connectionManager.connections[0].isConnected === false) {
// Connection is not active
const error = new ResponseHelper.ResponseError('Database connection not active!', undefined, 503);
return ResponseHelper.sendErrorResponse(res, error);
}
// Everything fine
const responseData = {
status: 'ok',
};
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
});
registerProductionWebhooks.apply(this);
}
}
export async function start(): Promise<void> {
const PORT = config.get('port');
const ADDRESS = config.get('listen_address');
const app = new App();
await app.config();
let server;
if (app.protocol === 'https' && app.sslKey && app.sslCert) {
const https = require('https');
const privateKey = readFileSync(app.sslKey, 'utf8');
const cert = readFileSync(app.sslCert, 'utf8');
const credentials = { key: privateKey, cert };
server = https.createServer(credentials, app.app);
} else {
const http = require('http');
server = http.createServer(app.app);
}
server.listen(PORT, ADDRESS, async () => {
const versions = await GenericHelpers.getVersions();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${versions.cli}`);
await app.externalHooks.run('n8n.ready', [app]);
});
}

View file

@ -193,6 +193,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowExecuteBefore: [ workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> { async function (this: WorkflowHooks): Promise<void> {
// Push data to editor-ui once workflow finished // Push data to editor-ui once workflow finished
if (this.mode === 'manual') {
const pushInstance = Push.getInstance(); const pushInstance = Push.getInstance();
pushInstance.send('executionStarted', { pushInstance.send('executionStarted', {
executionId: this.executionId, executionId: this.executionId,
@ -202,11 +203,14 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id as string, workflowId: this.workflowData.id as string,
workflowName: this.workflowData.name, workflowName: this.workflowData.name,
}); });
}
}, },
], ],
workflowExecuteAfter: [ workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> { async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
if (this.mode === 'manual') {
pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf); pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf);
}
}, },
], ],
}; };

View file

@ -41,6 +41,7 @@ import { join as pathJoin } from 'path';
import { fork } from 'child_process'; import { fork } from 'child_process';
import * as Bull from 'bull'; import * as Bull from 'bull';
import * as Queue from './Queue';
export class WorkflowRunner { export class WorkflowRunner {
activeExecutions: ActiveExecutions.ActiveExecutions; activeExecutions: ActiveExecutions.ActiveExecutions;
@ -57,11 +58,7 @@ export class WorkflowRunner {
const executionsMode = config.get('executions.mode') as string; const executionsMode = config.get('executions.mode') as string;
if (executionsMode === 'queue') { if (executionsMode === 'queue') {
// Connect to bull-queue this.jobQueue = Queue.getInstance().getBullObjectInstance();
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
} }
} }
@ -251,14 +248,9 @@ export class WorkflowRunner {
const workflowExecution: PCancelable<IRun> = new PCancelable(async (resolve, reject, onCancel) => { const workflowExecution: PCancelable<IRun> = new PCancelable(async (resolve, reject, onCancel) => {
onCancel.shouldReject = false; onCancel.shouldReject = false;
onCancel(async () => { onCancel(async () => {
if (await job.isActive()) { await Queue.getInstance().stopJob(job);
// Job is already running so tell it to stop
await job.progress(-1);
} else {
// Job did not get started yet so remove from queue
await job.remove();
const fullRunData: IRun = { const fullRunData :IRun = {
data: { data: {
resultData: { resultData: {
error: { error: {
@ -271,10 +263,8 @@ export class WorkflowRunner {
startedAt: new Date(), startedAt: new Date(),
stoppedAt: new Date(), stoppedAt: new Date(),
}; };
this.activeExecutions.remove(executionId, fullRunData); this.activeExecutions.remove(executionId, fullRunData);
resolve(fullRunData); resolve(fullRunData);
}
}); });
const jobData: Promise<IBullJobResponse> = job.finished(); const jobData: Promise<IBullJobResponse> = job.finished();

View file

@ -17,6 +17,7 @@ import * as ResponseHelper from './ResponseHelper';
import * as Server from './Server'; import * as Server from './Server';
import * as TestWebhooks from './TestWebhooks'; import * as TestWebhooks from './TestWebhooks';
import * as WebhookHelpers from './WebhookHelpers'; import * as WebhookHelpers from './WebhookHelpers';
import * as WebhookServer from './WebhookServer';
import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData'; import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData';
import * as WorkflowHelpers from './WorkflowHelpers'; import * as WorkflowHelpers from './WorkflowHelpers';
export { export {
@ -29,6 +30,7 @@ export {
Server, Server,
TestWebhooks, TestWebhooks,
WebhookHelpers, WebhookHelpers,
WebhookServer,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers, WorkflowHelpers,
}; };

View file

@ -125,7 +125,7 @@ export interface IRestApi {
getActiveWorkflows(): Promise<string[]>; getActiveWorkflows(): Promise<string[]>;
getActivationError(id: string): Promise<IActivationError | undefined >; getActivationError(id: string): Promise<IActivationError | undefined >;
getCurrentExecutions(filter: object): Promise<IExecutionsCurrentSummaryExtended[]>; getCurrentExecutions(filter: object): Promise<IExecutionsCurrentSummaryExtended[]>;
getPastExecutions(filter: object, limit: number, lastId?: string | number): Promise<IExecutionsListResponse>; getPastExecutions(filter: object, limit: number, lastId?: string | number, firstId?: string | number): Promise<IExecutionsListResponse>;
stopCurrentExecution(executionId: string): Promise<IExecutionsStopData>; stopCurrentExecution(executionId: string): Promise<IExecutionsStopData>;
makeRestApiRequest(method: string, endpoint: string, data?: any): Promise<any>; // tslint:disable-line:no-any makeRestApiRequest(method: string, endpoint: string, data?: any): Promise<any>; // tslint:disable-line:no-any
getSettings(): Promise<IN8nUISettings>; getSettings(): Promise<IN8nUISettings>;

View file

@ -28,7 +28,10 @@
</el-option> </el-option>
</el-select> </el-select>
</el-col> </el-col>
<el-col :span="8">&nbsp; <el-col :span="4">&nbsp;
</el-col>
<el-col :span="4" class="autorefresh">
<el-checkbox v-model="autoRefresh" @change="handleAutoRefreshToggle">Auto refresh</el-checkbox>
</el-col> </el-col>
</el-row> </el-row>
</div> </div>
@ -191,6 +194,8 @@ export default mixins(
finishedExecutionsCount: 0, finishedExecutionsCount: 0,
checkAll: false, checkAll: false,
autoRefresh: true,
autoRefreshInterval: undefined as undefined | NodeJS.Timer,
filter: { filter: {
status: 'ALL', status: 'ALL',
@ -292,6 +297,10 @@ export default mixins(
// Handle the close externally as the visible parameter is an external prop // Handle the close externally as the visible parameter is an external prop
// and is so not allowed to be changed here. // and is so not allowed to be changed here.
this.$emit('closeDialog'); this.$emit('closeDialog');
if (this.autoRefreshInterval) {
clearInterval(this.autoRefreshInterval);
this.autoRefreshInterval = undefined;
}
return false; return false;
}, },
displayExecution (execution: IExecutionShortResponse) { displayExecution (execution: IExecutionShortResponse) {
@ -301,6 +310,18 @@ export default mixins(
}); });
this.closeDialog(); this.closeDialog();
}, },
handleAutoRefreshToggle () {
if (this.autoRefreshInterval) {
// Clear any previously existing intervals (if any - there shouldn't)
clearInterval(this.autoRefreshInterval);
this.autoRefreshInterval = undefined;
}
if (this.autoRefresh) {
this.autoRefreshInterval = setInterval(this.loadAutoRefresh, 4 * 1000); // refresh data every 4 secs
}
},
handleCheckAllChange () { handleCheckAllChange () {
if (this.checkAll === false) { if (this.checkAll === false) {
Vue.set(this, 'selectedItems', {}); Vue.set(this, 'selectedItems', {});
@ -389,6 +410,27 @@ export default mixins(
this.$store.commit('setActiveExecutions', activeExecutions); this.$store.commit('setActiveExecutions', activeExecutions);
}, },
async loadAutoRefresh () : Promise<void> {
let firstId: string | number | undefined = 0;
if (this.finishedExecutions.length !== 0) {
firstId = this.finishedExecutions[0].id;
}
const activeExecutionsPromise: Promise<IExecutionsListResponse> = this.restApi().getPastExecutions({}, 100, undefined, firstId);
const currentExecutionsPromise: Promise<IExecutionsCurrentSummaryExtended[]> = this.restApi().getCurrentExecutions({});
const results = await Promise.all([activeExecutionsPromise, currentExecutionsPromise]);
for (const activeExecution of results[1]) {
if (activeExecution.workflowId !== undefined && activeExecution.workflowName === undefined) {
activeExecution.workflowName = this.getWorkflowName(activeExecution.workflowId);
}
}
this.$store.commit('setActiveExecutions', results[1]);
this.finishedExecutions.unshift.apply(this.finishedExecutions, results[0].results);
this.finishedExecutionsCount = results[0].count;
},
async loadFinishedExecutions (): Promise<void> { async loadFinishedExecutions (): Promise<void> {
if (this.filter.status === 'running') { if (this.filter.status === 'running') {
this.finishedExecutions = []; this.finishedExecutions = [];
@ -459,6 +501,7 @@ export default mixins(
await this.loadWorkflows(); await this.loadWorkflows();
await this.refreshData(); await this.refreshData();
this.handleAutoRefreshToggle();
}, },
async retryExecution (execution: IExecutionShortResponse, loadWorkflow?: boolean) { async retryExecution (execution: IExecutionShortResponse, loadWorkflow?: boolean) {
this.isDataLoading = true; this.isDataLoading = true;
@ -544,6 +587,11 @@ export default mixins(
<style scoped lang="scss"> <style scoped lang="scss">
.autorefresh {
padding-right: 0.5em;
text-align: right;
}
.filters { .filters {
line-height: 2em; line-height: 2em;
.refresh-button { .refresh-button {

View file

@ -300,11 +300,12 @@ export const restApi = Vue.extend({
// Returns all saved executions // Returns all saved executions
// TODO: For sure needs some kind of default filter like last day, with max 10 results, ... // TODO: For sure needs some kind of default filter like last day, with max 10 results, ...
getPastExecutions: (filter: object, limit: number, lastId?: string | number): Promise<IExecutionsListResponse> => { getPastExecutions: (filter: object, limit: number, lastId?: string | number, firstId?: string | number): Promise<IExecutionsListResponse> => {
let sendData = {}; let sendData = {};
if (filter) { if (filter) {
sendData = { sendData = {
filter, filter,
firstId,
lastId, lastId,
limit, limit,
}; };

View file

@ -147,6 +147,7 @@ import {
NodeInputConnections, NodeInputConnections,
NodeHelpers, NodeHelpers,
Workflow, Workflow,
IRun,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
IConnectionsUi, IConnectionsUi,
@ -161,6 +162,7 @@ import {
IUpdateInformation, IUpdateInformation,
IWorkflowDataUpdate, IWorkflowDataUpdate,
XYPositon, XYPositon,
IPushDataExecutionFinished,
} from '../Interface'; } from '../Interface';
export default mixins( export default mixins(
@ -728,8 +730,38 @@ export default mixins(
type: 'success', type: 'success',
}); });
} catch (error) { } catch (error) {
// Execution stop might fail when the execution has already finished. Let's treat this here.
const execution = await this.restApi().getExecution(executionId) as IExecutionResponse;
if (execution.finished) {
const executedData = {
data: execution.data,
finished: execution.finished,
mode: execution.mode,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
} as IRun;
const pushData = {
data: executedData,
executionIdActive: executionId,
executionIdDb: executionId,
retryOf: execution.retryOf,
} as IPushDataExecutionFinished;
this.$store.commit('finishActiveExecution', pushData);
this.$titleSet(execution.workflowData.name, 'IDLE');
this.$store.commit('setExecutingNode', null);
this.$store.commit('setWorkflowExecutionData', executedData);
this.$store.commit('removeActiveAction', 'workflowRunning');
this.$showMessage({
title: 'Workflow finished executing',
message: 'Unable to stop operation in time. Workflow finished executing already.',
type: 'success',
});
} else {
this.$showError(error, 'Problem stopping execution', 'There was a problem stopping the execuction:'); this.$showError(error, 'Problem stopping execution', 'There was a problem stopping the execuction:');
} }
}
this.stopExecutionInProgress = false; this.stopExecutionInProgress = false;
}, },