diff --git a/packages/cli/src/decorators/debounce.ts b/packages/cli/src/decorators/debounce.ts new file mode 100644 index 0000000000..6096ce522a --- /dev/null +++ b/packages/cli/src/decorators/debounce.ts @@ -0,0 +1,37 @@ +import debounce from 'lodash/debounce'; + +/** + * Debounce a class method using `lodash/debounce`. + * + * @param waitMs - Number of milliseconds to debounce method by. + * + * @example + * ``` + * class MyClass { + * @Debounce(1000) + * async myMethod() { + * // debounced + * } + * } + * ``` + */ +export const Debounce = + (waitMs: number): MethodDecorator => + ( + _: object, + methodName: string, + originalDescriptor: PropertyDescriptor, + ): TypedPropertyDescriptor => ({ + configurable: true, + + get() { + const debouncedFn = debounce(originalDescriptor.value, waitMs); + + Object.defineProperty(this, methodName, { + configurable: false, + value: debouncedFn, + }); + + return debouncedFn as T; + }, + }); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index f015890d48..09bf06e4f5 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -24,8 +24,6 @@ export class Publisher { if (config.getEnv('executions.mode') !== 'queue') return; this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' }); - - this.client.on('error', (error) => this.logger.error(error.message)); } getClient() { diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index e1951f924e..3605e71f31 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -27,8 +27,6 @@ export class Subscriber { this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); - this.client.on('error', (error) => this.logger.error(error.message)); - this.client.on('message', (channel: PubSub.Channel, message) => { this.handlers.get(channel)?.(message); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 552802ba70..6a383fb353 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -173,39 +173,11 @@ export class ScalingService { // #region Listeners private registerListeners() { - let latestAttemptTs = 0; - let cumulativeTimeoutMs = 0; - - const MAX_TIMEOUT_MS = this.globalConfig.queue.bull.redis.timeoutThreshold; - const RESET_LENGTH_MS = 30_000; - this.queue.on('error', (error: Error) => { + if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy + this.logger.error('[ScalingService] Queue errored', { error }); - /** - * On Redis connection failure, try to reconnect. On every failed attempt, - * increment a cumulative timeout - if this exceeds a limit, exit the - * process. Reset the cumulative timeout if >30s between retries. - */ - if (error.message.includes('ECONNREFUSED')) { - const nowTs = Date.now(); - if (nowTs - latestAttemptTs > RESET_LENGTH_MS) { - latestAttemptTs = nowTs; - cumulativeTimeoutMs = 0; - } else { - cumulativeTimeoutMs += nowTs - latestAttemptTs; - latestAttemptTs = nowTs; - if (cumulativeTimeoutMs > MAX_TIMEOUT_MS) { - this.logger.error('[ScalingService] Redis unavailable after max timeout'); - this.logger.error('[ScalingService] Exiting process...'); - process.exit(1); - } - } - - this.logger.warn('[ScalingService] Redis unavailable - retrying to connect...'); - return; - } - throw error; }); diff --git a/packages/cli/src/services/redis-client.service.ts b/packages/cli/src/services/redis-client.service.ts index dc0d3b8cde..1eb083a2d7 100644 --- a/packages/cli/src/services/redis-client.service.ts +++ b/packages/cli/src/services/redis-client.service.ts @@ -3,18 +3,42 @@ import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; import { Service } from 'typedi'; +import { Debounce } from '@/decorators/debounce'; import { Logger } from '@/logger'; +import { TypedEmitter } from '@/typed-emitter'; import type { RedisClientType } from '../scaling/redis/redis.types'; +type RedisEventMap = { + 'connection-lost': number; + 'connection-recovered': never; +}; + @Service() -export class RedisClientService { +export class RedisClientService extends TypedEmitter { private readonly clients = new Set(); + private readonly config = { + /** How long (in ms) to try to reconnect for before exiting. */ + maxTimeout: this.globalConfig.queue.bull.redis.timeoutThreshold, + + /** How long (in ms) to wait between reconnection attempts. */ + retryInterval: 1000, + + /** How long (in ms) to wait before resetting the cumulative timeout. */ + resetLength: 30_000, + }; + + /** Whether any client has lost connection to Redis. */ + private lostConnection = false; + constructor( private readonly logger: Logger, private readonly globalConfig: GlobalConfig, - ) {} + ) { + super(); + this.registerListeners(); + } createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { const client = @@ -22,6 +46,19 @@ export class RedisClientService { ? this.createClusterClient(arg) : this.createRegularClient(arg); + client.on('error', (error) => { + if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by retryStrategy + + this.logger.error(`[Redis client] ${error.message}`, { error }); + }); + + client.on('ready', () => { + if (this.lostConnection) { + this.emit('connection-recovered'); + this.lostConnection = false; + } + }); + this.clients.add(client); return client; @@ -118,32 +155,29 @@ export class RedisClientService { * Reset the cumulative timeout if >30s between reconnection attempts. */ private retryStrategy() { - const RETRY_INTERVAL = 500; // ms - const RESET_LENGTH = 30_000; // ms - const MAX_TIMEOUT = this.globalConfig.queue.bull.redis.timeoutThreshold; - let lastAttemptTs = 0; let cumulativeTimeout = 0; return () => { const nowTs = Date.now(); - if (nowTs - lastAttemptTs > RESET_LENGTH) { + if (nowTs - lastAttemptTs > this.config.resetLength) { cumulativeTimeout = 0; lastAttemptTs = nowTs; } else { cumulativeTimeout += nowTs - lastAttemptTs; lastAttemptTs = nowTs; - if (cumulativeTimeout > MAX_TIMEOUT) { - this.logger.error(`[Redis] Unable to connect after max timeout of ${MAX_TIMEOUT} ms`); - this.logger.error('Exiting process...'); + if (cumulativeTimeout > this.config.maxTimeout) { + const maxTimeout = Math.round(this.config.maxTimeout / 1000) + 's'; + this.logger.error(`Unable to connect to Redis after trying to connect for ${maxTimeout}`); + this.logger.error('Exiting process due to Redis connection error'); process.exit(1); } } - this.logger.warn('Redis unavailable - trying to reconnect...'); + this.emit('connection-lost', cumulativeTimeout); - return RETRY_INTERVAL; + return this.config.retryInterval; }; } @@ -156,4 +190,40 @@ export class RedisClientService { return { host, port: parseInt(port) }; }); } + + @Debounce(1000) + emit( + event: Event, + ...args: Array + ): boolean { + return super.emit(event, ...args); + } + + private registerListeners() { + const { maxTimeout: maxTimeoutMs, retryInterval: retryIntervalMs } = this.config; + + const retryInterval = this.formatTimeout(retryIntervalMs); + const maxTimeout = this.formatTimeout(maxTimeoutMs); + + this.on('connection-lost', (cumulativeTimeoutMs) => { + const cumulativeTimeout = this.formatTimeout(cumulativeTimeoutMs); + const reconnectionMsg = `Trying to reconnect in ${retryInterval}...`; + const timeoutDetails = `${cumulativeTimeout}/${maxTimeout}`; + + this.logger.warn(`Lost Redis connection. ${reconnectionMsg} (${timeoutDetails})`); + + this.lostConnection = true; + }); + + this.on('connection-recovered', () => { + this.logger.info('Recovered Redis connection'); + }); + } + + private formatTimeout(timeoutMs: number) { + const timeoutSeconds = timeoutMs / 1000; + const roundedTimeout = Math.round(timeoutSeconds * 10) / 10; + + return roundedTimeout + 's'; + } }