mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-16 07:17:49 -08:00
9bd7529193
* use typedi for UserManagementMailer * use typedi for SamlService * fix typos * use typedi for Queue * use typedi for License * convert some more code to use typedi
460 lines
13 KiB
TypeScript
460 lines
13 KiB
TypeScript
import { Container } from 'typedi';
|
|
import { readFile } from 'fs/promises';
|
|
import type { Server } from 'http';
|
|
import type { Url } from 'url';
|
|
import express from 'express';
|
|
import bodyParser from 'body-parser';
|
|
import bodyParserXml from 'body-parser-xml';
|
|
import compression from 'compression';
|
|
import parseUrl from 'parseurl';
|
|
import type { RedisOptions } from 'ioredis';
|
|
|
|
import type { WebhookHttpMethod } from 'n8n-workflow';
|
|
import { ErrorReporterProxy as ErrorReporter, LoggerProxy as Logger } from 'n8n-workflow';
|
|
import config from '@/config';
|
|
import { N8N_VERSION, inDevelopment } from '@/constants';
|
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
|
import * as Db from '@/Db';
|
|
import type { IExternalHooksClass } from '@/Interfaces';
|
|
import { ExternalHooks } from '@/ExternalHooks';
|
|
import {
|
|
send,
|
|
sendErrorResponse,
|
|
sendSuccessResponse,
|
|
ServiceUnavailableError,
|
|
} from '@/ResponseHelper';
|
|
import { corsMiddleware } from '@/middlewares';
|
|
import { TestWebhooks } from '@/TestWebhooks';
|
|
import { WaitingWebhooks } from '@/WaitingWebhooks';
|
|
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
|
|
|
|
const emptyBuffer = Buffer.alloc(0);
|
|
|
|
export abstract class AbstractServer {
|
|
protected server: Server;
|
|
|
|
protected app: express.Application;
|
|
|
|
protected externalHooks: IExternalHooksClass;
|
|
|
|
protected activeWorkflowRunner: ActiveWorkflowRunner;
|
|
|
|
protected protocol: string;
|
|
|
|
protected sslKey: string;
|
|
|
|
protected sslCert: string;
|
|
|
|
protected timezone: string;
|
|
|
|
protected restEndpoint: string;
|
|
|
|
protected endpointWebhook: string;
|
|
|
|
protected endpointWebhookTest: string;
|
|
|
|
protected endpointWebhookWaiting: string;
|
|
|
|
abstract configure(): Promise<void>;
|
|
|
|
constructor() {
|
|
this.app = express();
|
|
this.app.disable('x-powered-by');
|
|
|
|
this.protocol = config.getEnv('protocol');
|
|
this.sslKey = config.getEnv('ssl_key');
|
|
this.sslCert = config.getEnv('ssl_cert');
|
|
|
|
this.timezone = config.getEnv('generic.timezone');
|
|
|
|
this.restEndpoint = config.getEnv('endpoints.rest');
|
|
this.endpointWebhook = config.getEnv('endpoints.webhook');
|
|
this.endpointWebhookTest = config.getEnv('endpoints.webhookTest');
|
|
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
|
|
|
|
this.externalHooks = Container.get(ExternalHooks);
|
|
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
|
}
|
|
|
|
private async setupErrorHandlers() {
|
|
const { app } = this;
|
|
|
|
// Augment errors sent to Sentry
|
|
const {
|
|
Handlers: { requestHandler, errorHandler },
|
|
} = await import('@sentry/node');
|
|
app.use(requestHandler());
|
|
app.use(errorHandler());
|
|
}
|
|
|
|
private async setupCommonMiddlewares() {
|
|
const { app } = this;
|
|
|
|
// Compress the response data
|
|
app.use(compression());
|
|
|
|
// Make sure that each request has the "parsedUrl" parameter
|
|
app.use((req, res, next) => {
|
|
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
|
req.parsedUrl = parseUrl(req)!;
|
|
req.rawBody = emptyBuffer;
|
|
next();
|
|
});
|
|
|
|
const payloadSizeMax = config.getEnv('endpoints.payloadSizeMax');
|
|
|
|
// Support application/json type post data
|
|
app.use(
|
|
bodyParser.json({
|
|
limit: `${payloadSizeMax}mb`,
|
|
verify: (req, res, buf) => {
|
|
req.rawBody = buf;
|
|
},
|
|
}),
|
|
);
|
|
|
|
// Support application/xml type post data
|
|
bodyParserXml(bodyParser);
|
|
app.use(
|
|
bodyParser.xml({
|
|
limit: `${payloadSizeMax}mb`,
|
|
xmlParseOptions: {
|
|
normalize: true, // Trim whitespace inside text nodes
|
|
normalizeTags: true, // Transform tags to lowercase
|
|
explicitArray: false, // Only put properties in array if length > 1
|
|
},
|
|
verify: (req, res, buf) => {
|
|
req.rawBody = buf;
|
|
},
|
|
}),
|
|
);
|
|
|
|
app.use(
|
|
bodyParser.text({
|
|
limit: `${payloadSizeMax}mb`,
|
|
verify: (req, res, buf) => {
|
|
req.rawBody = buf;
|
|
},
|
|
}),
|
|
);
|
|
|
|
// support application/x-www-form-urlencoded post data
|
|
app.use(
|
|
bodyParser.urlencoded({
|
|
limit: `${payloadSizeMax}mb`,
|
|
extended: false,
|
|
verify: (req, res, buf) => {
|
|
req.rawBody = buf;
|
|
},
|
|
}),
|
|
);
|
|
}
|
|
|
|
private setupDevMiddlewares() {
|
|
this.app.use(corsMiddleware);
|
|
}
|
|
|
|
protected setupPushServer() {}
|
|
|
|
private async setupHealthCheck() {
|
|
this.app.use((req, res, next) => {
|
|
if (!Db.isInitialized) {
|
|
sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!'));
|
|
} else next();
|
|
});
|
|
|
|
// Does very basic health check
|
|
this.app.get('/healthz', async (req, res) => {
|
|
Logger.debug('Health check started!');
|
|
|
|
const connection = Db.getConnection();
|
|
|
|
try {
|
|
if (!connection.isInitialized) {
|
|
// Connection is not active
|
|
throw new ServiceUnavailableError('No active database connection!');
|
|
}
|
|
// DB ping
|
|
await connection.query('SELECT 1');
|
|
} catch (error) {
|
|
ErrorReporter.error(error);
|
|
Logger.error('No Database connection!');
|
|
return sendErrorResponse(res, new ServiceUnavailableError('No Database connection!'));
|
|
}
|
|
|
|
Logger.debug('Health check completed successfully!');
|
|
sendSuccessResponse(res, { status: 'ok' }, true, 200);
|
|
});
|
|
|
|
if (config.getEnv('executions.mode') === 'queue') {
|
|
await this.setupRedisChecks();
|
|
}
|
|
}
|
|
|
|
// This connection is going to be our heartbeat
|
|
// IORedis automatically pings redis and tries to reconnect
|
|
// We will be using a retryStrategy to control how and when to exit.
|
|
private async setupRedisChecks() {
|
|
// eslint-disable-next-line @typescript-eslint/naming-convention
|
|
const { default: Redis } = await import('ioredis');
|
|
|
|
let lastTimer = 0;
|
|
let cumulativeTimeout = 0;
|
|
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
|
|
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
|
|
|
|
const redis = new Redis({
|
|
host,
|
|
port,
|
|
db,
|
|
username,
|
|
password,
|
|
retryStrategy: (): number | null => {
|
|
const now = Date.now();
|
|
if (now - lastTimer > 30000) {
|
|
// Means we had no timeout at all or last timeout was temporary and we recovered
|
|
lastTimer = now;
|
|
cumulativeTimeout = 0;
|
|
} else {
|
|
cumulativeTimeout += now - lastTimer;
|
|
lastTimer = now;
|
|
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
|
|
Logger.error(
|
|
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
|
|
);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
return 500;
|
|
},
|
|
});
|
|
|
|
redis.on('close', () => {
|
|
Logger.warn('Redis unavailable - trying to reconnect...');
|
|
});
|
|
|
|
redis.on('error', (error) => {
|
|
if (!String(error).includes('ECONNREFUSED')) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
Logger.warn('Error with Redis: ', error);
|
|
}
|
|
});
|
|
}
|
|
|
|
// ----------------------------------------
|
|
// Regular Webhooks
|
|
// ----------------------------------------
|
|
protected setupWebhookEndpoint() {
|
|
const endpoint = this.endpointWebhook;
|
|
const activeWorkflowRunner = this.activeWorkflowRunner;
|
|
|
|
// Register all webhook requests
|
|
this.app.all(`/${endpoint}/*`, async (req, res) => {
|
|
// Cut away the "/webhook/" to get the registered part of the url
|
|
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
|
|
|
|
const method = req.method.toUpperCase() as WebhookHttpMethod;
|
|
if (method === 'OPTIONS') {
|
|
let allowedMethods: string[];
|
|
try {
|
|
allowedMethods = await activeWorkflowRunner.getWebhookMethods(requestUrl);
|
|
allowedMethods.push('OPTIONS');
|
|
|
|
// Add custom "Allow" header to satisfy OPTIONS response.
|
|
res.append('Allow', allowedMethods);
|
|
} catch (error) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
sendErrorResponse(res, error);
|
|
return;
|
|
}
|
|
|
|
res.header('Access-Control-Allow-Origin', '*');
|
|
|
|
sendSuccessResponse(res, {}, true, 204);
|
|
return;
|
|
}
|
|
|
|
if (!WEBHOOK_METHODS.includes(method)) {
|
|
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
|
|
return;
|
|
}
|
|
|
|
let response;
|
|
try {
|
|
response = await activeWorkflowRunner.executeWebhook(method, requestUrl, req, res);
|
|
} catch (error) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
sendErrorResponse(res, error);
|
|
return;
|
|
}
|
|
|
|
if (response.noWebhookResponse === true) {
|
|
// Nothing else to do as the response got already sent
|
|
return;
|
|
}
|
|
|
|
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
|
|
});
|
|
}
|
|
|
|
// ----------------------------------------
|
|
// Waiting Webhooks
|
|
// ----------------------------------------
|
|
protected setupWaitingWebhookEndpoint() {
|
|
const endpoint = this.endpointWebhookWaiting;
|
|
const waitingWebhooks = Container.get(WaitingWebhooks);
|
|
|
|
// Register all webhook-waiting requests
|
|
this.app.all(`/${endpoint}/*`, async (req, res) => {
|
|
// Cut away the "/webhook-waiting/" to get the registered part of the url
|
|
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
|
|
|
|
const method = req.method.toUpperCase() as WebhookHttpMethod;
|
|
|
|
if (!WEBHOOK_METHODS.includes(method)) {
|
|
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
|
|
return;
|
|
}
|
|
|
|
let response;
|
|
try {
|
|
response = await waitingWebhooks.executeWebhook(method, requestUrl, req, res);
|
|
} catch (error) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
sendErrorResponse(res, error);
|
|
return;
|
|
}
|
|
|
|
if (response.noWebhookResponse === true) {
|
|
// Nothing else to do as the response got already sent
|
|
return;
|
|
}
|
|
|
|
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
|
|
});
|
|
}
|
|
|
|
// ----------------------------------------
|
|
// Testing Webhooks
|
|
// ----------------------------------------
|
|
protected setupTestWebhookEndpoint() {
|
|
const endpoint = this.endpointWebhookTest;
|
|
const testWebhooks = Container.get(TestWebhooks);
|
|
|
|
// Register all test webhook requests (for testing via the UI)
|
|
this.app.all(`/${endpoint}/*`, async (req, res) => {
|
|
// Cut away the "/webhook-test/" to get the registered part of the url
|
|
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
|
|
|
|
const method = req.method.toUpperCase() as WebhookHttpMethod;
|
|
|
|
if (method === 'OPTIONS') {
|
|
let allowedMethods: string[];
|
|
try {
|
|
allowedMethods = await testWebhooks.getWebhookMethods(requestUrl);
|
|
allowedMethods.push('OPTIONS');
|
|
|
|
// Add custom "Allow" header to satisfy OPTIONS response.
|
|
res.append('Allow', allowedMethods);
|
|
} catch (error) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
sendErrorResponse(res, error);
|
|
return;
|
|
}
|
|
|
|
res.header('Access-Control-Allow-Origin', '*');
|
|
|
|
sendSuccessResponse(res, {}, true, 204);
|
|
return;
|
|
}
|
|
|
|
if (!WEBHOOK_METHODS.includes(method)) {
|
|
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
|
|
return;
|
|
}
|
|
|
|
let response;
|
|
try {
|
|
response = await testWebhooks.callTestWebhook(method, requestUrl, req, res);
|
|
} catch (error) {
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
sendErrorResponse(res, error);
|
|
return;
|
|
}
|
|
|
|
if (response.noWebhookResponse === true) {
|
|
// Nothing else to do as the response got already sent
|
|
return;
|
|
}
|
|
|
|
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
|
|
});
|
|
|
|
// Removes a test webhook
|
|
// TODO UM: check if this needs validation with user management.
|
|
this.app.delete(
|
|
`/${this.restEndpoint}/test-webhook/:id`,
|
|
send(async (req) => testWebhooks.cancelTestWebhook(req.params.id)),
|
|
);
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
const { app, externalHooks, protocol, sslKey, sslCert } = this;
|
|
|
|
if (protocol === 'https' && sslKey && sslCert) {
|
|
const https = await import('https');
|
|
this.server = https.createServer(
|
|
{
|
|
key: await readFile(this.sslKey, 'utf8'),
|
|
cert: await readFile(this.sslCert, 'utf8'),
|
|
},
|
|
app,
|
|
);
|
|
} else {
|
|
const http = await import('http');
|
|
this.server = http.createServer(app);
|
|
}
|
|
|
|
const PORT = config.getEnv('port');
|
|
const ADDRESS = config.getEnv('listen_address');
|
|
|
|
this.server.on('error', (error: Error & { code: string }) => {
|
|
if (error.code === 'EADDRINUSE') {
|
|
console.log(
|
|
`n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`,
|
|
);
|
|
process.exit(1);
|
|
}
|
|
});
|
|
|
|
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));
|
|
|
|
await this.setupErrorHandlers();
|
|
this.setupPushServer();
|
|
await this.setupCommonMiddlewares();
|
|
if (inDevelopment) {
|
|
this.setupDevMiddlewares();
|
|
}
|
|
|
|
await this.setupHealthCheck();
|
|
|
|
await this.configure();
|
|
|
|
console.log(`n8n ready on ${ADDRESS}, port ${PORT}`);
|
|
console.log(`Version: ${N8N_VERSION}`);
|
|
|
|
const defaultLocale = config.getEnv('defaultLocale');
|
|
if (defaultLocale !== 'en') {
|
|
console.log(`Locale: ${defaultLocale}`);
|
|
}
|
|
|
|
await externalHooks.run('n8n.ready', [this, config]);
|
|
}
|
|
}
|
|
|
|
declare module 'http' {
|
|
export interface IncomingMessage {
|
|
parsedUrl: Url;
|
|
}
|
|
}
|