mirror of
https://github.com/n8n-io/n8n.git
synced 2025-02-02 07:01:30 -08:00
refactor(core): Suppress MaxListenersExceededWarning
in the logs (#10077)
This commit is contained in:
parent
48f047ee2e
commit
3bbeae47f3
|
@ -35,6 +35,24 @@ export interface WebhookResponse {
|
||||||
export class Queue {
|
export class Queue {
|
||||||
private jobQueue: JobQueue;
|
private jobQueue: JobQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of jobs a single server can process concurrently
|
||||||
|
* Any worker that wants to process executions must first set this to a non-zero value
|
||||||
|
*/
|
||||||
|
private concurrency = 0;
|
||||||
|
|
||||||
|
setConcurrency(concurrency: number) {
|
||||||
|
this.concurrency = concurrency;
|
||||||
|
// This sets the max event listeners on the jobQueue EventEmitter to prevent the logs getting flooded with MaxListenersExceededWarning
|
||||||
|
// see: https://github.com/OptimalBits/bull/blob/develop/lib/job.js#L497-L521
|
||||||
|
this.jobQueue.setMaxListeners(
|
||||||
|
4 + // `close`
|
||||||
|
2 + // `error`
|
||||||
|
2 + // `global:progress`
|
||||||
|
concurrency * 2, // 2 global events for every call to `job.finished()`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
constructor(private activeExecutions: ActiveExecutions) {}
|
constructor(private activeExecutions: ActiveExecutions) {}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
|
@ -102,8 +120,8 @@ export class Queue {
|
||||||
return new Set(inProgressJobs.map((job) => job.data.executionId));
|
return new Set(inProgressJobs.map((job) => job.data.executionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
async process(fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
||||||
return await this.jobQueue.process(concurrency, fn);
|
return await this.jobQueue.process(this.concurrency, fn);
|
||||||
}
|
}
|
||||||
|
|
||||||
async ping(): Promise<string> {
|
async ping(): Promise<string> {
|
||||||
|
|
|
@ -320,11 +320,9 @@ export class Worker extends BaseCommand {
|
||||||
|
|
||||||
const envConcurrency = config.getEnv('executions.concurrency.productionLimit');
|
const envConcurrency = config.getEnv('executions.concurrency.productionLimit');
|
||||||
const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;
|
const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;
|
||||||
|
Worker.jobQueue.setConcurrency(concurrency);
|
||||||
|
|
||||||
void Worker.jobQueue.process(
|
void Worker.jobQueue.process(async (job) => await this.runJob(job, this.nodeTypes));
|
||||||
concurrency,
|
|
||||||
async (job) => await this.runJob(job, this.nodeTypes),
|
|
||||||
);
|
|
||||||
|
|
||||||
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
|
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
|
||||||
// Progress of a job got updated which does get used
|
// Progress of a job got updated which does get used
|
||||||
|
|
Loading…
Reference in a new issue