diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index a3fe4b82dd..dda0d64f02 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -2,7 +2,8 @@ import type Bull from 'bull'; import { Service } from 'typedi'; import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse'; + import { getRedisClusterClient, getRedisClusterNodes, @@ -62,7 +63,7 @@ export class Queue { this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { this.activeExecutions.resolveResponsePromise( progress.executionId, - WebhookHelpers.decodeWebhookResponse(progress.response), + decodeWebhookResponse(progress.response), ); }); } @@ -79,7 +80,23 @@ export class Queue { return this.jobQueue.getJobs(jobTypes); } + async process(concurrency: number, fn: Bull.ProcessCallbackFunction): Promise { + return this.jobQueue.process(concurrency, fn); + } + + async ping(): Promise { + return this.jobQueue.client.ping(); + } + + async pause(isLocal?: boolean): Promise { + return this.jobQueue.pause(isLocal); + } + getBullObjectInstance(): JobQueue { + if (this.jobQueue === undefined) { + // if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue + throw new Error('Queue is not initialized yet!'); + } return this.jobQueue; } diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index cea4ae21af..2c8d75edaf 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -160,23 +160,6 @@ export function getWorkflowWebhooks( return returnData; } -export function decodeWebhookResponse( - response: IExecuteResponsePromiseData, -): IExecuteResponsePromiseData { - if ( - typeof response === 'object' && - typeof response.body === 'object' && - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] - ) { - response.body = Buffer.from( - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string, - BINARY_ENCODING, - ); - } - - return response; -} - export function encodeWebhookResponse( response: IExecuteResponsePromiseData, ): IExecuteResponsePromiseData { diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index cbfc8fb356..448f7c7f5e 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -37,10 +37,10 @@ import type { IWorkflowExecutionDataProcessWithExecution, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import type { Job, JobData, JobQueue, JobResponse } from '@/Queue'; +import type { Job, JobData, JobResponse } from '@/Queue'; // eslint-disable-next-line import/no-cycle import { Queue } from '@/Queue'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse'; // eslint-disable-next-line import/no-cycle import * as WorkflowHelpers from '@/WorkflowHelpers'; // eslint-disable-next-line import/no-cycle @@ -60,7 +60,7 @@ export class WorkflowRunner { push: Push; - jobQueue: JobQueue; + jobQueue: Queue; constructor() { this.push = Container.get(Push); @@ -172,8 +172,7 @@ export class WorkflowRunner { await initErrorHandling(); if (executionsMode === 'queue') { - const queue = Container.get(Queue); - this.jobQueue = queue.getBullObjectInstance(); + this.jobQueue = Container.get(Queue); } if (executionsMode === 'queue' && data.executionMode !== 'manual') { @@ -733,7 +732,7 @@ export class WorkflowRunner { this.activeExecutions.remove(executionId, message.data.runData); } else if (message.type === 'sendResponse') { if (responsePromise) { - responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response)); + responsePromise.resolve(decodeWebhookResponse(message.data.response)); } } else if (message.type === 'sendDataToUI') { // eslint-disable-next-line @typescript-eslint/no-unsafe-call diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 9fc960d02f..1f84a50ef6 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData' import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import config from '@/config'; -import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue'; +import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue'; import { Queue } from '@/Queue'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { N8N_VERSION } from '@/constants'; @@ -61,7 +61,7 @@ export class Worker extends BaseCommand { [jobId: string]: WorkerJobStatusSummary; } = {}; - static jobQueue: JobQueue; + static jobQueue: Queue; redisSubscriber: RedisServicePubSubSubscriber; @@ -335,15 +335,14 @@ export class Worker extends BaseCommand { `Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`, ); - const queue = Container.get(Queue); - await queue.init(); + Worker.jobQueue = Container.get(Queue); + await Worker.jobQueue.init(); this.logger.debug('Queue singleton ready'); - Worker.jobQueue = queue.getBullObjectInstance(); void Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, this.nodeTypes), ); - Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => { + Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => { // Progress of a job got updated which does get used // to communicate that a job got canceled. @@ -359,7 +358,7 @@ export class Worker extends BaseCommand { let lastTimer = 0; let cumulativeTimeout = 0; - Worker.jobQueue.on('error', (error: Error) => { + Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => { if (error.toString().includes('ECONNREFUSED')) { const now = Date.now(); if (now - lastTimer > 30000) { @@ -423,7 +422,7 @@ export class Worker extends BaseCommand { // if it loses the connection to redis try { // Redis ping - await Worker.jobQueue.client.ping(); + await Worker.jobQueue.ping(); } catch (e) { LoggerProxy.error('No Redis connection!', e as Error); const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); diff --git a/packages/cli/src/helpers/decodeWebhookResponse.ts b/packages/cli/src/helpers/decodeWebhookResponse.ts new file mode 100644 index 0000000000..dd0e464b5e --- /dev/null +++ b/packages/cli/src/helpers/decodeWebhookResponse.ts @@ -0,0 +1,18 @@ +import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow'; + +export function decodeWebhookResponse( + response: IExecuteResponsePromiseData, +): IExecuteResponsePromiseData { + if ( + typeof response === 'object' && + typeof response.body === 'object' && + (response.body as IDataObject)['__@N8nEncodedBuffer@__'] + ) { + response.body = Buffer.from( + (response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string, + BINARY_ENCODING, + ); + } + + return response; +}