feat(core): Shut down runner on idle timeout (no-changelog) (#11820)

This commit is contained in:
Iván Ovejero 2024-11-22 11:17:00 +01:00 committed by GitHub
parent cd3598aaab
commit 0cbb46c8b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 147 additions and 5 deletions

View file

@ -26,6 +26,14 @@ export class BaseRunnerConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
/**
* How long (in seconds) a runner may be idle for before exit. Intended
* for use in `external` mode - launcher must pass the env var when launching
* the runner. Disabled with `0` on `internal` mode.
*/
@Env('N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT')
idleTimeout: number = 0;
@Nested
healthcheckServer!: HealthcheckServerConfig;
}

View file

@ -3,6 +3,7 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
import { builtinModules } from 'node:module';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
import type { JsRunnerConfig } from '@/config/js-runner-config';
import { MainConfig } from '@/config/main-config';
import { ExecutionError } from '@/js-task-runner/errors/execution-error';
@ -24,17 +25,21 @@ jest.mock('ws');
const defaultConfig = new MainConfig();
describe('JsTaskRunner', () => {
const createRunnerWithOpts = (opts: Partial<JsRunnerConfig> = {}) =>
const createRunnerWithOpts = (
jsRunnerOpts: Partial<JsRunnerConfig> = {},
baseRunnerOpts: Partial<BaseRunnerConfig> = {},
) =>
new JsTaskRunner({
baseRunnerConfig: {
...defaultConfig.baseRunnerConfig,
grantToken: 'grantToken',
maxConcurrency: 1,
n8nUri: 'localhost',
...baseRunnerOpts,
},
jsRunnerConfig: {
...defaultConfig.jsRunnerConfig,
...opts,
...jsRunnerOpts,
},
sentryConfig: {
sentryDsn: '',
@ -825,4 +830,100 @@ describe('JsTaskRunner', () => {
});
});
});
describe('idle timeout', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should set idle timer when instantiated', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should reset idle timer when accepting a task', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const offerId = 'offer123';
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
runner.openOffers.set(offerId, {
offerId,
validUntil: process.hrtime.bigint() + BigInt(idleTimeout * 1000 * 1_000_000),
});
runner.offerAccepted(offerId, taskId);
jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset
runner.runningTasks.clear();
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should reset idle timer when finishing a task', async () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] });
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
await runner.receivedSettings(taskId, {});
jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should never reach idle timeout if idle timeout is set to 0', () => {
const runner = createRunnerWithOpts({}, { idleTimeout: 0 });
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(999999);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should not reach idle timeout if there are running tasks', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});
});
});

View file

@ -51,6 +51,9 @@ void (async function start() {
}
runner = new JsTaskRunner(config);
runner.on('runner:reached-idle-timeout', () => {
void createSignalHandler('IDLE_TIMEOUT')();
});
const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;

View file

@ -1,5 +1,6 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
@ -49,7 +50,7 @@ export interface TaskRunnerOpts extends BaseRunnerConfig {
name?: string;
}
export abstract class TaskRunner {
export abstract class TaskRunner extends EventEmitter {
id: string = nanoid();
ws: WebSocket;
@ -76,10 +77,17 @@ export abstract class TaskRunner {
name: string;
private idleTimer: NodeJS.Timeout | undefined;
/** How long (in seconds) a runner may be idle for before exit. */
private readonly idleTimeout: number;
constructor(opts: TaskRunnerOpts) {
super();
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
this.idleTimeout = opts.idleTimeout;
const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`;
this.ws = new WebSocket(wsUrl, {
@ -108,6 +116,17 @@ export abstract class TaskRunner {
});
this.ws.addEventListener('message', this.receiveMessage);
this.ws.addEventListener('close', this.stopTaskOffers);
this.resetIdleTimer();
}
private resetIdleTimer() {
if (this.idleTimeout === 0) return;
this.clearIdleTimer();
this.idleTimer = setTimeout(() => {
if (this.runningTasks.size === 0) this.emit('runner:reached-idle-timeout');
}, this.idleTimeout * 1000);
}
private receiveMessage = (message: MessageEvent) => {
@ -244,6 +263,7 @@ export abstract class TaskRunner {
this.openOffers.delete(offerId);
}
this.resetIdleTimer();
this.runningTasks.set(taskId, {
taskId,
active: false,
@ -306,6 +326,8 @@ export abstract class TaskRunner {
this.taskDone(taskId, data);
} catch (error) {
this.taskErrored(taskId, error);
} finally {
this.resetIdleTimer();
}
}
@ -432,6 +454,8 @@ export abstract class TaskRunner {
/** Close the connection gracefully and wait until has been closed */
async stop() {
this.clearIdleTimer();
this.stopTaskOffers();
await this.waitUntilAllTasksAreDone();
@ -439,6 +463,11 @@ export abstract class TaskRunner {
await this.closeConnection();
}
clearIdleTimer() {
if (this.idleTimer) clearTimeout(this.idleTimer);
this.idleTimer = undefined;
}
private async closeConnection() {
// 1000 is the standard close code
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5

View file

@ -126,7 +126,7 @@ export class TaskRunnerWsServer {
this.sendMessage.bind(this, id) as MessageCallback,
);
this.logger.info(`Runner "${message.name}" (${id}) has been registered`);
this.logger.info(`Registered runner "${message.name}" (${id}) `);
return;
}
@ -166,6 +166,7 @@ export class TaskRunnerWsServer {
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
});
this.taskBroker.deregisterRunner(id, disconnectError);
this.logger.debug(`Deregistered runner "${id}"`);
connection.close(code);
this.runnerConnections.delete(id);
}

View file

@ -525,7 +525,7 @@ export class TaskBroker {
return;
}
if (e instanceof TaskDeferredError) {
this.logger.info(`Task (${taskId}) deferred until runner is ready`);
this.logger.debug(`Task (${taskId}) deferred until runner is ready`);
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
return;
}