perf(core): Launch runners on demand and shut down if idle

This commit is contained in:
Iván Ovejero 2024-11-07 11:04:08 +01:00
parent 471921dc20
commit b18a3e269d
No known key found for this signature in database
7 changed files with 163 additions and 32 deletions

View file

@ -50,4 +50,12 @@ export class TaskRunnersConfig {
/** How many concurrent tasks can a runner execute at a time */
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
/** How long (in minutes) until shutting down an idle runner. */
@Env('N8N_RUNNERS_IDLE_TIMEOUT')
idleTimeout: number = 5;
/** How often (in minutes) to check if a runner is idle. */
@Env('N8N_RUNNERS_IDLE_CHECKS_FREQUENCY')
idleChecksFrequency: number = 1;
}

View file

@ -20,6 +20,7 @@ function createSignalHandler(signal: string) {
if (runner) {
await runner.stop();
runner = undefined;
console.log('Task runner stopped');
}
} catch (e) {
const error = ensureError(e);

View file

@ -0,0 +1,112 @@
import { TaskRunnersConfig } from '@n8n/config';
import { strict } from 'node:assert';
import { Service } from 'typedi';
// import { Time } from '@/constants';
import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service';
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { TypedEmitter } from '@/typed-emitter';
export type RunnerLifecycleEventMap = {
'runner:started': never;
'runner:stopped': never;
};
@Service()
export class RunnerLifecycleEvents extends TypedEmitter<RunnerLifecycleEventMap> {}
@Service()
export class RunnerLifecycleManager {
private state: 'stopped' | 'starting' | 'running' | 'stopping' = 'stopped';
private startPromise: Promise<void> | null = null;
private lastActivityTime: number = Date.now();
private idleChecksInterval: NodeJS.Timeout | null = null;
constructor(
private readonly logger: Logger,
private readonly taskRunnerProcess: TaskRunnerProcess,
readonly runnerConfig: TaskRunnersConfig,
private readonly lifecycleEvents: RunnerLifecycleEvents,
) {
const { mode } = runnerConfig;
strict(
mode === 'internal_childprocess' || mode === 'internal_launcher',
'Runner mode must be `internal_childprocess` or `internal_launcher`',
);
this.startIdleChecks();
}
async ensureRunnerAvailable() {
if (this.state === 'running') return;
if (this.state === 'starting') return await this.startPromise;
this.state = 'starting';
this.startPromise = this.startRunnerProcess().finally(() => {
this.startPromise = null;
});
return await this.startPromise;
}
updateLastActivityTime() {
this.lastActivityTime = Date.now();
}
private async startRunnerProcess() {
try {
this.logger.debug('Starting task runner process');
await this.taskRunnerProcess.start();
this.lifecycleEvents.emit('runner:started');
this.state = 'running';
this.lastActivityTime = Date.now();
} catch (error) {
this.state = 'stopped';
throw error;
}
}
private startIdleChecks() {
// const idleTimeout = this.runnerConfig.idleTimeout * Time.minutes.toMilliseconds;
// const idleChecksFrequency = this.runnerConfig.idleChecksFrequency * Time.minutes.toMilliseconds;
const idleTimeout = 10_000;
const idleChecksFrequency = 10_000;
this.idleChecksInterval = setInterval(() => {
if (this.state === 'running' && Date.now() - this.lastActivityTime > idleTimeout) {
this.logger.info('Runner has been idle for too long, stopping it');
void this.stopRunner();
}
}, idleChecksFrequency);
}
private async stopRunner() {
if (this.state !== 'running') return;
this.state = 'stopping';
try {
await this.taskRunnerProcess.stop();
this.lifecycleEvents.emit('runner:stopped');
} finally {
this.state = 'stopped';
}
}
@OnShutdown()
async shutdown() {
if (this.idleChecksInterval) clearInterval(this.idleChecksInterval);
await this.stopRunner();
}
}

View file

@ -4,7 +4,7 @@ import type WebSocket from 'ws';
import { Logger } from '@/logging/logger.service';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import type { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import type {
DisconnectAnalyzer,
TaskRunnerServerInitRequest,
@ -23,10 +23,10 @@ export class TaskRunnerWsServer {
constructor(
private readonly logger: Logger,
private readonly taskBroker: TaskBroker,
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer,
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer | undefined,
) {}
setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) {
setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer | undefined) {
this.disconnectAnalyzer = disconnectAnalyzer;
}
@ -99,7 +99,7 @@ export class TaskRunnerWsServer {
async removeConnection(id: TaskRunner['id']) {
const connection = this.runnerConnections.get(id);
if (connection) {
if (connection && this.disconnectAnalyzer) {
const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id);
this.taskBroker.deregisterRunner(id, disconnectReason);
connection.close();

View file

@ -4,13 +4,14 @@ import type {
RunnerMessage,
TaskResultData,
} from '@n8n/task-runner';
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, ensureError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import { Logger } from '@/logging/logger.service';
import { TaskRejectError } from './errors';
import { RunnerLifecycleManager } from './runner-lifecycle-manager';
export interface TaskRunner {
id: string;
@ -78,7 +79,10 @@ export class TaskBroker {
private pendingTaskRequests: TaskRequest[] = [];
constructor(private readonly logger: Logger) {}
constructor(
private readonly logger: Logger,
private readonly lifecycleManager: RunnerLifecycleManager,
) {}
expireTasks() {
const now = process.hrtime.bigint();
@ -269,7 +273,7 @@ export class TaskBroker {
await this.cancelTask(message.taskId, message.reason);
break;
case 'requester:taskrequest':
this.taskRequested({
await this.taskRequested({
taskType: message.taskType,
requestId: message.requestId,
requesterId,
@ -553,7 +557,18 @@ export class TaskBroker {
}
}
taskRequested(request: TaskRequest) {
async taskRequested(request: TaskRequest) {
try {
await this.lifecycleManager.ensureRunnerAvailable();
} catch (e) {
const error = ensureError(e);
this.logger.error('Failed to start task runner', { error });
this.handleRunnerReject(request.requestId, `Task runner unavailable: ${error.message}`);
return;
}
this.lifecycleManager.updateLastActivityTime();
this.pendingTaskRequests.push(request);
this.settleTasks();
}

View file

@ -4,6 +4,7 @@ import Container, { Service } from 'typedi';
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
import { RunnerLifecycleEvents } from './runner-lifecycle-manager';
import { TaskRunnerWsServer } from './runner-ws-server';
import type { LocalTaskManager } from './task-managers/local-task-manager';
import type { TaskRunnerServer } from './task-runner-server';
@ -23,20 +24,29 @@ export class TaskRunnerModule {
private taskRunnerProcess: TaskRunnerProcess | undefined;
constructor(private readonly runnerConfig: TaskRunnersConfig) {}
constructor(
private readonly runnerConfig: TaskRunnersConfig,
private readonly lifecycleEvents: RunnerLifecycleEvents,
) {
this.lifecycleEvents.on('runner:started', async () => {
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
'@/runners/internal-task-runner-disconnect-analyzer'
);
this.taskRunnerWsServer?.setDisconnectAnalyzer(
Container.get(InternalTaskRunnerDisconnectAnalyzer),
);
});
this.lifecycleEvents.on('runner:stopped', () => {
this.taskRunnerWsServer?.setDisconnectAnalyzer(undefined);
});
}
async start() {
a.ok(!this.runnerConfig.disabled, 'Task runner is disabled');
await this.loadTaskManager();
await this.loadTaskRunnerServer();
if (
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher'
) {
await this.startInternalTaskRunner();
}
}
async stop() {
@ -67,19 +77,4 @@ export class TaskRunnerModule {
await this.taskRunnerHttpServer.start();
}
private async startInternalTaskRunner() {
a.ok(this.taskRunnerWsServer, 'Task Runner WS Server not loaded');
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
await this.taskRunnerProcess.start();
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
'@/runners/internal-task-runner-disconnect-analyzer'
);
this.taskRunnerWsServer.setDisconnectAnalyzer(
Container.get(InternalTaskRunnerDisconnectAnalyzer),
);
}
}

View file

@ -77,7 +77,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
}
async start() {
a.ok(!this.process, 'Task Runner Process already running');
if (this.isRunning) return;
const grantToken = await this.authService.createGrantToken();