diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index e97e4ce0a6..679e22142a 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -1,4 +1,4 @@ -import { Container } from 'typedi'; +import { Container, Service } from 'typedi'; import { readFile } from 'fs/promises'; import type { Server } from 'http'; import express from 'express'; @@ -9,7 +9,8 @@ import config from '@/config'; import { N8N_VERSION, inDevelopment, inTest } from '@/constants'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import * as Db from '@/Db'; -import type { N8nInstanceType, IExternalHooksClass } from '@/Interfaces'; +import { N8nInstanceType } from '@/Interfaces'; +import type { IExternalHooksClass } from '@/Interfaces'; import { ExternalHooks } from '@/ExternalHooks'; import { send, sendErrorResponse } from '@/ResponseHelper'; import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares'; @@ -20,7 +21,9 @@ import { webhookRequestHandler } from '@/WebhookHelpers'; import { generateHostInstanceId } from './databases/utils/generators'; import { Logger } from '@/Logger'; import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error'; +import { OnShutdown } from '@/decorators/OnShutdown'; +@Service() export abstract class AbstractServer { protected logger: Logger; @@ -246,4 +249,26 @@ export abstract class AbstractServer { await this.externalHooks.run('n8n.ready', [this, config]); } } + + /** + * Stops the HTTP(S) server from accepting new connections. Gives all + * connections configured amount of time to finish their work and + * then closes them forcefully. + */ + @OnShutdown() + async onShutdown(): Promise { + if (!this.server) { + return; + } + + this.logger.debug(`Shutting down ${this.protocol} server`); + + this.server.close((error) => { + if (error) { + this.logger.error(`Error while shutting down ${this.protocol} server`, { error }); + } + + this.logger.debug(`${this.protocol} server shut down`); + }); + } } diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 4542aa1603..b45d0602d4 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -65,6 +65,7 @@ import { ActivationErrorsService } from '@/ActivationErrors.service'; import { NotFoundError } from './errors/response-errors/not-found.error'; import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; +import { OnShutdown } from '@/decorators/OnShutdown'; interface QueuedActivation { activationMode: WorkflowActivateMode; @@ -664,6 +665,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { await this.addActiveWorkflows('leadershipChange'); } + @OnShutdown() async removeAllTriggerAndPollerBasedWorkflows() { await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); } diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index fec5f6cd8c..e10bacd77e 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -17,6 +17,7 @@ import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } fr import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; import { RedisService } from './services/redis.service'; import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OnShutdown } from '@/decorators/OnShutdown'; type FeatureReturnType = Partial< { @@ -30,6 +31,8 @@ export class License { private redisPublisher: RedisServicePubSubPublisher; + private isShuttingDown = false; + constructor( private readonly logger: Logger, private readonly instanceSettings: InstanceSettings, @@ -40,6 +43,11 @@ export class License { async init(instanceType: N8nInstanceType = 'main') { if (this.manager) { + this.logger.warn('License manager already initialized or shutting down'); + return; + } + if (this.isShuttingDown) { + this.logger.warn('License manager already shutting down'); return; } @@ -191,7 +199,12 @@ export class License { await this.manager.renew(); } + @OnShutdown() async shutdown() { + // Shut down License manager to unclaim any floating entitlements + // Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete + this.isShuttingDown = true; + if (!this.manager) { return; } diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 5a7afc2a5d..8d1dc08149 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -6,6 +6,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import { Container, Service } from 'typedi'; import assert from 'assert'; import { exec as callbackExec } from 'child_process'; import { access as fsAccess } from 'fs/promises'; @@ -84,7 +85,6 @@ import { handleLdapInit, isLdapEnabled } from './Ldap/helpers'; import { AbstractServer } from './AbstractServer'; import { PostHogClient } from './posthog'; import { eventBus } from './eventbus'; -import { Container } from 'typedi'; import { InternalHooks } from './InternalHooks'; import { License } from './License'; import { getStatusUsingPreviousExecutionStatusMethod } from './executions/executionHelpers'; @@ -124,6 +124,7 @@ import { PasswordUtility } from './services/password.utility'; const exec = promisify(callbackExec); +@Service() export class Server extends AbstractServer { private endpointPresetCredentials: string; diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/WebhookServer.ts index a8daadc51a..60f59f606d 100644 --- a/packages/cli/src/WebhookServer.ts +++ b/packages/cli/src/WebhookServer.ts @@ -1,5 +1,7 @@ +import { Service } from 'typedi'; import { AbstractServer } from '@/AbstractServer'; +@Service() export class WebhookServer extends AbstractServer { constructor() { super('webhook'); diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 1949bc6f69..91a356b4e1 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -22,6 +22,7 @@ import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager import { initExpressionEvaluator } from '@/ExpressionEvaluator'; import { generateHostInstanceId } from '@db/utils/generators'; import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee'; +import { ShutdownService } from '@/shutdown/Shutdown.service'; export abstract class BaseCommand extends Command { protected logger = Container.get(Logger); @@ -38,7 +39,7 @@ export abstract class BaseCommand extends Command { protected server?: AbstractServer; - protected isShuttingDown = false; + protected shutdownService: ShutdownService = Container.get(ShutdownService); /** * How long to wait for graceful shutdown before force killing the process. @@ -309,7 +310,7 @@ export abstract class BaseCommand extends Command { private onTerminationSignal(signal: string) { return async () => { - if (this.isShuttingDown) { + if (this.shutdownService.isShuttingDown()) { this.logger.info(`Received ${signal}. Already shutting down...`); return; } @@ -323,9 +324,9 @@ export abstract class BaseCommand extends Command { }, this.gracefulShutdownTimeoutInS * 1000); this.logger.info(`Received ${signal}. Shutting down...`); - this.isShuttingDown = true; + this.shutdownService.shutdown(); - await this.stopProcess(); + await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]); clearTimeout(forceShutdownTimer); }; diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index d48ea8567f..be9039fd6b 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -63,7 +63,7 @@ export class Start extends BaseCommand { protected activeWorkflowRunner: ActiveWorkflowRunner; - protected server = new Server(); + protected server = Container.get(Server); private pruningService: PruningService; @@ -101,14 +101,6 @@ export class Start extends BaseCommand { await this.externalHooks?.run('n8n.stop', []); - // Shut down License manager to unclaim any floating entitlements - // Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete - await Container.get(License).shutdown(); - - if (this.pruningService.isPruningEnabled()) { - this.pruningService.stopPruning(); - } - if (Container.get(MultiMainSetup).isEnabled) { await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 07374ecf03..1ff1dd8bb8 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -19,7 +19,7 @@ export class Webhook extends BaseCommand { help: flags.help({ char: 'h' }), }; - protected server = new WebhookServer(); + protected server = Container.get(WebhookServer); constructor(argv: string[], cmdConfig: IConfig) { super(argv, cmdConfig); diff --git a/packages/cli/src/decorators/OnShutdown.ts b/packages/cli/src/decorators/OnShutdown.ts new file mode 100644 index 0000000000..87e8a6a457 --- /dev/null +++ b/packages/cli/src/decorators/OnShutdown.ts @@ -0,0 +1,38 @@ +import { Container } from 'typedi'; +import { ApplicationError } from 'n8n-workflow'; +import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service'; + +/** + * Decorator that registers a method as a shutdown hook. The method will + * be called when the application is shutting down. + * + * Priority is used to determine the order in which the hooks are called. + * + * NOTE: Requires also @Service() decorator to be used on the class. + * + * @example + * ```ts + * @Service() + * class MyClass { + * @OnShutdown() + * async shutdown() { + * // Will be called when the app is shutting down + * } + * } + * ``` + */ +export const OnShutdown = + (priority = 100): MethodDecorator => + (prototype, propertyKey, descriptor) => { + const serviceClass = prototype.constructor as ServiceClass; + const methodName = String(propertyKey); + // TODO: assert that serviceClass is decorated with @Service + if (typeof descriptor?.value === 'function') { + Container.get(ShutdownService).register(priority, { serviceClass, methodName }); + } else { + const name = `${serviceClass.name}.${methodName}()`; + throw new ApplicationError( + `${name} must be a method on ${serviceClass.name} to use "OnShutdown"`, + ); + } + }; diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index f990e212f6..42adadaa7b 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -94,4 +94,17 @@ export abstract class AbstractPush extends EventEmitter { this.sendToSessions(type, data, userSessionIds); } + + /** + * Closes all push existing connections + */ + closeAllConnections() { + for (const sessionId in this.connections) { + // Signal the connection that we want to close it. + // We are not removing the sessions here because it should be + // the implementation's responsibility to do so once the connection + // has actually closed. + this.close(this.connections[sessionId]); + } + } } diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index e89c5a7a5b..d8705a475c 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -14,6 +14,7 @@ import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import type { IPushDataType } from '@/Interfaces'; import type { User } from '@db/entities/User'; +import { OnShutdown } from '@/decorators/OnShutdown'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -70,6 +71,11 @@ export class Push extends EventEmitter { sendToUsers(type: IPushDataType, data: D, userIds: Array) { this.backend.sendToUsers(type, data, userIds); } + + @OnShutdown() + onShutdown(): void { + this.backend.closeAllConnections(); + } } export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index 0727881a08..10b7d589c5 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -10,6 +10,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; import { ExecutionEntity } from '@db/entities/ExecutionEntity'; import { jsonStringify } from 'n8n-workflow'; +import { OnShutdown } from '@/decorators/OnShutdown'; @Service() export class PruningService { @@ -24,6 +25,8 @@ export class PruningService { public hardDeletionTimeout: NodeJS.Timeout | undefined; + private isShuttingDown = false; + constructor( private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, @@ -54,6 +57,11 @@ export class PruningService { * @important Call this method only after DB migrations have completed. */ startPruning() { + if (this.isShuttingDown) { + this.logger.warn('[Pruning] Cannot start pruning while shutting down'); + return; + } + this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers'); this.setSoftDeletionInterval(); @@ -158,6 +166,12 @@ export class PruningService { this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected }); } + @OnShutdown() + shutdown(): void { + this.isShuttingDown = true; + this.stopPruning(); + } + /** * Permanently remove all soft-deleted executions and their binary data, in a pruning cycle. * @return Delay in ms after which the next cycle should be started diff --git a/packages/cli/src/shutdown/Shutdown.service.ts b/packages/cli/src/shutdown/Shutdown.service.ts new file mode 100644 index 0000000000..b52d8ab11a --- /dev/null +++ b/packages/cli/src/shutdown/Shutdown.service.ts @@ -0,0 +1,85 @@ +import { Container, Service } from 'typedi'; +import { ApplicationError, ErrorReporterProxy, assert } from 'n8n-workflow'; +import { Logger } from '@/Logger'; + +export interface ServiceClass { + new (): Record Promise | void>; +} + +export interface ShutdownHandler { + serviceClass: ServiceClass; + methodName: string; +} + +/** Error reported when a listener fails to shutdown gracefully */ +export class ComponentShutdownError extends ApplicationError { + constructor(componentName: string, cause: Error) { + super('Failed to shutdown gracefully', { + level: 'error', + cause, + extra: { component: componentName }, + }); + } +} + +/** Service responsible for orchestrating a graceful shutdown of the application */ +@Service() +export class ShutdownService { + private readonly handlersByPriority: ShutdownHandler[][] = []; + + private shutdownPromise: Promise | undefined; + + constructor(private readonly logger: Logger) {} + + /** Registers given listener to be notified when the application is shutting down */ + register(priority: number, handler: ShutdownHandler) { + if (!this.handlersByPriority[priority]) { + this.handlersByPriority[priority] = []; + } + this.handlersByPriority[priority].push(handler); + } + + /** Signals all registered listeners that the application is shutting down */ + shutdown() { + if (this.shutdownPromise) { + throw new ApplicationError('App is already shutting down'); + } + + this.shutdownPromise = this.startShutdown(); + } + + /** Returns a promise that resolves when all the registered listeners have shut down */ + async waitForShutdown(): Promise { + if (!this.shutdownPromise) { + throw new ApplicationError('App is not shutting down'); + } + + await this.shutdownPromise; + } + + isShuttingDown() { + return !!this.shutdownPromise; + } + + private async startShutdown() { + const handlers = Object.values(this.handlersByPriority).reverse(); + for (const handlerGroup of handlers) { + await Promise.allSettled( + handlerGroup.map(async (handler) => this.shutdownComponent(handler)), + ); + } + } + + private async shutdownComponent({ serviceClass, methodName }: ShutdownHandler) { + const name = `${serviceClass.name}.${methodName}()`; + try { + this.logger.debug(`Shutting down component "${name}"`); + const service = Container.get(serviceClass); + const method = service[methodName]; + await method.call(service); + } catch (error) { + assert(error instanceof Error); + ErrorReporterProxy.error(new ComponentShutdownError(name, error)); + } + } +} diff --git a/packages/cli/test/unit/decorators/OnShutdown.test.ts b/packages/cli/test/unit/decorators/OnShutdown.test.ts new file mode 100644 index 0000000000..1870d95122 --- /dev/null +++ b/packages/cli/test/unit/decorators/OnShutdown.test.ts @@ -0,0 +1,76 @@ +import Container, { Service } from 'typedi'; +import { OnShutdown } from '@/decorators/OnShutdown'; +import { ShutdownService } from '@/shutdown/Shutdown.service'; +import { mock } from 'jest-mock-extended'; + +describe('OnShutdown', () => { + let shutdownService: ShutdownService; + let registerSpy: jest.SpyInstance; + + beforeEach(() => { + shutdownService = new ShutdownService(mock()); + Container.set(ShutdownService, shutdownService); + registerSpy = jest.spyOn(shutdownService, 'register'); + }); + + it('should register a methods that is decorated with OnShutdown', () => { + @Service() + class TestClass { + @OnShutdown() + async onShutdown() {} + } + + expect(shutdownService.register).toHaveBeenCalledTimes(1); + expect(shutdownService.register).toHaveBeenCalledWith(100, { + methodName: 'onShutdown', + serviceClass: TestClass, + }); + }); + + it('should register multiple methods in the same class', () => { + @Service() + class TestClass { + @OnShutdown() + async one() {} + + @OnShutdown() + async two() {} + } + + expect(shutdownService.register).toHaveBeenCalledTimes(2); + expect(shutdownService.register).toHaveBeenCalledWith(100, { + methodName: 'one', + serviceClass: TestClass, + }); + expect(shutdownService.register).toHaveBeenCalledWith(100, { + methodName: 'two', + serviceClass: TestClass, + }); + }); + + it('should use the given priority', () => { + class TestClass { + @OnShutdown(10) + async onShutdown() { + // Will be called when the app is shutting down + } + } + + expect(shutdownService.register).toHaveBeenCalledTimes(1); + // @ts-expect-error We are checking internal parts of the shutdown service + expect(shutdownService.handlersByPriority[10].length).toEqual(1); + }); + + it('should throw an error if the decorated member is not a function', () => { + expect(() => { + @Service() + class TestClass { + // @ts-expect-error Testing invalid code + @OnShutdown() + onShutdown = 'not a function'; + } + + new TestClass(); + }).toThrow('TestClass.onShutdown() must be a method on TestClass to use "OnShutdown"'); + }); +}); diff --git a/packages/cli/test/unit/shutdown/Shutdown.service.test.ts b/packages/cli/test/unit/shutdown/Shutdown.service.test.ts new file mode 100644 index 0000000000..d0f761524e --- /dev/null +++ b/packages/cli/test/unit/shutdown/Shutdown.service.test.ts @@ -0,0 +1,127 @@ +import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow'; +import { mock } from 'jest-mock-extended'; +import type { ServiceClass } from '@/shutdown/Shutdown.service'; +import { ShutdownService } from '@/shutdown/Shutdown.service'; +import Container from 'typedi'; + +class MockComponent { + onShutdown() {} +} + +describe('ShutdownService', () => { + let shutdownService: ShutdownService; + let mockComponent: MockComponent; + let onShutdownSpy: jest.SpyInstance; + let mockErrorReporterProxy: jest.SpyInstance; + + beforeEach(() => { + shutdownService = new ShutdownService(mock()); + mockComponent = new MockComponent(); + Container.set(MockComponent, mockComponent); + onShutdownSpy = jest.spyOn(mockComponent, 'onShutdown'); + mockErrorReporterProxy = jest.spyOn(ErrorReporterProxy, 'error').mockImplementation(() => {}); + }); + + describe('shutdown', () => { + it('should signal shutdown', () => { + shutdownService.register(10, { + serviceClass: MockComponent as unknown as ServiceClass, + methodName: 'onShutdown', + }); + shutdownService.shutdown(); + expect(onShutdownSpy).toBeCalledTimes(1); + }); + + it('should signal shutdown in the priority order', async () => { + class MockService { + onShutdownHighPrio() {} + + onShutdownLowPrio() {} + } + + const order: string[] = []; + const mockService = new MockService(); + Container.set(MockService, mockService); + + jest.spyOn(mockService, 'onShutdownHighPrio').mockImplementation(() => order.push('high')); + jest.spyOn(mockService, 'onShutdownLowPrio').mockImplementation(() => order.push('low')); + + shutdownService.register(100, { + serviceClass: MockService as unknown as ServiceClass, + methodName: 'onShutdownHighPrio', + }); + + shutdownService.register(10, { + serviceClass: MockService as unknown as ServiceClass, + methodName: 'onShutdownLowPrio', + }); + + shutdownService.shutdown(); + await shutdownService.waitForShutdown(); + expect(order).toEqual(['high', 'low']); + }); + + it('should throw error if shutdown is already in progress', () => { + shutdownService.register(10, { + methodName: 'onShutdown', + serviceClass: MockComponent as unknown as ServiceClass, + }); + shutdownService.shutdown(); + expect(() => shutdownService.shutdown()).toThrow('App is already shutting down'); + }); + + it('should report error if component shutdown fails', async () => { + const componentError = new Error('Something went wrong'); + onShutdownSpy.mockImplementation(() => { + throw componentError; + }); + shutdownService.register(10, { + serviceClass: MockComponent as unknown as ServiceClass, + methodName: 'onShutdown', + }); + shutdownService.shutdown(); + await shutdownService.waitForShutdown(); + + expect(mockErrorReporterProxy).toHaveBeenCalledTimes(1); + const error = mockErrorReporterProxy.mock.calls[0][0]; + expect(error).toBeInstanceOf(ApplicationError); + expect(error.message).toBe('Failed to shutdown gracefully'); + expect(error.extra).toEqual({ + component: 'MockComponent.onShutdown()', + }); + expect(error.cause).toBe(componentError); + }); + }); + + describe('waitForShutdown', () => { + it('should wait for shutdown', async () => { + shutdownService.register(10, { + serviceClass: MockComponent as unknown as ServiceClass, + methodName: 'onShutdown', + }); + shutdownService.shutdown(); + await expect(shutdownService.waitForShutdown()).resolves.toBeUndefined(); + }); + + it('should throw error if app is not shutting down', async () => { + await expect(async () => shutdownService.waitForShutdown()).rejects.toThrow( + 'App is not shutting down', + ); + }); + }); + + describe('isShuttingDown', () => { + it('should return true if app is shutting down', () => { + shutdownService.register(10, { + serviceClass: MockComponent as unknown as ServiceClass, + methodName: 'onShutdown', + }); + shutdownService.shutdown(); + expect(shutdownService.isShuttingDown()).toBe(true); + }); + + it('should return false if app is not shutting down', () => { + expect(shutdownService.isShuttingDown()).toBe(false); + }); + }); +});