From 4e3681b9052f202502cf562043dc27ea9bdb4afa Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Tue, 29 Oct 2024 12:39:31 +0200 Subject: [PATCH] fix: Provide a better error message when task runner disconnects (no-changelog) (#11442) --- .../node-process-oom-detector.test.ts | 43 +++++++++++ .../__tests__/sliding-window-signal.test.ts | 71 +++++++++++++++++++ .../src/runners/__tests__/task-broker.test.ts | 14 ++-- .../errors/task-runner-disconnected-error.ts | 7 ++ .../runners/errors/task-runner-oom-error.ts | 31 ++++++++ .../src/runners/node-process-oom-detector.ts | 34 +++++++++ packages/cli/src/runners/runner-ws-server.ts | 15 ++-- .../cli/src/runners/sliding-window-signal.ts | 59 +++++++++++++++ .../cli/src/runners/task-broker.service.ts | 26 ++++--- .../task-runner-disconnect-analyzer.ts | 60 ++++++++++++++++ .../cli/src/runners/task-runner-process.ts | 19 ++++- .../nodes/Code/JsTaskRunnerSandbox.ts | 8 ++- 12 files changed, 362 insertions(+), 25 deletions(-) create mode 100644 packages/cli/src/runners/__tests__/node-process-oom-detector.test.ts create mode 100644 packages/cli/src/runners/__tests__/sliding-window-signal.test.ts create mode 100644 packages/cli/src/runners/errors/task-runner-disconnected-error.ts create mode 100644 packages/cli/src/runners/errors/task-runner-oom-error.ts create mode 100644 packages/cli/src/runners/node-process-oom-detector.ts create mode 100644 packages/cli/src/runners/sliding-window-signal.ts create mode 100644 packages/cli/src/runners/task-runner-disconnect-analyzer.ts diff --git a/packages/cli/src/runners/__tests__/node-process-oom-detector.test.ts b/packages/cli/src/runners/__tests__/node-process-oom-detector.test.ts new file mode 100644 index 0000000000..5b619e0e08 --- /dev/null +++ b/packages/cli/src/runners/__tests__/node-process-oom-detector.test.ts @@ -0,0 +1,43 @@ +import { spawn } from 'node:child_process'; + +import { NodeProcessOomDetector } from '../node-process-oom-detector'; + +describe('NodeProcessOomDetector', () => { + test('should detect an out-of-memory error in a monitored process', (done) => { + const childProcess = spawn(process.execPath, [ + // set low memory limit + '--max-old-space-size=20', + '-e', + ` + const data = []; + // fill memory until it crashes + while (true) data.push(Array.from({ length: 10_000 }).map(() => Math.random().toString()).join()); + `, + ]); + + const detector = new NodeProcessOomDetector(childProcess); + + childProcess.on('exit', (code) => { + expect(detector.didProcessOom).toBe(true); + expect(code).not.toBe(0); + done(); + }); + }); + + test('should not detect an out-of-memory error in a process that exits normally', (done) => { + const childProcess = spawn(process.execPath, [ + '-e', + ` + console.log("Hello, World!"); + `, + ]); + + const detector = new NodeProcessOomDetector(childProcess); + + childProcess.on('exit', (code) => { + expect(detector.didProcessOom).toBe(false); + expect(code).toBe(0); + done(); + }); + }); +}); diff --git a/packages/cli/src/runners/__tests__/sliding-window-signal.test.ts b/packages/cli/src/runners/__tests__/sliding-window-signal.test.ts new file mode 100644 index 0000000000..56462d186a --- /dev/null +++ b/packages/cli/src/runners/__tests__/sliding-window-signal.test.ts @@ -0,0 +1,71 @@ +import { TypedEmitter } from '../../typed-emitter'; +import { SlidingWindowSignal } from '../sliding-window-signal'; + +type TestEventMap = { + testEvent: string; +}; + +describe('SlidingWindowSignal', () => { + let eventEmitter: TypedEmitter; + let slidingWindowSignal: SlidingWindowSignal; + + beforeEach(() => { + eventEmitter = new TypedEmitter(); + slidingWindowSignal = new SlidingWindowSignal(eventEmitter, 'testEvent', { + windowSizeInMs: 500, + }); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.clearAllMocks(); + }); + + it('should return the last signal if within window size', async () => { + const signal = 'testSignal'; + eventEmitter.emit('testEvent', signal); + + const receivedSignal = await slidingWindowSignal.getSignal(); + + expect(receivedSignal).toBe(signal); + }); + + it('should return null if there is no signal within the window', async () => { + jest.useFakeTimers(); + const receivedSignalPromise = slidingWindowSignal.getSignal(); + jest.advanceTimersByTime(600); + const receivedSignal = await receivedSignalPromise; + + expect(receivedSignal).toBeNull(); + jest.useRealTimers(); + }); + + it('should return null if "exit" event is not emitted before timeout', async () => { + const signal = 'testSignal'; + jest.useFakeTimers(); + const receivedSignalPromise = slidingWindowSignal.getSignal(); + jest.advanceTimersByTime(600); + eventEmitter.emit('testEvent', signal); + + const receivedSignal = await receivedSignalPromise; + expect(receivedSignal).toBeNull(); + jest.useRealTimers(); + }); + + it('should return the signal emitted on "exit" event before timeout', async () => { + jest.useFakeTimers(); + const receivedSignalPromise = slidingWindowSignal.getSignal(); + + // Emit 'exit' with a signal before timeout + const exitSignal = 'exitSignal'; + eventEmitter.emit('testEvent', exitSignal); + + // Advance timers enough to go outside the timeout window + jest.advanceTimersByTime(600); + + const receivedSignal = await receivedSignalPromise; + expect(receivedSignal).toBe(exitSignal); + + jest.useRealTimers(); + }); +}); diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 4a226c5b98..a90bf7662c 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -110,7 +110,7 @@ describe('TaskBroker', () => { const messageCallback = jest.fn(); taskBroker.registerRunner(runner, messageCallback); - taskBroker.deregisterRunner(runnerId); + taskBroker.deregisterRunner(runnerId, new Error()); const knownRunners = taskBroker.getKnownRunners(); const runnerIds = Object.keys(knownRunners); @@ -138,7 +138,7 @@ describe('TaskBroker', () => { validFor: 1000, validUntil: createValidUntil(1000), }); - taskBroker.deregisterRunner(runnerId); + taskBroker.deregisterRunner(runnerId, new Error()); const offers = taskBroker.getPendingTaskOffers(); expect(offers).toHaveLength(1); @@ -161,10 +161,14 @@ describe('TaskBroker', () => { [taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' }, task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' }, }); - taskBroker.deregisterRunner(runnerId); + const error = new Error('error'); + taskBroker.deregisterRunner(runnerId, error); - expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`); - expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`); + expect(failSpy).toBeCalledWith(taskId, error); + expect(rejectSpy).toBeCalledWith( + taskId, + `The Task Runner (${runnerId}) has disconnected: error`, + ); }); }); diff --git a/packages/cli/src/runners/errors/task-runner-disconnected-error.ts b/packages/cli/src/runners/errors/task-runner-disconnected-error.ts new file mode 100644 index 0000000000..6c7c49450a --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-disconnected-error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerDisconnectedError extends ApplicationError { + constructor(runnerId: string) { + super(`Task runner (${runnerId}) disconnected`); + } +} diff --git a/packages/cli/src/runners/errors/task-runner-oom-error.ts b/packages/cli/src/runners/errors/task-runner-oom-error.ts new file mode 100644 index 0000000000..e52b8b4bea --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-oom-error.ts @@ -0,0 +1,31 @@ +import { ApplicationError } from 'n8n-workflow'; + +import type { TaskRunner } from '../task-broker.service'; + +export class TaskRunnerOomError extends ApplicationError { + public description: string; + + constructor(runnerId: TaskRunner['id'], isCloudDeployment: boolean) { + super(`Task runner (${runnerId}) ran out of memory.`, { level: 'error' }); + + const fixSuggestions = { + reduceItems: 'Reduce the number of items processed at a time by batching the input.', + increaseMemory: + "Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable.", + upgradePlan: 'Upgrade your cloud plan to increase the available memory.', + }; + + const subtitle = + 'The runner executing the code ran out of memory. This usually happens when there are too many items to process. You can try the following:'; + const suggestions = isCloudDeployment + ? [fixSuggestions.reduceItems, fixSuggestions.upgradePlan] + : [fixSuggestions.reduceItems, fixSuggestions.increaseMemory]; + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle}

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/cli/src/runners/node-process-oom-detector.ts b/packages/cli/src/runners/node-process-oom-detector.ts new file mode 100644 index 0000000000..e6debb8551 --- /dev/null +++ b/packages/cli/src/runners/node-process-oom-detector.ts @@ -0,0 +1,34 @@ +import * as a from 'node:assert/strict'; +import type { ChildProcess } from 'node:child_process'; + +/** + * Class to monitor a nodejs process and detect if it runs out of + * memory (OOMs). + */ +export class NodeProcessOomDetector { + public get didProcessOom() { + return this._didProcessOom; + } + + private _didProcessOom = false; + + constructor(processToMonitor: ChildProcess) { + this.monitorProcess(processToMonitor); + } + + private monitorProcess(processToMonitor: ChildProcess) { + a.ok(processToMonitor.stderr, "Can't monitor a process without stderr"); + + processToMonitor.stderr.on('data', this.onStderr); + + processToMonitor.once('exit', () => { + processToMonitor.stderr?.off('data', this.onStderr); + }); + } + + private onStderr = (data: Buffer) => { + if (data.includes('JavaScript heap out of memory')) { + this._didProcessOom = true; + } + }; +} diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index 38b70c97dc..59bb92ff76 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -10,6 +10,7 @@ import type { TaskRunnerServerInitResponse, } from './runner-types'; import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service'; +import { TaskRunnerDisconnectAnalyzer } from './task-runner-disconnect-analyzer'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -22,6 +23,7 @@ export class TaskRunnerService { constructor( private readonly logger: Logger, private readonly taskBroker: TaskBroker, + private readonly disconnectAnalyzer: TaskRunnerDisconnectAnalyzer, ) {} sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) { @@ -34,7 +36,7 @@ export class TaskRunnerService { let isConnected = false; - const onMessage = (data: WebSocket.RawData) => { + const onMessage = async (data: WebSocket.RawData) => { try { const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); @@ -45,7 +47,7 @@ export class TaskRunnerService { if (!isConnected && message.type !== 'runner:info') { return; } else if (!isConnected && message.type === 'runner:info') { - this.removeConnection(id); + await this.removeConnection(id); isConnected = true; this.runnerConnections.set(id, connection); @@ -75,10 +77,10 @@ export class TaskRunnerService { }; // Makes sure to remove the session if the connection is closed - connection.once('close', () => { + connection.once('close', async () => { connection.off('pong', heartbeat); connection.off('message', onMessage); - this.removeConnection(id); + await this.removeConnection(id); }); connection.on('message', onMessage); @@ -87,10 +89,11 @@ export class TaskRunnerService { ); } - removeConnection(id: TaskRunner['id']) { + async removeConnection(id: TaskRunner['id']) { const connection = this.runnerConnections.get(id); if (connection) { - this.taskBroker.deregisterRunner(id); + const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id); + this.taskBroker.deregisterRunner(id, disconnectReason); connection.close(); this.runnerConnections.delete(id); } diff --git a/packages/cli/src/runners/sliding-window-signal.ts b/packages/cli/src/runners/sliding-window-signal.ts new file mode 100644 index 0000000000..5954f7bade --- /dev/null +++ b/packages/cli/src/runners/sliding-window-signal.ts @@ -0,0 +1,59 @@ +import type { TypedEmitter } from '../typed-emitter'; + +export type SlidingWindowSignalOpts = { + windowSizeInMs?: number; +}; + +/** + * A class that listens for a specific event on an emitter (signal) and + * provides a sliding window of the last event that was emitted. + */ +export class SlidingWindowSignal { + private lastSignal: TEvents[TEventName] | null = null; + + private lastSignalTime: number = 0; + + private windowSizeInMs: number; + + constructor( + private readonly eventEmitter: TypedEmitter, + private readonly eventName: TEventName, + opts: SlidingWindowSignalOpts = {}, + ) { + const { windowSizeInMs = 500 } = opts; + + this.windowSizeInMs = windowSizeInMs; + + eventEmitter.on(eventName, (signal: TEvents[TEventName]) => { + this.lastSignal = signal; + this.lastSignalTime = Date.now(); + }); + } + + /** + * If an event has been emitted within the last `windowSize` milliseconds, + * that event is returned. Otherwise it will wait for up to `windowSize` + * milliseconds for the event to be emitted. `null` is returned + * if no event is emitted within the window. + */ + public async getSignal(): Promise { + const timeSinceLastEvent = Date.now() - this.lastSignalTime; + if (timeSinceLastEvent <= this.windowSizeInMs) return this.lastSignal; + + return await new Promise((resolve) => { + let timeoutTimerId: NodeJS.Timeout | null = null; + + const onExit = (signal: TEvents[TEventName]) => { + if (timeoutTimerId) clearTimeout(timeoutTimerId); + resolve(signal); + }; + + timeoutTimerId = setTimeout(() => { + this.eventEmitter.off(this.eventName, onExit); + resolve(null); + }); + + this.eventEmitter.once(this.eventName, onExit); + }); + } +} diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 40ad6d6e90..d88d677725 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -104,7 +104,7 @@ export class TaskBroker { }); } - deregisterRunner(runnerId: string) { + deregisterRunner(runnerId: string, error: Error) { this.knownRunners.delete(runnerId); // Remove any pending offers @@ -117,8 +117,11 @@ export class TaskBroker { // Fail any tasks for (const task of this.tasks.values()) { if (task.runnerId === runnerId) { - void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`); - this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`); + void this.failTask(task.id, error); + this.handleRunnerReject( + task.id, + `The Task Runner (${runnerId}) has disconnected: ${error.message}`, + ); } } } @@ -352,7 +355,7 @@ export class TaskBroker { }); } - private async failTask(taskId: Task['id'], reason: string) { + private async failTask(taskId: Task['id'], error: Error) { const task = this.tasks.get(taskId); if (!task) { return; @@ -362,7 +365,7 @@ export class TaskBroker { await this.messageRequester(task.requesterId, { type: 'broker:taskerror', taskId, - error: reason, + error, }); } @@ -375,11 +378,14 @@ export class TaskBroker { } const runner = this.knownRunners.get(task.runnerId); if (!runner) { - const reason = `Cannot find runner, failed to find runner (${task.runnerId})`; - await this.failTask(taskId, reason); - throw new ApplicationError(reason, { - level: 'error', - }); + const error = new ApplicationError( + `Cannot find runner, failed to find runner (${task.runnerId})`, + { + level: 'error', + }, + ); + await this.failTask(taskId, error); + throw error; } return runner.runner; } diff --git a/packages/cli/src/runners/task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/task-runner-disconnect-analyzer.ts new file mode 100644 index 0000000000..d75a1b9aad --- /dev/null +++ b/packages/cli/src/runners/task-runner-disconnect-analyzer.ts @@ -0,0 +1,60 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import { Service } from 'typedi'; + +import config from '@/config'; + +import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; +import { TaskRunnerOomError } from './errors/task-runner-oom-error'; +import { SlidingWindowSignal } from './sliding-window-signal'; +import type { TaskRunner } from './task-broker.service'; +import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; +import { TaskRunnerProcess } from './task-runner-process'; + +/** + * Analyzes the disconnect reason of a task runner process to provide a more + * meaningful error message to the user. + */ +@Service() +export class TaskRunnerDisconnectAnalyzer { + private readonly exitReasonSignal: SlidingWindowSignal; + + constructor( + private readonly runnerConfig: TaskRunnersConfig, + private readonly taskRunnerProcess: TaskRunnerProcess, + ) { + // When the task runner process is running as a child process, there's + // no determinate time when it exits compared to when the runner disconnects + // (i.e. it's a race condition). Hence we use a sliding window to determine + // the exit reason. As long as we receive the exit signal from the task + // runner process within the window, we can determine the exit reason. + this.exitReasonSignal = new SlidingWindowSignal(this.taskRunnerProcess, 'exit', { + windowSizeInMs: 500, + }); + } + + private get isCloudDeployment() { + return config.get('deployment.type') === 'cloud'; + } + + async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { + const exitCode = await this.awaitExitSignal(); + if (exitCode === 'oom') { + return new TaskRunnerOomError(runnerId, this.isCloudDeployment); + } + + return new TaskRunnerDisconnectedError(runnerId); + } + + private async awaitExitSignal(): Promise { + if (this.runnerConfig.mode === 'external') { + // If the task runner is running in external mode, we don't have + // control over the process and hence cannot determine the exit + // reason. We just return 'unknown' in this case. + return 'unknown'; + } + + const lastExitReason = await this.exitReasonSignal.getSignal(); + + return lastExitReason?.reason ?? 'unknown'; + } +} diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index f4c219ead7..d453f1d134 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -9,14 +9,24 @@ import { Logger } from '@/logging/logger.service'; import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; import { forwardToLogger } from './forward-to-logger'; +import { NodeProcessOomDetector } from './node-process-oom-detector'; +import { TypedEmitter } from '../typed-emitter'; type ChildProcess = ReturnType; +export type ExitReason = 'unknown' | 'oom'; + +export type TaskRunnerProcessEventMap = { + exit: { + reason: ExitReason; + }; +}; + /** * Manages the JS task runner process as a child process */ @Service() -export class TaskRunnerProcess { +export class TaskRunnerProcess extends TypedEmitter { public get isRunning() { return this.process !== null; } @@ -39,6 +49,8 @@ export class TaskRunnerProcess { private _runPromise: Promise | null = null; + private oomDetector: NodeProcessOomDetector | null = null; + private isShuttingDown = false; private logger: Logger; @@ -54,6 +66,8 @@ export class TaskRunnerProcess { private readonly runnerConfig: TaskRunnersConfig, private readonly authService: TaskRunnerAuthService, ) { + super(); + a.ok( this.runnerConfig.mode === 'internal_childprocess' || this.runnerConfig.mode === 'internal_launcher', @@ -141,6 +155,8 @@ export class TaskRunnerProcess { private monitorProcess(taskRunnerProcess: ChildProcess) { this._runPromise = new Promise((resolve) => { + this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess); + taskRunnerProcess.on('exit', (code) => { this.onProcessExit(code, resolve); }); @@ -149,6 +165,7 @@ export class TaskRunnerProcess { private onProcessExit(_code: number | null, resolveFn: () => void) { this.process = null; + this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }); resolveFn(); // If we are not shutting down, restart the process diff --git a/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts index 80d4b32445..98d98a28e3 100644 --- a/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts +++ b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts @@ -60,9 +60,11 @@ export class JsTaskRunnerSandbox { } private throwExecutionError(error: unknown): never { - // The error coming from task runner is not an instance of error, - // so we need to wrap it in an error instance. - if (isWrappableError(error)) { + if (error instanceof Error) { + throw error; + } else if (isWrappableError(error)) { + // The error coming from task runner is not an instance of error, + // so we need to wrap it in an error instance. throw new WrappedExecutionError(error); }