From c82579bf760cc4b5a2670b14e4e48fc37e2e2263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 4 Jul 2024 18:07:47 +0200 Subject: [PATCH] fix(core): Disconnect Redis after pausing queue during worker shutdown (#9928) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/cli/src/Queue.ts | 10 +++++----- packages/cli/src/commands/worker.ts | 3 --- packages/cli/src/decorators/OnShutdown.ts | 6 +++++- .../cli/src/services/redis/RedisServiceBaseClasses.ts | 4 ---- .../cli/src/services/redis/redis-client.service.ts | 6 ++++-- .../cli/test/integration/shared/utils/testCommand.ts | 1 + 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index fce59867b5..c7aacaab6d 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -9,6 +9,7 @@ import { } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; +import { HIGHEST_PRIORITY, OnShutdown } from './decorators/OnShutdown'; export type JobId = Bull.JobId; export type Job = Bull.Job; @@ -108,11 +109,10 @@ export class Queue { return await this.jobQueue.client.ping(); } - async pause({ - isLocal, - doNotWaitActive, - }: { isLocal?: boolean; doNotWaitActive?: boolean } = {}): Promise { - return await this.jobQueue.pause(isLocal, doNotWaitActive); + @OnShutdown(HIGHEST_PRIORITY) + // Stop accepting new jobs, `doNotWaitActive` allows reporting progress + async pause(): Promise { + return await this.jobQueue?.pause(true, true); } getBullObjectInstance(): JobQueue { diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 5545378f66..2d75ce98ed 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -64,9 +64,6 @@ export class Worker extends BaseCommand { async stopProcess() { this.logger.info('Stopping n8n...'); - // Stop accepting new jobs, `doNotWaitActive` allows reporting progress - await Worker.jobQueue.pause({ isLocal: true, doNotWaitActive: true }); - try { await this.externalHooks?.run('n8n.stop', []); diff --git a/packages/cli/src/decorators/OnShutdown.ts b/packages/cli/src/decorators/OnShutdown.ts index 87e8a6a457..c5177a295b 100644 --- a/packages/cli/src/decorators/OnShutdown.ts +++ b/packages/cli/src/decorators/OnShutdown.ts @@ -2,6 +2,10 @@ import { Container } from 'typedi'; import { ApplicationError } from 'n8n-workflow'; import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service'; +export const LOWEST_PRIORITY = 0; +export const DEFAULT_PRIORITY = 100; +export const HIGHEST_PRIORITY = 200; + /** * Decorator that registers a method as a shutdown hook. The method will * be called when the application is shutting down. @@ -22,7 +26,7 @@ import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service' * ``` */ export const OnShutdown = - (priority = 100): MethodDecorator => + (priority = DEFAULT_PRIORITY): MethodDecorator => (prototype, propertyKey, descriptor) => { const serviceClass = prototype.constructor as ServiceClass; const methodName = String(propertyKey); diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index 514e511ae5..ba2cc41a91 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -40,10 +40,6 @@ class RedisServiceBase { } this.redisClient = this.redisClientService.createClient({ type }); - this.redisClient.on('close', () => { - this.logger.warn('Redis unavailable - trying to reconnect...'); - }); - this.redisClient.on('error', (error) => { if (!String(error).includes('ECONNREFUSED')) { this.logger.warn('Error with Redis: ', error); diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts index 9de12d30cf..3a282c02bc 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -4,7 +4,7 @@ import { Logger } from '@/Logger'; import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; import type { RedisClientType } from './RedisServiceBaseClasses'; -import { OnShutdown } from '@/decorators/OnShutdown'; +import { LOWEST_PRIORITY, OnShutdown } from '@/decorators/OnShutdown'; @Service() export class RedisClientService { @@ -23,7 +23,7 @@ export class RedisClientService { return client; } - @OnShutdown() + @OnShutdown(LOWEST_PRIORITY) disconnectClients() { for (const client of this.clients) { client.disconnect(); @@ -144,6 +144,8 @@ export class RedisClientService { } } + this.logger.warn('Redis unavailable - trying to reconnect...'); + return RETRY_INTERVAL; }; } diff --git a/packages/cli/test/integration/shared/utils/testCommand.ts b/packages/cli/test/integration/shared/utils/testCommand.ts index 3ec203408a..ff36791ad5 100644 --- a/packages/cli/test/integration/shared/utils/testCommand.ts +++ b/packages/cli/test/integration/shared/utils/testCommand.ts @@ -11,6 +11,7 @@ export const setupTestCommand = (Command: Class) => { // mock SIGINT/SIGTERM registration process.once = jest.fn(); + process.exit = jest.fn() as never; beforeAll(async () => { await testDb.init();