fix(core): Fix worker shutdown errors when active executions (#10353)

This commit is contained in:
Iván Ovejero 2024-08-13 09:14:52 +02:00 committed by GitHub
parent cdd0ab4031
commit e071b73bab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 31 additions and 25 deletions

View file

@ -333,7 +333,9 @@ export abstract class BaseCommand extends Command {
this.logger.info(`Received ${signal}. Shutting down...`); this.logger.info(`Received ${signal}. Shutting down...`);
this.shutdownService.shutdown(); this.shutdownService.shutdown();
await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]); await this.shutdownService.waitForShutdown();
await this.stopProcess();
clearTimeout(forceShutdownTimer); clearTimeout(forceShutdownTimer);
}; };

View file

@ -2,7 +2,7 @@ import { Container } from 'typedi';
import { Flags, type Config } from '@oclif/core'; import { Flags, type Config } from '@oclif/core';
import express from 'express'; import express from 'express';
import http from 'http'; import http from 'http';
import { sleep, ApplicationError } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper'; import * as ResponseHelper from '@/ResponseHelper';
@ -61,23 +61,6 @@ export class Worker extends BaseCommand {
try { try {
await this.externalHooks?.run('n8n.stop', []); await this.externalHooks?.run('n8n.stop', []);
const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000;
// Wait for active workflow executions to finish
let count = 0;
while (this.jobProcessor.getRunningJobIds().length !== 0) {
if (count++ % 4 === 0) {
const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000);
this.logger.info(
`Waiting for ${
Object.keys(this.jobProcessor.getRunningJobIds()).length
} active executions to finish... (max wait ${waitLeft} more seconds)`,
);
}
await sleep(500);
}
} catch (error) { } catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }

View file

@ -7,6 +7,7 @@ import type { Job, JobData, JobOptions, JobQueue } from '../types';
import { ApplicationError } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import type { JobProcessor } from '../job-processor';
const queue = mock<JobQueue>({ const queue = mock<JobQueue>({
client: { ping: jest.fn() }, client: { ping: jest.fn() },
@ -100,23 +101,27 @@ describe('ScalingService', () => {
}); });
}); });
describe('pauseQueue', () => { describe('stop', () => {
it('should pause the queue', async () => { it('should pause the queue and check for running jobs', async () => {
/** /**
* Arrange * Arrange
*/ */
const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); const jobProcessor = mock<JobProcessor>();
const scalingService = new ScalingService(mock(), mock(), jobProcessor, globalConfig);
await scalingService.setupQueue(); await scalingService.setupQueue();
jobProcessor.getRunningJobIds.mockReturnValue([]);
const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount');
/** /**
* Act * Act
*/ */
await scalingService.pauseQueue(); await scalingService.stop();
/** /**
* Assert * Assert
*/ */
expect(queue.pause).toHaveBeenCalledWith(true, true); expect(queue.pause).toHaveBeenCalledWith(true, true);
expect(getRunningJobsCountSpy).toHaveBeenCalled();
}); });
}); });

View file

@ -1,5 +1,5 @@
import Container, { Service } from 'typedi'; import Container, { Service } from 'typedi';
import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow'; import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config'; import config from '@/config';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
@ -59,10 +59,22 @@ export class ScalingService {
} }
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async pauseQueue() { async stop() {
await this.queue.pause(true, true); await this.queue.pause(true, true);
this.logger.debug('[ScalingService] Queue paused'); this.logger.debug('[ScalingService] Queue paused');
let count = 0;
while (this.getRunningJobsCount() !== 0) {
if (count++ % 4 === 0) {
this.logger.info(
`Waiting for ${this.getRunningJobsCount()} active executions to finish...`,
);
}
await sleep(500);
}
} }
async pingQueue() { async pingQueue() {
@ -113,6 +125,10 @@ export class ScalingService {
} }
} }
getRunningJobsCount() {
return this.jobProcessor.getRunningJobIds().length;
}
// #endregion // #endregion
// #region Listeners // #region Listeners