mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
fix(core): Restore queue listeners for webhook
process (#10781)
This commit is contained in:
parent
b2b6190cc0
commit
86f4877bab
|
@ -51,7 +51,7 @@ describe('ScalingService', () => {
|
|||
|
||||
let scalingService: ScalingService;
|
||||
|
||||
let registerMainListenersSpy: jest.SpyInstance;
|
||||
let registerMainOrWebhookListenersSpy: jest.SpyInstance;
|
||||
let registerWorkerListenersSpy: jest.SpyInstance;
|
||||
let scheduleQueueRecoverySpy: jest.SpyInstance;
|
||||
let stopQueueRecoverySpy: jest.SpyInstance;
|
||||
|
@ -86,8 +86,11 @@ describe('ScalingService', () => {
|
|||
|
||||
// @ts-expect-error Private method
|
||||
ScalingService.prototype.scheduleQueueRecovery = jest.fn();
|
||||
registerMainOrWebhookListenersSpy = jest.spyOn(
|
||||
scalingService,
|
||||
// @ts-expect-error Private method
|
||||
registerMainListenersSpy = jest.spyOn(scalingService, 'registerMainListeners');
|
||||
'registerMainOrWebhookListeners',
|
||||
);
|
||||
// @ts-expect-error Private method
|
||||
registerWorkerListenersSpy = jest.spyOn(scalingService, 'registerWorkerListeners');
|
||||
// @ts-expect-error Private method
|
||||
|
@ -102,7 +105,7 @@ describe('ScalingService', () => {
|
|||
await scalingService.setupQueue();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerMainListenersSpy).toHaveBeenCalled();
|
||||
expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled();
|
||||
expect(registerWorkerListenersSpy).not.toHaveBeenCalled();
|
||||
expect(scheduleQueueRecoverySpy).toHaveBeenCalled();
|
||||
});
|
||||
|
@ -115,7 +118,7 @@ describe('ScalingService', () => {
|
|||
await scalingService.setupQueue();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerMainListenersSpy).toHaveBeenCalled();
|
||||
expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled();
|
||||
expect(registerWorkerListenersSpy).not.toHaveBeenCalled();
|
||||
expect(scheduleQueueRecoverySpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
@ -130,7 +133,20 @@ describe('ScalingService', () => {
|
|||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerWorkerListenersSpy).toHaveBeenCalled();
|
||||
expect(registerMainListenersSpy).not.toHaveBeenCalled();
|
||||
expect(registerMainOrWebhookListenersSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('webhook', () => {
|
||||
it('should set up a queue + listeners', async () => {
|
||||
// @ts-expect-error Private field
|
||||
scalingService.instanceType = 'webhook';
|
||||
|
||||
await scalingService.setupQueue();
|
||||
|
||||
expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs);
|
||||
expect(registerWorkerListenersSpy).not.toHaveBeenCalled();
|
||||
expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -209,8 +209,8 @@ export class ScalingService {
|
|||
throw error;
|
||||
});
|
||||
|
||||
if (this.instanceType === 'main') {
|
||||
this.registerMainListeners();
|
||||
if (this.instanceType === 'main' || this.instanceType === 'webhook') {
|
||||
this.registerMainOrWebhookListeners();
|
||||
} else if (this.instanceType === 'worker') {
|
||||
this.registerWorkerListeners();
|
||||
}
|
||||
|
@ -246,9 +246,9 @@ export class ScalingService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Register listeners on a `main` process for Bull queue events.
|
||||
* Register listeners on a `main` or `webhook` process for Bull queue events.
|
||||
*/
|
||||
private registerMainListeners() {
|
||||
private registerMainOrWebhookListeners() {
|
||||
this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => {
|
||||
if (!this.isPubSubMessage(msg)) return;
|
||||
|
||||
|
|
Loading…
Reference in a new issue