mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 04:04:06 -08:00
feat: Graceful termination of task runner (no-changelog) (#11009)
This commit is contained in:
parent
6a2f9e7295
commit
4434668135
|
@ -4,6 +4,9 @@ import * as a from 'node:assert/strict';
|
|||
import { authenticate } from './authenticator';
|
||||
import { JsTaskRunner } from './code';
|
||||
|
||||
let runner: JsTaskRunner | undefined;
|
||||
let isShuttingDown = false;
|
||||
|
||||
type Config = {
|
||||
n8nUri: string;
|
||||
authToken?: string;
|
||||
|
@ -26,6 +29,29 @@ function readAndParseConfig(): Config {
|
|||
};
|
||||
}
|
||||
|
||||
function createSignalHandler(signal: string) {
|
||||
return async function onSignal() {
|
||||
if (isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Received ${signal} signal, shutting down...`);
|
||||
|
||||
isShuttingDown = true;
|
||||
try {
|
||||
if (runner) {
|
||||
await runner.stop();
|
||||
runner = undefined;
|
||||
}
|
||||
} catch (e) {
|
||||
const error = ensureError(e);
|
||||
console.error('Error stopping task runner', { error });
|
||||
} finally {
|
||||
process.exit(0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void (async function start() {
|
||||
const config = readAndParseConfig();
|
||||
|
||||
|
@ -40,7 +66,10 @@ void (async function start() {
|
|||
}
|
||||
|
||||
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
|
||||
new JsTaskRunner('javascript', wsUrl, grantToken, 5);
|
||||
runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
|
||||
|
||||
process.on('SIGINT', createSignalHandler('SIGINT'));
|
||||
process.on('SIGTERM', createSignalHandler('SIGTERM'));
|
||||
})().catch((e) => {
|
||||
const error = ensureError(e);
|
||||
console.error('Task runner failed to start', { error });
|
||||
|
|
|
@ -359,4 +359,36 @@ export abstract class TaskRunner {
|
|||
}
|
||||
return rpcObject;
|
||||
}
|
||||
|
||||
/** Close the connection gracefully and wait until has been closed */
|
||||
async stop() {
|
||||
this.stopTaskOffers();
|
||||
|
||||
await this.waitUntilAllTasksAreDone();
|
||||
|
||||
await this.closeConnection();
|
||||
}
|
||||
|
||||
private async closeConnection() {
|
||||
// 1000 is the standard close code
|
||||
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
|
||||
this.ws.close(1000, 'Shutting down');
|
||||
|
||||
await new Promise((resolve) => {
|
||||
this.ws.once('close', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
private async waitUntilAllTasksAreDone(maxWaitTimeInMs = 30_000) {
|
||||
// TODO: Make maxWaitTimeInMs configurable
|
||||
const start = Date.now();
|
||||
|
||||
while (this.runningTasks.size > 0) {
|
||||
if (Date.now() - start > maxWaitTimeInMs) {
|
||||
throw new ApplicationError('Timeout while waiting for tasks to finish');
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue