mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Ensure worker stops picking up new jobs while shutting down
This commit is contained in:
parent
3cd34b5af6
commit
501777986b
|
@ -202,7 +202,7 @@ describe('ScalingService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('if worker', () => {
|
describe('if worker', () => {
|
||||||
it('should wait for running jobs to finish', async () => {
|
it('should pause queue and wait for running jobs to finish', async () => {
|
||||||
// @ts-expect-error readonly property
|
// @ts-expect-error readonly property
|
||||||
instanceSettings.instanceType = 'worker';
|
instanceSettings.instanceType = 'worker';
|
||||||
await scalingService.setupQueue();
|
await scalingService.setupQueue();
|
||||||
|
@ -211,7 +211,7 @@ describe('ScalingService', () => {
|
||||||
await scalingService.stop();
|
await scalingService.stop();
|
||||||
|
|
||||||
expect(getRunningJobsCountSpy).toHaveBeenCalled();
|
expect(getRunningJobsCountSpy).toHaveBeenCalled();
|
||||||
expect(queue.pause).not.toHaveBeenCalled();
|
expect(queue.pause).toHaveBeenCalled();
|
||||||
expect(stopQueueRecoverySpy).not.toHaveBeenCalled();
|
expect(stopQueueRecoverySpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -139,17 +139,21 @@ export class ScalingService {
|
||||||
else if (instanceType === 'worker') await this.stopWorker();
|
else if (instanceType === 'worker') await this.stopWorker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async pauseQueue() {
|
||||||
|
await this.queue.pause(true, true); // no more jobs will be enqueued or picked up
|
||||||
|
this.logger.debug('Paused queue');
|
||||||
|
}
|
||||||
|
|
||||||
private async stopMain() {
|
private async stopMain() {
|
||||||
if (this.instanceSettings.isSingleMain) {
|
if (this.instanceSettings.isSingleMain) await this.pauseQueue();
|
||||||
await this.queue.pause(true, true); // no more jobs will be picked up
|
|
||||||
this.logger.debug('Queue paused');
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.queueRecoveryContext.timeout) this.stopQueueRecovery();
|
if (this.queueRecoveryContext.timeout) this.stopQueueRecovery();
|
||||||
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
|
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async stopWorker() {
|
private async stopWorker() {
|
||||||
|
await this.pauseQueue();
|
||||||
|
|
||||||
let count = 0;
|
let count = 0;
|
||||||
|
|
||||||
while (this.getRunningJobsCount() !== 0) {
|
while (this.getRunningJobsCount() !== 0) {
|
||||||
|
|
Loading…
Reference in a new issue