refactor: Delete a lot of unused and duplicate code in Server and WebhookServer (#5080)

* store n8n version string in a const and use that everywhere

* reduce code duplication between Server and WebhookServer

* unify redis checks

* fix linting
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-01-04 11:38:48 +01:00 committed by GitHub
parent b67f803cbe
commit 8b19fdd5f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 882 additions and 1324 deletions

View file

@ -65,6 +65,7 @@
"@oclif/dev-cli": "^1.22.2",
"@types/basic-auth": "^1.1.2",
"@types/bcryptjs": "^2.4.2",
"@types/body-parser-xml": "^2.0.2",
"@types/compression": "1.0.1",
"@types/connect-history-api-fallback": "^1.3.1",
"@types/convict": "^4.2.1",

View file

@ -0,0 +1,453 @@
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import type { Url } from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import bodyParserXml from 'body-parser-xml';
import compression from 'compression';
import parseUrl from 'parseurl';
import { getConnectionManager } from 'typeorm';
import type { RedisOptions } from 'ioredis';
import {
ErrorReporterProxy as ErrorReporter,
LoggerProxy as Logger,
WebhookHttpMethod,
} from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION, inDevelopment } from '@/constants';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import type { IExternalHooksClass } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
import {
send,
sendErrorResponse,
sendSuccessResponse,
ServiceUnavailableError,
} from '@/ResponseHelper';
import { corsMiddleware } from '@/middlewares/cors';
import * as TestWebhooks from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
const emptyBuffer = Buffer.alloc(0);
export abstract class AbstractServer {
protected app: express.Application;
protected externalHooks: IExternalHooksClass;
protected activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner;
protected protocol: string;
protected sslKey: string;
protected sslCert: string;
protected timezone: string;
protected restEndpoint: string;
protected endpointWebhook: string;
protected endpointWebhookTest: string;
protected endpointWebhookWaiting: string;
abstract configure(): Promise<void>;
constructor() {
this.app = express();
this.app.disable('x-powered-by');
this.protocol = config.getEnv('protocol');
this.sslKey = config.getEnv('ssl_key');
this.sslCert = config.getEnv('ssl_cert');
this.timezone = config.getEnv('generic.timezone');
this.restEndpoint = config.getEnv('endpoints.rest');
this.endpointWebhook = config.getEnv('endpoints.webhook');
this.endpointWebhookTest = config.getEnv('endpoints.webhookTest');
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
this.externalHooks = ExternalHooks();
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
}
private async setupCommonMiddlewares() {
const { app } = this;
// Augment errors sent to Sentry
const {
Handlers: { requestHandler, errorHandler },
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
// Compress the response data
app.use(compression());
// Make sure that each request has the "parsedUrl" parameter
app.use((req, res, next) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
req.parsedUrl = parseUrl(req)!;
req.rawBody = emptyBuffer;
next();
});
const payloadSizeMax = config.getEnv('endpoints.payloadSizeMax');
// Support application/json type post data
app.use(
bodyParser.json({
limit: `${payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Support application/xml type post data
bodyParserXml(bodyParser);
app.use(
bodyParser.xml({
limit: `${payloadSizeMax}mb`,
xmlParseOptions: {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
},
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
app.use(
bodyParser.text({
limit: `${payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// support application/x-www-form-urlencoded post data
app.use(
bodyParser.urlencoded({
limit: `${payloadSizeMax}mb`,
extended: false,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
}
private setupDevMiddlewares() {
this.app.use(corsMiddleware);
}
private async setupHealthCheck() {
this.app.use((req, res, next) => {
if (!Db.isInitialized) {
sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
} else next();
});
// Does very basic health check
this.app.get('/healthz', async (req, res) => {
Logger.debug('Health check started!');
const connection = getConnectionManager().get();
try {
if (!connection.isConnected) {
// Connection is not active
throw new ServiceUnavailableError('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (error) {
ErrorReporter.error(error);
Logger.error('No Database connection!');
return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!'));
}
Logger.debug('Health check completed successfully!');
sendSuccessResponse(res, { status: 'ok' }, true, 200);
});
if (config.getEnv('executions.mode') === 'queue') {
await this.setupRedisChecks();
}
}
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using a retryStrategy to control how and when to exit.
private async setupRedisChecks() {
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
let lastTimer = 0;
let cumulativeTimeout = 0;
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
const redis = new Redis({
host,
port,
db,
username,
password,
retryStrategy: (): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
Logger.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
});
redis.on('close', () => {
Logger.warn('Redis unavailable - trying to reconnect...');
});
redis.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
Logger.warn('Error with Redis: ', error);
}
});
}
// ----------------------------------------
// Regular Webhooks
// ----------------------------------------
protected setupWebhookEndpoint() {
const endpoint = this.endpointWebhook;
const activeWorkflowRunner = this.activeWorkflowRunner;
// Register all webhook requests
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await activeWorkflowRunner.executeWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
}
// ----------------------------------------
// Waiting Webhooks
// ----------------------------------------
protected setupWaitingWebhookEndpoint() {
const endpoint = this.endpointWebhookWaiting;
const waitingWebhooks = new WaitingWebhooks();
// Register all webhook-waiting requests
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook-waiting/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await waitingWebhooks.executeWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
}
// ----------------------------------------
// Testing Webhooks
// ----------------------------------------
protected setupTestWebhookEndpoint() {
const endpoint = this.endpointWebhookTest;
const testWebhooks = TestWebhooks.getInstance();
// Register all test webhook requests (for testing via the UI)
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook-test/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await testWebhooks.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await testWebhooks.callTestWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
// Removes a test webhook
// TODO UM: check if this needs validation with user management.
this.app.delete(
`/${this.restEndpoint}/test-webhook/:id`,
send(async (req) => testWebhooks.cancelTestWebhook(req.params.id)),
);
}
async start(): Promise<void> {
const { app, externalHooks, protocol, sslKey, sslCert } = this;
let server: Server;
if (protocol === 'https' && sslKey && sslCert) {
const https = await import('https');
server = https.createServer(
{
key: await readFile(this.sslKey, 'utf8'),
cert: await readFile(this.sslCert, 'utf8'),
},
app,
);
} else {
const http = await import('http');
server = http.createServer(app);
}
const PORT = config.getEnv('port');
const ADDRESS = config.getEnv('listen_address');
server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
console.log(
`n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`,
);
process.exit(1);
}
});
await new Promise<void>((resolve) => server.listen(PORT, ADDRESS, () => resolve()));
await this.setupCommonMiddlewares();
if (inDevelopment) {
this.setupDevMiddlewares();
}
await this.setupHealthCheck();
await this.configure();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${N8N_VERSION}`);
const defaultLocale = config.getEnv('defaultLocale');
if (defaultLocale !== 'en') {
console.log(`Locale: ${defaultLocale}`);
}
await externalHooks.run('n8n.ready', [app, config]);
}
}
declare module 'http' {
export interface IncomingMessage {
parsedUrl: Url;
}
}

View file

@ -76,6 +76,10 @@ export class ActiveWorkflowRunner {
[key: string]: IQueuedWorkflowActivations;
} = {};
constructor() {
this.activeWorkflows = new ActiveWorkflows();
}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async init() {
// Get the active workflows from database
@ -100,8 +104,6 @@ export class ActiveWorkflowRunner {
await Db.collections.Webhook.clear();
}
this.activeWorkflows = new ActiveWorkflows();
if (workflowsData.length !== 0) {
console.info(' ================================');
console.info(' Start Active Workflows:');
@ -147,11 +149,6 @@ export class ActiveWorkflowRunner {
await externalHooks.run('activeWorkflows.initialized', []);
}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async initWebhooks() {
this.activeWorkflows = new ActiveWorkflows();
}
/**
* Removes all the currently active workflows
*

View file

@ -1,4 +1,3 @@
import type { Application } from 'express';
import config from '@/config';
import { ErrorReporterProxy } from 'n8n-workflow';
@ -44,11 +43,3 @@ export const initErrorHandling = async () => {
initialized = true;
};
export const setupErrorMiddleware = async (app: Application) => {
const {
Handlers: { requestHandler, errorHandler },
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
};

View file

@ -5,28 +5,19 @@
/* eslint-disable no-underscore-dangle */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import express from 'express';
import { join as pathJoin } from 'path';
import { readFile as fsReadFile } from 'fs/promises';
import type { n8n } from 'n8n-core';
import {
ExecutionError,
IDataObject,
INode,
IRunExecutionData,
jsonParse,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { validate } from 'class-validator';
import config from '@/config';
import * as Db from '@/Db';
import {
ICredentialsDb,
IExecutionDb,
IExecutionFlattedDb,
IPackageVersions,
IWorkflowDb,
} from '@/Interfaces';
import { ICredentialsDb, IExecutionDb, IExecutionFlattedDb, IWorkflowDb } from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
// eslint-disable-next-line import/order
import { Like } from 'typeorm';
@ -34,9 +25,6 @@ import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { CredentialsEntity } from '@db/entities/CredentialsEntity';
import { TagEntity } from '@db/entities/TagEntity';
import { User } from '@db/entities/User';
import { CLI_DIR } from '@/constants';
let versionCache: IPackageVersions | undefined;
/**
* Returns the base URL n8n is reachable from
@ -62,27 +50,8 @@ export function getSessionId(req: express.Request): string | undefined {
return req.headers.sessionid as string | undefined;
}
/**
* Returns information which version of the packages are installed
*/
export async function getVersions(): Promise<IPackageVersions> {
if (versionCache !== undefined) {
return versionCache;
}
const packageFile = await fsReadFile(pathJoin(CLI_DIR, 'package.json'), 'utf8');
const packageData = jsonParse<n8n.PackageJson>(packageFile);
versionCache = {
cli: packageData.version,
};
return versionCache;
}
/**
* Extracts configuration schema for key
*
*/
function extractSchemaForKey(configKey: string, configSchema: IDataObject): IDataObject {
const configKeyParts = configKey.split('.');

View file

@ -27,9 +27,7 @@ import PCancelable from 'p-cancelable';
import type { FindOperator, Repository } from 'typeorm';
import type { ChildProcess } from 'child_process';
import { Url } from 'url';
import type { Request } from 'express';
import type { InstalledNodes } from '@db/entities/InstalledNodes';
import type { InstalledPackages } from '@db/entities/InstalledPackages';
import type { Role } from '@db/entities/Role';
@ -57,10 +55,6 @@ export interface IQueuedWorkflowActivations {
workflowData: IWorkflowDb;
}
export interface ICustomRequest extends Request {
parsedUrl: Url | undefined;
}
export interface ICredentialsTypeData {
[key: string]: CredentialLoadingDetails;
}
@ -498,8 +492,8 @@ export interface IVersionNotificationSettings {
export interface IN8nUISettings {
endpointWebhook: string;
endpointWebhookTest: string;
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveDataErrorExecution: 'all' | 'none';
saveDataSuccessExecution: 'all' | 'none';
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;

View file

@ -23,7 +23,8 @@ import {
import { Telemetry } from '@/telemetry';
import { RoleService } from './role/role.service';
import { eventBus } from './eventbus';
import { User } from './databases/entities/User';
import type { User } from '@db/entities/User';
import { N8N_VERSION } from '@/constants';
function userToPayload(user: User): {
userId: string;
@ -42,19 +43,11 @@ function userToPayload(user: User): {
}
export class InternalHooksClass implements IInternalHooksClass {
private versionCli: string;
private nodeTypes: INodeTypes;
constructor(
private telemetry: Telemetry,
private instanceId: string,
versionCli: string,
nodeTypes: INodeTypes,
) {
this.versionCli = versionCli;
this.nodeTypes = nodeTypes;
}
private nodeTypes: INodeTypes,
) {}
async onServerStarted(
diagnosticInfo: IDiagnosticInfo,
@ -174,7 +167,7 @@ export class InternalHooksClass implements IInternalHooksClass {
node_graph_string: JSON.stringify(nodeGraph),
notes_count_overlapping: overlappingCount,
notes_count_non_overlapping: notesCount - overlappingCount,
version_cli: this.versionCli,
version_cli: N8N_VERSION,
num_tags: workflow.tags?.length ?? 0,
public_api: publicApi,
sharing_role: userRole,
@ -249,7 +242,7 @@ export class InternalHooksClass implements IInternalHooksClass {
const properties: IExecutionTrackProperties = {
workflow_id: workflow.id,
is_manual: false,
version_cli: this.versionCli,
version_cli: N8N_VERSION,
success: false,
};

View file

@ -13,20 +13,11 @@ export class InternalHooksManager {
throw new Error('InternalHooks not initialized');
}
static async init(
instanceId: string,
versionCli: string,
nodeTypes: INodeTypes,
): Promise<InternalHooksClass> {
static async init(instanceId: string, nodeTypes: INodeTypes): Promise<InternalHooksClass> {
if (!this.internalHooksInstance) {
const telemetry = new Telemetry(instanceId, versionCli);
const telemetry = new Telemetry(instanceId);
await telemetry.init();
this.internalHooksInstance = new InternalHooksClass(
telemetry,
instanceId,
versionCli,
nodeTypes,
);
this.internalHooksInstance = new InternalHooksClass(telemetry, instanceId, nodeTypes);
}
return this.internalHooksInstance;

View file

@ -3,7 +3,7 @@ import { ILogger } from 'n8n-workflow';
import { getLogger } from './Logger';
import config from '@/config';
import * as Db from '@/Db';
import { LICENSE_FEATURES, SETTINGS_LICENSE_CERT_KEY } from './constants';
import { LICENSE_FEATURES, N8N_VERSION, SETTINGS_LICENSE_CERT_KEY } from './constants';
async function loadCertStr(): Promise<TLicenseContainerStr> {
const databaseSettings = await Db.collections.Settings.findOne({
@ -35,7 +35,7 @@ export class License {
this.logger = getLogger();
}
async init(instanceId: string, version: string) {
async init(instanceId: string) {
if (this.manager) {
return;
}
@ -48,7 +48,7 @@ export class License {
this.manager = new LicenseManager({
server,
tenantId: config.getEnv('license.tenantId'),
productIdentifier: `n8n-${version}`,
productIdentifier: `n8n-${N8N_VERSION}`,
autoRenewEnabled,
autoRenewOffset,
logger: this.logger,

View file

@ -28,7 +28,6 @@
/* eslint-disable no-await-in-loop */
import { exec as callbackExec } from 'child_process';
import { readFileSync } from 'fs';
import { access as fsAccess } from 'fs/promises';
import os from 'os';
import { join as pathJoin, resolve as pathResolve } from 'path';
@ -36,7 +35,7 @@ import { createHmac } from 'crypto';
import { promisify } from 'util';
import cookieParser from 'cookie-parser';
import express from 'express';
import { FindManyOptions, getConnectionManager, In } from 'typeorm';
import { FindManyOptions, In } from 'typeorm';
import axios, { AxiosRequestConfig } from 'axios';
import clientOAuth1, { RequestOptions } from 'oauth-1.0a';
// IMPORTANT! Do not switch to anther bcrypt library unless really necessary and
@ -61,9 +60,7 @@ import {
ITelemetrySettings,
LoggerProxy,
jsonParse,
WebhookHttpMethod,
WorkflowExecuteMode,
ErrorReporterProxy as ErrorReporter,
INodeTypes,
ICredentialTypes,
INode,
@ -72,21 +69,17 @@ import {
} from 'n8n-workflow';
import basicAuth from 'basic-auth';
import compression from 'compression';
import jwt from 'jsonwebtoken';
import jwks from 'jwks-rsa';
// @ts-ignore
import timezones from 'google-timezones-json';
import parseUrl from 'parseurl';
import promClient, { Registry } from 'prom-client';
import history from 'connect-history-api-fallback';
import bodyParser from 'body-parser';
import config from '@/config';
import * as Queue from '@/Queue';
import { InternalHooksManager } from '@/InternalHooksManager';
import { getCredentialTranslationPath } from '@/TranslationHelpers';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { nodesController } from '@/api/nodes.api';
@ -95,6 +88,7 @@ import {
AUTH_COOKIE_NAME,
EDITOR_UI_DIST_DIR,
GENERATED_STATIC_DIR,
N8N_VERSION,
NODES_BASE_DIR,
RESPONSE_ERROR_MESSAGES,
TEMPLATES_DIR,
@ -129,17 +123,13 @@ import {
DatabaseType,
ICredentialsDb,
ICredentialsOverwrite,
ICustomRequest,
IDiagnosticInfo,
IExecutionFlattedDb,
IExecutionsStopData,
IExecutionsSummary,
IExternalHooksClass,
IN8nUISettings,
IPackageVersions,
} from '@/Interfaces';
import * as ActiveExecutions from '@/ActiveExecutions';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import {
CredentialsHelper,
getCredentialForUser,
@ -147,119 +137,42 @@ import {
} from '@/CredentialsHelper';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import { NodeTypes } from '@/NodeTypes';
import * as Push from '@/Push';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import * as ResponseHelper from '@/ResponseHelper';
import * as TestWebhooks from '@/TestWebhooks';
import { WaitTracker, WaitTrackerClass } from '@/WaitTracker';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WebhookServer from '@/WebhookServer';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { toHttpNodeParameters } from '@/CurlConverterHelper';
import { setupErrorMiddleware } from '@/ErrorReporting';
import { eventBus } from '@/eventbus';
import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { getLicense } from '@/License';
import { licenseController } from '@/license/license.controller';
import { corsMiddleware } from '@/middlewares/cors';
require('body-parser-xml')(bodyParser);
import { licenseController } from './license/license.controller';
import { corsMiddleware } from './middlewares/cors';
import { AbstractServer } from './AbstractServer';
const exec = promisify(callbackExec);
const externalHooks: IExternalHooksClass = ExternalHooks();
class App {
app: express.Application;
activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner;
testWebhooks: TestWebhooks.TestWebhooks;
endpointWebhook: string;
endpointWebhookWaiting: string;
endpointWebhookTest: string;
class Server extends AbstractServer {
endpointPresetCredentials: string;
externalHooks: IExternalHooksClass;
waitTracker: WaitTrackerClass;
defaultWorkflowName: string;
defaultCredentialsName: string;
saveDataErrorExecution: 'all' | 'none';
saveDataSuccessExecution: 'all' | 'none';
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
push: Push.Push;
versions: IPackageVersions | undefined;
restEndpoint: string;
publicApiEndpoint: string;
frontendSettings: IN8nUISettings;
protocol: string;
sslKey: string;
sslCert: string;
payloadSizeMax: number;
presetCredentialsLoaded: boolean;
webhookMethods: WebhookHttpMethod[];
nodeTypes: INodeTypes;
credentialTypes: ICredentialTypes;
constructor() {
this.app = express();
this.app.disable('x-powered-by');
this.endpointWebhook = config.getEnv('endpoints.webhook');
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
this.endpointWebhookTest = config.getEnv('endpoints.webhookTest');
this.defaultWorkflowName = config.getEnv('workflows.defaultName');
this.defaultCredentialsName = config.getEnv('credentials.defaultName');
this.saveDataErrorExecution = config.get('executions.saveDataOnError');
this.saveDataSuccessExecution = config.get('executions.saveDataOnSuccess');
this.saveManualExecutions = config.get('executions.saveDataManualExecutions');
this.executionTimeout = config.get('executions.timeout');
this.maxExecutionTimeout = config.get('executions.maxTimeout');
this.payloadSizeMax = config.get('endpoints.payloadSizeMax');
this.timezone = config.get('generic.timezone');
this.restEndpoint = config.get('endpoints.rest');
this.publicApiEndpoint = config.get('publicApi.path');
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
this.testWebhooks = TestWebhooks.getInstance();
this.push = Push.getInstance();
super();
this.nodeTypes = NodeTypes();
this.credentialTypes = CredentialTypes();
@ -267,17 +180,9 @@ class App {
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.waitTracker = WaitTracker();
this.protocol = config.getEnv('protocol');
this.sslKey = config.getEnv('ssl_key');
this.sslCert = config.getEnv('ssl_cert');
this.externalHooks = externalHooks;
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
void setupErrorMiddleware(this.app);
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
}
@ -305,11 +210,11 @@ class App {
this.frontendSettings = {
endpointWebhook: this.endpointWebhook,
endpointWebhookTest: this.endpointWebhookTest,
saveDataErrorExecution: this.saveDataErrorExecution,
saveDataSuccessExecution: this.saveDataSuccessExecution,
saveManualExecutions: this.saveManualExecutions,
executionTimeout: this.executionTimeout,
maxExecutionTimeout: this.maxExecutionTimeout,
saveDataErrorExecution: config.getEnv('executions.saveDataOnError'),
saveDataSuccessExecution: config.getEnv('executions.saveDataOnSuccess'),
saveManualExecutions: config.getEnv('executions.saveDataManualExecutions'),
executionTimeout: config.getEnv('executions.timeout'),
maxExecutionTimeout: config.getEnv('executions.maxTimeout'),
workflowCallerPolicyDefaultOption: config.getEnv('workflows.callerPolicyDefaultOption'),
timezone: this.timezone,
urlBaseWebhook,
@ -374,14 +279,6 @@ class App {
};
}
/**
* Returns the current epoch time
*
*/
getCurrentDate(): Date {
return new Date();
}
/**
* Returns the current settings for the frontend
*/
@ -410,7 +307,7 @@ class App {
async initLicense(): Promise<void> {
const license = getLicense();
await license.init(this.frontendSettings.instanceId, this.frontendSettings.versionCli);
await license.init(this.frontendSettings.instanceId);
const activationKey = config.getEnv('license.activationKey');
if (activationKey) {
@ -422,7 +319,7 @@ class App {
}
}
async config(): Promise<void> {
async configure(): Promise<void> {
const enableMetrics = config.getEnv('endpoints.metrics.enable');
let register: Registry;
@ -437,8 +334,7 @@ class App {
.then(() => true)
.catch(() => false);
this.versions = await GenericHelpers.getVersions();
this.frontendSettings.versionCli = this.versions.cli;
this.frontendSettings.versionCli = N8N_VERSION;
this.frontendSettings.instanceId = await UserSettings.getInstanceId();
@ -446,6 +342,7 @@ class App {
await this.initLicense();
const publicApiEndpoint = config.getEnv('publicApi.path');
const excludeEndpoints = config.getEnv('security.excludeEndpoints');
const ignoredEndpoints = [
@ -458,7 +355,7 @@ class App {
this.endpointWebhook,
this.endpointWebhookTest,
this.endpointPresetCredentials,
config.getEnv('publicApi.disabled') ? this.publicApiEndpoint : '',
config.getEnv('publicApi.disabled') ? publicApiEndpoint : '',
...excludeEndpoints.split(':'),
].filter((u) => !!u);
@ -635,7 +532,7 @@ class App {
// ----------------------------------------
if (!config.getEnv('publicApi.disabled')) {
const { apiRouters, apiLatestVersion } = await loadPublicApiVersions(this.publicApiEndpoint);
const { apiRouters, apiLatestVersion } = await loadPublicApiVersions(publicApiEndpoint);
this.app.use(...apiRouters);
this.frontendSettings.publicApi.latestVersion = apiLatestVersion;
}
@ -643,6 +540,7 @@ class App {
this.app.use(cookieParser());
// Get push connections
const push = Push.getInstance();
this.app.use(`/${this.restEndpoint}/push`, corsMiddleware, async (req, res, next) => {
const { sessionId } = req.query;
if (sessionId === undefined) {
@ -660,54 +558,9 @@ class App {
}
}
this.push.add(sessionId as string, req, res);
push.add(sessionId as string, req, res);
});
// Compress the response data
this.app.use(compression());
// Make sure that each request has the "parsedUrl" parameter
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
(req as ICustomRequest).parsedUrl = parseUrl(req);
req.rawBody = Buffer.from('', 'base64');
next();
});
// Support application/json type post data
this.app.use(
bodyParser.json({
limit: `${this.payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Support application/xml type post data
this.app.use(
// @ts-ignore
bodyParser.xml({
limit: `${this.payloadSizeMax}mb`,
xmlParseOptions: {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
},
verify: (req: express.Request, res: any, buf: any) => {
req.rawBody = buf;
},
}),
);
this.app.use(
bodyParser.text({
limit: `${this.payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Make sure that Vue history mode works properly
this.app.use(
history({
@ -722,29 +575,6 @@ class App {
}),
);
// support application/x-www-form-urlencoded post data
this.app.use(
bodyParser.urlencoded({
limit: `${this.payloadSizeMax}mb`,
extended: false,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
this.app.use(corsMiddleware);
// eslint-disable-next-line consistent-return
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (!Db.isInitialized) {
const error = new ResponseHelper.ServiceUnavailableError('Database is not ready!');
return ResponseHelper.sendErrorResponse(res, error);
}
next();
});
// ----------------------------------------
// User Management
// ----------------------------------------
@ -759,40 +589,6 @@ class App {
this.app.use(`/${this.restEndpoint}/nodes`, nodesController);
}
// ----------------------------------------
// Healthcheck
// ----------------------------------------
// Does very basic health check
this.app.get('/healthz', async (req: express.Request, res: express.Response) => {
LoggerProxy.debug('Health check started!');
const connection = getConnectionManager().get();
try {
if (!connection.isConnected) {
// Connection is not active
throw new Error('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
} catch (err) {
ErrorReporter.error(err);
LoggerProxy.error('No Database connection!', err);
const error = new ResponseHelper.ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Everything fine
const responseData = {
status: 'ok',
};
LoggerProxy.debug('Health check completed successfully!');
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
});
// ----------------------------------------
// Metrics
// ----------------------------------------
@ -1155,7 +951,7 @@ class App {
const newCredentialsData = credentials.getDataToSave() as unknown as ICredentialsDb;
// Add special database related data
newCredentialsData.updatedAt = this.getCurrentDate();
newCredentialsData.updatedAt = new Date();
// Update the credentials in DB
await Db.collections.Credentials.update(credentialId, newCredentialsData);
@ -1267,7 +1063,7 @@ class App {
credentials.setData(decryptedDataOriginal, encryptionKey);
const newCredentialsData = credentials.getDataToSave() as unknown as ICredentialsDb;
// Add special database related data
newCredentialsData.updatedAt = this.getCurrentDate();
newCredentialsData.updatedAt = new Date();
// Save the credentials in DB
await Db.collections.Credentials.update(credentialId, newCredentialsData);
@ -1486,16 +1282,6 @@ class App {
}),
);
// Removes a test webhook
this.app.delete(
`/${this.restEndpoint}/test-webhook/:id`,
ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<boolean> => {
// TODO UM: check if this needs validation with user management.
const workflowId = req.params.id;
return this.testWebhooks.cancelTestWebhook(workflowId);
}),
);
// ----------------------------------------
// Options
// ----------------------------------------
@ -1565,69 +1351,11 @@ class App {
// ----------------------------------------
if (!config.getEnv('endpoints.disableProductionWebhooksOnMainProcess')) {
WebhookServer.registerProductionWebhooks.apply(this);
this.setupWebhookEndpoint();
this.setupWaitingWebhookEndpoint();
}
// Register all webhook requests (test for UI)
this.app.all(
`/${this.endpointWebhookTest}/*`,
async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook-test/" to get the registered part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(
this.endpointWebhookTest.length + 2,
);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await this.testWebhooks.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
ResponseHelper.sendErrorResponse(
res,
new Error(`The method ${method} is not supported.`),
);
return;
}
let response;
try {
response = await this.testWebhooks.callTestWebhook(method, requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
},
);
this.setupTestWebhookEndpoint();
if (this.endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
@ -1676,97 +1404,53 @@ class App {
}
export async function start(): Promise<void> {
const PORT = config.getEnv('port');
const ADDRESS = config.getEnv('listen_address');
const app = new Server();
await app.start();
const app = new App();
await app.config();
let server;
if (app.protocol === 'https' && app.sslKey && app.sslCert) {
const https = require('https');
const privateKey = readFileSync(app.sslKey, 'utf8');
const cert = readFileSync(app.sslCert, 'utf8');
const credentials = { key: privateKey, cert };
server = https.createServer(credentials, app.app);
} else {
const http = require('http');
server = http.createServer(app.app);
}
server.listen(PORT, ADDRESS, async () => {
const versions = await GenericHelpers.getVersions();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${versions.cli}`);
const defaultLocale = config.getEnv('defaultLocale');
if (defaultLocale !== 'en') {
console.log(`Locale: ${defaultLocale}`);
}
await app.externalHooks.run('n8n.ready', [app, config]);
const cpus = os.cpus();
const binaryDataConfig = config.getEnv('binaryDataManager');
const diagnosticInfo: IDiagnosticInfo = {
basicAuthActive: config.getEnv('security.basicAuth.active'),
databaseType: (await GenericHelpers.getConfigValue('database.type')) as DatabaseType,
disableProductionWebhooksOnMainProcess: config.getEnv(
'endpoints.disableProductionWebhooksOnMainProcess',
),
notificationsEnabled: config.getEnv('versionNotifications.enabled'),
versionCli: versions.cli,
systemInfo: {
os: {
type: os.type(),
version: os.version(),
},
memory: os.totalmem() / 1024,
cpus: {
count: cpus.length,
model: cpus[0].model,
speed: cpus[0].speed,
},
const cpus = os.cpus();
const binaryDataConfig = config.getEnv('binaryDataManager');
const diagnosticInfo: IDiagnosticInfo = {
basicAuthActive: config.getEnv('security.basicAuth.active'),
databaseType: (await GenericHelpers.getConfigValue('database.type')) as DatabaseType,
disableProductionWebhooksOnMainProcess: config.getEnv(
'endpoints.disableProductionWebhooksOnMainProcess',
),
notificationsEnabled: config.getEnv('versionNotifications.enabled'),
versionCli: N8N_VERSION,
systemInfo: {
os: {
type: os.type(),
version: os.version(),
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),
executions_data_save_on_error: config.getEnv('executions.saveDataOnError'),
executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'),
executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'),
executions_data_save_manual_executions: config.getEnv(
'executions.saveDataManualExecutions',
),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
memory: os.totalmem() / 1024,
cpus: {
count: cpus.length,
model: cpus[0].model,
speed: cpus[0].speed,
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
n8n_multi_user_allowed: isUserManagementEnabled(),
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
};
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),
executions_data_save_on_error: config.getEnv('executions.saveDataOnError'),
executions_data_save_on_success: config.getEnv('executions.saveDataOnSuccess'),
executions_data_save_on_progress: config.getEnv('executions.saveExecutionProgress'),
executions_data_save_manual_executions: config.getEnv('executions.saveDataManualExecutions'),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
n8n_multi_user_allowed: isUserManagementEnabled(),
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
};
void Db.collections
.Workflow!.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
})
.then(async (workflow) =>
InternalHooksManager.getInstance().onServerStarted(diagnosticInfo, workflow?.createdAt),
);
});
server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
console.log(
`n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`,
);
process.exit(1);
}
const workflow = await Db.collections.Workflow!.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
});
await InternalHooksManager.getInstance().onServerStarted(diagnosticInfo, workflow?.createdAt);
}

View file

@ -29,6 +29,5 @@ export interface N8nApp {
app: Application;
restEndpoint: string;
externalHooks: IExternalHooksClass;
defaultCredentialsName: string;
activeWorkflowRunner: ActiveWorkflowRunner;
}

View file

@ -1,356 +1,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable no-console */
/* eslint-disable consistent-return */
/* eslint-disable @typescript-eslint/restrict-plus-operands */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import express from 'express';
import { readFileSync } from 'fs';
import { getConnectionManager } from 'typeorm';
import bodyParser from 'body-parser';
import { AbstractServer } from '@/AbstractServer';
import compression from 'compression';
import parseUrl from 'parseurl';
import { WebhookHttpMethod } from 'n8n-workflow';
import * as Db from '@/Db';
import * as ActiveExecutions from '@/ActiveExecutions';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import * as ResponseHelper from '@/ResponseHelper';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import type { ICustomRequest, IExternalHooksClass, IPackageVersions } from '@/Interfaces';
import config from '@/config';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
import { setupErrorMiddleware } from '@/ErrorReporting';
import { corsMiddleware } from './middlewares/cors';
// eslint-disable-next-line @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-call
require('body-parser-xml')(bodyParser);
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function registerProductionWebhooks() {
// ----------------------------------------
// Regular Webhooks
// ----------------------------------------
// Register all webhook requests
this.app.all(
`/${this.endpointWebhook}/*`,
async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registered part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(
this.endpointWebhook.length + 2,
);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
ResponseHelper.sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
response = await this.activeWorkflowRunner.executeWebhook(method, requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
},
);
// ----------------------------------------
// Waiting Webhooks
// ----------------------------------------
const waitingWebhooks = new WaitingWebhooks();
// Register all webhook-waiting requests
this.app.all(
`/${this.endpointWebhookWaiting}/*`,
async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook-waiting/" to get the registered part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(
this.endpointWebhookWaiting.length + 2,
);
const method = req.method.toUpperCase() as WebhookHttpMethod;
// TODO: Add support for OPTIONS in the future
// if (method === 'OPTIONS') {
// }
if (!WEBHOOK_METHODS.includes(method)) {
ResponseHelper.sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await waitingWebhooks.executeWebhook(method, requestUrl, req, res);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
},
);
}
class App {
app: express.Application;
activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner;
endpointWebhook: string;
endpointWebhookWaiting: string;
endpointPresetCredentials: string;
externalHooks: IExternalHooksClass;
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
versions: IPackageVersions | undefined;
restEndpoint: string;
protocol: string;
sslKey: string;
sslCert: string;
presetCredentialsLoaded: boolean;
constructor() {
this.app = express();
this.app.disable('x-powered-by');
this.endpointWebhook = config.getEnv('endpoints.webhook');
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
this.saveDataErrorExecution = config.getEnv('executions.saveDataOnError');
this.saveDataSuccessExecution = config.getEnv('executions.saveDataOnSuccess');
this.saveManualExecutions = config.getEnv('executions.saveDataManualExecutions');
this.executionTimeout = config.getEnv('executions.timeout');
this.maxExecutionTimeout = config.getEnv('executions.maxTimeout');
this.timezone = config.getEnv('generic.timezone');
this.restEndpoint = config.getEnv('endpoints.rest');
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.protocol = config.getEnv('protocol');
this.sslKey = config.getEnv('ssl_key');
this.sslCert = config.getEnv('ssl_cert');
this.externalHooks = ExternalHooks();
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
void setupErrorMiddleware(this.app);
}
/**
* Returns the current epoch time
*
*/
getCurrentDate(): Date {
return new Date();
}
async config(): Promise<void> {
this.versions = await GenericHelpers.getVersions();
// Compress the response data
this.app.use(compression());
// Make sure that each request has the "parsedUrl" parameter
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
(req as ICustomRequest).parsedUrl = parseUrl(req);
req.rawBody = Buffer.from('', 'base64');
next();
});
// Support application/json type post data
this.app.use(
bodyParser.json({
limit: '16mb',
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Support application/xml type post data
this.app.use(
// @ts-ignore
bodyParser.xml({
limit: '16mb',
xmlParseOptions: {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
},
}),
);
this.app.use(
bodyParser.text({
limit: '16mb',
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// support application/x-www-form-urlencoded post data
this.app.use(
bodyParser.urlencoded({
extended: false,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
this.app.use(corsMiddleware);
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (!Db.isInitialized) {
const error = new ResponseHelper.ServiceUnavailableError('Database is not ready!');
return ResponseHelper.sendErrorResponse(res, error);
}
next();
});
// ----------------------------------------
// Healthcheck
// ----------------------------------------
// Does very basic health check
this.app.get('/healthz', async (req: express.Request, res: express.Response) => {
const connection = getConnectionManager().get();
try {
if (!connection.isConnected) {
// Connection is not active
throw new Error('No active database connection!');
}
// DB ping
await connection.query('SELECT 1');
// eslint-disable-next-line id-denylist
} catch (err) {
const error = new ResponseHelper.ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error);
}
// Everything fine
const responseData = {
status: 'ok',
};
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
});
registerProductionWebhooks.apply(this);
export class WebhookServer extends AbstractServer {
async configure() {
this.setupWebhookEndpoint();
this.setupWaitingWebhookEndpoint();
}
}
export async function start(): Promise<void> {
const PORT = config.getEnv('port');
const ADDRESS = config.getEnv('listen_address');
const app = new App();
await app.config();
let server;
if (app.protocol === 'https' && app.sslKey && app.sslCert) {
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
const https = require('https');
const privateKey = readFileSync(app.sslKey, 'utf8');
const cert = readFileSync(app.sslCert, 'utf8');
const credentials = { key: privateKey, cert };
server = https.createServer(credentials, app.app);
} else {
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
const http = require('http');
server = http.createServer(app.app);
}
server.listen(PORT, ADDRESS, async () => {
const versions = await GenericHelpers.getVersions();
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
console.log(`Version: ${versions.cli}`);
await app.externalHooks.run('n8n.ready', [app, config]);
});
}

View file

@ -34,7 +34,6 @@ import { CredentialTypes } from '@/CredentialTypes';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import { IWorkflowExecuteProcess, IWorkflowExecutionDataProcessWithExecution } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
@ -111,8 +110,7 @@ class WorkflowRunnerProcess {
await externalHooks.init();
const instanceId = (await UserSettings.prepareUserSettings()).instanceId ?? '';
const { cli } = await GenericHelpers.getVersions();
await InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
@ -121,7 +119,7 @@ class WorkflowRunnerProcess {
await Db.init();
const license = getLicense();
await license.init(instanceId, cli);
await license.init(instanceId);
// Start timeout for the execution
let workflowTimeout = config.getEnv('executions.timeout'); // initialize with default

View file

@ -11,7 +11,6 @@ import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
@ -137,8 +136,7 @@ export class Execute extends Command {
CredentialTypes(loadNodesAndCredentials);
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
await InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, nodeTypes);
if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) {
workflowId = undefined;

View file

@ -25,7 +25,6 @@ import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
@ -325,8 +324,7 @@ export class ExecuteBatch extends Command {
CredentialTypes(loadNodesAndCredentials);
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
await InternalHooksManager.init(instanceId, cli, nodeTypes);
await InternalHooksManager.init(instanceId, nodeTypes);
// Send a shallow copy of allWorkflows so we still have all workflow data.
const results = await this.runTests([...allWorkflows]);

View file

@ -17,7 +17,7 @@ import replaceStream from 'replacestream';
import { promisify } from 'util';
import glob from 'fast-glob';
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import { LoggerProxy, sleep } from 'n8n-workflow';
import { createHash } from 'crypto';
import config from '@/config';
@ -234,314 +234,231 @@ export class Start extends Command {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Start);
// Wrap that the process does not close but we can still use async
await (async () => {
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error: Error) => {
logger.error(`There was an error initializing DB: "${error.message}"`);
processExitCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
});
// Make sure the settings exist
const userSettings = await UserSettings.prepareUserSettings();
if (!config.getEnv('userManagement.jwtSecret')) {
// If we don't have a JWT secret set, generate
// one based and save to config.
const encryptionKey = await UserSettings.getEncryptionKey();
// For a key off every other letter from encryption key
// CAREFUL: do not change this or it breaks all existing tokens.
let baseKey = '';
for (let i = 0; i < encryptionKey.length; i += 2) {
baseKey += encryptionKey[i];
}
config.set(
'userManagement.jwtSecret',
createHash('sha256').update(baseKey).digest('hex'),
);
}
if (!config.getEnv('endpoints.disableUi')) {
await Start.generateStaticAssets();
}
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
await CredentialsOverwrites(credentialTypes).init();
await loadNodesAndCredentials.generateTypesForFrontend();
// Wait till the database is ready
await startDbInitPromise;
const installedPackages = await getAllInstalledPackages();
const missingPackages = new Set<{
packageName: string;
version: string;
}>();
installedPackages.forEach((installedPackage) => {
installedPackage.installedNodes.forEach((installedNode) => {
if (!loadNodesAndCredentials.known.nodes[installedNode.type]) {
// Leave the list ready for installing in case we need.
missingPackages.add({
packageName: installedPackage.packageName,
version: installedPackage.installedVersion,
});
}
});
});
await UserSettings.getEncryptionKey();
// Load settings from database and set them to config.
const databaseSettings = await Db.collections.Settings.find({ loadOnStartup: true });
databaseSettings.forEach((setting) => {
config.set(setting.key, JSON.parse(setting.value));
});
config.set('nodes.packagesMissing', '');
if (missingPackages.size) {
LoggerProxy.error(
'n8n detected that some packages are missing. For more information, visit https://docs.n8n.io/integrations/community-nodes/troubleshooting/',
);
if (flags.reinstallMissingPackages || process.env.N8N_REINSTALL_MISSING_PACKAGES) {
LoggerProxy.info('Attempting to reinstall missing packages', { missingPackages });
try {
// Optimistic approach - stop if any installation fails
// eslint-disable-next-line no-restricted-syntax
for (const missingPackage of missingPackages) {
// eslint-disable-next-line no-await-in-loop
void (await loadNodesAndCredentials.loadNpmModule(
missingPackage.packageName,
missingPackage.version,
));
missingPackages.delete(missingPackage);
}
LoggerProxy.info(
'Packages reinstalled successfully. Resuming regular initialization.',
);
} catch (error) {
LoggerProxy.error('n8n was unable to install the missing packages.');
}
}
}
if (missingPackages.size) {
config.set(
'nodes.packagesMissing',
Array.from(missingPackages)
.map((missingPackage) => `${missingPackage.packageName}@${missingPackage.version}`)
.join(' '),
);
}
if (config.getEnv('executions.mode') === 'queue') {
const redisHost = config.getEnv('queue.bull.redis.host');
const redisUsername = config.getEnv('queue.bull.redis.username');
const redisPassword = config.getEnv('queue.bull.redis.password');
const redisPort = config.getEnv('queue.bull.redis.port');
const redisDB = config.getEnv('queue.bull.redis.db');
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
let lastTimer = 0;
let cumulativeTimeout = 0;
const settings = {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
retryStrategy: (times: number): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
} as IDataObject;
if (redisHost) {
settings.host = redisHost;
}
if (redisUsername) {
settings.username = redisUsername;
}
if (redisPassword) {
settings.password = redisPassword;
}
if (redisPort) {
settings.port = redisPort;
}
if (redisDB) {
settings.db = redisDB;
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above
// to control how and when to exit.
const redis = new Redis(settings);
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
logger.warn('Redis unavailable - trying to reconnect...');
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
logger.warn('Error with Redis: ', error);
}
});
}
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
if (dbType === 'sqlite') {
const shouldRunVacuum = config.getEnv('database.sqlite.executeVacuumOnStartup');
if (shouldRunVacuum) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
await Db.collections.Execution.query('VACUUM;');
}
}
if (flags.tunnel) {
this.log('\nWaiting for tunnel ...');
let tunnelSubdomain;
if (
process.env[TUNNEL_SUBDOMAIN_ENV] !== undefined &&
process.env[TUNNEL_SUBDOMAIN_ENV] !== ''
) {
tunnelSubdomain = process.env[TUNNEL_SUBDOMAIN_ENV];
} else if (userSettings.tunnelSubdomain !== undefined) {
tunnelSubdomain = userSettings.tunnelSubdomain;
}
if (tunnelSubdomain === undefined) {
// When no tunnel subdomain did exist yet create a new random one
const availableCharacters = 'abcdefghijklmnopqrstuvwxyz0123456789';
userSettings.tunnelSubdomain = Array.from({ length: 24 })
.map(() => {
return availableCharacters.charAt(
Math.floor(Math.random() * availableCharacters.length),
);
})
.join('');
await UserSettings.writeUserSettings(userSettings);
}
const tunnelSettings: localtunnel.TunnelConfig = {
host: 'https://hooks.n8n.cloud',
subdomain: tunnelSubdomain,
};
const port = config.getEnv('port');
// @ts-ignore
const webhookTunnel = await localtunnel(port, tunnelSettings);
process.env.WEBHOOK_URL = `${webhookTunnel.url}/`;
this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`);
this.log(
'IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!',
);
}
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
await InternalHooksManager.init(instanceId, cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
await Server.start();
// Start to get active workflows and run their triggers
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
await activeWorkflowRunner.init();
WaitTracker();
const editorUrl = GenericHelpers.getBaseUrl();
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
const saveManualExecutions = config.getEnv('executions.saveDataManualExecutions');
if (saveManualExecutions) {
this.log('\nManual executions will be visible only for the owner');
}
// Allow to open n8n editor by pressing "o"
if (Boolean(process.stdout.isTTY) && process.stdin.setRawMode) {
process.stdin.setRawMode(true);
process.stdin.resume();
process.stdin.setEncoding('utf8');
// eslint-disable-next-line @typescript-eslint/no-unused-vars
let inputText = '';
if (flags.open) {
Start.openBrowser();
}
this.log('\nPress "o" to open in Browser.');
process.stdin.on('data', (key: string) => {
if (key === 'o') {
Start.openBrowser();
inputText = '';
} else if (key.charCodeAt(0) === 3) {
// Ctrl + c got pressed
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Start.stopProcess();
} else {
// When anything else got pressed, record it and send it on enter into the child process
// eslint-disable-next-line no-lonely-if
if (key.charCodeAt(0) === 13) {
// send to child process and print in terminal
process.stdout.write('\n');
inputText = '';
} else {
// record it and write into terminal
// eslint-disable-next-line @typescript-eslint/no-unused-vars
inputText += key;
process.stdout.write(key);
}
}
});
}
} catch (error) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this.error(`There was an error: ${error.message}`);
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error: Error) => {
logger.error(`There was an error initializing DB: "${error.message}"`);
processExitCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.emit('exit', processExitCode);
});
// Make sure the settings exist
const userSettings = await UserSettings.prepareUserSettings();
if (!config.getEnv('userManagement.jwtSecret')) {
// If we don't have a JWT secret set, generate
// one based and save to config.
const encryptionKey = await UserSettings.getEncryptionKey();
// For a key off every other letter from encryption key
// CAREFUL: do not change this or it breaks all existing tokens.
let baseKey = '';
for (let i = 0; i < encryptionKey.length; i += 2) {
baseKey += encryptionKey[i];
}
config.set('userManagement.jwtSecret', createHash('sha256').update(baseKey).digest('hex'));
}
})();
if (!config.getEnv('endpoints.disableUi')) {
await Start.generateStaticAssets();
}
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
await CredentialsOverwrites(credentialTypes).init();
await loadNodesAndCredentials.generateTypesForFrontend();
// Wait till the database is ready
await startDbInitPromise;
const installedPackages = await getAllInstalledPackages();
const missingPackages = new Set<{
packageName: string;
version: string;
}>();
installedPackages.forEach((installedPackage) => {
installedPackage.installedNodes.forEach((installedNode) => {
if (!loadNodesAndCredentials.known.nodes[installedNode.type]) {
// Leave the list ready for installing in case we need.
missingPackages.add({
packageName: installedPackage.packageName,
version: installedPackage.installedVersion,
});
}
});
});
await UserSettings.getEncryptionKey();
// Load settings from database and set them to config.
const databaseSettings = await Db.collections.Settings.find({ loadOnStartup: true });
databaseSettings.forEach((setting) => {
config.set(setting.key, JSON.parse(setting.value));
});
config.set('nodes.packagesMissing', '');
if (missingPackages.size) {
LoggerProxy.error(
'n8n detected that some packages are missing. For more information, visit https://docs.n8n.io/integrations/community-nodes/troubleshooting/',
);
if (flags.reinstallMissingPackages || process.env.N8N_REINSTALL_MISSING_PACKAGES) {
LoggerProxy.info('Attempting to reinstall missing packages', { missingPackages });
try {
// Optimistic approach - stop if any installation fails
// eslint-disable-next-line no-restricted-syntax
for (const missingPackage of missingPackages) {
// eslint-disable-next-line no-await-in-loop
void (await loadNodesAndCredentials.loadNpmModule(
missingPackage.packageName,
missingPackage.version,
));
missingPackages.delete(missingPackage);
}
LoggerProxy.info('Packages reinstalled successfully. Resuming regular initialization.');
} catch (error) {
LoggerProxy.error('n8n was unable to install the missing packages.');
}
}
config.set(
'nodes.packagesMissing',
Array.from(missingPackages)
.map((missingPackage) => `${missingPackage.packageName}@${missingPackage.version}`)
.join(' '),
);
}
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
if (dbType === 'sqlite') {
const shouldRunVacuum = config.getEnv('database.sqlite.executeVacuumOnStartup');
if (shouldRunVacuum) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
await Db.collections.Execution.query('VACUUM;');
}
}
if (flags.tunnel) {
this.log('\nWaiting for tunnel ...');
let tunnelSubdomain;
if (
process.env[TUNNEL_SUBDOMAIN_ENV] !== undefined &&
process.env[TUNNEL_SUBDOMAIN_ENV] !== ''
) {
tunnelSubdomain = process.env[TUNNEL_SUBDOMAIN_ENV];
} else if (userSettings.tunnelSubdomain !== undefined) {
tunnelSubdomain = userSettings.tunnelSubdomain;
}
if (tunnelSubdomain === undefined) {
// When no tunnel subdomain did exist yet create a new random one
const availableCharacters = 'abcdefghijklmnopqrstuvwxyz0123456789';
userSettings.tunnelSubdomain = Array.from({ length: 24 })
.map(() => {
return availableCharacters.charAt(
Math.floor(Math.random() * availableCharacters.length),
);
})
.join('');
await UserSettings.writeUserSettings(userSettings);
}
const tunnelSettings: localtunnel.TunnelConfig = {
host: 'https://hooks.n8n.cloud',
subdomain: tunnelSubdomain,
};
const port = config.getEnv('port');
// @ts-ignore
const webhookTunnel = await localtunnel(port, tunnelSettings);
process.env.WEBHOOK_URL = `${webhookTunnel.url}/`;
this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`);
this.log(
'IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!',
);
}
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
await Server.start();
// Start to get active workflows and run their triggers
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
await activeWorkflowRunner.init();
WaitTracker();
const editorUrl = GenericHelpers.getBaseUrl();
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
const saveManualExecutions = config.getEnv('executions.saveDataManualExecutions');
if (saveManualExecutions) {
this.log('\nManual executions will be visible only for the owner');
}
// Allow to open n8n editor by pressing "o"
if (Boolean(process.stdout.isTTY) && process.stdin.setRawMode) {
process.stdin.setRawMode(true);
process.stdin.resume();
process.stdin.setEncoding('utf8');
// eslint-disable-next-line @typescript-eslint/no-unused-vars
let inputText = '';
if (flags.open) {
Start.openBrowser();
}
this.log('\nPress "o" to open in Browser.');
process.stdin.on('data', (key: string) => {
if (key === 'o') {
Start.openBrowser();
inputText = '';
} else if (key.charCodeAt(0) === 3) {
// Ctrl + c got pressed
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Start.stopProcess();
} else {
// When anything else got pressed, record it and send it on enter into the child process
// eslint-disable-next-line no-lonely-if
if (key.charCodeAt(0) === 13) {
// send to child process and print in terminal
process.stdout.write('\n');
inputText = '';
} else {
// record it and write into terminal
// eslint-disable-next-line @typescript-eslint/no-unused-vars
inputText += key;
process.stdout.write(key);
}
}
});
}
} catch (error) {
console.error('There was an error', error);
processExitCode = 1;
process.emit('exit', processExitCode);
}
}
}

View file

@ -7,24 +7,21 @@
import { BinaryDataManager, UserSettings } from 'n8n-core';
import { Command, flags } from '@oclif/command';
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import { LoggerProxy, sleep } from 'n8n-workflow';
import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import * as GenericHelpers from '@/GenericHelpers';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as WebhookServer from '@/WebhookServer';
import { WebhookServer } from '@/WebhookServer';
import { getLogger } from '@/Logger';
import { initErrorHandling } from '@/ErrorReporting';
import * as CrashJournal from '@/CrashJournal';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
let processExitCode = 0;
export class Webhook extends Command {
@ -85,6 +82,22 @@ export class Webhook extends Command {
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async run() {
if (config.getEnv('executions.mode') !== 'queue') {
/**
* It is technically possible to run without queues but
* there are 2 known bugs when running in this mode:
* - Executions list will be problematic as the main process
* is not aware of current executions in the webhook processes
* and therefore will display all current executions as error
* as it is unable to determine if it is still running or crashed
* - You cannot stop currently executing jobs from webhook processes
* when running without queues as the main process cannot talk to
* the webhook processes to communicate workflow execution interruption.
*/
this.error('Webhook processes can only run with execution mode as queue.');
}
const logger = getLogger();
LoggerProxy.init(logger);
@ -95,154 +108,52 @@ export class Webhook extends Command {
await initErrorHandling();
await CrashJournal.init();
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow
const { flags } = this.parse(Webhook);
// Wrap that the process does not close but we can still use async
await (async () => {
if (config.getEnv('executions.mode') !== 'queue') {
/**
* It is technically possible to run without queues but
* there are 2 known bugs when running in this mode:
* - Executions list will be problematic as the main process
* is not aware of current executions in the webhook processes
* and therefore will display all current executions as error
* as it is unable to determine if it is still running or crashed
* - You cannot stop currently executing jobs from webhook processes
* when running without queues as the main process cannot talk to
* the webhook processes to communicate workflow execution interruption.
*/
this.error('Webhook processes can only run with execution mode as queue.');
}
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error) => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access
logger.error(`There was an error initializing DB: "${error.message}"`);
processExitCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
});
// Make sure the settings exist
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
await CredentialsOverwrites(credentialTypes).init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Wait till the database is ready
await startDbInitPromise;
const instanceId = await UserSettings.getInstanceId();
const { cli } = await GenericHelpers.getVersions();
await InternalHooksManager.init(instanceId, cli, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
if (config.getEnv('executions.mode') === 'queue') {
const redisHost = config.getEnv('queue.bull.redis.host');
const redisUsername = config.getEnv('queue.bull.redis.username');
const redisPassword = config.getEnv('queue.bull.redis.password');
const redisPort = config.getEnv('queue.bull.redis.port');
const redisDB = config.getEnv('queue.bull.redis.db');
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
let lastTimer = 0;
let cumulativeTimeout = 0;
const settings = {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
retryStrategy: (times: number): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
} as IDataObject;
if (redisHost) {
settings.host = redisHost;
}
if (redisUsername) {
settings.username = redisUsername;
}
if (redisPassword) {
settings.password = redisPassword;
}
if (redisPort) {
settings.port = redisPort;
}
if (redisDB) {
settings.db = redisDB;
}
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above
// to control how and when to exit.
const redis = new Redis(settings);
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
logger.warn('Redis unavailable - trying to reconnect...');
} else {
logger.warn('Error with Redis: ', error);
}
});
}
await WebhookServer.start();
// Start to get active workflows and run their triggers
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
await activeWorkflowRunner.initWebhooks();
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const editorUrl = GenericHelpers.getBaseUrl();
console.info('Webhook listener waiting for requests.');
} catch (error) {
console.error('Exiting due to error. See log message for details.');
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
logger.error(`Webhook process cannot continue. "${error.message}"`);
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error) => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access
logger.error(`There was an error initializing DB: "${error.message}"`);
processExitCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
}
})();
process.emit('exit', processExitCode);
});
// Make sure the settings exist
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await UserSettings.prepareUserSettings();
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();
// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
// Load the credentials overwrites if any exist
await CredentialsOverwrites(credentialTypes).init();
// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();
// Wait till the database is ready
await startDbInitPromise;
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
const server = new WebhookServer();
await server.start();
console.info('Webhook listener waiting for requests.');
} catch (error) {
console.error('Exiting due to error.', error);
processExitCode = 1;
process.emit('exit', processExitCode);
}
}
}

View file

@ -43,6 +43,7 @@ import config from '@/config';
import * as Queue from '@/Queue';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants';
export class Worker extends Command {
static description = '\nStarts a n8n worker';
@ -304,16 +305,15 @@ export class Worker extends Command {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, nodeTypes));
const versions = await GenericHelpers.getVersions();
const instanceId = await UserSettings.getInstanceId();
await InternalHooksManager.init(instanceId, versions.cli, nodeTypes);
await InternalHooksManager.init(instanceId, nodeTypes);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
console.info('\nn8n worker is now ready');
console.info(` * Version: ${versions.cli}`);
console.info(` * Version: ${N8N_VERSION}`);
console.info(` * Concurrency: ${flags.concurrency}`);
console.info('');

View file

@ -1,8 +1,14 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/naming-convention */
import { readFileSync } from 'fs';
import { resolve, join, dirname } from 'path';
import { RESPONSE_ERROR_MESSAGES as CORE_RESPONSE_ERROR_MESSAGES, UserSettings } from 'n8n-core';
import {
n8n,
RESPONSE_ERROR_MESSAGES as CORE_RESPONSE_ERROR_MESSAGES,
UserSettings,
} from 'n8n-core';
import { jsonParse } from 'n8n-workflow';
const { NODE_ENV, E2E_TESTS } = process.env;
export const inProduction = NODE_ENV === 'production';
@ -16,6 +22,10 @@ export const NODES_BASE_DIR = join(CLI_DIR, '..', 'nodes-base');
export const GENERATED_STATIC_DIR = join(UserSettings.getUserHome(), '.cache/n8n/public');
export const EDITOR_UI_DIST_DIR = join(dirname(require.resolve('n8n-editor-ui')), 'dist');
export const N8N_VERSION = jsonParse<n8n.PackageJson>(
readFileSync(join(CLI_DIR, 'package.json'), 'utf8'),
).version;
export const NODE_PACKAGE_PREFIX = 'n8n-nodes-';
export const STARTER_TEMPLATE_NAME = `${NODE_PACKAGE_PREFIX}starter`;

View file

@ -5,15 +5,14 @@ import { MessageEventBusDestination } from './MessageEventBusDestination.ee';
import * as Sentry from '@sentry/node';
import { eventBus } from '../MessageEventBus/MessageEventBus';
import {
LoggerProxy,
MessageEventBusDestinationOptions,
MessageEventBusDestinationSentryOptions,
MessageEventBusDestinationTypeNames,
} from 'n8n-workflow';
import { GenericHelpers } from '../..';
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
import { EventMessageTypes } from '../EventMessageClasses';
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
import { N8N_VERSION } from '@/constants';
export const isMessageEventBusDestinationSentryOptions = (
candidate: unknown,
@ -45,22 +44,15 @@ export class MessageEventBusDestinationSentry
if (options.tracesSampleRate) this.tracesSampleRate = options.tracesSampleRate;
const { ENVIRONMENT: environment } = process.env;
GenericHelpers.getVersions()
.then((versions) => {
this.sentryClient = new Sentry.NodeClient({
dsn: this.dsn,
tracesSampleRate: this.tracesSampleRate,
environment,
release: versions.cli,
transport: Sentry.makeNodeTransport,
integrations: Sentry.defaultIntegrations,
stackParser: Sentry.defaultStackParser,
});
LoggerProxy.debug(`MessageEventBusDestinationSentry with id ${this.getId()} initialized`);
})
.catch((error) => {
console.error(error);
});
this.sentryClient = new Sentry.NodeClient({
dsn: this.dsn,
tracesSampleRate: this.tracesSampleRate,
environment,
release: N8N_VERSION,
transport: Sentry.makeNodeTransport,
integrations: Sentry.defaultIntegrations,
stackParser: Sentry.defaultStackParser,
});
}
async receiveFromEventBus(msg: EventMessageTypes): Promise<boolean> {

View file

@ -1,8 +1,7 @@
import { inDevelopment } from '@/constants';
import type { RequestHandler } from 'express';
export const corsMiddleware: RequestHandler = (req, res, next) => {
if (inDevelopment && 'origin' in req.headers) {
if ('origin' in req.headers) {
// Allow access also from frontend when developing
res.header('Access-Control-Allow-Origin', req.headers.origin);
res.header('Access-Control-Allow-Credentials', 'true');

View file

@ -8,6 +8,7 @@ import { IExecutionTrackProperties } from '@/Interfaces';
import { getLogger } from '@/Logger';
import { getLicense } from '@/License';
import { LicenseService } from '@/license/License.service';
import { N8N_VERSION } from '@/constants';
type ExecutionTrackDataKey = 'manual_error' | 'manual_success' | 'prod_error' | 'prod_success';
@ -35,7 +36,7 @@ export class Telemetry {
private executionCountsBuffer: IExecutionsBuffer = {};
constructor(private instanceId: string, private versionCli: string) {}
constructor(private instanceId: string) {}
async init() {
const enabled = config.getEnv('diagnostics.enabled');
@ -179,7 +180,7 @@ export class Telemetry {
const updatedProperties: ITelemetryTrackProperties = {
...properties,
instance_id: this.instanceId,
version_cli: this.versionCli,
version_cli: N8N_VERSION,
};
const payload = {

View file

@ -6,13 +6,11 @@ import * as testDb from './shared/testDb';
import type { AuthAgent } from './shared/types';
import * as utils from './shared/utils';
import { ILicensePostResponse, ILicenseReadResponse } from '@/Interfaces';
import { LicenseManager } from '@n8n_io/license-sdk';
import { License } from '@/License';
const MOCK_SERVER_URL = 'https://server.com/v1';
const MOCK_RENEW_OFFSET = 259200;
const MOCK_INSTANCE_ID = 'instance-id';
const MOCK_N8N_VERSION = '0.27.0';
let app: express.Application;
let testDbName = '';
@ -41,7 +39,7 @@ beforeAll(async () => {
beforeEach(async () => {
license = new License();
await license.init(MOCK_INSTANCE_ID, MOCK_N8N_VERSION);
await license.init(MOCK_INSTANCE_ID);
});
afterEach(async () => {

View file

@ -1,13 +1,13 @@
import { LicenseManager } from '@n8n_io/license-sdk';
import config from '@/config';
import { License } from '@/License';
import { N8N_VERSION } from '@/constants';
jest.mock('@n8n_io/license-sdk');
const MOCK_SERVER_URL = 'https://server.com/v1';
const MOCK_RENEW_OFFSET = 259200;
const MOCK_INSTANCE_ID = 'instance-id';
const MOCK_N8N_VERSION = '0.27.0';
const MOCK_ACTIVATION_KEY = 'activation-key';
const MOCK_FEATURE_FLAG = 'feat:mock';
const MOCK_MAIN_PLAN_ID = 1234;
@ -23,7 +23,7 @@ describe('License', () => {
beforeEach(async () => {
license = new License();
await license.init(MOCK_INSTANCE_ID, MOCK_N8N_VERSION);
await license.init(MOCK_INSTANCE_ID);
});
test('initializes license manager', async () => {
@ -31,7 +31,7 @@ describe('License', () => {
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${MOCK_N8N_VERSION}`,
productIdentifier: `n8n-${N8N_VERSION}`,
logger: expect.anything(),
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),

View file

@ -103,6 +103,7 @@ importers:
'@sentry/node': ^7.28.1
'@types/basic-auth': ^1.1.2
'@types/bcryptjs': ^2.4.2
'@types/body-parser-xml': ^2.0.2
'@types/compression': 1.0.1
'@types/connect-history-api-fallback': ^1.3.1
'@types/convict': ^4.2.1
@ -312,6 +313,7 @@ importers:
'@oclif/dev-cli': 1.26.10
'@types/basic-auth': 1.1.3
'@types/bcryptjs': 2.4.2
'@types/body-parser-xml': 2.0.2
'@types/compression': 1.0.1
'@types/connect-history-api-fallback': 1.3.5
'@types/convict': 4.2.1
@ -5550,6 +5552,16 @@ packages:
resolution: {integrity: sha512-g2qEd+zkfkTEudA2SrMAeAvY7CrFqtbsLILm2dT2VIeKTqMqVzcdfURlvu6FU3srRgbmXN1Srm94pg34EIehww==}
dev: true
/@types/body-parser-xml/2.0.2:
resolution: {integrity: sha512-LlmFkP3BTfacofFevInpM8iZ6+hALZ9URUt5JpSw76irhHCdbqbcBtbxbu2MO8HUGoIROQ5wuB55rLS99xNgCg==}
dependencies:
'@types/body-parser': 1.19.2
'@types/connect': 3.4.35
'@types/express-serve-static-core': 4.17.31
'@types/node': 16.11.65
'@types/xml2js': 0.4.11
dev: true
/@types/body-parser/1.19.2:
resolution: {integrity: sha512-ALYone6pm6QmwZoAgeyNksccT9Q4AWZQ6PvfwR37GT6r6FWUPguq6sUmNGSMV2Wr761oQoBxwGGa6DR5o1DC9g==}
dependencies: