fix(core): Support redis cluster in queue mode (#6708)

* support redis cluster

* cleanup, fix config schema

* set default prefix to bull
This commit is contained in:
Michael Auerswald 2023-07-21 23:31:52 +02:00 committed by GitHub
parent 8ceb8322eb
commit 4029386349
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 37 deletions

View file

@ -10,7 +10,7 @@ import parseUrl from 'parseurl';
import type { RedisOptions } from 'ioredis'; import type { RedisOptions } from 'ioredis';
import type { WebhookHttpMethod } from 'n8n-workflow'; import type { WebhookHttpMethod } from 'n8n-workflow';
import { LoggerProxy as Logger } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import { N8N_VERSION, inDevelopment } from '@/constants'; import { N8N_VERSION, inDevelopment } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@ -27,6 +27,7 @@ import { corsMiddleware } from '@/middlewares';
import { TestWebhooks } from '@/TestWebhooks'; import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks';
import { WEBHOOK_METHODS } from '@/WebhookHelpers'; import { WEBHOOK_METHODS } from '@/WebhookHelpers';
import { getRedisClusterNodes } from './GenericHelpers';
const emptyBuffer = Buffer.alloc(0); const emptyBuffer = Buffer.alloc(0);
@ -187,14 +188,36 @@ export abstract class AbstractServer {
let lastTimer = 0; let lastTimer = 0;
let cumulativeTimeout = 0; let cumulativeTimeout = 0;
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
const clusterNodes = getRedisClusterNodes();
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
const usesRedisCluster = clusterNodes.length > 0;
const redis = new Redis({ LoggerProxy.debug(
host, usesRedisCluster
port, ? `Initialising Redis cluster connection with nodes: ${clusterNodes
db, .map((e) => `${e.host}:${e.port}`)
.join(',')}`
: `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${
port ?? '6379'
}`,
);
const sharedRedisOptions: RedisOptions = {
username, username,
password, password,
db,
enableReadyCheck: false,
maxRetriesPerRequest: null,
};
const redis = usesRedisCluster
? new Redis.Cluster(
clusterNodes.map((node) => ({ host: node.host, port: node.port })),
{
redisOptions: sharedRedisOptions,
},
)
: new Redis({
host,
port,
...sharedRedisOptions,
retryStrategy: (): number | null => { retryStrategy: (): number | null => {
const now = Date.now(); const now = Date.now();
if (now - lastTimer > 30000) { if (now - lastTimer > 30000) {
@ -205,7 +228,7 @@ export abstract class AbstractServer {
cumulativeTimeout += now - lastTimer; cumulativeTimeout += now - lastTimer;
lastTimer = now; lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) { if (cumulativeTimeout > redisConnectionTimeoutLimit) {
Logger.error( LoggerProxy.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
); );
process.exit(1); process.exit(1);
@ -216,13 +239,13 @@ export abstract class AbstractServer {
}); });
redis.on('close', () => { redis.on('close', () => {
Logger.warn('Redis unavailable - trying to reconnect...'); LoggerProxy.warn('Redis unavailable - trying to reconnect...');
}); });
redis.on('error', (error) => { redis.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) { if (!String(error).includes('ECONNREFUSED')) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
Logger.warn('Error with Redis: ', error); LoggerProxy.warn('Error with Redis: ', error);
} }
}); });
} }

View file

@ -199,4 +199,28 @@ export async function createErrorExecution(
await Container.get(ExecutionRepository).createNewExecution(fullExecutionData); await Container.get(ExecutionRepository).createNewExecution(fullExecutionData);
} }
export function getRedisClusterNodes(): Array<{ host: string; port: number }> {
const clusterNodePairs = config
.getEnv('queue.bull.redis.clusterNodes')
.split(',')
.filter((e) => e);
return clusterNodePairs.map((pair) => {
const [host, port] = pair.split(':');
return { host, port: parseInt(port) };
});
}
export function getRedisPrefix(): string {
let prefix = config.getEnv('queue.bull.prefix');
if (prefix && getRedisClusterNodes().length > 0) {
if (!prefix.startsWith('{')) {
prefix = '{' + prefix;
}
if (!prefix.endsWith('}')) {
prefix += '}';
}
}
return prefix;
}
export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20; export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20;

View file

@ -1,10 +1,11 @@
import type Bull from 'bull'; import type Bull from 'bull';
import type { RedisOptions } from 'ioredis'; import { type RedisOptions } from 'ioredis';
import { Service } from 'typedi'; import { Service } from 'typedi';
import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import { LoggerProxy, type IExecuteResponsePromiseData } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers'; import * as WebhookHelpers from '@/WebhookHelpers';
import { getRedisClusterNodes, getRedisPrefix } from './GenericHelpers';
export type JobId = Bull.JobId; export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>; export type Job = Bull.Job<JobData>;
@ -31,19 +32,54 @@ export class Queue {
constructor(private activeExecutions: ActiveExecutions) {} constructor(private activeExecutions: ActiveExecutions) {}
async init() { async init() {
const prefix = config.getEnv('queue.bull.prefix'); const prefix = getRedisPrefix();
const redisOptions: RedisOptions = config.getEnv('queue.bull.redis'); const clusterNodes = getRedisClusterNodes();
const usesRedisCluster = clusterNodes.length > 0;
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
// eslint-disable-next-line @typescript-eslint/naming-convention // eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Bull } = await import('bull'); const { default: Bull } = await import('bull');
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// Disabling ready check is necessary as it allows worker to // Disabling ready check is necessary as it allows worker to
// quickly reconnect to Redis if Redis crashes or is unreachable // quickly reconnect to Redis if Redis crashes or is unreachable
// for some time. With it enabled, worker might take minutes to realize // for some time. With it enabled, worker might take minutes to realize
// redis is back up and resume working. // redis is back up and resume working.
// More here: https://github.com/OptimalBits/bull/issues/890 // More here: https://github.com/OptimalBits/bull/issues/890
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); LoggerProxy.debug(
usesRedisCluster
? `Initialising Redis cluster connection with nodes: ${clusterNodes
.map((e) => `${e.host}:${e.port}`)
.join(',')}`
: `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${
port ?? '6379'
}`,
);
const sharedRedisOptions: RedisOptions = {
username,
password,
db,
enableReadyCheck: false,
maxRetriesPerRequest: null,
};
this.jobQueue = new Bull('jobs', {
prefix,
createClient: (type, clientConfig) =>
usesRedisCluster
? new Redis.Cluster(
clusterNodes.map((node) => ({ host: node.host, port: node.port })),
{
...clientConfig,
redisOptions: sharedRedisOptions,
},
)
: new Redis({
...clientConfig,
host,
port,
...sharedRedisOptions,
}),
});
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
this.activeExecutions.resolveResponsePromise( this.activeExecutions.resolveResponsePromise(

View file

@ -353,9 +353,9 @@ export const schema = {
}, },
bull: { bull: {
prefix: { prefix: {
doc: 'Prefix for all queue keys', doc: 'Prefix for all queue keys (wrap in {} for cluster mode)',
format: String, format: String,
default: '', default: 'bull',
env: 'QUEUE_BULL_PREFIX', env: 'QUEUE_BULL_PREFIX',
}, },
redis: { redis: {
@ -395,6 +395,12 @@ export const schema = {
default: '', default: '',
env: 'QUEUE_BULL_REDIS_USERNAME', env: 'QUEUE_BULL_REDIS_USERNAME',
}, },
clusterNodes: {
doc: 'Redis Cluster startup nodes (comma separated list of host:port pairs)',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_CLUSTER_NODES',
},
}, },
queueRecoveryInterval: { queueRecoveryInterval: {
doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.', doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.',