mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-14 08:34:07 -08:00
fix(core): Handle Redis disconnects gracefully (#11007)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
This commit is contained in:
parent
805a1140c9
commit
cd916480c2
37
packages/cli/src/decorators/debounce.ts
Normal file
37
packages/cli/src/decorators/debounce.ts
Normal file
|
@ -0,0 +1,37 @@
|
|||
import debounce from 'lodash/debounce';
|
||||
|
||||
/**
|
||||
* Debounce a class method using `lodash/debounce`.
|
||||
*
|
||||
* @param waitMs - Number of milliseconds to debounce method by.
|
||||
*
|
||||
* @example
|
||||
* ```
|
||||
* class MyClass {
|
||||
* @Debounce(1000)
|
||||
* async myMethod() {
|
||||
* // debounced
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export const Debounce =
|
||||
(waitMs: number): MethodDecorator =>
|
||||
<T>(
|
||||
_: object,
|
||||
methodName: string,
|
||||
originalDescriptor: PropertyDescriptor,
|
||||
): TypedPropertyDescriptor<T> => ({
|
||||
configurable: true,
|
||||
|
||||
get() {
|
||||
const debouncedFn = debounce(originalDescriptor.value, waitMs);
|
||||
|
||||
Object.defineProperty(this, methodName, {
|
||||
configurable: false,
|
||||
value: debouncedFn,
|
||||
});
|
||||
|
||||
return debouncedFn as T;
|
||||
},
|
||||
});
|
|
@ -24,8 +24,6 @@ export class Publisher {
|
|||
if (config.getEnv('executions.mode') !== 'queue') return;
|
||||
|
||||
this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' });
|
||||
|
||||
this.client.on('error', (error) => this.logger.error(error.message));
|
||||
}
|
||||
|
||||
getClient() {
|
||||
|
|
|
@ -27,8 +27,6 @@ export class Subscriber {
|
|||
|
||||
this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });
|
||||
|
||||
this.client.on('error', (error) => this.logger.error(error.message));
|
||||
|
||||
this.client.on('message', (channel: PubSub.Channel, message) => {
|
||||
this.handlers.get(channel)?.(message);
|
||||
});
|
||||
|
|
|
@ -173,39 +173,11 @@ export class ScalingService {
|
|||
// #region Listeners
|
||||
|
||||
private registerListeners() {
|
||||
let latestAttemptTs = 0;
|
||||
let cumulativeTimeoutMs = 0;
|
||||
|
||||
const MAX_TIMEOUT_MS = this.globalConfig.queue.bull.redis.timeoutThreshold;
|
||||
const RESET_LENGTH_MS = 30_000;
|
||||
|
||||
this.queue.on('error', (error: Error) => {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
|
||||
|
||||
this.logger.error('[ScalingService] Queue errored', { error });
|
||||
|
||||
/**
|
||||
* On Redis connection failure, try to reconnect. On every failed attempt,
|
||||
* increment a cumulative timeout - if this exceeds a limit, exit the
|
||||
* process. Reset the cumulative timeout if >30s between retries.
|
||||
*/
|
||||
if (error.message.includes('ECONNREFUSED')) {
|
||||
const nowTs = Date.now();
|
||||
if (nowTs - latestAttemptTs > RESET_LENGTH_MS) {
|
||||
latestAttemptTs = nowTs;
|
||||
cumulativeTimeoutMs = 0;
|
||||
} else {
|
||||
cumulativeTimeoutMs += nowTs - latestAttemptTs;
|
||||
latestAttemptTs = nowTs;
|
||||
if (cumulativeTimeoutMs > MAX_TIMEOUT_MS) {
|
||||
this.logger.error('[ScalingService] Redis unavailable after max timeout');
|
||||
this.logger.error('[ScalingService] Exiting process...');
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.warn('[ScalingService] Redis unavailable - retrying to connect...');
|
||||
return;
|
||||
}
|
||||
|
||||
throw error;
|
||||
});
|
||||
|
||||
|
|
|
@ -3,18 +3,42 @@ import ioRedis from 'ioredis';
|
|||
import type { Cluster, RedisOptions } from 'ioredis';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import { Debounce } from '@/decorators/debounce';
|
||||
import { Logger } from '@/logger';
|
||||
import { TypedEmitter } from '@/typed-emitter';
|
||||
|
||||
import type { RedisClientType } from '../scaling/redis/redis.types';
|
||||
|
||||
type RedisEventMap = {
|
||||
'connection-lost': number;
|
||||
'connection-recovered': never;
|
||||
};
|
||||
|
||||
@Service()
|
||||
export class RedisClientService {
|
||||
export class RedisClientService extends TypedEmitter<RedisEventMap> {
|
||||
private readonly clients = new Set<ioRedis | Cluster>();
|
||||
|
||||
private readonly config = {
|
||||
/** How long (in ms) to try to reconnect for before exiting. */
|
||||
maxTimeout: this.globalConfig.queue.bull.redis.timeoutThreshold,
|
||||
|
||||
/** How long (in ms) to wait between reconnection attempts. */
|
||||
retryInterval: 1000,
|
||||
|
||||
/** How long (in ms) to wait before resetting the cumulative timeout. */
|
||||
resetLength: 30_000,
|
||||
};
|
||||
|
||||
/** Whether any client has lost connection to Redis. */
|
||||
private lostConnection = false;
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
) {}
|
||||
) {
|
||||
super();
|
||||
this.registerListeners();
|
||||
}
|
||||
|
||||
createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) {
|
||||
const client =
|
||||
|
@ -22,6 +46,19 @@ export class RedisClientService {
|
|||
? this.createClusterClient(arg)
|
||||
: this.createRegularClient(arg);
|
||||
|
||||
client.on('error', (error) => {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by retryStrategy
|
||||
|
||||
this.logger.error(`[Redis client] ${error.message}`, { error });
|
||||
});
|
||||
|
||||
client.on('ready', () => {
|
||||
if (this.lostConnection) {
|
||||
this.emit('connection-recovered');
|
||||
this.lostConnection = false;
|
||||
}
|
||||
});
|
||||
|
||||
this.clients.add(client);
|
||||
|
||||
return client;
|
||||
|
@ -118,32 +155,29 @@ export class RedisClientService {
|
|||
* Reset the cumulative timeout if >30s between reconnection attempts.
|
||||
*/
|
||||
private retryStrategy() {
|
||||
const RETRY_INTERVAL = 500; // ms
|
||||
const RESET_LENGTH = 30_000; // ms
|
||||
const MAX_TIMEOUT = this.globalConfig.queue.bull.redis.timeoutThreshold;
|
||||
|
||||
let lastAttemptTs = 0;
|
||||
let cumulativeTimeout = 0;
|
||||
|
||||
return () => {
|
||||
const nowTs = Date.now();
|
||||
|
||||
if (nowTs - lastAttemptTs > RESET_LENGTH) {
|
||||
if (nowTs - lastAttemptTs > this.config.resetLength) {
|
||||
cumulativeTimeout = 0;
|
||||
lastAttemptTs = nowTs;
|
||||
} else {
|
||||
cumulativeTimeout += nowTs - lastAttemptTs;
|
||||
lastAttemptTs = nowTs;
|
||||
if (cumulativeTimeout > MAX_TIMEOUT) {
|
||||
this.logger.error(`[Redis] Unable to connect after max timeout of ${MAX_TIMEOUT} ms`);
|
||||
this.logger.error('Exiting process...');
|
||||
if (cumulativeTimeout > this.config.maxTimeout) {
|
||||
const maxTimeout = Math.round(this.config.maxTimeout / 1000) + 's';
|
||||
this.logger.error(`Unable to connect to Redis after trying to connect for ${maxTimeout}`);
|
||||
this.logger.error('Exiting process due to Redis connection error');
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.warn('Redis unavailable - trying to reconnect...');
|
||||
this.emit('connection-lost', cumulativeTimeout);
|
||||
|
||||
return RETRY_INTERVAL;
|
||||
return this.config.retryInterval;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -156,4 +190,40 @@ export class RedisClientService {
|
|||
return { host, port: parseInt(port) };
|
||||
});
|
||||
}
|
||||
|
||||
@Debounce(1000)
|
||||
emit<Event extends keyof RedisEventMap>(
|
||||
event: Event,
|
||||
...args: Array<RedisEventMap[Event]>
|
||||
): boolean {
|
||||
return super.emit(event, ...args);
|
||||
}
|
||||
|
||||
private registerListeners() {
|
||||
const { maxTimeout: maxTimeoutMs, retryInterval: retryIntervalMs } = this.config;
|
||||
|
||||
const retryInterval = this.formatTimeout(retryIntervalMs);
|
||||
const maxTimeout = this.formatTimeout(maxTimeoutMs);
|
||||
|
||||
this.on('connection-lost', (cumulativeTimeoutMs) => {
|
||||
const cumulativeTimeout = this.formatTimeout(cumulativeTimeoutMs);
|
||||
const reconnectionMsg = `Trying to reconnect in ${retryInterval}...`;
|
||||
const timeoutDetails = `${cumulativeTimeout}/${maxTimeout}`;
|
||||
|
||||
this.logger.warn(`Lost Redis connection. ${reconnectionMsg} (${timeoutDetails})`);
|
||||
|
||||
this.lostConnection = true;
|
||||
});
|
||||
|
||||
this.on('connection-recovered', () => {
|
||||
this.logger.info('Recovered Redis connection');
|
||||
});
|
||||
}
|
||||
|
||||
private formatTimeout(timeoutMs: number) {
|
||||
const timeoutSeconds = timeoutMs / 1000;
|
||||
const roundedTimeout = Math.round(timeoutSeconds * 10) / 10;
|
||||
|
||||
return roundedTimeout + 's';
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue