refactor(core): Port scaling mode config (no-changelog) (#10321)

This commit is contained in:
Iván Ovejero 2024-08-12 11:03:37 +02:00 committed by GitHub
parent 6b52bebf52
commit 8728b63aeb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 146 additions and 127 deletions

View file

@ -0,0 +1,96 @@
import { Config, Env, Nested } from '../decorators';
@Config
class HealthConfig {
/** Whether to enable the worker health check endpoint `/healthz`. */
@Env('QUEUE_HEALTH_CHECK_ACTIVE')
active = false;
/** Port for worker to respond to health checks requests on, if enabled. */
@Env('QUEUE_HEALTH_CHECK_PORT')
port = 5678;
}
@Config
class RedisConfig {
/** Redis database for Bull queue. */
@Env('QUEUE_BULL_REDIS_DB')
db = 0;
/** Redis host for Bull queue. */
@Env('QUEUE_BULL_REDIS_HOST')
host = 'localhost';
/** Password to authenticate with Redis. */
@Env('QUEUE_BULL_REDIS_PASSWORD')
password = '';
/** Port for Redis to listen on. */
@Env('QUEUE_BULL_REDIS_PORT')
port = 6379;
/** Max cumulative timeout (in milliseconds) of connection retries before process exit. */
@Env('QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD')
timeoutThreshold = 10_000;
/** Redis username. Redis 6.0 or higher required. */
@Env('QUEUE_BULL_REDIS_USERNAME')
username = '';
/** Redis cluster startup nodes, as comma-separated list of `{host}:{port}` pairs. @example 'redis-1:6379,redis-2:6379' */
@Env('QUEUE_BULL_REDIS_CLUSTER_NODES')
clusterNodes = '';
/** Whether to enable TLS on Redis connections. */
@Env('QUEUE_BULL_REDIS_TLS')
tls = false;
}
@Config
class SettingsConfig {
/** How long (in milliseconds) is the lease period for a worker processing a job. */
@Env('QUEUE_WORKER_LOCK_DURATION')
lockDuration = 30_000;
/** How often (in milliseconds) a worker must renew the lease. */
@Env('QUEUE_WORKER_LOCK_RENEW_TIME')
lockRenewTime = 15_000;
/** How often (in milliseconds) Bull must check for stalled jobs. `0` to disable. */
@Env('QUEUE_WORKER_STALLED_INTERVAL')
stalledInterval = 30_000;
/** Max number of times a stalled job will be re-processed. See Bull's [documentation](https://docs.bullmq.io/guide/workers/stalled-jobs). */
@Env('QUEUE_WORKER_MAX_STALLED_COUNT')
maxStalledCount = 1;
}
@Config
class BullConfig {
/** Prefix for Bull keys on Redis. @example 'bull:jobs:23' */
@Env('QUEUE_BULL_PREFIX')
prefix = 'bull';
@Nested
redis: RedisConfig;
/** How often (in seconds) to poll the Bull queue to identify executions finished during a Redis crash. `0` to disable. May increase Redis traffic significantly. */
@Env('QUEUE_RECOVERY_INTERVAL')
queueRecoveryInterval = 60; // watchdog interval
/** @deprecated How long (in seconds) a worker must wait for active executions to finish before exiting. Use `N8N_GRACEFUL_SHUTDOWN_TIMEOUT` instead */
@Env('QUEUE_WORKER_TIMEOUT')
gracefulShutdownTimeout = 30;
@Nested
settings: SettingsConfig;
}
@Config
export class ScalingModeConfig {
@Nested
health: HealthConfig;
@Nested
bull: BullConfig;
}

View file

@ -12,6 +12,7 @@ import { ExternalStorageConfig } from './configs/external-storage';
import { WorkflowsConfig } from './configs/workflows';
import { EndpointsConfig } from './configs/endpoints';
import { CacheConfig } from './configs/cache';
import { ScalingModeConfig } from './configs/scaling-mode.config';
@Config
class UserManagementConfig {
@ -79,4 +80,7 @@ export class GlobalConfig {
@Nested
readonly cache: CacheConfig;
@Nested
queue: ScalingModeConfig;
}

View file

@ -184,6 +184,33 @@ describe('GlobalConfig', () => {
ttl: 3600000,
},
},
queue: {
health: {
active: false,
port: 5678,
},
bull: {
redis: {
db: 0,
host: 'localhost',
password: '',
port: 6379,
timeoutThreshold: 10_000,
username: '',
clusterNodes: '',
tls: false,
},
queueRecoveryInterval: 60,
gracefulShutdownTimeout: 30,
prefix: 'bull',
settings: {
lockDuration: 30_000,
lockRenewTime: 15_000,
stalledInterval: 30_000,
maxStalledCount: 1,
},
},
},
};
it('should use all default values when no env variables are defined', () => {

View file

@ -37,6 +37,7 @@ import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { EventService } from './events/event.service';
import { GlobalConfig } from '@n8n/config';
@Service()
export class WorkflowRunner {
@ -424,7 +425,7 @@ export class WorkflowRunner {
const jobData: Promise<JobResult> = job.finished();
const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval');
const { queueRecoveryInterval } = Container.get(GlobalConfig).queue.bull;
const racingPromises: Array<Promise<JobResult>> = [jobData];

View file

@ -102,7 +102,7 @@ export class Worker extends BaseCommand {
const { QUEUE_WORKER_TIMEOUT } = process.env;
if (QUEUE_WORKER_TIMEOUT) {
this.gracefulShutdownTimeoutInS =
parseInt(QUEUE_WORKER_TIMEOUT, 10) || config.default('queue.bull.gracefulShutdownTimeout');
parseInt(QUEUE_WORKER_TIMEOUT, 10) || this.globalConfig.queue.bull.gracefulShutdownTimeout;
this.logger.warn(
'QUEUE_WORKER_TIMEOUT has been deprecated. Rename it to N8N_GRACEFUL_SHUTDOWN_TIMEOUT.',
);
@ -182,7 +182,7 @@ export class Worker extends BaseCommand {
}
async setupHealthMonitor() {
const port = config.getEnv('queue.health.port');
const { port } = this.globalConfig.queue.health;
const app = express();
app.disable('x-powered-by');
@ -285,7 +285,7 @@ export class Worker extends BaseCommand {
this.logger.info(` * Concurrency: ${this.concurrency}`);
this.logger.info('');
if (config.getEnv('queue.health.active')) {
if (this.globalConfig.queue.health.active) {
await this.setupHealthMonitor();
}

View file

@ -162,119 +162,6 @@ export const schema = {
},
},
queue: {
health: {
active: {
doc: 'If health checks should be enabled',
format: Boolean,
default: false,
env: 'QUEUE_HEALTH_CHECK_ACTIVE',
},
port: {
doc: 'Port to serve health check on if activated',
format: Number,
default: 5678,
env: 'QUEUE_HEALTH_CHECK_PORT',
},
},
bull: {
prefix: {
doc: 'Prefix for all bull queue keys',
format: String,
default: 'bull',
env: 'QUEUE_BULL_PREFIX',
},
redis: {
db: {
doc: 'Redis DB',
format: Number,
default: 0,
env: 'QUEUE_BULL_REDIS_DB',
},
host: {
doc: 'Redis Host',
format: String,
default: 'localhost',
env: 'QUEUE_BULL_REDIS_HOST',
},
password: {
doc: 'Redis Password',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_PASSWORD',
},
port: {
doc: 'Redis Port',
format: Number,
default: 6379,
env: 'QUEUE_BULL_REDIS_PORT',
},
timeoutThreshold: {
doc: 'Max cumulative timeout (in milliseconds) of connection retries before process exit',
format: Number,
default: 10000,
env: 'QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD',
},
username: {
doc: 'Redis Username (needs Redis >= 6)',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_USERNAME',
},
clusterNodes: {
doc: 'Redis Cluster startup nodes (comma separated list of host:port pairs)',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_CLUSTER_NODES',
},
tls: {
format: Boolean,
default: false,
env: 'QUEUE_BULL_REDIS_TLS',
doc: 'Enable TLS on Redis connections. Default: false',
},
},
queueRecoveryInterval: {
doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.',
format: Number,
default: 60,
env: 'QUEUE_RECOVERY_INTERVAL',
},
gracefulShutdownTimeout: {
doc: '[DEPRECATED] (Use N8N_GRACEFUL_SHUTDOWN_TIMEOUT instead) How long should n8n wait for running executions before exiting worker process (seconds)',
format: Number,
default: 30,
env: 'QUEUE_WORKER_TIMEOUT',
},
settings: {
lockDuration: {
doc: 'How long (ms) is the lease period for a worker to work on a message',
format: Number,
default: 30000,
env: 'QUEUE_WORKER_LOCK_DURATION',
},
lockRenewTime: {
doc: 'How frequently (ms) should a worker renew the lease time',
format: Number,
default: 15000,
env: 'QUEUE_WORKER_LOCK_RENEW_TIME',
},
stalledInterval: {
doc: 'How often check for stalled jobs (use 0 for never checking)',
format: Number,
default: 30000,
env: 'QUEUE_WORKER_STALLED_INTERVAL',
},
maxStalledCount: {
doc: 'Max amount of times a stalled job will be re-processed',
format: Number,
default: 1,
env: 'QUEUE_WORKER_MAX_STALLED_COUNT',
},
},
},
},
generic: {
// The timezone to use. Is important for nodes like "Cron" which start the
// workflow automatically at a specified time. This setting can also be

View file

@ -10,6 +10,7 @@ import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
import { JobProcessor } from './job-processor';
import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { GlobalConfig } from '@n8n/config';
@Service()
export class ScalingService {
@ -21,6 +22,7 @@ export class ScalingService {
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
private readonly jobProcessor: JobProcessor,
private readonly globalConfig: GlobalConfig,
) {}
// #region Lifecycle
@ -30,12 +32,12 @@ export class ScalingService {
const { RedisClientService } = await import('@/services/redis/redis-client.service');
const service = Container.get(RedisClientService);
const bullPrefix = config.getEnv('queue.bull.prefix');
const bullPrefix = this.globalConfig.queue.bull.prefix;
const prefix = service.toValidPrefix(bullPrefix);
this.queue = new BullQueue(QUEUE_NAME, {
prefix,
settings: config.get('queue.bull.settings'),
settings: this.globalConfig.queue.bull.settings,
createClient: (type) => service.createClient({ type: `${type}(bull)` }),
});
@ -133,7 +135,7 @@ export class ScalingService {
let latestAttemptTs = 0;
let cumulativeTimeoutMs = 0;
const MAX_TIMEOUT_MS = config.getEnv('queue.bull.redis.timeoutThreshold');
const MAX_TIMEOUT_MS = this.globalConfig.queue.bull.redis.timeoutThreshold;
const RESET_LENGTH_MS = 30_000;
this.queue.on('error', (error: Error) => {

View file

@ -1,17 +1,20 @@
import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/Logger';
import ioRedis from 'ioredis';
import type { Cluster, RedisOptions } from 'ioredis';
import type { RedisClientType } from './RedisServiceBaseClasses';
import { OnShutdown } from '@/decorators/OnShutdown';
import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
import { GlobalConfig } from '@n8n/config';
@Service()
export class RedisClientService {
private readonly clients = new Set<ioRedis | Cluster>();
constructor(private readonly logger: Logger) {}
constructor(
private readonly logger: Logger,
private readonly globalConfig: GlobalConfig,
) {}
createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) {
const client =
@ -57,7 +60,7 @@ export class RedisClientService {
}) {
const options = this.getOptions({ extraOptions });
const { host, port } = config.getEnv('queue.bull.redis');
const { host, port } = this.globalConfig.queue.bull.redis;
options.host = host;
options.port = port;
@ -87,7 +90,7 @@ export class RedisClientService {
}
private getOptions({ extraOptions }: { extraOptions?: RedisOptions }) {
const { username, password, db, tls } = config.getEnv('queue.bull.redis');
const { username, password, db, tls } = this.globalConfig.queue.bull.redis;
/**
* Disabling ready check allows quick reconnection to Redis if Redis becomes
@ -124,7 +127,7 @@ export class RedisClientService {
private retryStrategy() {
const RETRY_INTERVAL = 500; // ms
const RESET_LENGTH = 30_000; // ms
const MAX_TIMEOUT = config.getEnv('queue.bull.redis.timeoutThreshold');
const MAX_TIMEOUT = this.globalConfig.queue.bull.redis.timeoutThreshold;
let lastAttemptTs = 0;
let cumulativeTimeout = 0;
@ -152,8 +155,7 @@ export class RedisClientService {
}
private clusterNodes() {
return config
.getEnv('queue.bull.redis.clusterNodes')
return this.globalConfig.queue.bull.redis.clusterNodes
.split(',')
.filter((pair) => pair.trim().length > 0)
.map((pair) => {