From 7b396e78c61d1d020a1ab377525979564069341d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 20 Jun 2024 12:55:07 +0200 Subject: [PATCH] refactor(core): Introduce `RedisClientService` (no-changelog) (#9774) --- packages/cli/src/Queue.ts | 32 +--- packages/cli/src/config/schema.ts | 2 +- packages/cli/src/config/types.ts | 3 +- .../cli/src/services/cache/cache.service.ts | 16 +- .../cli/src/services/orchestration/helpers.ts | 2 +- .../orchestration/main/MultiMainSetup.ee.ts | 9 +- .../orchestration.handler.main.service.ts | 5 +- .../orchestration.handler.webhook.service.ts | 2 +- .../worker/handleCommandMessageWorker.ts | 2 +- .../cli/src/services/redis/RedisConstants.ts | 6 + .../services/redis/RedisServiceBaseClasses.ts | 9 +- .../src/services/redis/RedisServiceHelper.ts | 151 ---------------- .../redis/RedisServicePubSubPublisher.ts | 2 +- .../redis/RedisServicePubSubSubscriber.ts | 2 +- .../services/redis/redis-client.service.ts | 161 ++++++++++++++++++ .../services/orchestration.service.test.ts | 21 +-- .../test/unit/services/redis.service.test.ts | 29 ++-- 17 files changed, 231 insertions(+), 223 deletions(-) create mode 100644 packages/cli/src/services/redis/RedisConstants.ts delete mode 100644 packages/cli/src/services/redis/RedisServiceHelper.ts create mode 100644 packages/cli/src/services/redis/redis-client.service.ts diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 1618485b68..fbb53b9180 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,5 +1,5 @@ import type Bull from 'bull'; -import { Service } from 'typedi'; +import Container, { Service } from 'typedi'; import { ApplicationError, BINARY_ENCODING, @@ -8,14 +8,6 @@ import { type IExecuteResponsePromiseData, } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; - -import { - getRedisClusterClient, - getRedisClusterNodes, - getRedisPrefix, - getRedisStandardClient, -} from './services/redis/RedisServiceHelper'; -import type { RedisClientType } from './services/redis/RedisServiceBaseClasses'; import config from '@/config'; export type JobId = Bull.JobId; @@ -44,26 +36,18 @@ export class Queue { constructor(private activeExecutions: ActiveExecutions) {} async init() { - const bullPrefix = config.getEnv('queue.bull.prefix'); - const prefix = getRedisPrefix(bullPrefix); - const clusterNodes = getRedisClusterNodes(); - const usesRedisCluster = clusterNodes.length > 0; - const { default: Bull } = await import('bull'); + const { RedisClientService } = await import('@/services/redis/redis-client.service'); + + const redisClientService = Container.get(RedisClientService); + + const bullPrefix = config.getEnv('queue.bull.prefix'); + const prefix = redisClientService.toValidPrefix(bullPrefix); - const { default: Redis } = await import('ioredis'); - // Disabling ready check is necessary as it allows worker to - // quickly reconnect to Redis if Redis crashes or is unreachable - // for some time. With it enabled, worker might take minutes to realize - // redis is back up and resume working. - // More here: https://github.com/OptimalBits/bull/issues/890 this.jobQueue = new Bull('jobs', { prefix, settings: config.get('queue.bull.settings'), - createClient: (type, clientConfig) => - usesRedisCluster - ? getRedisClusterClient(Redis, clientConfig, (type + '(bull)') as RedisClientType) - : getRedisStandardClient(Redis, clientConfig, (type + '(bull)') as RedisClientType), + createClient: (type) => redisClientService.createClient({ type: `${type}(bull)` }), }); this.jobQueue.on('global:progress', (_jobId, progress: WebhookResponse) => { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 730d279212..46489dac96 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -436,7 +436,7 @@ export const schema = { env: 'QUEUE_BULL_REDIS_PORT', }, timeoutThreshold: { - doc: 'Redis timeout threshold', + doc: 'Max cumulative timeout (in milliseconds) of connection retries before process exit', format: Number, default: 10000, env: 'QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD', diff --git a/packages/cli/src/config/types.ts b/packages/cli/src/config/types.ts index 220f579239..025941c205 100644 --- a/packages/cli/src/config/types.ts +++ b/packages/cli/src/config/types.ts @@ -2,6 +2,7 @@ import type { BinaryData } from 'n8n-core'; import type { schema } from './schema'; +import type { RedisOptions } from 'ioredis'; // ----------------------------------- // transformers @@ -74,7 +75,7 @@ type ToReturnType = T extends NumericPath : unknown; type ExceptionPaths = { - 'queue.bull.redis': object; + 'queue.bull.redis': RedisOptions; binaryDataManager: BinaryData.Config; 'nodes.exclude': string[] | undefined; 'nodes.include': string[] | undefined; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 226c68438f..8e9a4dc95c 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -1,11 +1,10 @@ import EventEmitter from 'node:events'; -import { Service } from 'typedi'; +import Container, { Service } from 'typedi'; import { caching } from 'cache-manager'; import { ApplicationError, jsonStringify } from 'n8n-workflow'; import config from '@/config'; -import { getDefaultRedisClient, getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { UncacheableValueError } from '@/errors/cache-errors/uncacheable-value.error'; import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refresh-value.error'; import type { @@ -29,8 +28,17 @@ export class CacheService extends EventEmitter { const useRedis = backend === 'redis' || (backend === 'auto' && mode === 'queue'); if (useRedis) { - const keyPrefix = `${getRedisPrefix()}:${config.getEnv('cache.redis.prefix')}:`; - const redisClient = await getDefaultRedisClient({ keyPrefix }, 'client(cache)'); + const { RedisClientService } = await import('../redis/redis-client.service'); + const redisClientService = Container.get(RedisClientService); + + const prefixBase = config.getEnv('redis.prefix'); + const cachePrefix = config.getEnv('cache.redis.prefix'); + const prefix = redisClientService.toValidPrefix(`${prefixBase}:${cachePrefix}:`); + + const redisClient = redisClientService.createClient({ + type: 'client(cache)', + extraOptions: { keyPrefix: prefix }, + }); const { redisStoreUsingClient } = await import('@/services/cache/redis.cache-manager'); const redisStore = redisStoreUsingClient(redisClient, { ttl }); diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts index 20fed52871..c5ccd43636 100644 --- a/packages/cli/src/services/orchestration/helpers.ts +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -2,7 +2,7 @@ import { Container } from 'typedi'; import { jsonParse } from 'n8n-workflow'; import { Logger } from '@/Logger'; import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; -import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL } from '../redis/RedisConstants'; import * as os from 'os'; export interface RedisServiceCommandLastReceived { diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 7498a0c03b..19cd14c4a8 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -2,16 +2,17 @@ import { EventEmitter } from 'node:events'; import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; -import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { Logger } from '@/Logger'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import { RedisClientService } from '@/services/redis/redis-client.service'; @Service() export class MultiMainSetup extends EventEmitter { constructor( private readonly logger: Logger, private readonly redisPublisher: RedisServicePubSubPublisher, + private readonly redisClientService: RedisClientService, ) { super(); } @@ -20,13 +21,17 @@ export class MultiMainSetup extends EventEmitter { return config.getEnv('redis.queueModeId'); } - private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; + private leaderKey: string; private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); private leaderCheckInterval: NodeJS.Timer | undefined; async init() { + const prefix = config.getEnv('redis.prefix'); + const validPrefix = this.redisClientService.toValidPrefix(prefix); + this.leaderKey = validPrefix + ':main_instance_leader'; + await this.tryBecomeLeader(); // prevent initial wait this.leaderCheckInterval = setInterval( diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 9206f9db15..6cc86c9f51 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -1,8 +1,5 @@ import { Service } from 'typedi'; -import { - COMMAND_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from '../../redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redis/RedisConstants'; import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; import { handleCommandMessageMain } from './handleCommandMessageMain'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts index 548086c040..73ad86d0d4 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts @@ -1,5 +1,5 @@ import { Service } from 'typedi'; -import { COMMAND_REDIS_CHANNEL } from '../../redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL } from '../../redis/RedisConstants'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { handleCommandMessageWebhook } from './handleCommandMessageWebhook'; diff --git a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts index d825428f9e..f5d9efa4dc 100644 --- a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts +++ b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts @@ -1,7 +1,7 @@ import { jsonParse } from 'n8n-workflow'; import Container from 'typedi'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; -import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisConstants'; import * as os from 'os'; import { License } from '@/License'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; diff --git a/packages/cli/src/services/redis/RedisConstants.ts b/packages/cli/src/services/redis/RedisConstants.ts new file mode 100644 index 0000000000..281817b9c7 --- /dev/null +++ b/packages/cli/src/services/redis/RedisConstants.ts @@ -0,0 +1,6 @@ +export const EVENT_BUS_REDIS_STREAM = 'n8n:eventstream'; +export const COMMAND_REDIS_STREAM = 'n8n:commandstream'; +export const WORKER_RESPONSE_REDIS_STREAM = 'n8n:workerstream'; +export const COMMAND_REDIS_CHANNEL = 'n8n.commands'; +export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response'; +export const WORKER_RESPONSE_REDIS_LIST = 'n8n:list:worker-response'; diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index b185a1228f..e3bfae0411 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -3,7 +3,7 @@ import type { Cluster } from 'ioredis'; import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/Logger'; -import { getDefaultRedisClient } from './RedisServiceHelper'; +import { RedisClientService } from './redis-client.service'; export type RedisClientType = | 'subscriber' @@ -29,13 +29,16 @@ class RedisServiceBase { isInitialized = false; - constructor(protected readonly logger: Logger) {} + constructor( + protected readonly logger: Logger, + private readonly redisClientService: RedisClientService, + ) {} async init(type: RedisClientType = 'client'): Promise { if (this.redisClient && this.isInitialized) { return; } - this.redisClient = await getDefaultRedisClient(undefined, type); + this.redisClient = this.redisClientService.createClient({ type }); this.redisClient.on('close', () => { this.logger.warn('Redis unavailable - trying to reconnect...'); diff --git a/packages/cli/src/services/redis/RedisServiceHelper.ts b/packages/cli/src/services/redis/RedisServiceHelper.ts deleted file mode 100644 index 32a72fb22f..0000000000 --- a/packages/cli/src/services/redis/RedisServiceHelper.ts +++ /dev/null @@ -1,151 +0,0 @@ -import type Redis from 'ioredis'; -import type { Cluster, RedisOptions } from 'ioredis'; -import config from '@/config'; -import type { RedisClientType } from './RedisServiceBaseClasses'; -import Container from 'typedi'; -import { Logger } from '@/Logger'; - -export const EVENT_BUS_REDIS_STREAM = 'n8n:eventstream'; -export const COMMAND_REDIS_STREAM = 'n8n:commandstream'; -export const WORKER_RESPONSE_REDIS_STREAM = 'n8n:workerstream'; -export const COMMAND_REDIS_CHANNEL = 'n8n.commands'; -export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response'; -export const WORKER_RESPONSE_REDIS_LIST = 'n8n:list:worker-response'; - -export function getRedisClusterNodes(): Array<{ host: string; port: number }> { - const clusterNodePairs = config - .getEnv('queue.bull.redis.clusterNodes') - .split(',') - .filter((e) => e); - return clusterNodePairs.map((pair) => { - const [host, port] = pair.split(':'); - return { host, port: parseInt(port) }; - }); -} - -export function getRedisPrefix(customPrefix?: string): string { - let prefix = customPrefix ?? config.getEnv('redis.prefix'); - if (prefix && getRedisClusterNodes().length > 0) { - if (!prefix.startsWith('{')) { - prefix = '{' + prefix; - } - if (!prefix.endsWith('}')) { - prefix += '}'; - } - } - return prefix; -} - -export function getRedisStandardClient( - redis: typeof Redis, - redisOptions?: RedisOptions, - redisType?: RedisClientType, -): Redis | Cluster { - let lastTimer = 0; - let cumulativeTimeout = 0; - const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - const sharedRedisOptions: RedisOptions = { - ...redisOptions, - host, - port, - username, - password, - db, - enableReadyCheck: false, - maxRetriesPerRequest: null, - }; - if (config.getEnv('queue.bull.redis.tls')) sharedRedisOptions.tls = {}; - - const logger = Container.get(Logger); - logger.debug( - `Initialising Redis client${redisType ? ` of type ${redisType}` : ''} connection with host: ${ - host ?? 'localhost' - } and port: ${port ?? '6379'}`, - ); - return new redis({ - ...sharedRedisOptions, - retryStrategy: (): number | null => { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - logger.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - return 500; - }, - }); -} - -export function getRedisClusterClient( - redis: typeof Redis, - redisOptions?: RedisOptions, - redisType?: RedisClientType, -): Cluster { - let lastTimer = 0; - let cumulativeTimeout = 0; - const clusterNodes = getRedisClusterNodes(); - const { username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - const sharedRedisOptions: RedisOptions = { - ...redisOptions, - username, - password, - db, - enableReadyCheck: false, - maxRetriesPerRequest: null, - }; - if (config.getEnv('queue.bull.redis.tls')) sharedRedisOptions.tls = {}; - - const logger = Container.get(Logger); - logger.debug( - `Initialising Redis cluster${ - redisType ? ` of type ${redisType}` : '' - } connection with nodes: ${clusterNodes.map((e) => `${e.host}:${e.port}`).join(',')}`, - ); - return new redis.Cluster( - clusterNodes.map((node) => ({ host: node.host, port: node.port })), - { - redisOptions: sharedRedisOptions, - clusterRetryStrategy: (): number | null => { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - logger.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - return 500; - }, - }, - ); -} - -export async function getDefaultRedisClient( - additionalRedisOptions?: RedisOptions, - redisType?: RedisClientType, -): Promise { - const { default: Redis } = await import('ioredis'); - const clusterNodes = getRedisClusterNodes(); - const usesRedisCluster = clusterNodes.length > 0; - return usesRedisCluster - ? getRedisClusterClient(Redis, additionalRedisOptions, redisType) - : getRedisStandardClient(Redis, additionalRedisOptions, redisType); -} diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index b029b6546a..ff810ba7c6 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -1,5 +1,5 @@ import { Service } from 'typedi'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisConstants'; import type { RedisServiceCommandObject, RedisServiceWorkerResponseObject, diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index a3474c3149..8751826428 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -1,5 +1,5 @@ import { Service } from 'typedi'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisConstants'; import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; @Service() diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts new file mode 100644 index 0000000000..9de12d30cf --- /dev/null +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -0,0 +1,161 @@ +import { Service } from 'typedi'; +import config from '@/config'; +import { Logger } from '@/Logger'; +import ioRedis from 'ioredis'; +import type { Cluster, RedisOptions } from 'ioredis'; +import type { RedisClientType } from './RedisServiceBaseClasses'; +import { OnShutdown } from '@/decorators/OnShutdown'; + +@Service() +export class RedisClientService { + private readonly clients = new Set(); + + constructor(private readonly logger: Logger) {} + + createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { + const client = + this.clusterNodes().length > 0 + ? this.createClusterClient(arg) + : this.createRegularClient(arg); + + this.clients.add(client); + + return client; + } + + @OnShutdown() + disconnectClients() { + for (const client of this.clients) { + client.disconnect(); + } + } + + /** + * Ensure prefix is wrapped in curly braces for Redis cluster. + * See: https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md + */ + toValidPrefix(prefix: string) { + if (this.clusterNodes().length > 0) { + if (!prefix.startsWith('{')) prefix = '{' + prefix; + if (!prefix.endsWith('}')) prefix += '}'; + } + + return prefix; + } + + // ---------------------------------- + // private + // ---------------------------------- + + private createRegularClient({ + type, + extraOptions, + }: { + type: RedisClientType; + extraOptions?: RedisOptions; + }) { + const options = this.getOptions({ extraOptions }); + + const { host, port } = config.getEnv('queue.bull.redis'); + + options.host = host; + options.port = port; + + this.logger.debug('[Redis] Initializing regular client', { type, host, port }); + + return new ioRedis(options); + } + + private createClusterClient({ + type, + extraOptions, + }: { + type: string; + extraOptions?: RedisOptions; + }) { + const options = this.getOptions({ extraOptions }); + + const clusterNodes = this.clusterNodes(); + + this.logger.debug('[Redis] Initializing cluster client', { type, clusterNodes }); + + return new ioRedis.Cluster(clusterNodes, { + redisOptions: options, + clusterRetryStrategy: this.retryStrategy(), + }); + } + + private getOptions({ extraOptions }: { extraOptions?: RedisOptions }) { + const { username, password, db, tls } = config.getEnv('queue.bull.redis'); + + /** + * Disabling ready check allows quick reconnection to Redis if Redis becomes + * temporarily unreachable. With ready check enabled, the client might take + * minutes to realize Redis is back up and resume working. + * + * See: + * - https://github.com/OptimalBits/bull/issues/890 + * - https://github.com/OptimalBits/bull/issues/1873 + * - https://github.com/OptimalBits/bull/pull/2185 + */ + const options: RedisOptions = { + username, + password, + db, + enableReadyCheck: false, + maxRetriesPerRequest: null, + retryStrategy: this.retryStrategy(), + ...extraOptions, + }; + + if (tls) options.tls = {}; // enable TLS with default Node.js settings + + return options; + } + + /** + * Strategy to retry connecting to Redis on connection failure. + * + * Try to reconnect every 500ms. On every failed attempt, increment a timeout + * counter - if the cumulative timeout exceeds a limit, exit the process. + * Reset the cumulative timeout if >30s between reconnection attempts. + */ + private retryStrategy() { + const RETRY_INTERVAL = 500; // ms + const RESET_LENGTH = 30_000; // ms + const MAX_TIMEOUT = config.getEnv('queue.bull.redis.timeoutThreshold'); + + let lastAttemptTs = 0; + let cumulativeTimeout = 0; + + return () => { + const nowTs = Date.now(); + + if (nowTs - lastAttemptTs > RESET_LENGTH) { + cumulativeTimeout = 0; + lastAttemptTs = nowTs; + } else { + cumulativeTimeout += nowTs - lastAttemptTs; + lastAttemptTs = nowTs; + if (cumulativeTimeout > MAX_TIMEOUT) { + this.logger.error(`[Redis] Unable to connect after max timeout of ${MAX_TIMEOUT} ms`); + this.logger.error('Exiting process...'); + process.exit(1); + } + } + + return RETRY_INTERVAL; + }; + } + + private clusterNodes() { + return config + .getEnv('queue.bull.redis.clusterNodes') + .split(',') + .filter((pair) => pair.trim().length > 0) + .map((pair) => { + const [host, port] = pair.split(':'); + return { host, port: parseInt(port) }; + }); + } +} diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index b7824d9b3f..43aba8d484 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -14,6 +14,13 @@ import { Push } from '@/push'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { mockInstance } from '../../shared/mocking'; import type { WorkflowActivateMode } from 'n8n-workflow'; +import { RedisClientService } from '@/services/redis/redis-client.service'; +import type Redis from 'ioredis'; +import { mock } from 'jest-mock-extended'; + +const redisClientService = mockInstance(RedisClientService); +const mockRedisClient = mock(); +redisClientService.createClient.mockReturnValue(mockRedisClient); const os = Container.get(OrchestrationService); const handler = Container.get(OrchestrationHandlerMainService); @@ -43,20 +50,6 @@ describe('Orchestration Service', () => { const eventBus = mockInstance(MessageEventBus); beforeAll(async () => { - jest.mock('ioredis', () => { - const Redis = require('ioredis-mock'); - if (typeof Redis === 'object') { - // the first mock is an ioredis shim because ioredis-mock depends on it - // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 - return { - Command: { _transformer: { argument: {}, reply: {} } }, - }; - } - // second mock for our code - return function (...args: any) { - return new Redis(args); - }; - }); jest.mock('@/services/redis/RedisServicePubSubPublisher', () => { return jest.fn().mockImplementation(() => { return { diff --git a/packages/cli/test/unit/services/redis.service.test.ts b/packages/cli/test/unit/services/redis.service.test.ts index efa426b84a..cb963ad535 100644 --- a/packages/cli/test/unit/services/redis.service.test.ts +++ b/packages/cli/test/unit/services/redis.service.test.ts @@ -4,6 +4,21 @@ import config from '@/config'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../shared/mocking'; +jest.mock('ioredis', () => { + const Redis = require('ioredis-mock'); + if (typeof Redis === 'object') { + // the first mock is an ioredis shim because ioredis-mock depends on it + // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 + return { + Command: { _transformer: { argument: {}, reply: {} } }, + }; + } + // second mock for our code + return function (...args: any) { + return new Redis(args); + }; +}); + mockInstance(Logger); const redisService = Container.get(RedisService); @@ -15,20 +30,6 @@ const PUBSUB_CHANNEL = 'testchannel'; describe('RedisService', () => { beforeAll(async () => { - jest.mock('ioredis', () => { - const Redis = require('ioredis-mock'); - if (typeof Redis === 'object') { - // the first mock is an ioredis shim because ioredis-mock depends on it - // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 - return { - Command: { _transformer: { argument: {}, reply: {} } }, - }; - } - // second mock for our code - return function (...args: any) { - return new Redis(args); - }; - }); setDefaultConfig(); });