fix(core): Add check that queue is defined and remove cyclic dependency (#7404)

In a rare edge case an undefined queue could be returned - this should
not happen and now an error is thrown.
Also using the opportunity to remove a cyclic dependency from the Queue.
This commit is contained in:
Michael Auerswald 2023-10-13 11:53:59 +02:00 committed by GitHub
parent 609f0837cf
commit 45f2ef373e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 33 deletions

View file

@ -2,7 +2,8 @@ import type Bull from 'bull';
import { Service } from 'typedi';
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
import {
getRedisClusterClient,
getRedisClusterNodes,
@ -62,7 +63,7 @@ export class Queue {
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
this.activeExecutions.resolveResponsePromise(
progress.executionId,
WebhookHelpers.decodeWebhookResponse(progress.response),
decodeWebhookResponse(progress.response),
);
});
}
@ -79,7 +80,23 @@ export class Queue {
return this.jobQueue.getJobs(jobTypes);
}
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
return this.jobQueue.process(concurrency, fn);
}
async ping(): Promise<string> {
return this.jobQueue.client.ping();
}
async pause(isLocal?: boolean): Promise<void> {
return this.jobQueue.pause(isLocal);
}
getBullObjectInstance(): JobQueue {
if (this.jobQueue === undefined) {
// if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue
throw new Error('Queue is not initialized yet!');
}
return this.jobQueue;
}

View file

@ -160,23 +160,6 @@ export function getWorkflowWebhooks(
return returnData;
}
export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}
return response;
}
export function encodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {

View file

@ -37,10 +37,10 @@ import type {
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue';
import type { Job, JobData, JobResponse } from '@/Queue';
// eslint-disable-next-line import/no-cycle
import { Queue } from '@/Queue';
import * as WebhookHelpers from '@/WebhookHelpers';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
// eslint-disable-next-line import/no-cycle
import * as WorkflowHelpers from '@/WorkflowHelpers';
// eslint-disable-next-line import/no-cycle
@ -60,7 +60,7 @@ export class WorkflowRunner {
push: Push;
jobQueue: JobQueue;
jobQueue: Queue;
constructor() {
this.push = Container.get(Push);
@ -172,8 +172,7 @@ export class WorkflowRunner {
await initErrorHandling();
if (executionsMode === 'queue') {
const queue = Container.get(Queue);
this.jobQueue = queue.getBullObjectInstance();
this.jobQueue = Container.get(Queue);
}
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
@ -733,7 +732,7 @@ export class WorkflowRunner {
this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') {
if (responsePromise) {
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response));
responsePromise.resolve(decodeWebhookResponse(message.data.response));
}
} else if (message.type === 'sendDataToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call

View file

@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import config from '@/config';
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants';
@ -61,7 +61,7 @@ export class Worker extends BaseCommand {
[jobId: string]: WorkerJobStatusSummary;
} = {};
static jobQueue: JobQueue;
static jobQueue: Queue;
redisSubscriber: RedisServicePubSubSubscriber;
@ -335,15 +335,14 @@ export class Worker extends BaseCommand {
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
);
const queue = Container.get(Queue);
await queue.init();
Worker.jobQueue = Container.get(Queue);
await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready');
Worker.jobQueue = queue.getBullObjectInstance();
void Worker.jobQueue.process(flags.concurrency, async (job) =>
this.runJob(job, this.nodeTypes),
);
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
// Progress of a job got updated which does get used
// to communicate that a job got canceled.
@ -359,7 +358,7 @@ export class Worker extends BaseCommand {
let lastTimer = 0;
let cumulativeTimeout = 0;
Worker.jobQueue.on('error', (error: Error) => {
Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => {
if (error.toString().includes('ECONNREFUSED')) {
const now = Date.now();
if (now - lastTimer > 30000) {
@ -423,7 +422,7 @@ export class Worker extends BaseCommand {
// if it loses the connection to redis
try {
// Redis ping
await Worker.jobQueue.client.ping();
await Worker.jobQueue.ping();
} catch (e) {
LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');

View file

@ -0,0 +1,18 @@
import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow';
export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}
return response;
}