From e91721cc295802129f3f6cd7057032bf24c7ca1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Sun, 2 Feb 2025 14:00:42 +0100 Subject: [PATCH] p --- .vscode/launch.json | 11 + .../config/src/configs/scaling-mode.config.ts | 12 + packages/cli/src/commands/webhook.ts | 2 +- packages/cli/src/commands/worker.ts | 2 +- packages/cli/src/databases/dsl/column.ts | 2 +- .../scaling/__tests__/scaling.service.test.ts | 51 +- packages/cli/src/scaling/job-processor.ts | 131 ++++- packages/cli/src/scaling/job-producer.ts | 209 ++++++++ packages/cli/src/scaling/job-queues.ts | 40 ++ packages/cli/src/scaling/job-recovery.ts | 106 ++++ packages/cli/src/scaling/scaling.service.ts | 478 ++---------------- packages/cli/src/server.ts | 2 +- .../integration/commands/worker.cmd.test.ts | 2 +- 13 files changed, 557 insertions(+), 491 deletions(-) create mode 100644 packages/cli/src/scaling/job-producer.ts create mode 100644 packages/cli/src/scaling/job-queues.ts create mode 100644 packages/cli/src/scaling/job-recovery.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index 5501fe4439..8360e0220f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -36,6 +36,17 @@ "outputCapture": "std", "killBehavior": "polite" }, + { + "name": "Debug n8n worker", + "program": "${workspaceFolder}/packages/cli/bin/n8n", + "cwd": "${workspaceFolder}/packages/cli/bin", + "args": ["worker"], + "request": "launch", + "skipFiles": ["/**"], + "type": "node", + "outputCapture": "std", + "killBehavior": "polite" + }, { "name": "Launch n8n CLI dev with debug", "runtimeExecutable": "pnpm", diff --git a/packages/@n8n/config/src/configs/scaling-mode.config.ts b/packages/@n8n/config/src/configs/scaling-mode.config.ts index f202440a5b..32bef319df 100644 --- a/packages/@n8n/config/src/configs/scaling-mode.config.ts +++ b/packages/@n8n/config/src/configs/scaling-mode.config.ts @@ -90,6 +90,15 @@ class BullConfig { settings: SettingsConfig; } +class CommaSeparatedStringArray extends Array { + constructor(str: string) { + super(); + const parsed = str.split(':') as this; + const filtered = parsed.filter((i) => typeof i === 'string' && i.length); + return filtered.length ? filtered : []; + } +} + @Config export class ScalingModeConfig { @Nested @@ -97,4 +106,7 @@ export class ScalingModeConfig { @Nested bull: BullConfig; + + @Env('QUEUE_DEDICATED_IDS') + dedicatedIds: CommaSeparatedStringArray = []; } diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index fd1e961b59..57acf02988 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -90,7 +90,7 @@ export class Webhook extends BaseCommand { } const { ScalingService } = await import('@/scaling/scaling.service'); - await Container.get(ScalingService).setupQueue(); + await Container.get(ScalingService).setupQueues(); await this.server.start(); this.logger.info('Webhook listener waiting for requests.'); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index c6046a7772..d636b47019 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -159,7 +159,7 @@ export class Worker extends BaseCommand { const { ScalingService } = await import('@/scaling/scaling.service'); this.scalingService = Container.get(ScalingService); - await this.scalingService.setupQueue(); + await this.scalingService.setupQueues(); this.scalingService.setupWorker(this.concurrency); } diff --git a/packages/cli/src/databases/dsl/column.ts b/packages/cli/src/databases/dsl/column.ts index 05f3ae6eb3..0b746f5762 100644 --- a/packages/cli/src/databases/dsl/column.ts +++ b/packages/cli/src/databases/dsl/column.ts @@ -130,7 +130,7 @@ export class Column { ? "STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')" : 'CURRENT_TIMESTAMP(3)'; } - if (type === 'json' && isSqlite) { + if (type === 'json' && (isSqlite || isPostgres)) { if (typeof this.defaultValue !== 'string') { this.defaultValue = JSON.stringify(this.defaultValue); } diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 2d03dac507..598b85dcf9 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -107,7 +107,7 @@ describe('ScalingService', () => { describe('setupQueue', () => { describe('if leader main', () => { it('should set up queue + listeners + queue recovery', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled(); @@ -120,7 +120,7 @@ describe('ScalingService', () => { it('should set up queue + listeners', async () => { instanceSettings.markAsFollower(); - await scalingService.setupQueue(); + await scalingService.setupQueues(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled(); @@ -134,7 +134,7 @@ describe('ScalingService', () => { // @ts-expect-error readonly property instanceSettings.instanceType = 'worker'; - await scalingService.setupQueue(); + await scalingService.setupQueues(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); expect(registerWorkerListenersSpy).toHaveBeenCalled(); @@ -147,7 +147,7 @@ describe('ScalingService', () => { // @ts-expect-error readonly property instanceSettings.instanceType = 'webhook'; - await scalingService.setupQueue(); + await scalingService.setupQueues(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); @@ -160,7 +160,7 @@ describe('ScalingService', () => { it('should set up a worker with concurrency', async () => { // @ts-expect-error readonly property instanceSettings.instanceType = 'worker'; - await scalingService.setupQueue(); + await scalingService.setupQueues(); const concurrency = 5; scalingService.setupWorker(concurrency); @@ -169,7 +169,7 @@ describe('ScalingService', () => { }); it('should throw if called on a non-worker instance', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); expect(() => scalingService.setupWorker(5)).toThrow(); }); @@ -187,7 +187,7 @@ describe('ScalingService', () => { it('should pause queue, stop queue recovery and queue metrics', async () => { // @ts-expect-error readonly property instanceSettings.instanceType = 'main'; - await scalingService.setupQueue(); + await scalingService.setupQueues(); // @ts-expect-error readonly property scalingService.queueRecoveryContext.timeout = 1; jest.spyOn(scalingService, 'isQueueMetricsEnabled', 'get').mockReturnValue(true); @@ -205,7 +205,7 @@ describe('ScalingService', () => { it('should wait for running jobs to finish', async () => { // @ts-expect-error readonly property instanceSettings.instanceType = 'worker'; - await scalingService.setupQueue(); + await scalingService.setupQueues(); jobProcessor.getRunningJobIds.mockReturnValue([]); await scalingService.stop(); @@ -217,19 +217,9 @@ describe('ScalingService', () => { }); }); - describe('pingQueue', () => { - it('should ping the queue', async () => { - await scalingService.setupQueue(); - - await scalingService.pingQueue(); - - expect(queue.client.ping).toHaveBeenCalled(); - }); - }); - describe('addJob', () => { it('should add a job', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); queue.add.mockResolvedValue(mock({ id: '456' })); const jobData = mock({ executionId: '123' }); @@ -243,22 +233,9 @@ describe('ScalingService', () => { }); }); - describe('getJob', () => { - it('should get a job', async () => { - await scalingService.setupQueue(); - const jobId = '123'; - queue.getJob.mockResolvedValue(mock({ id: jobId })); - - const job = await scalingService.getJob(jobId); - - expect(queue.getJob).toHaveBeenCalledWith(jobId); - expect(job?.id).toBe(jobId); - }); - }); - describe('findJobsByStatus', () => { it('should find jobs by status', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); queue.getJobs.mockResolvedValue([mock({ id: '123' })]); const jobs = await scalingService.findJobsByStatus(['active']); @@ -269,7 +246,7 @@ describe('ScalingService', () => { }); it('should filter out `null` in Redis response', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); // @ts-expect-error - Untyped but possible Redis response queue.getJobs.mockResolvedValue([mock(), null]); @@ -281,7 +258,7 @@ describe('ScalingService', () => { describe('stopJob', () => { it('should stop an active job', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); const job = mock({ isActive: jest.fn().mockResolvedValue(true) }); const result = await scalingService.stopJob(job); @@ -293,7 +270,7 @@ describe('ScalingService', () => { }); it('should stop an inactive job', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); const job = mock({ isActive: jest.fn().mockResolvedValue(false) }); const result = await scalingService.stopJob(job); @@ -303,7 +280,7 @@ describe('ScalingService', () => { }); it('should report failure to stop a job', async () => { - await scalingService.setupQueue(); + await scalingService.setupQueues(); const job = mock({ isActive: jest.fn().mockImplementation(() => { throw new ApplicationError('Something went wrong'); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 84657f52a0..16303fbd8c 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -6,6 +6,7 @@ import { WorkflowExecute, ErrorReporter, Logger, + isObjectLiteral, } from 'n8n-core'; import type { ExecutionStatus, @@ -13,7 +14,14 @@ import type { IRun, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; -import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; +import { + BINARY_ENCODING, + ApplicationError, + Workflow, + jsonStringify, + ensureError, + sleep, +} from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; import config from '@/config'; @@ -26,15 +34,22 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da import type { Job, + JobFailedMessage, JobFinishedMessage, JobId, + JobMessage, + JobQueue, JobResult, RespondToWebhookMessage, RunningJob, } from './scaling.types'; +import { JOB_TYPE_NAME } from './constants'; +import { JobQueues } from './job-queues'; +import { OnShutdown } from '@/decorators/on-shutdown'; +import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants'; /** - * Responsible for processing jobs from the queue, i.e. running enqueued executions. + * Responsible for processing jobs from the queues, i.e. running enqueued executions. */ @Service() export class JobProcessor { @@ -43,6 +58,7 @@ export class JobProcessor { constructor( private readonly logger: Logger, private readonly errorReporter: ErrorReporter, + private readonly jobQueues: JobQueues, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, @@ -52,6 +68,78 @@ export class JobProcessor { this.logger = this.logger.scoped('scaling'); } + setup(concurrency: number) { + this.assertWorker(); + + const queues = this.jobQueues.getAllQueues(); + for (const queue of queues) { + this.setupQueueProcessing(queue, concurrency); + } + } + + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) + async stop() { + let count = 0; + + while (this.getRunningJobsCount() !== 0) { + if (count++ % 4 === 0) { + this.logger.info( + `Waiting for ${this.getRunningJobsCount()} active executions to finish...`, + ); + } + + await sleep(500); + } + } + + private getRunningJobsCount() { + return this.getRunningJobIds().length; + } + + private setupQueueProcessing(queue: JobQueue, concurrency: number) { + queue.on('global:progress', (jobId: JobId, msg: unknown) => this.onProgress(jobId, msg)); + queue.on('error', (error: Error) => this.onError(error)); + void queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => this.preProcessJob(job)); + } + + private onProgress(jobId: JobId, msg: unknown) { + if (!this.isJobMessage(msg)) return; + + if (msg.kind === 'abort-job') this.stopJob(jobId); + } + + private onError(error: Error) { + if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy + + /** + * Non-recoverable error on worker start with Redis unavailable. + * Even if Redis recovers, worker will remain unable to process jobs. + */ + if (error.message.includes('Error initializing Lua scripts')) { + this.logger.error('Fatal error initializing worker', { error }); + this.logger.error('Exiting process...'); + process.exit(1); + } + + this.logger.error('Queue errored', { error }); + + throw error; + } + + private async preProcessJob(job: Job) { + try { + if (!this.hasValidJobData(job)) { + throw new ApplicationError('Worker received invalid job', { + extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) }, + }); + } + + await this.processJob(job); + } catch (error) { + await this.reportJobProcessingError(ensureError(error), job); + } + } + async processJob(job: Job): Promise { const { executionId, loadStaticData } = job.data; @@ -284,4 +372,43 @@ export class JobProcessor { return response; } + + private assertWorker() { + if (this.instanceSettings.instanceType === 'worker') return; + + throw new ApplicationError('This method must be called on a `worker` instance'); + } + + /** Whether the argument is a message sent via Bull's internal pubsub setup. */ + private isJobMessage(candidate: unknown): candidate is JobMessage { + return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; + } + + private hasValidJobData(job: Job) { + return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data; + } + + private async reportJobProcessingError(error: Error, job: Job) { + const { executionId } = job.data; + + this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, { + error, + executionId, + jobId: job.id, + }); + + const msg: JobFailedMessage = { + kind: 'job-failed', + executionId, + workerId: this.instanceSettings.hostId, + errorMsg: error.message, + errorStack: error.stack ?? '', + }; + + await job.progress(msg); + + this.errorReporter.error(error, { executionId }); + + throw error; + } } diff --git a/packages/cli/src/scaling/job-producer.ts b/packages/cli/src/scaling/job-producer.ts new file mode 100644 index 0000000000..7b5baf6a48 --- /dev/null +++ b/packages/cli/src/scaling/job-producer.ts @@ -0,0 +1,209 @@ +import { Service } from '@n8n/di'; +import { GlobalConfig } from '@n8n/config'; +import { strict } from 'node:assert'; +import { BINARY_ENCODING, IExecuteResponsePromiseData } from 'n8n-workflow'; +import { InstanceSettings, Logger } from 'n8n-core'; + +import { ActiveExecutions } from '@/active-executions'; +import { EventService } from '@/events/event.service'; + +import type { Job, JobData, JobId, JobMessage, JobOptions, JobStatus } from './scaling.types'; +import { JOB_TYPE_NAME } from './constants'; +import { JobQueues } from './job-queues'; +import { assertNever } from '@/utils'; +import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants'; +import { OnShutdown } from '@/decorators/on-shutdown'; + +/** Responsible for enqueuing jobs on the queues. */ +@Service() +export class JobProducer { + private readonly isQueueMetricsEnabled: boolean; + + /** Counters for completed and failed jobs, reset on each interval tick. */ + private readonly jobCounters = { completed: 0, failed: 0 }; + + /** Interval for collecting queue metrics to expose via Prometheus. */ + private queueMetricsInterval: NodeJS.Timer | undefined; + + constructor( + private readonly logger: Logger, + private readonly globalConfig: GlobalConfig, + private readonly instanceSettings: InstanceSettings, + private readonly jobQueues: JobQueues, + private readonly activeExecutions: ActiveExecutions, + private readonly eventService: EventService, + ) { + this.isQueueMetricsEnabled = + globalConfig.endpoints.metrics.includeQueueMetrics && + instanceSettings.instanceType === 'main' && + instanceSettings.isSingleMain; + this.logger = this.logger.scoped('scaling'); + } + + setup() { + const queues = this.jobQueues.getAllQueues(); + for (const queue of queues) { + queue.on('error', (error: Error) => this.onError(error)); + + queue.on('global:progress', (jobId: JobId, msg: unknown) => this.onProgress(jobId, msg)); + + if (this.isQueueMetricsEnabled) { + queue.on('global:completed', () => this.jobCounters.completed++); + queue.on('global:failed', () => this.jobCounters.failed++); + } + } + + this.scheduleQueueMetrics(); + } + + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) + async stop() { + if (this.instanceSettings.isSingleMain) { + const queues = this.jobQueues.getAllQueues(); + for (const queue of queues) { + await queue.pause(true, true); // no more jobs will be picked up + } + this.logger.debug('Queues paused'); + } + + if (this.isQueueMetricsEnabled) this.stopQueueMetrics(); + } + + private onError(error: Error) { + if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy + + this.logger.error('Queue errored', { error }); + + throw error; + } + + private onProgress(jobId: JobId, msg: unknown) { + if (!this.isJobMessage(msg)) return; + + // completion and failure are reported via `global:progress` to convey more details + // than natively provided by Bull in `global:completed` and `global:failed` events + + switch (msg.kind) { + case 'respond-to-webhook': + const decodedResponse = this.decodeWebhookResponse(msg.response); + this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); + break; + case 'job-finished': + this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, { + workerId: msg.workerId, + executionId: msg.executionId, + jobId, + }); + break; + case 'job-failed': + this.logger.error( + [ + `Execution ${msg.executionId} (job ${jobId}) failed`, + msg.errorStack ? `\n${msg.errorStack}\n` : '', + ].join(''), + { + workerId: msg.workerId, + errorMsg: msg.errorMsg, + executionId: msg.executionId, + jobId, + }, + ); + break; + case 'abort-job': + break; // only for worker + default: + assertNever(msg); + } + } + + async addJob(jobData: JobData, { priority }: { priority: number }) { + strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER); + + const jobOptions: JobOptions = { + priority, + removeOnComplete: true, + removeOnFail: true, + }; + + const { executionId, projectId } = jobData; + const queue = this.jobQueues.getQueue(projectId); + const job = await queue.add(JOB_TYPE_NAME, jobData, jobOptions); + + const jobId = job.id; + this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { + executionId, + projectId, + jobId, + }); + + return job; + } + + /** Whether the argument is a message sent via Bull's internal pubsub setup. */ + private isJobMessage(candidate: unknown): candidate is JobMessage { + return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; + } + + private decodeWebhookResponse( + response: IExecuteResponsePromiseData, + ): IExecuteResponsePromiseData { + if ( + typeof response === 'object' && + typeof response.body === 'object' && + response.body !== null && + '__@N8nEncodedBuffer@__' in response.body && + typeof response.body['__@N8nEncodedBuffer@__'] === 'string' + ) { + response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING); + } + + return response; + } + + /** Set up an interval to collect queue metrics and emit them in an event. */ + private scheduleQueueMetrics() { + if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return; + + this.queueMetricsInterval = setInterval(async () => { + const pendingJobCounts = await this.getPendingJobCounts(); + + this.eventService.emit('job-counts-updated', { + ...pendingJobCounts, // active, waiting + ...this.jobCounters, // completed, failed + }); + + this.jobCounters.completed = 0; + this.jobCounters.failed = 0; + }, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds); + } + + async getPendingJobCounts() { + const jobCounts = { active: 0, waiting: 0 }; + const queues = this.jobQueues.getAllQueues(); + for (const queue of queues) { + const counts = await queue.getJobCounts(); + jobCounts.active += counts.active; + jobCounts.waiting += counts.waiting; + } + return jobCounts; + } + + async findJobsByStatus(statuses: JobStatus[]): Promise { + const jobs = []; + const queues = this.jobQueues.getAllQueues(); + for (const queue of queues) { + jobs.push(...(await queue.getJobs(statuses))); + } + return jobs.filter((job) => job !== null); + } + + /** Stop collecting queue metrics. */ + private stopQueueMetrics() { + if (this.queueMetricsInterval) { + clearInterval(this.queueMetricsInterval); + this.queueMetricsInterval = undefined; + + this.logger.debug('Queue metrics collection stopped'); + } + } +} diff --git a/packages/cli/src/scaling/job-queues.ts b/packages/cli/src/scaling/job-queues.ts new file mode 100644 index 0000000000..b1de3722a0 --- /dev/null +++ b/packages/cli/src/scaling/job-queues.ts @@ -0,0 +1,40 @@ +import { Service } from '@n8n/di'; +import { JobQueue } from './scaling.types'; +import BullQueue from 'bull'; +import { RedisClientService } from '@/services/redis-client.service'; +import { Logger } from 'n8n-core'; +import { GlobalConfig } from '@n8n/config'; + +@Service() +export class JobQueues { + private queues: { + [key: string]: JobQueue; + } = {}; + + constructor( + private readonly logger: Logger, + private readonly globalConfig: GlobalConfig, + private readonly redisClientService: RedisClientService, + ) {} + + assertQueue(key: string, name: string) { + const { + bull: { prefix: bullPrefix, settings }, + } = this.globalConfig.queue; + const prefix = this.redisClientService.toValidPrefix(bullPrefix); + this.queues[key] = new BullQueue(name, { + prefix, + settings, + createClient: (type) => this.redisClientService.createClient({ type: `${type}(bull)` }), + }); + this.logger.info(`Queue setup: ${key}`); + } + + getQueue(key?: string) { + return (key ? this.queues[key] : undefined) ?? this.queues.default; + } + + getAllQueues() { + return Object.values(this.queues); + } +} diff --git a/packages/cli/src/scaling/job-recovery.ts b/packages/cli/src/scaling/job-recovery.ts new file mode 100644 index 0000000000..3be6168028 --- /dev/null +++ b/packages/cli/src/scaling/job-recovery.ts @@ -0,0 +1,106 @@ +import { Service } from '@n8n/di'; + +import config from '@/config'; +import { QueueRecoveryContext } from './scaling.types'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { InstanceSettings, Logger } from 'n8n-core'; +import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants'; +import { OnShutdown } from '@/decorators/on-shutdown'; +import { jsonStringify } from 'n8n-workflow'; +import { JobProducer } from './job-producer'; + +@Service() +export class JobRecovery { + constructor( + private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, + private readonly executionRepository: ExecutionRepository, + private readonly jobProducer: JobProducer, + ) { + this.logger = this.logger.scoped('scaling'); + } + + private readonly queueRecoveryContext: QueueRecoveryContext = { + batchSize: config.getEnv('executions.queueRecovery.batchSize'), + waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, + }; + + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) + async stop() { + const { instanceType } = this.instanceSettings; + const { timeout } = this.queueRecoveryContext; + + if (instanceType === 'main' && timeout) { + clearTimeout(timeout); + this.logger.debug('Queue recovery stopped'); + } + } + + schedule(waitMs = this.queueRecoveryContext.waitMs) { + this.queueRecoveryContext.timeout = setTimeout(async () => { + try { + const nextWaitMs = await this.recoverFromQueue(); + this.schedule(nextWaitMs); + } catch (error) { + this.logger.error('Failed to recover dangling executions from queue', { + msg: this.toErrorMsg(error), + }); + this.logger.error('Retrying...'); + + this.schedule(); + } + }, waitMs); + + const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' '); + + this.logger.debug(`Scheduled queue recovery check for next ${wait}`); + } + + /** + * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` + * but absent from the queue. Return time until next recovery cycle. + */ + private async recoverFromQueue() { + const { waitMs, batchSize } = this.queueRecoveryContext; + + const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); + + if (storedIds.length === 0) { + this.logger.debug('Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const runningJobs = await this.jobProducer.findJobsByStatus(['active', 'waiting']); + + const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); + + if (queuedIds.size === 0) { + this.logger.debug('Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); + + if (danglingIds.length === 0) { + this.logger.debug('Completed queue recovery check, no dangling executions'); + return waitMs; + } + + await this.executionRepository.markAsCrashed(danglingIds); + + this.logger.info('Completed queue recovery check, recovered dangling executions', { + danglingIds, + }); + + // if this cycle used up the whole batch size, it is possible for there to be + // dangling executions outside this check, so speed up next cycle + + return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs; + } + + private toErrorMsg(error: unknown) { + return error instanceof Error + ? error.message + : jsonStringify(error, { replaceCircularRefs: true }); + } +} diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index c20cb98a61..1118106962 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,223 +1,75 @@ import { GlobalConfig } from '@n8n/config'; -import { Container, Service } from '@n8n/di'; -import { ErrorReporter, InstanceSettings, isObjectLiteral, Logger } from 'n8n-core'; -import { - ApplicationError, - BINARY_ENCODING, - sleep, - jsonStringify, - ensureError, - ExecutionCancelledError, -} from 'n8n-workflow'; -import type { IExecuteResponsePromiseData } from 'n8n-workflow'; -import assert, { strict } from 'node:assert'; +import { Service } from '@n8n/di'; +import { InstanceSettings, Logger } from 'n8n-core'; +import { ExecutionCancelledError } from 'n8n-workflow'; +import assert from 'node:assert'; -import { ActiveExecutions } from '@/active-executions'; -import config from '@/config'; -import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants'; -import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { OnShutdown } from '@/decorators/on-shutdown'; -import { EventService } from '@/events/event.service'; import { OrchestrationService } from '@/services/orchestration.service'; -import { assertNever } from '@/utils'; -import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; +import { QUEUE_NAME } from './constants'; import { JobProcessor } from './job-processor'; -import type { - JobQueue, - Job, - JobData, - JobOptions, - JobStatus, - JobId, - QueueRecoveryContext, - JobMessage, - JobFailedMessage, -} from './scaling.types'; +import type { Job, JobData } from './scaling.types'; +import { JobProducer } from './job-producer'; +import { JobQueues } from './job-queues'; +import { JobRecovery } from './job-recovery'; @Service() export class ScalingService { - private sharedQueue: JobQueue; - - private readonly dedicatedQueues = new Map(); - constructor( private readonly logger: Logger, - private readonly errorReporter: ErrorReporter, - private readonly activeExecutions: ActiveExecutions, + private readonly jobProducer: JobProducer, private readonly jobProcessor: JobProcessor, + private readonly jobQueues: JobQueues, + private readonly jobRecovery: JobRecovery, private readonly globalConfig: GlobalConfig, - private readonly executionRepository: ExecutionRepository, private readonly instanceSettings: InstanceSettings, private readonly orchestrationService: OrchestrationService, - private readonly eventService: EventService, ) { this.logger = this.logger.scoped('scaling'); } // #region Lifecycle - async setupQueue() { - const { default: BullQueue } = await import('bull'); - const { RedisClientService } = await import('@/services/redis-client.service'); - const service = Container.get(RedisClientService); - - const bullPrefix = this.globalConfig.queue.bull.prefix; - const prefix = service.toValidPrefix(bullPrefix); - - this.sharedQueue = new BullQueue(QUEUE_NAME, { - prefix, - settings: this.globalConfig.queue.bull.settings, - createClient: (type) => service.createClient({ type: `${type}(bull)` }), - }); - - this.registerListeners(); - - const { isLeader, isMultiMain } = this.instanceSettings; - - if (isLeader) this.scheduleQueueRecovery(); - - if (isMultiMain) { - this.orchestrationService.multiMainSetup - .on('leader-takeover', () => this.scheduleQueueRecovery()) - .on('leader-stepdown', () => this.stopQueueRecovery()); + async setupQueues() { + this.jobQueues.assertQueue('default', QUEUE_NAME); + const { dedicatedIds } = this.globalConfig.queue; + for (const projectId of dedicatedIds) { + this.jobQueues.assertQueue(projectId, `project_${projectId}_jobs`); } - this.scheduleQueueMetrics(); + const { isLeader, isMultiMain, isWorker } = this.instanceSettings; + + if (!isWorker) { + if (isLeader) this.jobRecovery.schedule(); + + if (isMultiMain) { + this.orchestrationService.multiMainSetup + .on('leader-takeover', () => this.jobRecovery.schedule()) + .on('leader-stepdown', () => this.jobRecovery.stop()); + } + } this.logger.debug('Queue setup completed'); } setupWorker(concurrency: number) { - this.assertWorker(); - this.assertQueue(); - - void this.sharedQueue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { - try { - if (!this.hasValidJobData(job)) { - throw new ApplicationError('Worker received invalid job', { - extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) }, - }); - } - - await this.jobProcessor.processJob(job); - } catch (error) { - await this.reportJobProcessingError(ensureError(error), job); - } - }); + this.jobProcessor.setup(concurrency); this.logger.debug('Worker setup completed'); } - private async reportJobProcessingError(error: Error, job: Job) { - const { executionId } = job.data; - - this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, { - error, - executionId, - jobId: job.id, - }); - - const msg: JobFailedMessage = { - kind: 'job-failed', - executionId, - workerId: this.instanceSettings.hostId, - errorMsg: error.message, - errorStack: error.stack ?? '', - }; - - await job.progress(msg); - - this.errorReporter.error(error, { executionId }); - - throw error; - } - - @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) - async stop() { - const { instanceType } = this.instanceSettings; - - if (instanceType === 'main') await this.stopMain(); - else if (instanceType === 'worker') await this.stopWorker(); - } - - private async stopMain() { - if (this.instanceSettings.isSingleMain) { - await this.sharedQueue.pause(true, true); // no more jobs will be picked up - this.logger.debug('Queue paused'); - } - - if (this.queueRecoveryContext.timeout) this.stopQueueRecovery(); - if (this.isQueueMetricsEnabled) this.stopQueueMetrics(); - } - - private async stopWorker() { - let count = 0; - - while (this.getRunningJobsCount() !== 0) { - if (count++ % 4 === 0) { - this.logger.info( - `Waiting for ${this.getRunningJobsCount()} active executions to finish...`, - ); - } - - await sleep(500); - } - } - - async pingQueue() { - await this.sharedQueue.client.ping(); - } - // #endregion // #region Jobs - async getPendingJobCounts() { - const { active, waiting } = await this.sharedQueue.getJobCounts(); - - return { active, waiting }; - } - /** * Add a job to the queue. * * @param jobData Data of the job to add to the queue. * @param priority Priority of the job, from `1` (highest) to `MAX_SAFE_INTEGER` (lowest). */ - async addJob(jobData: JobData, { priority }: { priority: number }) { - strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER); - - const jobOptions: JobOptions = { - priority, - removeOnComplete: true, - removeOnFail: true, - }; - - const queue = this.dedicatedQueues.get(jobData.projectId!) ?? this.sharedQueue; - - const job = await queue.add(JOB_TYPE_NAME, jobData, jobOptions); - - const { executionId } = jobData; - const jobId = job.id; - - this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId }); - - return job; - } - - async getJob(jobId: JobId) { - // TODO: keep a jobId -> projectId mapping in redis - // TODO: use the project specific queue if available - return await this.sharedQueue.getJob(jobId); - } - - async findJobsByStatus(statuses: JobStatus[]) { - // TODO: keep a jobId -> projectId mapping in redis - // TODO: use the project specific queue if available - const jobs = await this.sharedQueue.getJobs(statuses); - - return jobs.filter((job) => job !== null); + async addJob(jobData: JobData, options: { priority: number }) { + return await this.jobProducer.addJob(jobData, options); } async stopJob(job: Job) { @@ -248,273 +100,5 @@ export class ScalingService { } } - getRunningJobsCount() { - return this.jobProcessor.getRunningJobIds().length; - } - - // #endregion - - // #region Listeners - - private registerListeners() { - const { instanceType } = this.instanceSettings; - if (instanceType === 'main' || instanceType === 'webhook') { - this.registerMainOrWebhookListeners(); - } else if (instanceType === 'worker') { - this.registerWorkerListeners(); - } - } - - /** - * Register listeners on a `worker` process for Bull queue events. - */ - private registerWorkerListeners() { - this.sharedQueue.on('global:progress', (jobId: JobId, msg: unknown) => { - if (!this.isJobMessage(msg)) return; - - if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId); - }); - - this.sharedQueue.on('error', (error: Error) => { - if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy - - /** - * Non-recoverable error on worker start with Redis unavailable. - * Even if Redis recovers, worker will remain unable to process jobs. - */ - if (error.message.includes('Error initializing Lua scripts')) { - this.logger.error('Fatal error initializing worker', { error }); - this.logger.error('Exiting process...'); - process.exit(1); - } - - this.logger.error('Queue errored', { error }); - - throw error; - }); - } - - /** - * Register listeners on a `main` or `webhook` process for Bull queue events. - */ - private registerMainOrWebhookListeners() { - this.sharedQueue.on('error', (error: Error) => { - if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy - - this.logger.error('Queue errored', { error }); - - throw error; - }); - - this.sharedQueue.on('global:progress', (jobId: JobId, msg: unknown) => { - if (!this.isJobMessage(msg)) return; - - // completion and failure are reported via `global:progress` to convey more details - // than natively provided by Bull in `global:completed` and `global:failed` events - - switch (msg.kind) { - case 'respond-to-webhook': - const decodedResponse = this.decodeWebhookResponse(msg.response); - this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); - break; - case 'job-finished': - this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, { - workerId: msg.workerId, - executionId: msg.executionId, - jobId, - }); - break; - case 'job-failed': - this.logger.error( - [ - `Execution ${msg.executionId} (job ${jobId}) failed`, - msg.errorStack ? `\n${msg.errorStack}\n` : '', - ].join(''), - { - workerId: msg.workerId, - errorMsg: msg.errorMsg, - executionId: msg.executionId, - jobId, - }, - ); - break; - case 'abort-job': - break; // only for worker - default: - assertNever(msg); - } - }); - - if (this.isQueueMetricsEnabled) { - this.sharedQueue.on('global:completed', () => this.jobCounters.completed++); - this.sharedQueue.on('global:failed', () => this.jobCounters.failed++); - } - } - - /** Whether the argument is a message sent via Bull's internal pubsub setup. */ - private isJobMessage(candidate: unknown): candidate is JobMessage { - return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; - } - - // #endregion - - private decodeWebhookResponse( - response: IExecuteResponsePromiseData, - ): IExecuteResponsePromiseData { - if ( - typeof response === 'object' && - typeof response.body === 'object' && - response.body !== null && - '__@N8nEncodedBuffer@__' in response.body && - typeof response.body['__@N8nEncodedBuffer@__'] === 'string' - ) { - response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING); - } - - return response; - } - - private assertQueue() { - if (this.sharedQueue) return; - - throw new ApplicationError('This method must be called after `setupQueue`'); - } - - private assertWorker() { - if (this.instanceSettings.instanceType === 'worker') return; - - throw new ApplicationError('This method must be called on a `worker` instance'); - } - - // #region Queue metrics - - /** Counters for completed and failed jobs, reset on each interval tick. */ - private readonly jobCounters = { completed: 0, failed: 0 }; - - /** Interval for collecting queue metrics to expose via Prometheus. */ - private queueMetricsInterval: NodeJS.Timer | undefined; - - get isQueueMetricsEnabled() { - return ( - this.globalConfig.endpoints.metrics.includeQueueMetrics && - this.instanceSettings.instanceType === 'main' && - this.instanceSettings.isSingleMain - ); - } - - /** Set up an interval to collect queue metrics and emit them in an event. */ - private scheduleQueueMetrics() { - if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return; - - this.queueMetricsInterval = setInterval(async () => { - const pendingJobCounts = await this.getPendingJobCounts(); - - this.eventService.emit('job-counts-updated', { - ...pendingJobCounts, // active, waiting - ...this.jobCounters, // completed, failed - }); - - this.jobCounters.completed = 0; - this.jobCounters.failed = 0; - }, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds); - } - - /** Stop collecting queue metrics. */ - private stopQueueMetrics() { - if (this.queueMetricsInterval) { - clearInterval(this.queueMetricsInterval); - this.queueMetricsInterval = undefined; - - this.logger.debug('Queue metrics collection stopped'); - } - } - - // #endregion - - // #region Queue recovery - - private readonly queueRecoveryContext: QueueRecoveryContext = { - batchSize: config.getEnv('executions.queueRecovery.batchSize'), - waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, - }; - - private scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) { - this.queueRecoveryContext.timeout = setTimeout(async () => { - try { - const nextWaitMs = await this.recoverFromQueue(); - this.scheduleQueueRecovery(nextWaitMs); - } catch (error) { - this.logger.error('Failed to recover dangling executions from queue', { - msg: this.toErrorMsg(error), - }); - this.logger.error('Retrying...'); - - this.scheduleQueueRecovery(); - } - }, waitMs); - - const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' '); - - this.logger.debug(`Scheduled queue recovery check for next ${wait}`); - } - - private stopQueueRecovery() { - clearTimeout(this.queueRecoveryContext.timeout); - - this.logger.debug('Queue recovery stopped'); - } - - /** - * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` - * but absent from the queue. Return time until next recovery cycle. - */ - private async recoverFromQueue() { - const { waitMs, batchSize } = this.queueRecoveryContext; - - const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); - - if (storedIds.length === 0) { - this.logger.debug('Completed queue recovery check, no dangling executions'); - return waitMs; - } - - const runningJobs = await this.findJobsByStatus(['active', 'waiting']); - - const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); - - if (queuedIds.size === 0) { - this.logger.debug('Completed queue recovery check, no dangling executions'); - return waitMs; - } - - const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); - - if (danglingIds.length === 0) { - this.logger.debug('Completed queue recovery check, no dangling executions'); - return waitMs; - } - - await this.executionRepository.markAsCrashed(danglingIds); - - this.logger.info('Completed queue recovery check, recovered dangling executions', { - danglingIds, - }); - - // if this cycle used up the whole batch size, it is possible for there to be - // dangling executions outside this check, so speed up next cycle - - return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs; - } - - private toErrorMsg(error: unknown) { - return error instanceof Error - ? error.message - : jsonStringify(error, { replaceCircularRefs: true }); - } - - private hasValidJobData(job: Job) { - return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data; - } - // #endregion } diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 9db860b1e6..7987bc00ab 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -221,7 +221,7 @@ export class Server extends AbstractServer { if (config.getEnv('executions.mode') === 'queue') { const { ScalingService } = await import('@/scaling/scaling.service'); - await Container.get(ScalingService).setupQueue(); + await Container.get(ScalingService).setupQueues(); } await handleMfaDisable(); diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 36d6322ec7..eb58034783 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -55,7 +55,7 @@ test('worker initializes all its components', async () => { expect(externalHooks.init).toHaveBeenCalledTimes(1); expect(externalSecretsManager.init).toHaveBeenCalledTimes(1); expect(messageEventBus.initialize).toHaveBeenCalledTimes(1); - expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); + expect(scalingService.setupQueues).toHaveBeenCalledTimes(1); expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(orchestrationService.init).toHaveBeenCalledTimes(1);