fix(core): Add check that queue is defined and remove cyclic dependency (#7404)

In a rare edge case an undefined queue could be returned - this should
not happen and now an error is thrown.
Also using the opportunity to remove a cyclic dependency from the Queue.
This commit is contained in:
Michael Auerswald 2023-10-13 11:53:59 +02:00 committed by GitHub
parent 609f0837cf
commit 45f2ef373e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 33 deletions

View file

@ -2,7 +2,8 @@ import type Bull from 'bull';
import { Service } from 'typedi'; import { Service } from 'typedi';
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow'; import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers'; import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
import { import {
getRedisClusterClient, getRedisClusterClient,
getRedisClusterNodes, getRedisClusterNodes,
@ -62,7 +63,7 @@ export class Queue {
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
this.activeExecutions.resolveResponsePromise( this.activeExecutions.resolveResponsePromise(
progress.executionId, progress.executionId,
WebhookHelpers.decodeWebhookResponse(progress.response), decodeWebhookResponse(progress.response),
); );
}); });
} }
@ -79,7 +80,23 @@ export class Queue {
return this.jobQueue.getJobs(jobTypes); return this.jobQueue.getJobs(jobTypes);
} }
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
return this.jobQueue.process(concurrency, fn);
}
async ping(): Promise<string> {
return this.jobQueue.client.ping();
}
async pause(isLocal?: boolean): Promise<void> {
return this.jobQueue.pause(isLocal);
}
getBullObjectInstance(): JobQueue { getBullObjectInstance(): JobQueue {
if (this.jobQueue === undefined) {
// if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue
throw new Error('Queue is not initialized yet!');
}
return this.jobQueue; return this.jobQueue;
} }

View file

@ -160,23 +160,6 @@ export function getWorkflowWebhooks(
return returnData; return returnData;
} }
export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}
return response;
}
export function encodeWebhookResponse( export function encodeWebhookResponse(
response: IExecuteResponsePromiseData, response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData { ): IExecuteResponsePromiseData {

View file

@ -37,10 +37,10 @@ import type {
IWorkflowExecutionDataProcessWithExecution, IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces'; } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue'; import type { Job, JobData, JobResponse } from '@/Queue';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { Queue } from '@/Queue'; import { Queue } from '@/Queue';
import * as WebhookHelpers from '@/WebhookHelpers'; import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
@ -60,7 +60,7 @@ export class WorkflowRunner {
push: Push; push: Push;
jobQueue: JobQueue; jobQueue: Queue;
constructor() { constructor() {
this.push = Container.get(Push); this.push = Container.get(Push);
@ -172,8 +172,7 @@ export class WorkflowRunner {
await initErrorHandling(); await initErrorHandling();
if (executionsMode === 'queue') { if (executionsMode === 'queue') {
const queue = Container.get(Queue); this.jobQueue = Container.get(Queue);
this.jobQueue = queue.getBullObjectInstance();
} }
if (executionsMode === 'queue' && data.executionMode !== 'manual') { if (executionsMode === 'queue' && data.executionMode !== 'manual') {
@ -733,7 +732,7 @@ export class WorkflowRunner {
this.activeExecutions.remove(executionId, message.data.runData); this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') { } else if (message.type === 'sendResponse') {
if (responsePromise) { if (responsePromise) {
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response)); responsePromise.resolve(decodeWebhookResponse(message.data.response));
} }
} else if (message.type === 'sendDataToUI') { } else if (message.type === 'sendDataToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call // eslint-disable-next-line @typescript-eslint/no-unsafe-call

View file

@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import config from '@/config'; import config from '@/config';
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue'; import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue';
import { Queue } from '@/Queue'; import { Queue } from '@/Queue';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants'; import { N8N_VERSION } from '@/constants';
@ -61,7 +61,7 @@ export class Worker extends BaseCommand {
[jobId: string]: WorkerJobStatusSummary; [jobId: string]: WorkerJobStatusSummary;
} = {}; } = {};
static jobQueue: JobQueue; static jobQueue: Queue;
redisSubscriber: RedisServicePubSubSubscriber; redisSubscriber: RedisServicePubSubSubscriber;
@ -335,15 +335,14 @@ export class Worker extends BaseCommand {
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`, `Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
); );
const queue = Container.get(Queue); Worker.jobQueue = Container.get(Queue);
await queue.init(); await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready'); this.logger.debug('Queue singleton ready');
Worker.jobQueue = queue.getBullObjectInstance();
void Worker.jobQueue.process(flags.concurrency, async (job) => void Worker.jobQueue.process(flags.concurrency, async (job) =>
this.runJob(job, this.nodeTypes), this.runJob(job, this.nodeTypes),
); );
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => { Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
// Progress of a job got updated which does get used // Progress of a job got updated which does get used
// to communicate that a job got canceled. // to communicate that a job got canceled.
@ -359,7 +358,7 @@ export class Worker extends BaseCommand {
let lastTimer = 0; let lastTimer = 0;
let cumulativeTimeout = 0; let cumulativeTimeout = 0;
Worker.jobQueue.on('error', (error: Error) => { Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => {
if (error.toString().includes('ECONNREFUSED')) { if (error.toString().includes('ECONNREFUSED')) {
const now = Date.now(); const now = Date.now();
if (now - lastTimer > 30000) { if (now - lastTimer > 30000) {
@ -423,7 +422,7 @@ export class Worker extends BaseCommand {
// if it loses the connection to redis // if it loses the connection to redis
try { try {
// Redis ping // Redis ping
await Worker.jobQueue.client.ping(); await Worker.jobQueue.ping();
} catch (e) { } catch (e) {
LoggerProxy.error('No Redis connection!', e as Error); LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');

View file

@ -0,0 +1,18 @@
import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow';
export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}
return response;
}