mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Ensure graceful shutdown for workers (#9547)
This commit is contained in:
parent
c5e0cf0137
commit
7fc00d8d10
|
@ -109,8 +109,11 @@ export class Queue {
|
||||||
return await this.jobQueue.client.ping();
|
return await this.jobQueue.client.ping();
|
||||||
}
|
}
|
||||||
|
|
||||||
async pause(isLocal?: boolean): Promise<void> {
|
async pause({
|
||||||
return await this.jobQueue.pause(isLocal);
|
isLocal,
|
||||||
|
doNotWaitActive,
|
||||||
|
}: { isLocal?: boolean; doNotWaitActive?: boolean } = {}): Promise<void> {
|
||||||
|
return await this.jobQueue.pause(isLocal, doNotWaitActive);
|
||||||
}
|
}
|
||||||
|
|
||||||
getBullObjectInstance(): JobQueue {
|
getBullObjectInstance(): JobQueue {
|
||||||
|
|
|
@ -63,23 +63,23 @@ export class Worker extends BaseCommand {
|
||||||
async stopProcess() {
|
async stopProcess() {
|
||||||
this.logger.info('Stopping n8n...');
|
this.logger.info('Stopping n8n...');
|
||||||
|
|
||||||
// Stop accepting new jobs
|
// Stop accepting new jobs, `doNotWaitActive` allows reporting progress
|
||||||
await Worker.jobQueue.pause(true);
|
await Worker.jobQueue.pause({ isLocal: true, doNotWaitActive: true });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.externalHooks?.run('n8n.stop', []);
|
await this.externalHooks?.run('n8n.stop', []);
|
||||||
|
|
||||||
const hardStopTime = Date.now() + this.gracefulShutdownTimeoutInS;
|
const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000;
|
||||||
|
|
||||||
// Wait for active workflow executions to finish
|
// Wait for active workflow executions to finish
|
||||||
let count = 0;
|
let count = 0;
|
||||||
while (Object.keys(Worker.runningJobs).length !== 0) {
|
while (Object.keys(Worker.runningJobs).length !== 0) {
|
||||||
if (count++ % 4 === 0) {
|
if (count++ % 4 === 0) {
|
||||||
const waitLeft = Math.ceil((hardStopTime - Date.now()) / 1000);
|
const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000);
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
`Waiting for ${
|
`Waiting for ${
|
||||||
Object.keys(Worker.runningJobs).length
|
Object.keys(Worker.runningJobs).length
|
||||||
} active executions to finish... (wait ${waitLeft} more seconds)`,
|
} active executions to finish... (max wait ${waitLeft} more seconds)`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,6 +483,16 @@ export class Worker extends BaseCommand {
|
||||||
await this.setupHealthMonitor();
|
await this.setupHealthMonitor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (process.stdout.isTTY) {
|
||||||
|
process.stdin.setRawMode(true);
|
||||||
|
process.stdin.resume();
|
||||||
|
process.stdin.setEncoding('utf8');
|
||||||
|
|
||||||
|
process.stdin.on('data', (key: string) => {
|
||||||
|
if (key.charCodeAt(0) === 3) process.kill(process.pid, 'SIGINT'); // ctrl+c
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure that the process does not close
|
// Make sure that the process does not close
|
||||||
await new Promise(() => {});
|
await new Promise(() => {});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue