mirror of
https://github.com/n8n-io/n8n.git
synced 2025-02-21 02:56:40 -08:00
p
This commit is contained in:
parent
57f1a3fc59
commit
e91721cc29
11
.vscode/launch.json
vendored
11
.vscode/launch.json
vendored
|
@ -36,6 +36,17 @@
|
|||
"outputCapture": "std",
|
||||
"killBehavior": "polite"
|
||||
},
|
||||
{
|
||||
"name": "Debug n8n worker",
|
||||
"program": "${workspaceFolder}/packages/cli/bin/n8n",
|
||||
"cwd": "${workspaceFolder}/packages/cli/bin",
|
||||
"args": ["worker"],
|
||||
"request": "launch",
|
||||
"skipFiles": ["<node_internals>/**"],
|
||||
"type": "node",
|
||||
"outputCapture": "std",
|
||||
"killBehavior": "polite"
|
||||
},
|
||||
{
|
||||
"name": "Launch n8n CLI dev with debug",
|
||||
"runtimeExecutable": "pnpm",
|
||||
|
|
|
@ -90,6 +90,15 @@ class BullConfig {
|
|||
settings: SettingsConfig;
|
||||
}
|
||||
|
||||
class CommaSeparatedStringArray<T extends string = string> extends Array<T> {
|
||||
constructor(str: string) {
|
||||
super();
|
||||
const parsed = str.split(':') as this;
|
||||
const filtered = parsed.filter((i) => typeof i === 'string' && i.length);
|
||||
return filtered.length ? filtered : [];
|
||||
}
|
||||
}
|
||||
|
||||
@Config
|
||||
export class ScalingModeConfig {
|
||||
@Nested
|
||||
|
@ -97,4 +106,7 @@ export class ScalingModeConfig {
|
|||
|
||||
@Nested
|
||||
bull: BullConfig;
|
||||
|
||||
@Env('QUEUE_DEDICATED_IDS')
|
||||
dedicatedIds: CommaSeparatedStringArray = [];
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ export class Webhook extends BaseCommand {
|
|||
}
|
||||
|
||||
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||
await Container.get(ScalingService).setupQueue();
|
||||
await Container.get(ScalingService).setupQueues();
|
||||
await this.server.start();
|
||||
this.logger.info('Webhook listener waiting for requests.');
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ export class Worker extends BaseCommand {
|
|||
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||
this.scalingService = Container.get(ScalingService);
|
||||
|
||||
await this.scalingService.setupQueue();
|
||||
await this.scalingService.setupQueues();
|
||||
|
||||
this.scalingService.setupWorker(this.concurrency);
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ export class Column {
|
|||
? "STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')"
|
||||
: 'CURRENT_TIMESTAMP(3)';
|
||||
}
|
||||
if (type === 'json' && isSqlite) {
|
||||
if (type === 'json' && (isSqlite || isPostgres)) {
|
||||
if (typeof this.defaultValue !== 'string') {
|
||||
this.defaultValue = JSON.stringify(this.defaultValue);
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ describe('ScalingService', () => {
|
|||
describe('setupQueue', () => {
|
||||
describe('if leader main', () => {
|
||||
it('should set up queue + listeners + queue recovery', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled();
|
||||
|
@ -120,7 +120,7 @@ describe('ScalingService', () => {
|
|||
it('should set up queue + listeners', async () => {
|
||||
instanceSettings.markAsFollower();
|
||||
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled();
|
||||
|
@ -134,7 +134,7 @@ describe('ScalingService', () => {
|
|||
// @ts-expect-error readonly property
|
||||
instanceSettings.instanceType = 'worker';
|
||||
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerWorkerListenersSpy).toHaveBeenCalled();
|
||||
|
@ -147,7 +147,7 @@ describe('ScalingService', () => {
|
|||
// @ts-expect-error readonly property
|
||||
instanceSettings.instanceType = 'webhook';
|
||||
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerWorkerListenersSpy).not.toHaveBeenCalled();
|
||||
|
@ -160,7 +160,7 @@ describe('ScalingService', () => {
|
|||
it('should set up a worker with concurrency', async () => {
|
||||
// @ts-expect-error readonly property
|
||||
instanceSettings.instanceType = 'worker';
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
const concurrency = 5;
|
||||
|
||||
scalingService.setupWorker(concurrency);
|
||||
|
@ -169,7 +169,7 @@ describe('ScalingService', () => {
|
|||
});
|
||||
|
||||
it('should throw if called on a non-worker instance', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
|
||||
expect(() => scalingService.setupWorker(5)).toThrow();
|
||||
});
|
||||
|
@ -187,7 +187,7 @@ describe('ScalingService', () => {
|
|||
it('should pause queue, stop queue recovery and queue metrics', async () => {
|
||||
// @ts-expect-error readonly property
|
||||
instanceSettings.instanceType = 'main';
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
// @ts-expect-error readonly property
|
||||
scalingService.queueRecoveryContext.timeout = 1;
|
||||
jest.spyOn(scalingService, 'isQueueMetricsEnabled', 'get').mockReturnValue(true);
|
||||
|
@ -205,7 +205,7 @@ describe('ScalingService', () => {
|
|||
it('should wait for running jobs to finish', async () => {
|
||||
// @ts-expect-error readonly property
|
||||
instanceSettings.instanceType = 'worker';
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
jobProcessor.getRunningJobIds.mockReturnValue([]);
|
||||
|
||||
await scalingService.stop();
|
||||
|
@ -217,19 +217,9 @@ describe('ScalingService', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('pingQueue', () => {
|
||||
it('should ping the queue', async () => {
|
||||
await scalingService.setupQueue();
|
||||
|
||||
await scalingService.pingQueue();
|
||||
|
||||
expect(queue.client.ping).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('addJob', () => {
|
||||
it('should add a job', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
queue.add.mockResolvedValue(mock<Job>({ id: '456' }));
|
||||
|
||||
const jobData = mock<JobData>({ executionId: '123' });
|
||||
|
@ -243,22 +233,9 @@ describe('ScalingService', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('getJob', () => {
|
||||
it('should get a job', async () => {
|
||||
await scalingService.setupQueue();
|
||||
const jobId = '123';
|
||||
queue.getJob.mockResolvedValue(mock<Job>({ id: jobId }));
|
||||
|
||||
const job = await scalingService.getJob(jobId);
|
||||
|
||||
expect(queue.getJob).toHaveBeenCalledWith(jobId);
|
||||
expect(job?.id).toBe(jobId);
|
||||
});
|
||||
});
|
||||
|
||||
describe('findJobsByStatus', () => {
|
||||
it('should find jobs by status', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
queue.getJobs.mockResolvedValue([mock<Job>({ id: '123' })]);
|
||||
|
||||
const jobs = await scalingService.findJobsByStatus(['active']);
|
||||
|
@ -269,7 +246,7 @@ describe('ScalingService', () => {
|
|||
});
|
||||
|
||||
it('should filter out `null` in Redis response', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
// @ts-expect-error - Untyped but possible Redis response
|
||||
queue.getJobs.mockResolvedValue([mock<Job>(), null]);
|
||||
|
||||
|
@ -281,7 +258,7 @@ describe('ScalingService', () => {
|
|||
|
||||
describe('stopJob', () => {
|
||||
it('should stop an active job', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(true) });
|
||||
|
||||
const result = await scalingService.stopJob(job);
|
||||
|
@ -293,7 +270,7 @@ describe('ScalingService', () => {
|
|||
});
|
||||
|
||||
it('should stop an inactive job', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(false) });
|
||||
|
||||
const result = await scalingService.stopJob(job);
|
||||
|
@ -303,7 +280,7 @@ describe('ScalingService', () => {
|
|||
});
|
||||
|
||||
it('should report failure to stop a job', async () => {
|
||||
await scalingService.setupQueue();
|
||||
await scalingService.setupQueues();
|
||||
const job = mock<Job>({
|
||||
isActive: jest.fn().mockImplementation(() => {
|
||||
throw new ApplicationError('Something went wrong');
|
||||
|
|
|
@ -6,6 +6,7 @@ import {
|
|||
WorkflowExecute,
|
||||
ErrorReporter,
|
||||
Logger,
|
||||
isObjectLiteral,
|
||||
} from 'n8n-core';
|
||||
import type {
|
||||
ExecutionStatus,
|
||||
|
@ -13,7 +14,14 @@ import type {
|
|||
IRun,
|
||||
IWorkflowExecutionDataProcess,
|
||||
} from 'n8n-workflow';
|
||||
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
|
||||
import {
|
||||
BINARY_ENCODING,
|
||||
ApplicationError,
|
||||
Workflow,
|
||||
jsonStringify,
|
||||
ensureError,
|
||||
sleep,
|
||||
} from 'n8n-workflow';
|
||||
import type PCancelable from 'p-cancelable';
|
||||
|
||||
import config from '@/config';
|
||||
|
@ -26,15 +34,22 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da
|
|||
|
||||
import type {
|
||||
Job,
|
||||
JobFailedMessage,
|
||||
JobFinishedMessage,
|
||||
JobId,
|
||||
JobMessage,
|
||||
JobQueue,
|
||||
JobResult,
|
||||
RespondToWebhookMessage,
|
||||
RunningJob,
|
||||
} from './scaling.types';
|
||||
import { JOB_TYPE_NAME } from './constants';
|
||||
import { JobQueues } from './job-queues';
|
||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||
|
||||
/**
|
||||
* Responsible for processing jobs from the queue, i.e. running enqueued executions.
|
||||
* Responsible for processing jobs from the queues, i.e. running enqueued executions.
|
||||
*/
|
||||
@Service()
|
||||
export class JobProcessor {
|
||||
|
@ -43,6 +58,7 @@ export class JobProcessor {
|
|||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
private readonly jobQueues: JobQueues,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
|
@ -52,6 +68,78 @@ export class JobProcessor {
|
|||
this.logger = this.logger.scoped('scaling');
|
||||
}
|
||||
|
||||
setup(concurrency: number) {
|
||||
this.assertWorker();
|
||||
|
||||
const queues = this.jobQueues.getAllQueues();
|
||||
for (const queue of queues) {
|
||||
this.setupQueueProcessing(queue, concurrency);
|
||||
}
|
||||
}
|
||||
|
||||
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||
async stop() {
|
||||
let count = 0;
|
||||
|
||||
while (this.getRunningJobsCount() !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
this.logger.info(
|
||||
`Waiting for ${this.getRunningJobsCount()} active executions to finish...`,
|
||||
);
|
||||
}
|
||||
|
||||
await sleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
private getRunningJobsCount() {
|
||||
return this.getRunningJobIds().length;
|
||||
}
|
||||
|
||||
private setupQueueProcessing(queue: JobQueue, concurrency: number) {
|
||||
queue.on('global:progress', (jobId: JobId, msg: unknown) => this.onProgress(jobId, msg));
|
||||
queue.on('error', (error: Error) => this.onError(error));
|
||||
void queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => this.preProcessJob(job));
|
||||
}
|
||||
|
||||
private onProgress(jobId: JobId, msg: unknown) {
|
||||
if (!this.isJobMessage(msg)) return;
|
||||
|
||||
if (msg.kind === 'abort-job') this.stopJob(jobId);
|
||||
}
|
||||
|
||||
private onError(error: Error) {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
|
||||
|
||||
/**
|
||||
* Non-recoverable error on worker start with Redis unavailable.
|
||||
* Even if Redis recovers, worker will remain unable to process jobs.
|
||||
*/
|
||||
if (error.message.includes('Error initializing Lua scripts')) {
|
||||
this.logger.error('Fatal error initializing worker', { error });
|
||||
this.logger.error('Exiting process...');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
this.logger.error('Queue errored', { error });
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
private async preProcessJob(job: Job) {
|
||||
try {
|
||||
if (!this.hasValidJobData(job)) {
|
||||
throw new ApplicationError('Worker received invalid job', {
|
||||
extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) },
|
||||
});
|
||||
}
|
||||
|
||||
await this.processJob(job);
|
||||
} catch (error) {
|
||||
await this.reportJobProcessingError(ensureError(error), job);
|
||||
}
|
||||
}
|
||||
|
||||
async processJob(job: Job): Promise<JobResult> {
|
||||
const { executionId, loadStaticData } = job.data;
|
||||
|
||||
|
@ -284,4 +372,43 @@ export class JobProcessor {
|
|||
|
||||
return response;
|
||||
}
|
||||
|
||||
private assertWorker() {
|
||||
if (this.instanceSettings.instanceType === 'worker') return;
|
||||
|
||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||
}
|
||||
|
||||
/** Whether the argument is a message sent via Bull's internal pubsub setup. */
|
||||
private isJobMessage(candidate: unknown): candidate is JobMessage {
|
||||
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
|
||||
}
|
||||
|
||||
private hasValidJobData(job: Job) {
|
||||
return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data;
|
||||
}
|
||||
|
||||
private async reportJobProcessingError(error: Error, job: Job) {
|
||||
const { executionId } = job.data;
|
||||
|
||||
this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, {
|
||||
error,
|
||||
executionId,
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
const msg: JobFailedMessage = {
|
||||
kind: 'job-failed',
|
||||
executionId,
|
||||
workerId: this.instanceSettings.hostId,
|
||||
errorMsg: error.message,
|
||||
errorStack: error.stack ?? '',
|
||||
};
|
||||
|
||||
await job.progress(msg);
|
||||
|
||||
this.errorReporter.error(error, { executionId });
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
209
packages/cli/src/scaling/job-producer.ts
Normal file
209
packages/cli/src/scaling/job-producer.ts
Normal file
|
@ -0,0 +1,209 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { strict } from 'node:assert';
|
||||
import { BINARY_ENCODING, IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
import { InstanceSettings, Logger } from 'n8n-core';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
import { EventService } from '@/events/event.service';
|
||||
|
||||
import type { Job, JobData, JobId, JobMessage, JobOptions, JobStatus } from './scaling.types';
|
||||
import { JOB_TYPE_NAME } from './constants';
|
||||
import { JobQueues } from './job-queues';
|
||||
import { assertNever } from '@/utils';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants';
|
||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||
|
||||
/** Responsible for enqueuing jobs on the queues. */
|
||||
@Service()
|
||||
export class JobProducer {
|
||||
private readonly isQueueMetricsEnabled: boolean;
|
||||
|
||||
/** Counters for completed and failed jobs, reset on each interval tick. */
|
||||
private readonly jobCounters = { completed: 0, failed: 0 };
|
||||
|
||||
/** Interval for collecting queue metrics to expose via Prometheus. */
|
||||
private queueMetricsInterval: NodeJS.Timer | undefined;
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly jobQueues: JobQueues,
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
private readonly eventService: EventService,
|
||||
) {
|
||||
this.isQueueMetricsEnabled =
|
||||
globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||
instanceSettings.instanceType === 'main' &&
|
||||
instanceSettings.isSingleMain;
|
||||
this.logger = this.logger.scoped('scaling');
|
||||
}
|
||||
|
||||
setup() {
|
||||
const queues = this.jobQueues.getAllQueues();
|
||||
for (const queue of queues) {
|
||||
queue.on('error', (error: Error) => this.onError(error));
|
||||
|
||||
queue.on('global:progress', (jobId: JobId, msg: unknown) => this.onProgress(jobId, msg));
|
||||
|
||||
if (this.isQueueMetricsEnabled) {
|
||||
queue.on('global:completed', () => this.jobCounters.completed++);
|
||||
queue.on('global:failed', () => this.jobCounters.failed++);
|
||||
}
|
||||
}
|
||||
|
||||
this.scheduleQueueMetrics();
|
||||
}
|
||||
|
||||
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||
async stop() {
|
||||
if (this.instanceSettings.isSingleMain) {
|
||||
const queues = this.jobQueues.getAllQueues();
|
||||
for (const queue of queues) {
|
||||
await queue.pause(true, true); // no more jobs will be picked up
|
||||
}
|
||||
this.logger.debug('Queues paused');
|
||||
}
|
||||
|
||||
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
|
||||
}
|
||||
|
||||
private onError(error: Error) {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
|
||||
|
||||
this.logger.error('Queue errored', { error });
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
private onProgress(jobId: JobId, msg: unknown) {
|
||||
if (!this.isJobMessage(msg)) return;
|
||||
|
||||
// completion and failure are reported via `global:progress` to convey more details
|
||||
// than natively provided by Bull in `global:completed` and `global:failed` events
|
||||
|
||||
switch (msg.kind) {
|
||||
case 'respond-to-webhook':
|
||||
const decodedResponse = this.decodeWebhookResponse(msg.response);
|
||||
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
|
||||
break;
|
||||
case 'job-finished':
|
||||
this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, {
|
||||
workerId: msg.workerId,
|
||||
executionId: msg.executionId,
|
||||
jobId,
|
||||
});
|
||||
break;
|
||||
case 'job-failed':
|
||||
this.logger.error(
|
||||
[
|
||||
`Execution ${msg.executionId} (job ${jobId}) failed`,
|
||||
msg.errorStack ? `\n${msg.errorStack}\n` : '',
|
||||
].join(''),
|
||||
{
|
||||
workerId: msg.workerId,
|
||||
errorMsg: msg.errorMsg,
|
||||
executionId: msg.executionId,
|
||||
jobId,
|
||||
},
|
||||
);
|
||||
break;
|
||||
case 'abort-job':
|
||||
break; // only for worker
|
||||
default:
|
||||
assertNever(msg);
|
||||
}
|
||||
}
|
||||
|
||||
async addJob(jobData: JobData, { priority }: { priority: number }) {
|
||||
strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER);
|
||||
|
||||
const jobOptions: JobOptions = {
|
||||
priority,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
};
|
||||
|
||||
const { executionId, projectId } = jobData;
|
||||
const queue = this.jobQueues.getQueue(projectId);
|
||||
const job = await queue.add(JOB_TYPE_NAME, jobData, jobOptions);
|
||||
|
||||
const jobId = job.id;
|
||||
this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, {
|
||||
executionId,
|
||||
projectId,
|
||||
jobId,
|
||||
});
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/** Whether the argument is a message sent via Bull's internal pubsub setup. */
|
||||
private isJobMessage(candidate: unknown): candidate is JobMessage {
|
||||
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
|
||||
}
|
||||
|
||||
private decodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
if (
|
||||
typeof response === 'object' &&
|
||||
typeof response.body === 'object' &&
|
||||
response.body !== null &&
|
||||
'__@N8nEncodedBuffer@__' in response.body &&
|
||||
typeof response.body['__@N8nEncodedBuffer@__'] === 'string'
|
||||
) {
|
||||
response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/** Set up an interval to collect queue metrics and emit them in an event. */
|
||||
private scheduleQueueMetrics() {
|
||||
if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return;
|
||||
|
||||
this.queueMetricsInterval = setInterval(async () => {
|
||||
const pendingJobCounts = await this.getPendingJobCounts();
|
||||
|
||||
this.eventService.emit('job-counts-updated', {
|
||||
...pendingJobCounts, // active, waiting
|
||||
...this.jobCounters, // completed, failed
|
||||
});
|
||||
|
||||
this.jobCounters.completed = 0;
|
||||
this.jobCounters.failed = 0;
|
||||
}, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds);
|
||||
}
|
||||
|
||||
async getPendingJobCounts() {
|
||||
const jobCounts = { active: 0, waiting: 0 };
|
||||
const queues = this.jobQueues.getAllQueues();
|
||||
for (const queue of queues) {
|
||||
const counts = await queue.getJobCounts();
|
||||
jobCounts.active += counts.active;
|
||||
jobCounts.waiting += counts.waiting;
|
||||
}
|
||||
return jobCounts;
|
||||
}
|
||||
|
||||
async findJobsByStatus(statuses: JobStatus[]): Promise<Job[]> {
|
||||
const jobs = [];
|
||||
const queues = this.jobQueues.getAllQueues();
|
||||
for (const queue of queues) {
|
||||
jobs.push(...(await queue.getJobs(statuses)));
|
||||
}
|
||||
return jobs.filter((job) => job !== null);
|
||||
}
|
||||
|
||||
/** Stop collecting queue metrics. */
|
||||
private stopQueueMetrics() {
|
||||
if (this.queueMetricsInterval) {
|
||||
clearInterval(this.queueMetricsInterval);
|
||||
this.queueMetricsInterval = undefined;
|
||||
|
||||
this.logger.debug('Queue metrics collection stopped');
|
||||
}
|
||||
}
|
||||
}
|
40
packages/cli/src/scaling/job-queues.ts
Normal file
40
packages/cli/src/scaling/job-queues.ts
Normal file
|
@ -0,0 +1,40 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import { JobQueue } from './scaling.types';
|
||||
import BullQueue from 'bull';
|
||||
import { RedisClientService } from '@/services/redis-client.service';
|
||||
import { Logger } from 'n8n-core';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
|
||||
@Service()
|
||||
export class JobQueues {
|
||||
private queues: {
|
||||
[key: string]: JobQueue;
|
||||
} = {};
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly redisClientService: RedisClientService,
|
||||
) {}
|
||||
|
||||
assertQueue(key: string, name: string) {
|
||||
const {
|
||||
bull: { prefix: bullPrefix, settings },
|
||||
} = this.globalConfig.queue;
|
||||
const prefix = this.redisClientService.toValidPrefix(bullPrefix);
|
||||
this.queues[key] = new BullQueue(name, {
|
||||
prefix,
|
||||
settings,
|
||||
createClient: (type) => this.redisClientService.createClient({ type: `${type}(bull)` }),
|
||||
});
|
||||
this.logger.info(`Queue setup: ${key}`);
|
||||
}
|
||||
|
||||
getQueue(key?: string) {
|
||||
return (key ? this.queues[key] : undefined) ?? this.queues.default;
|
||||
}
|
||||
|
||||
getAllQueues() {
|
||||
return Object.values(this.queues);
|
||||
}
|
||||
}
|
106
packages/cli/src/scaling/job-recovery.ts
Normal file
106
packages/cli/src/scaling/job-recovery.ts
Normal file
|
@ -0,0 +1,106 @@
|
|||
import { Service } from '@n8n/di';
|
||||
|
||||
import config from '@/config';
|
||||
import { QueueRecoveryContext } from './scaling.types';
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { InstanceSettings, Logger } from 'n8n-core';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants';
|
||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||
import { jsonStringify } from 'n8n-workflow';
|
||||
import { JobProducer } from './job-producer';
|
||||
|
||||
@Service()
|
||||
export class JobRecovery {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly jobProducer: JobProducer,
|
||||
) {
|
||||
this.logger = this.logger.scoped('scaling');
|
||||
}
|
||||
|
||||
private readonly queueRecoveryContext: QueueRecoveryContext = {
|
||||
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
|
||||
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
|
||||
};
|
||||
|
||||
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||
async stop() {
|
||||
const { instanceType } = this.instanceSettings;
|
||||
const { timeout } = this.queueRecoveryContext;
|
||||
|
||||
if (instanceType === 'main' && timeout) {
|
||||
clearTimeout(timeout);
|
||||
this.logger.debug('Queue recovery stopped');
|
||||
}
|
||||
}
|
||||
|
||||
schedule(waitMs = this.queueRecoveryContext.waitMs) {
|
||||
this.queueRecoveryContext.timeout = setTimeout(async () => {
|
||||
try {
|
||||
const nextWaitMs = await this.recoverFromQueue();
|
||||
this.schedule(nextWaitMs);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to recover dangling executions from queue', {
|
||||
msg: this.toErrorMsg(error),
|
||||
});
|
||||
this.logger.error('Retrying...');
|
||||
|
||||
this.schedule();
|
||||
}
|
||||
}, waitMs);
|
||||
|
||||
const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' ');
|
||||
|
||||
this.logger.debug(`Scheduled queue recovery check for next ${wait}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
||||
* but absent from the queue. Return time until next recovery cycle.
|
||||
*/
|
||||
private async recoverFromQueue() {
|
||||
const { waitMs, batchSize } = this.queueRecoveryContext;
|
||||
|
||||
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
||||
|
||||
if (storedIds.length === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const runningJobs = await this.jobProducer.findJobsByStatus(['active', 'waiting']);
|
||||
|
||||
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
|
||||
|
||||
if (queuedIds.size === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
||||
|
||||
if (danglingIds.length === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
await this.executionRepository.markAsCrashed(danglingIds);
|
||||
|
||||
this.logger.info('Completed queue recovery check, recovered dangling executions', {
|
||||
danglingIds,
|
||||
});
|
||||
|
||||
// if this cycle used up the whole batch size, it is possible for there to be
|
||||
// dangling executions outside this check, so speed up next cycle
|
||||
|
||||
return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs;
|
||||
}
|
||||
|
||||
private toErrorMsg(error: unknown) {
|
||||
return error instanceof Error
|
||||
? error.message
|
||||
: jsonStringify(error, { replaceCircularRefs: true });
|
||||
}
|
||||
}
|
|
@ -1,223 +1,75 @@
|
|||
import { GlobalConfig } from '@n8n/config';
|
||||
import { Container, Service } from '@n8n/di';
|
||||
import { ErrorReporter, InstanceSettings, isObjectLiteral, Logger } from 'n8n-core';
|
||||
import {
|
||||
ApplicationError,
|
||||
BINARY_ENCODING,
|
||||
sleep,
|
||||
jsonStringify,
|
||||
ensureError,
|
||||
ExecutionCancelledError,
|
||||
} from 'n8n-workflow';
|
||||
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
import assert, { strict } from 'node:assert';
|
||||
import { Service } from '@n8n/di';
|
||||
import { InstanceSettings, Logger } from 'n8n-core';
|
||||
import { ExecutionCancelledError } from 'n8n-workflow';
|
||||
import assert from 'node:assert';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
import config from '@/config';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants';
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||
import { EventService } from '@/events/event.service';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
import { assertNever } from '@/utils';
|
||||
|
||||
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
|
||||
import { QUEUE_NAME } from './constants';
|
||||
import { JobProcessor } from './job-processor';
|
||||
import type {
|
||||
JobQueue,
|
||||
Job,
|
||||
JobData,
|
||||
JobOptions,
|
||||
JobStatus,
|
||||
JobId,
|
||||
QueueRecoveryContext,
|
||||
JobMessage,
|
||||
JobFailedMessage,
|
||||
} from './scaling.types';
|
||||
import type { Job, JobData } from './scaling.types';
|
||||
import { JobProducer } from './job-producer';
|
||||
import { JobQueues } from './job-queues';
|
||||
import { JobRecovery } from './job-recovery';
|
||||
|
||||
@Service()
|
||||
export class ScalingService {
|
||||
private sharedQueue: JobQueue;
|
||||
|
||||
private readonly dedicatedQueues = new Map<string, JobQueue>();
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
private readonly jobProducer: JobProducer,
|
||||
private readonly jobProcessor: JobProcessor,
|
||||
private readonly jobQueues: JobQueues,
|
||||
private readonly jobRecovery: JobRecovery,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly orchestrationService: OrchestrationService,
|
||||
private readonly eventService: EventService,
|
||||
) {
|
||||
this.logger = this.logger.scoped('scaling');
|
||||
}
|
||||
|
||||
// #region Lifecycle
|
||||
|
||||
async setupQueue() {
|
||||
const { default: BullQueue } = await import('bull');
|
||||
const { RedisClientService } = await import('@/services/redis-client.service');
|
||||
const service = Container.get(RedisClientService);
|
||||
|
||||
const bullPrefix = this.globalConfig.queue.bull.prefix;
|
||||
const prefix = service.toValidPrefix(bullPrefix);
|
||||
|
||||
this.sharedQueue = new BullQueue(QUEUE_NAME, {
|
||||
prefix,
|
||||
settings: this.globalConfig.queue.bull.settings,
|
||||
createClient: (type) => service.createClient({ type: `${type}(bull)` }),
|
||||
});
|
||||
|
||||
this.registerListeners();
|
||||
|
||||
const { isLeader, isMultiMain } = this.instanceSettings;
|
||||
|
||||
if (isLeader) this.scheduleQueueRecovery();
|
||||
|
||||
if (isMultiMain) {
|
||||
this.orchestrationService.multiMainSetup
|
||||
.on('leader-takeover', () => this.scheduleQueueRecovery())
|
||||
.on('leader-stepdown', () => this.stopQueueRecovery());
|
||||
async setupQueues() {
|
||||
this.jobQueues.assertQueue('default', QUEUE_NAME);
|
||||
const { dedicatedIds } = this.globalConfig.queue;
|
||||
for (const projectId of dedicatedIds) {
|
||||
this.jobQueues.assertQueue(projectId, `project_${projectId}_jobs`);
|
||||
}
|
||||
|
||||
this.scheduleQueueMetrics();
|
||||
const { isLeader, isMultiMain, isWorker } = this.instanceSettings;
|
||||
|
||||
if (!isWorker) {
|
||||
if (isLeader) this.jobRecovery.schedule();
|
||||
|
||||
if (isMultiMain) {
|
||||
this.orchestrationService.multiMainSetup
|
||||
.on('leader-takeover', () => this.jobRecovery.schedule())
|
||||
.on('leader-stepdown', () => this.jobRecovery.stop());
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug('Queue setup completed');
|
||||
}
|
||||
|
||||
setupWorker(concurrency: number) {
|
||||
this.assertWorker();
|
||||
this.assertQueue();
|
||||
|
||||
void this.sharedQueue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
|
||||
try {
|
||||
if (!this.hasValidJobData(job)) {
|
||||
throw new ApplicationError('Worker received invalid job', {
|
||||
extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) },
|
||||
});
|
||||
}
|
||||
|
||||
await this.jobProcessor.processJob(job);
|
||||
} catch (error) {
|
||||
await this.reportJobProcessingError(ensureError(error), job);
|
||||
}
|
||||
});
|
||||
this.jobProcessor.setup(concurrency);
|
||||
|
||||
this.logger.debug('Worker setup completed');
|
||||
}
|
||||
|
||||
private async reportJobProcessingError(error: Error, job: Job) {
|
||||
const { executionId } = job.data;
|
||||
|
||||
this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, {
|
||||
error,
|
||||
executionId,
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
const msg: JobFailedMessage = {
|
||||
kind: 'job-failed',
|
||||
executionId,
|
||||
workerId: this.instanceSettings.hostId,
|
||||
errorMsg: error.message,
|
||||
errorStack: error.stack ?? '',
|
||||
};
|
||||
|
||||
await job.progress(msg);
|
||||
|
||||
this.errorReporter.error(error, { executionId });
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||
async stop() {
|
||||
const { instanceType } = this.instanceSettings;
|
||||
|
||||
if (instanceType === 'main') await this.stopMain();
|
||||
else if (instanceType === 'worker') await this.stopWorker();
|
||||
}
|
||||
|
||||
private async stopMain() {
|
||||
if (this.instanceSettings.isSingleMain) {
|
||||
await this.sharedQueue.pause(true, true); // no more jobs will be picked up
|
||||
this.logger.debug('Queue paused');
|
||||
}
|
||||
|
||||
if (this.queueRecoveryContext.timeout) this.stopQueueRecovery();
|
||||
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
|
||||
}
|
||||
|
||||
private async stopWorker() {
|
||||
let count = 0;
|
||||
|
||||
while (this.getRunningJobsCount() !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
this.logger.info(
|
||||
`Waiting for ${this.getRunningJobsCount()} active executions to finish...`,
|
||||
);
|
||||
}
|
||||
|
||||
await sleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
async pingQueue() {
|
||||
await this.sharedQueue.client.ping();
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Jobs
|
||||
|
||||
async getPendingJobCounts() {
|
||||
const { active, waiting } = await this.sharedQueue.getJobCounts();
|
||||
|
||||
return { active, waiting };
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a job to the queue.
|
||||
*
|
||||
* @param jobData Data of the job to add to the queue.
|
||||
* @param priority Priority of the job, from `1` (highest) to `MAX_SAFE_INTEGER` (lowest).
|
||||
*/
|
||||
async addJob(jobData: JobData, { priority }: { priority: number }) {
|
||||
strict(priority > 0 && priority <= Number.MAX_SAFE_INTEGER);
|
||||
|
||||
const jobOptions: JobOptions = {
|
||||
priority,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
};
|
||||
|
||||
const queue = this.dedicatedQueues.get(jobData.projectId!) ?? this.sharedQueue;
|
||||
|
||||
const job = await queue.add(JOB_TYPE_NAME, jobData, jobOptions);
|
||||
|
||||
const { executionId } = jobData;
|
||||
const jobId = job.id;
|
||||
|
||||
this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId });
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
async getJob(jobId: JobId) {
|
||||
// TODO: keep a jobId -> projectId mapping in redis
|
||||
// TODO: use the project specific queue if available
|
||||
return await this.sharedQueue.getJob(jobId);
|
||||
}
|
||||
|
||||
async findJobsByStatus(statuses: JobStatus[]) {
|
||||
// TODO: keep a jobId -> projectId mapping in redis
|
||||
// TODO: use the project specific queue if available
|
||||
const jobs = await this.sharedQueue.getJobs(statuses);
|
||||
|
||||
return jobs.filter((job) => job !== null);
|
||||
async addJob(jobData: JobData, options: { priority: number }) {
|
||||
return await this.jobProducer.addJob(jobData, options);
|
||||
}
|
||||
|
||||
async stopJob(job: Job) {
|
||||
|
@ -248,273 +100,5 @@ export class ScalingService {
|
|||
}
|
||||
}
|
||||
|
||||
getRunningJobsCount() {
|
||||
return this.jobProcessor.getRunningJobIds().length;
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Listeners
|
||||
|
||||
private registerListeners() {
|
||||
const { instanceType } = this.instanceSettings;
|
||||
if (instanceType === 'main' || instanceType === 'webhook') {
|
||||
this.registerMainOrWebhookListeners();
|
||||
} else if (instanceType === 'worker') {
|
||||
this.registerWorkerListeners();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register listeners on a `worker` process for Bull queue events.
|
||||
*/
|
||||
private registerWorkerListeners() {
|
||||
this.sharedQueue.on('global:progress', (jobId: JobId, msg: unknown) => {
|
||||
if (!this.isJobMessage(msg)) return;
|
||||
|
||||
if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId);
|
||||
});
|
||||
|
||||
this.sharedQueue.on('error', (error: Error) => {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
|
||||
|
||||
/**
|
||||
* Non-recoverable error on worker start with Redis unavailable.
|
||||
* Even if Redis recovers, worker will remain unable to process jobs.
|
||||
*/
|
||||
if (error.message.includes('Error initializing Lua scripts')) {
|
||||
this.logger.error('Fatal error initializing worker', { error });
|
||||
this.logger.error('Exiting process...');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
this.logger.error('Queue errored', { error });
|
||||
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register listeners on a `main` or `webhook` process for Bull queue events.
|
||||
*/
|
||||
private registerMainOrWebhookListeners() {
|
||||
this.sharedQueue.on('error', (error: Error) => {
|
||||
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
|
||||
|
||||
this.logger.error('Queue errored', { error });
|
||||
|
||||
throw error;
|
||||
});
|
||||
|
||||
this.sharedQueue.on('global:progress', (jobId: JobId, msg: unknown) => {
|
||||
if (!this.isJobMessage(msg)) return;
|
||||
|
||||
// completion and failure are reported via `global:progress` to convey more details
|
||||
// than natively provided by Bull in `global:completed` and `global:failed` events
|
||||
|
||||
switch (msg.kind) {
|
||||
case 'respond-to-webhook':
|
||||
const decodedResponse = this.decodeWebhookResponse(msg.response);
|
||||
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
|
||||
break;
|
||||
case 'job-finished':
|
||||
this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, {
|
||||
workerId: msg.workerId,
|
||||
executionId: msg.executionId,
|
||||
jobId,
|
||||
});
|
||||
break;
|
||||
case 'job-failed':
|
||||
this.logger.error(
|
||||
[
|
||||
`Execution ${msg.executionId} (job ${jobId}) failed`,
|
||||
msg.errorStack ? `\n${msg.errorStack}\n` : '',
|
||||
].join(''),
|
||||
{
|
||||
workerId: msg.workerId,
|
||||
errorMsg: msg.errorMsg,
|
||||
executionId: msg.executionId,
|
||||
jobId,
|
||||
},
|
||||
);
|
||||
break;
|
||||
case 'abort-job':
|
||||
break; // only for worker
|
||||
default:
|
||||
assertNever(msg);
|
||||
}
|
||||
});
|
||||
|
||||
if (this.isQueueMetricsEnabled) {
|
||||
this.sharedQueue.on('global:completed', () => this.jobCounters.completed++);
|
||||
this.sharedQueue.on('global:failed', () => this.jobCounters.failed++);
|
||||
}
|
||||
}
|
||||
|
||||
/** Whether the argument is a message sent via Bull's internal pubsub setup. */
|
||||
private isJobMessage(candidate: unknown): candidate is JobMessage {
|
||||
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
private decodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
if (
|
||||
typeof response === 'object' &&
|
||||
typeof response.body === 'object' &&
|
||||
response.body !== null &&
|
||||
'__@N8nEncodedBuffer@__' in response.body &&
|
||||
typeof response.body['__@N8nEncodedBuffer@__'] === 'string'
|
||||
) {
|
||||
response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private assertQueue() {
|
||||
if (this.sharedQueue) return;
|
||||
|
||||
throw new ApplicationError('This method must be called after `setupQueue`');
|
||||
}
|
||||
|
||||
private assertWorker() {
|
||||
if (this.instanceSettings.instanceType === 'worker') return;
|
||||
|
||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||
}
|
||||
|
||||
// #region Queue metrics
|
||||
|
||||
/** Counters for completed and failed jobs, reset on each interval tick. */
|
||||
private readonly jobCounters = { completed: 0, failed: 0 };
|
||||
|
||||
/** Interval for collecting queue metrics to expose via Prometheus. */
|
||||
private queueMetricsInterval: NodeJS.Timer | undefined;
|
||||
|
||||
get isQueueMetricsEnabled() {
|
||||
return (
|
||||
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||
this.instanceSettings.instanceType === 'main' &&
|
||||
this.instanceSettings.isSingleMain
|
||||
);
|
||||
}
|
||||
|
||||
/** Set up an interval to collect queue metrics and emit them in an event. */
|
||||
private scheduleQueueMetrics() {
|
||||
if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return;
|
||||
|
||||
this.queueMetricsInterval = setInterval(async () => {
|
||||
const pendingJobCounts = await this.getPendingJobCounts();
|
||||
|
||||
this.eventService.emit('job-counts-updated', {
|
||||
...pendingJobCounts, // active, waiting
|
||||
...this.jobCounters, // completed, failed
|
||||
});
|
||||
|
||||
this.jobCounters.completed = 0;
|
||||
this.jobCounters.failed = 0;
|
||||
}, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds);
|
||||
}
|
||||
|
||||
/** Stop collecting queue metrics. */
|
||||
private stopQueueMetrics() {
|
||||
if (this.queueMetricsInterval) {
|
||||
clearInterval(this.queueMetricsInterval);
|
||||
this.queueMetricsInterval = undefined;
|
||||
|
||||
this.logger.debug('Queue metrics collection stopped');
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Queue recovery
|
||||
|
||||
private readonly queueRecoveryContext: QueueRecoveryContext = {
|
||||
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
|
||||
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
|
||||
};
|
||||
|
||||
private scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) {
|
||||
this.queueRecoveryContext.timeout = setTimeout(async () => {
|
||||
try {
|
||||
const nextWaitMs = await this.recoverFromQueue();
|
||||
this.scheduleQueueRecovery(nextWaitMs);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to recover dangling executions from queue', {
|
||||
msg: this.toErrorMsg(error),
|
||||
});
|
||||
this.logger.error('Retrying...');
|
||||
|
||||
this.scheduleQueueRecovery();
|
||||
}
|
||||
}, waitMs);
|
||||
|
||||
const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' ');
|
||||
|
||||
this.logger.debug(`Scheduled queue recovery check for next ${wait}`);
|
||||
}
|
||||
|
||||
private stopQueueRecovery() {
|
||||
clearTimeout(this.queueRecoveryContext.timeout);
|
||||
|
||||
this.logger.debug('Queue recovery stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
||||
* but absent from the queue. Return time until next recovery cycle.
|
||||
*/
|
||||
private async recoverFromQueue() {
|
||||
const { waitMs, batchSize } = this.queueRecoveryContext;
|
||||
|
||||
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
||||
|
||||
if (storedIds.length === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const runningJobs = await this.findJobsByStatus(['active', 'waiting']);
|
||||
|
||||
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
|
||||
|
||||
if (queuedIds.size === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
||||
|
||||
if (danglingIds.length === 0) {
|
||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
await this.executionRepository.markAsCrashed(danglingIds);
|
||||
|
||||
this.logger.info('Completed queue recovery check, recovered dangling executions', {
|
||||
danglingIds,
|
||||
});
|
||||
|
||||
// if this cycle used up the whole batch size, it is possible for there to be
|
||||
// dangling executions outside this check, so speed up next cycle
|
||||
|
||||
return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs;
|
||||
}
|
||||
|
||||
private toErrorMsg(error: unknown) {
|
||||
return error instanceof Error
|
||||
? error.message
|
||||
: jsonStringify(error, { replaceCircularRefs: true });
|
||||
}
|
||||
|
||||
private hasValidJobData(job: Job) {
|
||||
return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data;
|
||||
}
|
||||
|
||||
// #endregion
|
||||
}
|
||||
|
|
|
@ -221,7 +221,7 @@ export class Server extends AbstractServer {
|
|||
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||
await Container.get(ScalingService).setupQueue();
|
||||
await Container.get(ScalingService).setupQueues();
|
||||
}
|
||||
|
||||
await handleMfaDisable();
|
||||
|
|
|
@ -55,7 +55,7 @@ test('worker initializes all its components', async () => {
|
|||
expect(externalHooks.init).toHaveBeenCalledTimes(1);
|
||||
expect(externalSecretsManager.init).toHaveBeenCalledTimes(1);
|
||||
expect(messageEventBus.initialize).toHaveBeenCalledTimes(1);
|
||||
expect(scalingService.setupQueue).toHaveBeenCalledTimes(1);
|
||||
expect(scalingService.setupQueues).toHaveBeenCalledTimes(1);
|
||||
expect(scalingService.setupWorker).toHaveBeenCalledTimes(1);
|
||||
expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1);
|
||||
expect(orchestrationService.init).toHaveBeenCalledTimes(1);
|
||||
|
|
Loading…
Reference in a new issue