diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index 8838ea5b95..f65b9dd7b8 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -4,6 +4,9 @@ import * as a from 'node:assert/strict'; import { authenticate } from './authenticator'; import { JsTaskRunner } from './code'; +let runner: JsTaskRunner | undefined; +let isShuttingDown = false; + type Config = { n8nUri: string; authToken?: string; @@ -26,6 +29,29 @@ function readAndParseConfig(): Config { }; } +function createSignalHandler(signal: string) { + return async function onSignal() { + if (isShuttingDown) { + return; + } + + console.log(`Received ${signal} signal, shutting down...`); + + isShuttingDown = true; + try { + if (runner) { + await runner.stop(); + runner = undefined; + } + } catch (e) { + const error = ensureError(e); + console.error('Error stopping task runner', { error }); + } finally { + process.exit(0); + } + }; +} + void (async function start() { const config = readAndParseConfig(); @@ -40,7 +66,10 @@ void (async function start() { } const wsUrl = `ws://${config.n8nUri}/runners/_ws`; - new JsTaskRunner('javascript', wsUrl, grantToken, 5); + runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5); + + process.on('SIGINT', createSignalHandler('SIGINT')); + process.on('SIGTERM', createSignalHandler('SIGTERM')); })().catch((e) => { const error = ensureError(e); console.error('Task runner failed to start', { error }); diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 6971df6bb4..6c723ea657 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -359,4 +359,36 @@ export abstract class TaskRunner { } return rpcObject; } + + /** Close the connection gracefully and wait until has been closed */ + async stop() { + this.stopTaskOffers(); + + await this.waitUntilAllTasksAreDone(); + + await this.closeConnection(); + } + + private async closeConnection() { + // 1000 is the standard close code + // https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5 + this.ws.close(1000, 'Shutting down'); + + await new Promise((resolve) => { + this.ws.once('close', resolve); + }); + } + + private async waitUntilAllTasksAreDone(maxWaitTimeInMs = 30_000) { + // TODO: Make maxWaitTimeInMs configurable + const start = Date.now(); + + while (this.runningTasks.size > 0) { + if (Date.now() - start > maxWaitTimeInMs) { + throw new ApplicationError('Timeout while waiting for tasks to finish'); + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } }