2022-04-08 14:32:08 -07:00
|
|
|
import Bull from 'bull';
|
2022-09-09 06:14:49 -07:00
|
|
|
import { IExecuteResponsePromiseData } from 'n8n-workflow';
|
2022-11-09 06:25:00 -08:00
|
|
|
import config from '@/config';
|
|
|
|
import * as ActiveExecutions from '@/ActiveExecutions';
|
|
|
|
import * as WebhookHelpers from '@/WebhookHelpers';
|
2021-02-09 14:32:40 -08:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
export type Job = Bull.Job<JobData>;
|
|
|
|
export type JobQueue = Bull.Queue<JobData>;
|
|
|
|
|
|
|
|
export interface JobData {
|
|
|
|
executionId: string;
|
|
|
|
loadStaticData: boolean;
|
|
|
|
}
|
|
|
|
|
|
|
|
export interface JobResponse {
|
|
|
|
success: boolean;
|
|
|
|
}
|
|
|
|
|
|
|
|
export interface WebhookResponse {
|
|
|
|
executionId: string;
|
|
|
|
response: IExecuteResponsePromiseData;
|
|
|
|
}
|
|
|
|
|
2021-02-09 14:32:40 -08:00
|
|
|
export class Queue {
|
2021-11-05 09:45:51 -07:00
|
|
|
private activeExecutions: ActiveExecutions.ActiveExecutions;
|
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
private jobQueue: JobQueue;
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2021-02-09 14:32:40 -08:00
|
|
|
constructor() {
|
2021-11-05 09:45:51 -07:00
|
|
|
this.activeExecutions = ActiveExecutions.getInstance();
|
|
|
|
|
2022-04-08 10:37:27 -07:00
|
|
|
const prefix = config.getEnv('queue.bull.prefix');
|
|
|
|
const redisOptions = config.getEnv('queue.bull.redis');
|
2021-08-29 11:58:11 -07:00
|
|
|
// Disabling ready check is necessary as it allows worker to
|
2021-02-09 14:32:40 -08:00
|
|
|
// quickly reconnect to Redis if Redis crashes or is unreachable
|
|
|
|
// for some time. With it enabled, worker might take minutes to realize
|
|
|
|
// redis is back up and resume working.
|
|
|
|
// More here: https://github.com/OptimalBits/bull/issues/890
|
|
|
|
// @ts-ignore
|
|
|
|
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
|
2021-11-05 09:45:51 -07:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
|
2021-11-05 09:45:51 -07:00
|
|
|
this.activeExecutions.resolveResponsePromise(
|
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
|
|
progress.executionId,
|
|
|
|
WebhookHelpers.decodeWebhookResponse(progress.response),
|
|
|
|
);
|
|
|
|
});
|
2021-02-09 14:32:40 -08:00
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
async add(jobData: JobData, jobOptions: object): Promise<Job> {
|
2021-08-29 11:58:11 -07:00
|
|
|
return this.jobQueue.add(jobData, jobOptions);
|
2021-02-09 14:32:40 -08:00
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
async getJob(jobId: Bull.JobId): Promise<Job | null> {
|
2021-08-29 11:58:11 -07:00
|
|
|
return this.jobQueue.getJob(jobId);
|
2021-02-09 14:32:40 -08:00
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
async getJobs(jobTypes: Bull.JobStatus[]): Promise<Job[]> {
|
2021-08-29 11:58:11 -07:00
|
|
|
return this.jobQueue.getJobs(jobTypes);
|
2021-02-09 14:32:40 -08:00
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2022-09-09 06:14:49 -07:00
|
|
|
getBullObjectInstance(): JobQueue {
|
2021-02-09 14:32:40 -08:00
|
|
|
return this.jobQueue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-08-29 11:58:11 -07:00
|
|
|
*
|
2022-09-09 06:14:49 -07:00
|
|
|
* @param job A Job instance
|
2021-02-09 14:32:40 -08:00
|
|
|
* @returns boolean true if we were able to securely stop the job
|
|
|
|
*/
|
2022-09-09 06:14:49 -07:00
|
|
|
async stopJob(job: Job): Promise<boolean> {
|
2021-02-09 14:32:40 -08:00
|
|
|
if (await job.isActive()) {
|
|
|
|
// Job is already running so tell it to stop
|
|
|
|
await job.progress(-1);
|
|
|
|
return true;
|
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
// Job did not get started yet so remove from queue
|
|
|
|
try {
|
|
|
|
await job.remove();
|
|
|
|
return true;
|
|
|
|
} catch (e) {
|
|
|
|
await job.progress(-1);
|
|
|
|
}
|
|
|
|
|
2021-02-09 14:32:40 -08:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let activeQueueInstance: Queue | undefined;
|
|
|
|
|
|
|
|
export function getInstance(): Queue {
|
|
|
|
if (activeQueueInstance === undefined) {
|
|
|
|
activeQueueInstance = new Queue();
|
|
|
|
}
|
2021-08-29 11:58:11 -07:00
|
|
|
|
2021-02-09 14:32:40 -08:00
|
|
|
return activeQueueInstance;
|
|
|
|
}
|