mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-25 12:44:07 -08:00
fix(core): Add check that queue is defined and remove cyclic dependency (#7404)
In a rare edge case an undefined queue could be returned - this should not happen and now an error is thrown. Also using the opportunity to remove a cyclic dependency from the Queue.
This commit is contained in:
parent
609f0837cf
commit
45f2ef373e
|
@ -2,7 +2,8 @@ import type Bull from 'bull';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
|
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
getRedisClusterClient,
|
getRedisClusterClient,
|
||||||
getRedisClusterNodes,
|
getRedisClusterNodes,
|
||||||
|
@ -62,7 +63,7 @@ export class Queue {
|
||||||
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
|
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
|
||||||
this.activeExecutions.resolveResponsePromise(
|
this.activeExecutions.resolveResponsePromise(
|
||||||
progress.executionId,
|
progress.executionId,
|
||||||
WebhookHelpers.decodeWebhookResponse(progress.response),
|
decodeWebhookResponse(progress.response),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -79,7 +80,23 @@ export class Queue {
|
||||||
return this.jobQueue.getJobs(jobTypes);
|
return this.jobQueue.getJobs(jobTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
||||||
|
return this.jobQueue.process(concurrency, fn);
|
||||||
|
}
|
||||||
|
|
||||||
|
async ping(): Promise<string> {
|
||||||
|
return this.jobQueue.client.ping();
|
||||||
|
}
|
||||||
|
|
||||||
|
async pause(isLocal?: boolean): Promise<void> {
|
||||||
|
return this.jobQueue.pause(isLocal);
|
||||||
|
}
|
||||||
|
|
||||||
getBullObjectInstance(): JobQueue {
|
getBullObjectInstance(): JobQueue {
|
||||||
|
if (this.jobQueue === undefined) {
|
||||||
|
// if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue
|
||||||
|
throw new Error('Queue is not initialized yet!');
|
||||||
|
}
|
||||||
return this.jobQueue;
|
return this.jobQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,23 +160,6 @@ export function getWorkflowWebhooks(
|
||||||
return returnData;
|
return returnData;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function decodeWebhookResponse(
|
|
||||||
response: IExecuteResponsePromiseData,
|
|
||||||
): IExecuteResponsePromiseData {
|
|
||||||
if (
|
|
||||||
typeof response === 'object' &&
|
|
||||||
typeof response.body === 'object' &&
|
|
||||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
|
|
||||||
) {
|
|
||||||
response.body = Buffer.from(
|
|
||||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
|
|
||||||
BINARY_ENCODING,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function encodeWebhookResponse(
|
export function encodeWebhookResponse(
|
||||||
response: IExecuteResponsePromiseData,
|
response: IExecuteResponsePromiseData,
|
||||||
): IExecuteResponsePromiseData {
|
): IExecuteResponsePromiseData {
|
||||||
|
|
|
@ -37,10 +37,10 @@ import type {
|
||||||
IWorkflowExecutionDataProcessWithExecution,
|
IWorkflowExecutionDataProcessWithExecution,
|
||||||
} from '@/Interfaces';
|
} from '@/Interfaces';
|
||||||
import { NodeTypes } from '@/NodeTypes';
|
import { NodeTypes } from '@/NodeTypes';
|
||||||
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue';
|
import type { Job, JobData, JobResponse } from '@/Queue';
|
||||||
// eslint-disable-next-line import/no-cycle
|
// eslint-disable-next-line import/no-cycle
|
||||||
import { Queue } from '@/Queue';
|
import { Queue } from '@/Queue';
|
||||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
|
||||||
// eslint-disable-next-line import/no-cycle
|
// eslint-disable-next-line import/no-cycle
|
||||||
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
||||||
// eslint-disable-next-line import/no-cycle
|
// eslint-disable-next-line import/no-cycle
|
||||||
|
@ -60,7 +60,7 @@ export class WorkflowRunner {
|
||||||
|
|
||||||
push: Push;
|
push: Push;
|
||||||
|
|
||||||
jobQueue: JobQueue;
|
jobQueue: Queue;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.push = Container.get(Push);
|
this.push = Container.get(Push);
|
||||||
|
@ -172,8 +172,7 @@ export class WorkflowRunner {
|
||||||
await initErrorHandling();
|
await initErrorHandling();
|
||||||
|
|
||||||
if (executionsMode === 'queue') {
|
if (executionsMode === 'queue') {
|
||||||
const queue = Container.get(Queue);
|
this.jobQueue = Container.get(Queue);
|
||||||
this.jobQueue = queue.getBullObjectInstance();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
|
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
|
||||||
|
@ -733,7 +732,7 @@ export class WorkflowRunner {
|
||||||
this.activeExecutions.remove(executionId, message.data.runData);
|
this.activeExecutions.remove(executionId, message.data.runData);
|
||||||
} else if (message.type === 'sendResponse') {
|
} else if (message.type === 'sendResponse') {
|
||||||
if (responsePromise) {
|
if (responsePromise) {
|
||||||
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response));
|
responsePromise.resolve(decodeWebhookResponse(message.data.response));
|
||||||
}
|
}
|
||||||
} else if (message.type === 'sendDataToUI') {
|
} else if (message.type === 'sendDataToUI') {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
||||||
|
|
|
@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
|
||||||
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
|
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
|
import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue';
|
||||||
import { Queue } from '@/Queue';
|
import { Queue } from '@/Queue';
|
||||||
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
||||||
import { N8N_VERSION } from '@/constants';
|
import { N8N_VERSION } from '@/constants';
|
||||||
|
@ -61,7 +61,7 @@ export class Worker extends BaseCommand {
|
||||||
[jobId: string]: WorkerJobStatusSummary;
|
[jobId: string]: WorkerJobStatusSummary;
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
static jobQueue: JobQueue;
|
static jobQueue: Queue;
|
||||||
|
|
||||||
redisSubscriber: RedisServicePubSubSubscriber;
|
redisSubscriber: RedisServicePubSubSubscriber;
|
||||||
|
|
||||||
|
@ -335,15 +335,14 @@ export class Worker extends BaseCommand {
|
||||||
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
|
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
const queue = Container.get(Queue);
|
Worker.jobQueue = Container.get(Queue);
|
||||||
await queue.init();
|
await Worker.jobQueue.init();
|
||||||
this.logger.debug('Queue singleton ready');
|
this.logger.debug('Queue singleton ready');
|
||||||
Worker.jobQueue = queue.getBullObjectInstance();
|
|
||||||
void Worker.jobQueue.process(flags.concurrency, async (job) =>
|
void Worker.jobQueue.process(flags.concurrency, async (job) =>
|
||||||
this.runJob(job, this.nodeTypes),
|
this.runJob(job, this.nodeTypes),
|
||||||
);
|
);
|
||||||
|
|
||||||
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
|
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
|
||||||
// Progress of a job got updated which does get used
|
// Progress of a job got updated which does get used
|
||||||
// to communicate that a job got canceled.
|
// to communicate that a job got canceled.
|
||||||
|
|
||||||
|
@ -359,7 +358,7 @@ export class Worker extends BaseCommand {
|
||||||
|
|
||||||
let lastTimer = 0;
|
let lastTimer = 0;
|
||||||
let cumulativeTimeout = 0;
|
let cumulativeTimeout = 0;
|
||||||
Worker.jobQueue.on('error', (error: Error) => {
|
Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => {
|
||||||
if (error.toString().includes('ECONNREFUSED')) {
|
if (error.toString().includes('ECONNREFUSED')) {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
if (now - lastTimer > 30000) {
|
if (now - lastTimer > 30000) {
|
||||||
|
@ -423,7 +422,7 @@ export class Worker extends BaseCommand {
|
||||||
// if it loses the connection to redis
|
// if it loses the connection to redis
|
||||||
try {
|
try {
|
||||||
// Redis ping
|
// Redis ping
|
||||||
await Worker.jobQueue.client.ping();
|
await Worker.jobQueue.ping();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
LoggerProxy.error('No Redis connection!', e as Error);
|
LoggerProxy.error('No Redis connection!', e as Error);
|
||||||
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
|
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
|
||||||
|
|
18
packages/cli/src/helpers/decodeWebhookResponse.ts
Normal file
18
packages/cli/src/helpers/decodeWebhookResponse.ts
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||||
|
|
||||||
|
export function decodeWebhookResponse(
|
||||||
|
response: IExecuteResponsePromiseData,
|
||||||
|
): IExecuteResponsePromiseData {
|
||||||
|
if (
|
||||||
|
typeof response === 'object' &&
|
||||||
|
typeof response.body === 'object' &&
|
||||||
|
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
|
||||||
|
) {
|
||||||
|
response.body = Buffer.from(
|
||||||
|
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
|
||||||
|
BINARY_ENCODING,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
}
|
Loading…
Reference in a new issue