mirror of
https://github.com/n8n-io/n8n.git
synced 2025-02-02 07:01:30 -08:00
feat(core): Implement task timeouts and heartbeats for runners (no-changelog) (#11690)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
parent
3f9127955a
commit
124ac26e43
|
@ -53,4 +53,12 @@ export class TaskRunnersConfig {
|
|||
/** Should the output of deduplication be asserted for correctness */
|
||||
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
|
||||
assertDeduplicationOutput: boolean = false;
|
||||
|
||||
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
|
||||
@Env('N8N_RUNNERS_TASK_TIMEOUT')
|
||||
taskTimeout: number = 60;
|
||||
|
||||
/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */
|
||||
@Env('N8N_RUNNERS_HEARTBEAT_INTERVAL')
|
||||
heartbeatInterval: number = 30;
|
||||
}
|
||||
|
|
|
@ -234,6 +234,8 @@ describe('GlobalConfig', () => {
|
|||
maxOldSpaceSize: '',
|
||||
maxConcurrency: 5,
|
||||
assertDeduplicationOutput: false,
|
||||
taskTimeout: 60,
|
||||
heartbeatInterval: 30,
|
||||
},
|
||||
sentry: {
|
||||
backendDsn: '',
|
||||
|
|
|
@ -1,4 +1,16 @@
|
|||
import { Config, Env } from '@n8n/config';
|
||||
import { Config, Env, Nested } from '@n8n/config';
|
||||
|
||||
@Config
|
||||
class HealthcheckServerConfig {
|
||||
@Env('N8N_RUNNERS_SERVER_ENABLED')
|
||||
enabled: boolean = false;
|
||||
|
||||
@Env('N8N_RUNNERS_SERVER_HOST')
|
||||
host: string = '127.0.0.1';
|
||||
|
||||
@Env('N8N_RUNNERS_SERVER_PORT')
|
||||
port: number = 5680;
|
||||
}
|
||||
|
||||
@Config
|
||||
export class BaseRunnerConfig {
|
||||
|
@ -13,4 +25,7 @@ export class BaseRunnerConfig {
|
|||
|
||||
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
|
||||
maxConcurrency: number = 5;
|
||||
|
||||
@Nested
|
||||
healthcheckServer!: HealthcheckServerConfig;
|
||||
}
|
||||
|
|
38
packages/@n8n/task-runner/src/healthcheck-server.ts
Normal file
38
packages/@n8n/task-runner/src/healthcheck-server.ts
Normal file
|
@ -0,0 +1,38 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
import { createServer } from 'node:http';
|
||||
|
||||
export class HealthcheckServer {
|
||||
private server = createServer((_, res) => {
|
||||
res.writeHead(200);
|
||||
res.end('OK');
|
||||
});
|
||||
|
||||
async start(host: string, port: number) {
|
||||
return await new Promise<void>((resolve, reject) => {
|
||||
const portInUseErrorHandler = (error: NodeJS.ErrnoException) => {
|
||||
if (error.code === 'EADDRINUSE') {
|
||||
reject(new ApplicationError(`Port ${port} is already in use`));
|
||||
} else {
|
||||
reject(error);
|
||||
}
|
||||
};
|
||||
|
||||
this.server.on('error', portInUseErrorHandler);
|
||||
|
||||
this.server.listen(port, host, () => {
|
||||
this.server.removeListener('error', portInUseErrorHandler);
|
||||
console.log(`Healthcheck server listening on ${host}, port ${port}`);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async stop() {
|
||||
return await new Promise<void>((resolve, reject) => {
|
||||
this.server.close((error) => {
|
||||
if (error) reject(error);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
|
@ -3,8 +3,10 @@ import Container from 'typedi';
|
|||
|
||||
import { MainConfig } from './config/main-config';
|
||||
import type { ErrorReporter } from './error-reporter';
|
||||
import type { HealthcheckServer } from './healthcheck-server';
|
||||
import { JsTaskRunner } from './js-task-runner/js-task-runner';
|
||||
|
||||
let healthcheckServer: HealthcheckServer | undefined;
|
||||
let runner: JsTaskRunner | undefined;
|
||||
let isShuttingDown = false;
|
||||
let errorReporter: ErrorReporter | undefined;
|
||||
|
@ -22,6 +24,7 @@ function createSignalHandler(signal: string) {
|
|||
if (runner) {
|
||||
await runner.stop();
|
||||
runner = undefined;
|
||||
void healthcheckServer?.stop();
|
||||
}
|
||||
|
||||
if (errorReporter) {
|
||||
|
@ -49,6 +52,14 @@ void (async function start() {
|
|||
|
||||
runner = new JsTaskRunner(config);
|
||||
|
||||
const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;
|
||||
|
||||
if (enabled) {
|
||||
const { HealthcheckServer } = await import('./healthcheck-server');
|
||||
healthcheckServer = new HealthcheckServer();
|
||||
await healthcheckServer.start(host, port);
|
||||
}
|
||||
|
||||
process.on('SIGINT', createSignalHandler('SIGINT'));
|
||||
process.on('SIGTERM', createSignalHandler('SIGTERM'));
|
||||
})().catch((e) => {
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
import type { TaskRunnersConfig } from '@n8n/config';
|
||||
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { INodeTypeBaseDescription } from 'n8n-workflow';
|
||||
import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow';
|
||||
|
||||
import { Time } from '@/constants';
|
||||
|
||||
import { TaskRejectError } from '../errors';
|
||||
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
|
||||
import { TaskBroker } from '../task-broker.service';
|
||||
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
|
||||
|
||||
|
@ -12,7 +16,7 @@ describe('TaskBroker', () => {
|
|||
let taskBroker: TaskBroker;
|
||||
|
||||
beforeEach(() => {
|
||||
taskBroker = new TaskBroker(mock());
|
||||
taskBroker = new TaskBroker(mock(), mock(), mock());
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
|
@ -618,4 +622,131 @@ describe('TaskBroker', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('task timeouts', () => {
|
||||
let taskBroker: TaskBroker;
|
||||
let config: TaskRunnersConfig;
|
||||
let runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
|
||||
|
||||
beforeAll(() => {
|
||||
jest.useFakeTimers();
|
||||
config = mock<TaskRunnersConfig>({ taskTimeout: 30 });
|
||||
taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents);
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('on sending task, we should set up task timeout', async () => {
|
||||
jest.spyOn(global, 'setTimeout');
|
||||
|
||||
const taskId = 'task1';
|
||||
const runnerId = 'runner1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const runnerMessageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, runnerMessageCallback);
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId: 'requester1', taskType: 'test' },
|
||||
});
|
||||
|
||||
await taskBroker.sendTaskSettings(taskId, {});
|
||||
|
||||
expect(setTimeout).toHaveBeenCalledWith(
|
||||
expect.any(Function),
|
||||
config.taskTimeout * Time.seconds.toMilliseconds,
|
||||
);
|
||||
});
|
||||
|
||||
it('on task completion, we should clear timeout', async () => {
|
||||
jest.spyOn(global, 'clearTimeout');
|
||||
|
||||
const taskId = 'task1';
|
||||
const runnerId = 'runner1';
|
||||
const requesterId = 'requester1';
|
||||
const requesterCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRequester(requesterId, requesterCallback);
|
||||
taskBroker.setTasks({
|
||||
[taskId]: {
|
||||
id: taskId,
|
||||
runnerId,
|
||||
requesterId,
|
||||
taskType: 'test',
|
||||
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
|
||||
},
|
||||
});
|
||||
|
||||
await taskBroker.taskDoneHandler(taskId, { result: [] });
|
||||
|
||||
expect(clearTimeout).toHaveBeenCalled();
|
||||
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('on task error, we should clear timeout', async () => {
|
||||
jest.spyOn(global, 'clearTimeout');
|
||||
|
||||
const taskId = 'task1';
|
||||
const runnerId = 'runner1';
|
||||
const requesterId = 'requester1';
|
||||
const requesterCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRequester(requesterId, requesterCallback);
|
||||
taskBroker.setTasks({
|
||||
[taskId]: {
|
||||
id: taskId,
|
||||
runnerId,
|
||||
requesterId,
|
||||
taskType: 'test',
|
||||
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
|
||||
},
|
||||
});
|
||||
|
||||
await taskBroker.taskErrorHandler(taskId, new Error('Test error'));
|
||||
|
||||
expect(clearTimeout).toHaveBeenCalled();
|
||||
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => {
|
||||
jest.spyOn(global, 'clearTimeout');
|
||||
|
||||
const taskId = 'task1';
|
||||
const runnerId = 'runner1';
|
||||
const requesterId = 'requester1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const runnerCallback = jest.fn();
|
||||
const requesterCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, runnerCallback);
|
||||
taskBroker.registerRequester(requesterId, requesterCallback);
|
||||
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
|
||||
});
|
||||
|
||||
await taskBroker.sendTaskSettings(taskId, {});
|
||||
|
||||
jest.runAllTimers();
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(runnerLifecycleEvents.emit).toHaveBeenCalledWith('runner:timed-out-during-task');
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(clearTimeout).toHaveBeenCalled();
|
||||
|
||||
expect(requesterCallback).toHaveBeenCalledWith({
|
||||
type: 'broker:taskerror',
|
||||
taskId,
|
||||
error: new ApplicationError(`Task execution timed out after ${config.taskTimeout} seconds`),
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,8 @@ import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.serv
|
|||
import { TaskRunnerProcess } from '@/runners/task-runner-process';
|
||||
import { mockInstance } from '@test/mocking';
|
||||
|
||||
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
|
||||
|
||||
const spawnMock = jest.fn(() =>
|
||||
mock<ChildProcess>({
|
||||
stdout: {
|
||||
|
@ -25,7 +27,7 @@ describe('TaskRunnerProcess', () => {
|
|||
runnerConfig.enabled = true;
|
||||
runnerConfig.mode = 'internal_childprocess';
|
||||
const authService = mock<TaskRunnerAuthService>();
|
||||
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
|
||||
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
|
||||
|
||||
afterEach(async () => {
|
||||
spawnMock.mockClear();
|
||||
|
@ -35,15 +37,35 @@ describe('TaskRunnerProcess', () => {
|
|||
it('should throw if runner mode is external', () => {
|
||||
runnerConfig.mode = 'external';
|
||||
|
||||
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();
|
||||
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow();
|
||||
|
||||
runnerConfig.mode = 'internal_childprocess';
|
||||
});
|
||||
|
||||
it('should register listener for `runner:failed-heartbeat-check` event', () => {
|
||||
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
|
||||
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||
|
||||
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
||||
'runner:failed-heartbeat-check',
|
||||
expect.any(Function),
|
||||
);
|
||||
});
|
||||
|
||||
it('should register listener for `runner:timed-out-during-task` event', () => {
|
||||
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
|
||||
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
|
||||
|
||||
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
|
||||
'runner:timed-out-during-task',
|
||||
expect.any(Function),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('start', () => {
|
||||
beforeEach(() => {
|
||||
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
|
||||
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
|
||||
});
|
||||
|
||||
test.each([
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
import type { TaskRunnersConfig } from '@n8n/config';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
||||
import { Time } from '@/constants';
|
||||
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
|
||||
|
||||
describe('TaskRunnerWsServer', () => {
|
||||
describe('heartbeat timer', () => {
|
||||
it('should set up heartbeat timer on server start', async () => {
|
||||
const setIntervalSpy = jest.spyOn(global, 'setInterval');
|
||||
|
||||
const server = new TaskRunnerWsServer(
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
|
||||
mock(),
|
||||
);
|
||||
|
||||
expect(setIntervalSpy).toHaveBeenCalledWith(
|
||||
expect.any(Function),
|
||||
30 * Time.seconds.toMilliseconds,
|
||||
);
|
||||
|
||||
await server.shutdown();
|
||||
});
|
||||
|
||||
it('should clear heartbeat timer on server stop', async () => {
|
||||
jest.spyOn(global, 'setInterval');
|
||||
const clearIntervalSpy = jest.spyOn(global, 'clearInterval');
|
||||
|
||||
const server = new TaskRunnerWsServer(
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
|
||||
mock(),
|
||||
);
|
||||
|
||||
await server.shutdown();
|
||||
|
||||
expect(clearIntervalSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,8 +1,10 @@
|
|||
import { Service } from 'typedi';
|
||||
|
||||
import config from '@/config';
|
||||
|
||||
import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
|
||||
import type { DisconnectAnalyzer } from './runner-types';
|
||||
import type { TaskRunner } from './task-broker.service';
|
||||
import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error';
|
||||
import type { DisconnectAnalyzer, DisconnectErrorOptions } from './runner-types';
|
||||
|
||||
/**
|
||||
* Analyzes the disconnect reason of a task runner to provide a more
|
||||
|
@ -10,7 +12,16 @@ import type { TaskRunner } from './task-broker.service';
|
|||
*/
|
||||
@Service()
|
||||
export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer {
|
||||
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
|
||||
return new TaskRunnerDisconnectedError(runnerId);
|
||||
async toDisconnectError(opts: DisconnectErrorOptions): Promise<Error> {
|
||||
const { reason, heartbeatInterval } = opts;
|
||||
|
||||
if (reason === 'failed-heartbeat-check' && heartbeatInterval) {
|
||||
return new TaskRunnerFailedHeartbeatError(
|
||||
heartbeatInterval,
|
||||
config.get('deployment.type') !== 'cloud',
|
||||
);
|
||||
}
|
||||
|
||||
return new TaskRunnerDisconnectedError(opts.runnerId ?? 'Unknown runner ID');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class TaskRunnerFailedHeartbeatError extends ApplicationError {
|
||||
description: string;
|
||||
|
||||
constructor(heartbeatInterval: number, isSelfHosted: boolean) {
|
||||
super('Task execution aborted because runner became unresponsive');
|
||||
|
||||
const subtitle =
|
||||
'The task runner failed to respond as expected, so it was considered unresponsive, and the task was aborted. You can try the following:';
|
||||
|
||||
const fixes = {
|
||||
optimizeScript:
|
||||
'Optimize your script to prevent CPU-intensive operations, e.g. by breaking them down into smaller chunks or batch processing.',
|
||||
ensureTermination:
|
||||
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
|
||||
increaseInterval: `If your task can reasonably keep the task runner busy for more than ${heartbeatInterval} ${heartbeatInterval === 1 ? 'second' : 'seconds'}, increase the heartbeat interval using the N8N_RUNNERS_HEARTBEAT_INTERVAL environment variable.`,
|
||||
};
|
||||
|
||||
const suggestions = [fixes.optimizeScript, fixes.ensureTermination];
|
||||
|
||||
if (isSelfHosted) suggestions.push(fixes.increaseInterval);
|
||||
|
||||
const suggestionsText = suggestions
|
||||
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
|
||||
.join('<br/>');
|
||||
|
||||
const description = `${subtitle}<br/><br/>${suggestionsText}`;
|
||||
|
||||
this.description = description;
|
||||
}
|
||||
}
|
34
packages/cli/src/runners/errors/task-runner-timeout.error.ts
Normal file
34
packages/cli/src/runners/errors/task-runner-timeout.error.ts
Normal file
|
@ -0,0 +1,34 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class TaskRunnerTimeoutError extends ApplicationError {
|
||||
description: string;
|
||||
|
||||
constructor(taskTimeout: number, isSelfHosted: boolean) {
|
||||
super(
|
||||
`Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`,
|
||||
);
|
||||
|
||||
const subtitle =
|
||||
'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted. You can try the following:';
|
||||
|
||||
const fixes = {
|
||||
optimizeScript:
|
||||
'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.',
|
||||
ensureTermination:
|
||||
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
|
||||
increaseTimeout: `If your task can reasonably take more than ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}, increase the timeout using the N8N_RUNNERS_TASK_TIMEOUT environment variable.`,
|
||||
};
|
||||
|
||||
const suggestions = [fixes.optimizeScript, fixes.ensureTermination];
|
||||
|
||||
if (isSelfHosted) suggestions.push(fixes.increaseTimeout);
|
||||
|
||||
const suggestionsText = suggestions
|
||||
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
|
||||
.join('<br/>');
|
||||
|
||||
const description = `${subtitle}<br/><br/>${suggestionsText}`;
|
||||
|
||||
this.description = description;
|
||||
}
|
||||
}
|
|
@ -5,8 +5,8 @@ import config from '@/config';
|
|||
|
||||
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
|
||||
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
|
||||
import type { DisconnectErrorOptions } from './runner-types';
|
||||
import { SlidingWindowSignal } from './sliding-window-signal';
|
||||
import type { TaskRunner } from './task-broker.service';
|
||||
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
|
||||
import { TaskRunnerProcess } from './task-runner-process';
|
||||
|
||||
|
@ -38,13 +38,13 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco
|
|||
});
|
||||
}
|
||||
|
||||
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
|
||||
async toDisconnectError(opts: DisconnectErrorOptions): Promise<Error> {
|
||||
const exitCode = await this.awaitExitSignal();
|
||||
if (exitCode === 'oom') {
|
||||
return new TaskRunnerOomError(runnerId, this.isCloudDeployment);
|
||||
return new TaskRunnerOomError(opts.runnerId ?? 'Unknown runner ID', this.isCloudDeployment);
|
||||
}
|
||||
|
||||
return await super.determineDisconnectReason(runnerId);
|
||||
return await super.toDisconnectError(opts);
|
||||
}
|
||||
|
||||
private async awaitExitSignal(): Promise<ExitReason> {
|
||||
|
|
11
packages/cli/src/runners/runner-lifecycle-events.ts
Normal file
11
packages/cli/src/runners/runner-lifecycle-events.ts
Normal file
|
@ -0,0 +1,11 @@
|
|||
import { Service } from 'typedi';
|
||||
|
||||
import { TypedEmitter } from '@/typed-emitter';
|
||||
|
||||
type RunnerLifecycleEventMap = {
|
||||
'runner:failed-heartbeat-check': never;
|
||||
'runner:timed-out-during-task': never;
|
||||
};
|
||||
|
||||
@Service()
|
||||
export class RunnerLifecycleEvents extends TypedEmitter<RunnerLifecycleEventMap> {}
|
|
@ -6,7 +6,7 @@ import type { TaskRunner } from './task-broker.service';
|
|||
import type { AuthlessRequest } from '../requests';
|
||||
|
||||
export interface DisconnectAnalyzer {
|
||||
determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>;
|
||||
toDisconnectError(opts: DisconnectErrorOptions): Promise<Error>;
|
||||
}
|
||||
|
||||
export type DataRequestType = 'input' | 'node' | 'all';
|
||||
|
@ -22,3 +22,11 @@ export interface TaskRunnerServerInitRequest
|
|||
}
|
||||
|
||||
export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest };
|
||||
|
||||
export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown';
|
||||
|
||||
export type DisconnectErrorOptions = {
|
||||
runnerId?: TaskRunner['id'];
|
||||
reason?: DisconnectReason;
|
||||
heartbeatInterval?: number;
|
||||
};
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
import { TaskRunnersConfig } from '@n8n/config';
|
||||
import type { BrokerMessage, RunnerMessage } from '@n8n/task-runner';
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
import { Service } from 'typedi';
|
||||
import type WebSocket from 'ws';
|
||||
|
||||
import { Time } from '@/constants';
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
|
||||
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
|
||||
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
|
||||
import type {
|
||||
DisconnectAnalyzer,
|
||||
DisconnectReason,
|
||||
TaskRunnerServerInitRequest,
|
||||
TaskRunnerServerInitResponse,
|
||||
} from './runner-types';
|
||||
|
@ -20,11 +25,50 @@ function heartbeat(this: WebSocket) {
|
|||
export class TaskRunnerWsServer {
|
||||
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
|
||||
|
||||
private heartbeatTimer: NodeJS.Timer | undefined;
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly taskBroker: TaskBroker,
|
||||
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer,
|
||||
) {}
|
||||
private readonly taskTunnersConfig: TaskRunnersConfig,
|
||||
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
|
||||
) {
|
||||
this.startHeartbeatChecks();
|
||||
}
|
||||
|
||||
private startHeartbeatChecks() {
|
||||
const { heartbeatInterval } = this.taskTunnersConfig;
|
||||
|
||||
if (heartbeatInterval <= 0) {
|
||||
throw new ApplicationError('Heartbeat interval must be greater than 0');
|
||||
}
|
||||
|
||||
this.heartbeatTimer = setInterval(() => {
|
||||
for (const [runnerId, connection] of this.runnerConnections.entries()) {
|
||||
if (!connection.isAlive) {
|
||||
void this.removeConnection(runnerId, 'failed-heartbeat-check');
|
||||
this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check');
|
||||
return;
|
||||
}
|
||||
connection.isAlive = false;
|
||||
connection.ping();
|
||||
}
|
||||
}, heartbeatInterval * Time.seconds.toMilliseconds);
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = undefined;
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
Array.from(this.runnerConnections.keys()).map(
|
||||
async (id) => await this.removeConnection(id, 'shutting-down'),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) {
|
||||
this.disconnectAnalyzer = disconnectAnalyzer;
|
||||
|
@ -97,11 +141,15 @@ export class TaskRunnerWsServer {
|
|||
);
|
||||
}
|
||||
|
||||
async removeConnection(id: TaskRunner['id']) {
|
||||
async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') {
|
||||
const connection = this.runnerConnections.get(id);
|
||||
if (connection) {
|
||||
const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id);
|
||||
this.taskBroker.deregisterRunner(id, disconnectReason);
|
||||
const disconnectError = await this.disconnectAnalyzer.toDisconnectError({
|
||||
runnerId: id,
|
||||
reason,
|
||||
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
|
||||
});
|
||||
this.taskBroker.deregisterRunner(id, disconnectError);
|
||||
connection.close();
|
||||
this.runnerConnections.delete(id);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import { TaskRunnersConfig } from '@n8n/config';
|
||||
import type {
|
||||
BrokerMessage,
|
||||
RequesterMessage,
|
||||
|
@ -8,9 +9,13 @@ import { ApplicationError } from 'n8n-workflow';
|
|||
import { nanoid } from 'nanoid';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import config from '@/config';
|
||||
import { Time } from '@/constants';
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
|
||||
import { TaskRejectError } from './errors';
|
||||
import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error';
|
||||
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
|
||||
|
||||
export interface TaskRunner {
|
||||
id: string;
|
||||
|
@ -24,6 +29,7 @@ export interface Task {
|
|||
runnerId: TaskRunner['id'];
|
||||
requesterId: string;
|
||||
taskType: string;
|
||||
timeout?: NodeJS.Timeout;
|
||||
}
|
||||
|
||||
export interface TaskOffer {
|
||||
|
@ -78,7 +84,15 @@ export class TaskBroker {
|
|||
|
||||
private pendingTaskRequests: TaskRequest[] = [];
|
||||
|
||||
constructor(private readonly logger: Logger) {}
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly taskRunnersConfig: TaskRunnersConfig,
|
||||
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
|
||||
) {
|
||||
if (this.taskRunnersConfig.taskTimeout <= 0) {
|
||||
throw new ApplicationError('Task timeout must be greater than 0');
|
||||
}
|
||||
}
|
||||
|
||||
expireTasks() {
|
||||
const now = process.hrtime.bigint();
|
||||
|
@ -408,6 +422,14 @@ export class TaskBroker {
|
|||
|
||||
async sendTaskSettings(taskId: Task['id'], settings: unknown) {
|
||||
const runner = await this.getRunnerOrFailTask(taskId);
|
||||
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) return;
|
||||
|
||||
task.timeout = setTimeout(async () => {
|
||||
await this.handleTaskTimeout(taskId);
|
||||
}, this.taskRunnersConfig.taskTimeout * Time.seconds.toMilliseconds);
|
||||
|
||||
await this.messageRunner(runner.id, {
|
||||
type: 'broker:tasksettings',
|
||||
taskId,
|
||||
|
@ -415,11 +437,27 @@ export class TaskBroker {
|
|||
});
|
||||
}
|
||||
|
||||
private async handleTaskTimeout(taskId: Task['id']) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) return;
|
||||
|
||||
this.runnerLifecycleEvents.emit('runner:timed-out-during-task');
|
||||
|
||||
await this.taskErrorHandler(
|
||||
taskId,
|
||||
new TaskRunnerTimeoutError(
|
||||
this.taskRunnersConfig.taskTimeout,
|
||||
config.getEnv('deployment.type') !== 'cloud',
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async taskDoneHandler(taskId: Task['id'], data: TaskResultData) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
if (!task) return;
|
||||
|
||||
clearTimeout(task.timeout);
|
||||
|
||||
await this.requesters.get(task.requesterId)?.({
|
||||
type: 'broker:taskdone',
|
||||
taskId: task.id,
|
||||
|
@ -430,9 +468,10 @@ export class TaskBroker {
|
|||
|
||||
async taskErrorHandler(taskId: Task['id'], error: unknown) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
if (!task) return;
|
||||
|
||||
clearTimeout(task.timeout);
|
||||
|
||||
await this.requesters.get(task.requesterId)?.({
|
||||
type: 'broker:taskerror',
|
||||
taskId: task.id,
|
||||
|
|
|
@ -10,6 +10,7 @@ import { Logger } from '@/logging/logger.service';
|
|||
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
|
||||
import { forwardToLogger } from './forward-to-logger';
|
||||
import { NodeProcessOomDetector } from './node-process-oom-detector';
|
||||
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
|
||||
import { TypedEmitter } from '../typed-emitter';
|
||||
|
||||
type ChildProcess = ReturnType<typeof spawn>;
|
||||
|
@ -70,6 +71,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|||
logger: Logger,
|
||||
private readonly runnerConfig: TaskRunnersConfig,
|
||||
private readonly authService: TaskRunnerAuthService,
|
||||
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
|
||||
) {
|
||||
super();
|
||||
|
||||
|
@ -79,6 +81,16 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|||
);
|
||||
|
||||
this.logger = logger.scoped('task-runner');
|
||||
|
||||
this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => {
|
||||
this.logger.warn('Task runner failed heartbeat check, restarting...');
|
||||
void this.forceRestart();
|
||||
});
|
||||
|
||||
this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => {
|
||||
this.logger.warn('Task runner timed out during task, restarting...');
|
||||
void this.forceRestart();
|
||||
});
|
||||
}
|
||||
|
||||
async start() {
|
||||
|
@ -116,9 +128,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|||
|
||||
@OnShutdown()
|
||||
async stop() {
|
||||
if (!this.process) {
|
||||
return;
|
||||
}
|
||||
if (!this.process) return;
|
||||
|
||||
this.isShuttingDown = true;
|
||||
|
||||
|
@ -133,10 +143,22 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|||
this.isShuttingDown = false;
|
||||
}
|
||||
|
||||
killNode() {
|
||||
if (!this.process) {
|
||||
return;
|
||||
/** Force-restart a runner suspected of being unresponsive. */
|
||||
async forceRestart() {
|
||||
if (!this.process) return;
|
||||
|
||||
if (this.useLauncher) {
|
||||
await this.killLauncher(); // @TODO: Implement SIGKILL in launcher
|
||||
} else {
|
||||
this.process.kill('SIGKILL');
|
||||
}
|
||||
|
||||
await this._runPromise;
|
||||
}
|
||||
|
||||
killNode() {
|
||||
if (!this.process) return;
|
||||
|
||||
this.process.kill();
|
||||
}
|
||||
|
||||
|
@ -173,7 +195,6 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
|||
this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' });
|
||||
resolveFn();
|
||||
|
||||
// If we are not shutting down, restart the process
|
||||
if (!this.isShuttingDown) {
|
||||
setImmediate(async () => await this.start());
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ export class TaskRunnerServer {
|
|||
private readonly logger: Logger,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly taskRunnerAuthController: TaskRunnerAuthController,
|
||||
private readonly taskRunnerService: TaskRunnerWsServer,
|
||||
private readonly taskRunnerWsServer: TaskRunnerWsServer,
|
||||
) {
|
||||
this.app = express();
|
||||
this.app.disable('x-powered-by');
|
||||
|
@ -148,7 +148,7 @@ export class TaskRunnerServer {
|
|||
// eslint-disable-next-line @typescript-eslint/unbound-method
|
||||
this.taskRunnerAuthController.authMiddleware,
|
||||
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) =>
|
||||
this.taskRunnerService.handleRequest(req, res),
|
||||
this.taskRunnerWsServer.handleRequest(req, res),
|
||||
);
|
||||
|
||||
const authEndpoint = `${this.getEndpointBasePath()}/auth`;
|
||||
|
|
Loading…
Reference in a new issue