feat(core): Unify application components shutdown (#8097)

## Summary

Add `ShutdownService` and `OnShutdown` decorator for more unified way to
shutdown different components. Use this new way in the following
components:

- HTTP(S) server
- Pruning service
- Push connection
- License

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Tomi Turtiainen 2023-12-22 12:39:58 +02:00 committed by GitHub
parent c158ca2471
commit 3a881be6c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 412 additions and 17 deletions

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container, Service } from 'typedi';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import express from 'express';
@ -9,7 +9,8 @@ import config from '@/config';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import type { N8nInstanceType, IExternalHooksClass } from '@/Interfaces';
import { N8nInstanceType } from '@/Interfaces';
import type { IExternalHooksClass } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
import { send, sendErrorResponse } from '@/ResponseHelper';
import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
@ -20,7 +21,9 @@ import { webhookRequestHandler } from '@/WebhookHelpers';
import { generateHostInstanceId } from './databases/utils/generators';
import { Logger } from '@/Logger';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
import { OnShutdown } from '@/decorators/OnShutdown';
@Service()
export abstract class AbstractServer {
protected logger: Logger;
@ -246,4 +249,26 @@ export abstract class AbstractServer {
await this.externalHooks.run('n8n.ready', [this, config]);
}
}
/**
* Stops the HTTP(S) server from accepting new connections. Gives all
* connections configured amount of time to finish their work and
* then closes them forcefully.
*/
@OnShutdown()
async onShutdown(): Promise<void> {
if (!this.server) {
return;
}
this.logger.debug(`Shutting down ${this.protocol} server`);
this.server.close((error) => {
if (error) {
this.logger.error(`Error while shutting down ${this.protocol} server`, { error });
}
this.logger.debug(`${this.protocol} server shut down`);
});
}
}

View file

@ -65,6 +65,7 @@ import { ActivationErrorsService } from '@/ActivationErrors.service';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { OnShutdown } from '@/decorators/OnShutdown';
interface QueuedActivation {
activationMode: WorkflowActivateMode;
@ -664,6 +665,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.addActiveWorkflows('leadershipChange');
}
@OnShutdown()
async removeAllTriggerAndPollerBasedWorkflows() {
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}

View file

@ -17,6 +17,7 @@ import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } fr
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { OnShutdown } from '@/decorators/OnShutdown';
type FeatureReturnType = Partial<
{
@ -30,6 +31,8 @@ export class License {
private redisPublisher: RedisServicePubSubPublisher;
private isShuttingDown = false;
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
@ -40,6 +43,11 @@ export class License {
async init(instanceType: N8nInstanceType = 'main') {
if (this.manager) {
this.logger.warn('License manager already initialized or shutting down');
return;
}
if (this.isShuttingDown) {
this.logger.warn('License manager already shutting down');
return;
}
@ -191,7 +199,12 @@ export class License {
await this.manager.renew();
}
@OnShutdown()
async shutdown() {
// Shut down License manager to unclaim any floating entitlements
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
this.isShuttingDown = true;
if (!this.manager) {
return;
}

View file

@ -6,6 +6,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import assert from 'assert';
import { exec as callbackExec } from 'child_process';
import { access as fsAccess } from 'fs/promises';
@ -84,7 +85,6 @@ import { handleLdapInit, isLdapEnabled } from './Ldap/helpers';
import { AbstractServer } from './AbstractServer';
import { PostHogClient } from './posthog';
import { eventBus } from './eventbus';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
import { License } from './License';
import { getStatusUsingPreviousExecutionStatusMethod } from './executions/executionHelpers';
@ -124,6 +124,7 @@ import { PasswordUtility } from './services/password.utility';
const exec = promisify(callbackExec);
@Service()
export class Server extends AbstractServer {
private endpointPresetCredentials: string;

View file

@ -1,5 +1,7 @@
import { Service } from 'typedi';
import { AbstractServer } from '@/AbstractServer';
@Service()
export class WebhookServer extends AbstractServer {
constructor() {
super('webhook');

View file

@ -22,6 +22,7 @@ import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager
import { initExpressionEvaluator } from '@/ExpressionEvaluator';
import { generateHostInstanceId } from '@db/utils/generators';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { ShutdownService } from '@/shutdown/Shutdown.service';
export abstract class BaseCommand extends Command {
protected logger = Container.get(Logger);
@ -38,7 +39,7 @@ export abstract class BaseCommand extends Command {
protected server?: AbstractServer;
protected isShuttingDown = false;
protected shutdownService: ShutdownService = Container.get(ShutdownService);
/**
* How long to wait for graceful shutdown before force killing the process.
@ -309,7 +310,7 @@ export abstract class BaseCommand extends Command {
private onTerminationSignal(signal: string) {
return async () => {
if (this.isShuttingDown) {
if (this.shutdownService.isShuttingDown()) {
this.logger.info(`Received ${signal}. Already shutting down...`);
return;
}
@ -323,9 +324,9 @@ export abstract class BaseCommand extends Command {
}, this.gracefulShutdownTimeoutInS * 1000);
this.logger.info(`Received ${signal}. Shutting down...`);
this.isShuttingDown = true;
this.shutdownService.shutdown();
await this.stopProcess();
await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]);
clearTimeout(forceShutdownTimer);
};

View file

@ -63,7 +63,7 @@ export class Start extends BaseCommand {
protected activeWorkflowRunner: ActiveWorkflowRunner;
protected server = new Server();
protected server = Container.get(Server);
private pruningService: PruningService;
@ -101,14 +101,6 @@ export class Start extends BaseCommand {
await this.externalHooks?.run('n8n.stop', []);
// Shut down License manager to unclaim any floating entitlements
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
await Container.get(License).shutdown();
if (this.pruningService.isPruningEnabled()) {
this.pruningService.stopPruning();
}
if (Container.get(MultiMainSetup).isEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();

View file

@ -19,7 +19,7 @@ export class Webhook extends BaseCommand {
help: flags.help({ char: 'h' }),
};
protected server = new WebhookServer();
protected server = Container.get(WebhookServer);
constructor(argv: string[], cmdConfig: IConfig) {
super(argv, cmdConfig);

View file

@ -0,0 +1,38 @@
import { Container } from 'typedi';
import { ApplicationError } from 'n8n-workflow';
import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service';
/**
* Decorator that registers a method as a shutdown hook. The method will
* be called when the application is shutting down.
*
* Priority is used to determine the order in which the hooks are called.
*
* NOTE: Requires also @Service() decorator to be used on the class.
*
* @example
* ```ts
* @Service()
* class MyClass {
* @OnShutdown()
* async shutdown() {
* // Will be called when the app is shutting down
* }
* }
* ```
*/
export const OnShutdown =
(priority = 100): MethodDecorator =>
(prototype, propertyKey, descriptor) => {
const serviceClass = prototype.constructor as ServiceClass;
const methodName = String(propertyKey);
// TODO: assert that serviceClass is decorated with @Service
if (typeof descriptor?.value === 'function') {
Container.get(ShutdownService).register(priority, { serviceClass, methodName });
} else {
const name = `${serviceClass.name}.${methodName}()`;
throw new ApplicationError(
`${name} must be a method on ${serviceClass.name} to use "OnShutdown"`,
);
}
};

View file

@ -94,4 +94,17 @@ export abstract class AbstractPush<T> extends EventEmitter {
this.sendToSessions(type, data, userSessionIds);
}
/**
* Closes all push existing connections
*/
closeAllConnections() {
for (const sessionId in this.connections) {
// Signal the connection that we want to close it.
// We are not removing the sessions here because it should be
// the implementation's responsibility to do so once the connection
// has actually closed.
this.close(this.connections[sessionId]);
}
}
}

View file

@ -14,6 +14,7 @@ import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import type { IPushDataType } from '@/Interfaces';
import type { User } from '@db/entities/User';
import { OnShutdown } from '@/decorators/OnShutdown';
const useWebSockets = config.getEnv('push.backend') === 'websocket';
@ -70,6 +71,11 @@ export class Push extends EventEmitter {
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
this.backend.sendToUsers(type, data, userIds);
}
@OnShutdown()
onShutdown(): void {
this.backend.closeAllConnections();
}
}
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {

View file

@ -10,6 +10,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { jsonStringify } from 'n8n-workflow';
import { OnShutdown } from '@/decorators/OnShutdown';
@Service()
export class PruningService {
@ -24,6 +25,8 @@ export class PruningService {
public hardDeletionTimeout: NodeJS.Timeout | undefined;
private isShuttingDown = false;
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
@ -54,6 +57,11 @@ export class PruningService {
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
if (this.isShuttingDown) {
this.logger.warn('[Pruning] Cannot start pruning while shutting down');
return;
}
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
@ -158,6 +166,12 @@ export class PruningService {
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
}
@OnShutdown()
shutdown(): void {
this.isShuttingDown = true;
this.stopPruning();
}
/**
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
* @return Delay in ms after which the next cycle should be started

View file

@ -0,0 +1,85 @@
import { Container, Service } from 'typedi';
import { ApplicationError, ErrorReporterProxy, assert } from 'n8n-workflow';
import { Logger } from '@/Logger';
export interface ServiceClass {
new (): Record<string, () => Promise<void> | void>;
}
export interface ShutdownHandler {
serviceClass: ServiceClass;
methodName: string;
}
/** Error reported when a listener fails to shutdown gracefully */
export class ComponentShutdownError extends ApplicationError {
constructor(componentName: string, cause: Error) {
super('Failed to shutdown gracefully', {
level: 'error',
cause,
extra: { component: componentName },
});
}
}
/** Service responsible for orchestrating a graceful shutdown of the application */
@Service()
export class ShutdownService {
private readonly handlersByPriority: ShutdownHandler[][] = [];
private shutdownPromise: Promise<void> | undefined;
constructor(private readonly logger: Logger) {}
/** Registers given listener to be notified when the application is shutting down */
register(priority: number, handler: ShutdownHandler) {
if (!this.handlersByPriority[priority]) {
this.handlersByPriority[priority] = [];
}
this.handlersByPriority[priority].push(handler);
}
/** Signals all registered listeners that the application is shutting down */
shutdown() {
if (this.shutdownPromise) {
throw new ApplicationError('App is already shutting down');
}
this.shutdownPromise = this.startShutdown();
}
/** Returns a promise that resolves when all the registered listeners have shut down */
async waitForShutdown(): Promise<void> {
if (!this.shutdownPromise) {
throw new ApplicationError('App is not shutting down');
}
await this.shutdownPromise;
}
isShuttingDown() {
return !!this.shutdownPromise;
}
private async startShutdown() {
const handlers = Object.values(this.handlersByPriority).reverse();
for (const handlerGroup of handlers) {
await Promise.allSettled(
handlerGroup.map(async (handler) => this.shutdownComponent(handler)),
);
}
}
private async shutdownComponent({ serviceClass, methodName }: ShutdownHandler) {
const name = `${serviceClass.name}.${methodName}()`;
try {
this.logger.debug(`Shutting down component "${name}"`);
const service = Container.get(serviceClass);
const method = service[methodName];
await method.call(service);
} catch (error) {
assert(error instanceof Error);
ErrorReporterProxy.error(new ComponentShutdownError(name, error));
}
}
}

View file

@ -0,0 +1,76 @@
import Container, { Service } from 'typedi';
import { OnShutdown } from '@/decorators/OnShutdown';
import { ShutdownService } from '@/shutdown/Shutdown.service';
import { mock } from 'jest-mock-extended';
describe('OnShutdown', () => {
let shutdownService: ShutdownService;
let registerSpy: jest.SpyInstance;
beforeEach(() => {
shutdownService = new ShutdownService(mock());
Container.set(ShutdownService, shutdownService);
registerSpy = jest.spyOn(shutdownService, 'register');
});
it('should register a methods that is decorated with OnShutdown', () => {
@Service()
class TestClass {
@OnShutdown()
async onShutdown() {}
}
expect(shutdownService.register).toHaveBeenCalledTimes(1);
expect(shutdownService.register).toHaveBeenCalledWith(100, {
methodName: 'onShutdown',
serviceClass: TestClass,
});
});
it('should register multiple methods in the same class', () => {
@Service()
class TestClass {
@OnShutdown()
async one() {}
@OnShutdown()
async two() {}
}
expect(shutdownService.register).toHaveBeenCalledTimes(2);
expect(shutdownService.register).toHaveBeenCalledWith(100, {
methodName: 'one',
serviceClass: TestClass,
});
expect(shutdownService.register).toHaveBeenCalledWith(100, {
methodName: 'two',
serviceClass: TestClass,
});
});
it('should use the given priority', () => {
class TestClass {
@OnShutdown(10)
async onShutdown() {
// Will be called when the app is shutting down
}
}
expect(shutdownService.register).toHaveBeenCalledTimes(1);
// @ts-expect-error We are checking internal parts of the shutdown service
expect(shutdownService.handlersByPriority[10].length).toEqual(1);
});
it('should throw an error if the decorated member is not a function', () => {
expect(() => {
@Service()
class TestClass {
// @ts-expect-error Testing invalid code
@OnShutdown()
onShutdown = 'not a function';
}
new TestClass();
}).toThrow('TestClass.onShutdown() must be a method on TestClass to use "OnShutdown"');
});
});

View file

@ -0,0 +1,127 @@
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';
import type { ServiceClass } from '@/shutdown/Shutdown.service';
import { ShutdownService } from '@/shutdown/Shutdown.service';
import Container from 'typedi';
class MockComponent {
onShutdown() {}
}
describe('ShutdownService', () => {
let shutdownService: ShutdownService;
let mockComponent: MockComponent;
let onShutdownSpy: jest.SpyInstance;
let mockErrorReporterProxy: jest.SpyInstance;
beforeEach(() => {
shutdownService = new ShutdownService(mock());
mockComponent = new MockComponent();
Container.set(MockComponent, mockComponent);
onShutdownSpy = jest.spyOn(mockComponent, 'onShutdown');
mockErrorReporterProxy = jest.spyOn(ErrorReporterProxy, 'error').mockImplementation(() => {});
});
describe('shutdown', () => {
it('should signal shutdown', () => {
shutdownService.register(10, {
serviceClass: MockComponent as unknown as ServiceClass,
methodName: 'onShutdown',
});
shutdownService.shutdown();
expect(onShutdownSpy).toBeCalledTimes(1);
});
it('should signal shutdown in the priority order', async () => {
class MockService {
onShutdownHighPrio() {}
onShutdownLowPrio() {}
}
const order: string[] = [];
const mockService = new MockService();
Container.set(MockService, mockService);
jest.spyOn(mockService, 'onShutdownHighPrio').mockImplementation(() => order.push('high'));
jest.spyOn(mockService, 'onShutdownLowPrio').mockImplementation(() => order.push('low'));
shutdownService.register(100, {
serviceClass: MockService as unknown as ServiceClass,
methodName: 'onShutdownHighPrio',
});
shutdownService.register(10, {
serviceClass: MockService as unknown as ServiceClass,
methodName: 'onShutdownLowPrio',
});
shutdownService.shutdown();
await shutdownService.waitForShutdown();
expect(order).toEqual(['high', 'low']);
});
it('should throw error if shutdown is already in progress', () => {
shutdownService.register(10, {
methodName: 'onShutdown',
serviceClass: MockComponent as unknown as ServiceClass,
});
shutdownService.shutdown();
expect(() => shutdownService.shutdown()).toThrow('App is already shutting down');
});
it('should report error if component shutdown fails', async () => {
const componentError = new Error('Something went wrong');
onShutdownSpy.mockImplementation(() => {
throw componentError;
});
shutdownService.register(10, {
serviceClass: MockComponent as unknown as ServiceClass,
methodName: 'onShutdown',
});
shutdownService.shutdown();
await shutdownService.waitForShutdown();
expect(mockErrorReporterProxy).toHaveBeenCalledTimes(1);
const error = mockErrorReporterProxy.mock.calls[0][0];
expect(error).toBeInstanceOf(ApplicationError);
expect(error.message).toBe('Failed to shutdown gracefully');
expect(error.extra).toEqual({
component: 'MockComponent.onShutdown()',
});
expect(error.cause).toBe(componentError);
});
});
describe('waitForShutdown', () => {
it('should wait for shutdown', async () => {
shutdownService.register(10, {
serviceClass: MockComponent as unknown as ServiceClass,
methodName: 'onShutdown',
});
shutdownService.shutdown();
await expect(shutdownService.waitForShutdown()).resolves.toBeUndefined();
});
it('should throw error if app is not shutting down', async () => {
await expect(async () => shutdownService.waitForShutdown()).rejects.toThrow(
'App is not shutting down',
);
});
});
describe('isShuttingDown', () => {
it('should return true if app is shutting down', () => {
shutdownService.register(10, {
serviceClass: MockComponent as unknown as ServiceClass,
methodName: 'onShutdown',
});
shutdownService.shutdown();
expect(shutdownService.isShuttingDown()).toBe(true);
});
it('should return false if app is not shutting down', () => {
expect(shutdownService.isShuttingDown()).toBe(false);
});
});
});