fix: Provide a better error message when task runner disconnects (no-changelog) (#11442)

This commit is contained in:
Tomi Turtiainen 2024-10-29 12:39:31 +02:00 committed by GitHub
parent 0ab24c814a
commit 4e3681b905
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 362 additions and 25 deletions

View file

@ -0,0 +1,43 @@
import { spawn } from 'node:child_process';
import { NodeProcessOomDetector } from '../node-process-oom-detector';
describe('NodeProcessOomDetector', () => {
test('should detect an out-of-memory error in a monitored process', (done) => {
const childProcess = spawn(process.execPath, [
// set low memory limit
'--max-old-space-size=20',
'-e',
`
const data = [];
// fill memory until it crashes
while (true) data.push(Array.from({ length: 10_000 }).map(() => Math.random().toString()).join());
`,
]);
const detector = new NodeProcessOomDetector(childProcess);
childProcess.on('exit', (code) => {
expect(detector.didProcessOom).toBe(true);
expect(code).not.toBe(0);
done();
});
});
test('should not detect an out-of-memory error in a process that exits normally', (done) => {
const childProcess = spawn(process.execPath, [
'-e',
`
console.log("Hello, World!");
`,
]);
const detector = new NodeProcessOomDetector(childProcess);
childProcess.on('exit', (code) => {
expect(detector.didProcessOom).toBe(false);
expect(code).toBe(0);
done();
});
});
});

View file

@ -0,0 +1,71 @@
import { TypedEmitter } from '../../typed-emitter';
import { SlidingWindowSignal } from '../sliding-window-signal';
type TestEventMap = {
testEvent: string;
};
describe('SlidingWindowSignal', () => {
let eventEmitter: TypedEmitter<TestEventMap>;
let slidingWindowSignal: SlidingWindowSignal<TestEventMap, 'testEvent'>;
beforeEach(() => {
eventEmitter = new TypedEmitter<TestEventMap>();
slidingWindowSignal = new SlidingWindowSignal(eventEmitter, 'testEvent', {
windowSizeInMs: 500,
});
});
afterEach(() => {
jest.clearAllTimers();
jest.clearAllMocks();
});
it('should return the last signal if within window size', async () => {
const signal = 'testSignal';
eventEmitter.emit('testEvent', signal);
const receivedSignal = await slidingWindowSignal.getSignal();
expect(receivedSignal).toBe(signal);
});
it('should return null if there is no signal within the window', async () => {
jest.useFakeTimers();
const receivedSignalPromise = slidingWindowSignal.getSignal();
jest.advanceTimersByTime(600);
const receivedSignal = await receivedSignalPromise;
expect(receivedSignal).toBeNull();
jest.useRealTimers();
});
it('should return null if "exit" event is not emitted before timeout', async () => {
const signal = 'testSignal';
jest.useFakeTimers();
const receivedSignalPromise = slidingWindowSignal.getSignal();
jest.advanceTimersByTime(600);
eventEmitter.emit('testEvent', signal);
const receivedSignal = await receivedSignalPromise;
expect(receivedSignal).toBeNull();
jest.useRealTimers();
});
it('should return the signal emitted on "exit" event before timeout', async () => {
jest.useFakeTimers();
const receivedSignalPromise = slidingWindowSignal.getSignal();
// Emit 'exit' with a signal before timeout
const exitSignal = 'exitSignal';
eventEmitter.emit('testEvent', exitSignal);
// Advance timers enough to go outside the timeout window
jest.advanceTimersByTime(600);
const receivedSignal = await receivedSignalPromise;
expect(receivedSignal).toBe(exitSignal);
jest.useRealTimers();
});
});

View file

@ -110,7 +110,7 @@ describe('TaskBroker', () => {
const messageCallback = jest.fn();
taskBroker.registerRunner(runner, messageCallback);
taskBroker.deregisterRunner(runnerId);
taskBroker.deregisterRunner(runnerId, new Error());
const knownRunners = taskBroker.getKnownRunners();
const runnerIds = Object.keys(knownRunners);
@ -138,7 +138,7 @@ describe('TaskBroker', () => {
validFor: 1000,
validUntil: createValidUntil(1000),
});
taskBroker.deregisterRunner(runnerId);
taskBroker.deregisterRunner(runnerId, new Error());
const offers = taskBroker.getPendingTaskOffers();
expect(offers).toHaveLength(1);
@ -161,10 +161,14 @@ describe('TaskBroker', () => {
[taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' },
task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' },
});
taskBroker.deregisterRunner(runnerId);
const error = new Error('error');
taskBroker.deregisterRunner(runnerId, error);
expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
expect(failSpy).toBeCalledWith(taskId, error);
expect(rejectSpy).toBeCalledWith(
taskId,
`The Task Runner (${runnerId}) has disconnected: error`,
);
});
});

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class TaskRunnerDisconnectedError extends ApplicationError {
constructor(runnerId: string) {
super(`Task runner (${runnerId}) disconnected`);
}
}

View file

@ -0,0 +1,31 @@
import { ApplicationError } from 'n8n-workflow';
import type { TaskRunner } from '../task-broker.service';
export class TaskRunnerOomError extends ApplicationError {
public description: string;
constructor(runnerId: TaskRunner['id'], isCloudDeployment: boolean) {
super(`Task runner (${runnerId}) ran out of memory.`, { level: 'error' });
const fixSuggestions = {
reduceItems: 'Reduce the number of items processed at a time by batching the input.',
increaseMemory:
"Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable.",
upgradePlan: 'Upgrade your cloud plan to increase the available memory.',
};
const subtitle =
'The runner executing the code ran out of memory. This usually happens when there are too many items to process. You can try the following:';
const suggestions = isCloudDeployment
? [fixSuggestions.reduceItems, fixSuggestions.upgradePlan]
: [fixSuggestions.reduceItems, fixSuggestions.increaseMemory];
const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');
const description = `${subtitle}<br/><br/>${suggestionsText}`;
this.description = description;
}
}

View file

@ -0,0 +1,34 @@
import * as a from 'node:assert/strict';
import type { ChildProcess } from 'node:child_process';
/**
* Class to monitor a nodejs process and detect if it runs out of
* memory (OOMs).
*/
export class NodeProcessOomDetector {
public get didProcessOom() {
return this._didProcessOom;
}
private _didProcessOom = false;
constructor(processToMonitor: ChildProcess) {
this.monitorProcess(processToMonitor);
}
private monitorProcess(processToMonitor: ChildProcess) {
a.ok(processToMonitor.stderr, "Can't monitor a process without stderr");
processToMonitor.stderr.on('data', this.onStderr);
processToMonitor.once('exit', () => {
processToMonitor.stderr?.off('data', this.onStderr);
});
}
private onStderr = (data: Buffer) => {
if (data.includes('JavaScript heap out of memory')) {
this._didProcessOom = true;
}
};
}

View file

@ -10,6 +10,7 @@ import type {
TaskRunnerServerInitResponse,
} from './runner-types';
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
import { TaskRunnerDisconnectAnalyzer } from './task-runner-disconnect-analyzer';
function heartbeat(this: WebSocket) {
this.isAlive = true;
@ -22,6 +23,7 @@ export class TaskRunnerService {
constructor(
private readonly logger: Logger,
private readonly taskBroker: TaskBroker,
private readonly disconnectAnalyzer: TaskRunnerDisconnectAnalyzer,
) {}
sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
@ -34,7 +36,7 @@ export class TaskRunnerService {
let isConnected = false;
const onMessage = (data: WebSocket.RawData) => {
const onMessage = async (data: WebSocket.RawData) => {
try {
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
@ -45,7 +47,7 @@ export class TaskRunnerService {
if (!isConnected && message.type !== 'runner:info') {
return;
} else if (!isConnected && message.type === 'runner:info') {
this.removeConnection(id);
await this.removeConnection(id);
isConnected = true;
this.runnerConnections.set(id, connection);
@ -75,10 +77,10 @@ export class TaskRunnerService {
};
// Makes sure to remove the session if the connection is closed
connection.once('close', () => {
connection.once('close', async () => {
connection.off('pong', heartbeat);
connection.off('message', onMessage);
this.removeConnection(id);
await this.removeConnection(id);
});
connection.on('message', onMessage);
@ -87,10 +89,11 @@ export class TaskRunnerService {
);
}
removeConnection(id: TaskRunner['id']) {
async removeConnection(id: TaskRunner['id']) {
const connection = this.runnerConnections.get(id);
if (connection) {
this.taskBroker.deregisterRunner(id);
const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id);
this.taskBroker.deregisterRunner(id, disconnectReason);
connection.close();
this.runnerConnections.delete(id);
}

View file

@ -0,0 +1,59 @@
import type { TypedEmitter } from '../typed-emitter';
export type SlidingWindowSignalOpts = {
windowSizeInMs?: number;
};
/**
* A class that listens for a specific event on an emitter (signal) and
* provides a sliding window of the last event that was emitted.
*/
export class SlidingWindowSignal<TEvents, TEventName extends keyof TEvents & string> {
private lastSignal: TEvents[TEventName] | null = null;
private lastSignalTime: number = 0;
private windowSizeInMs: number;
constructor(
private readonly eventEmitter: TypedEmitter<TEvents>,
private readonly eventName: TEventName,
opts: SlidingWindowSignalOpts = {},
) {
const { windowSizeInMs = 500 } = opts;
this.windowSizeInMs = windowSizeInMs;
eventEmitter.on(eventName, (signal: TEvents[TEventName]) => {
this.lastSignal = signal;
this.lastSignalTime = Date.now();
});
}
/**
* If an event has been emitted within the last `windowSize` milliseconds,
* that event is returned. Otherwise it will wait for up to `windowSize`
* milliseconds for the event to be emitted. `null` is returned
* if no event is emitted within the window.
*/
public async getSignal(): Promise<TEvents[TEventName] | null> {
const timeSinceLastEvent = Date.now() - this.lastSignalTime;
if (timeSinceLastEvent <= this.windowSizeInMs) return this.lastSignal;
return await new Promise<TEvents[TEventName] | null>((resolve) => {
let timeoutTimerId: NodeJS.Timeout | null = null;
const onExit = (signal: TEvents[TEventName]) => {
if (timeoutTimerId) clearTimeout(timeoutTimerId);
resolve(signal);
};
timeoutTimerId = setTimeout(() => {
this.eventEmitter.off(this.eventName, onExit);
resolve(null);
});
this.eventEmitter.once(this.eventName, onExit);
});
}
}

View file

@ -104,7 +104,7 @@ export class TaskBroker {
});
}
deregisterRunner(runnerId: string) {
deregisterRunner(runnerId: string, error: Error) {
this.knownRunners.delete(runnerId);
// Remove any pending offers
@ -117,8 +117,11 @@ export class TaskBroker {
// Fail any tasks
for (const task of this.tasks.values()) {
if (task.runnerId === runnerId) {
void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`);
this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`);
void this.failTask(task.id, error);
this.handleRunnerReject(
task.id,
`The Task Runner (${runnerId}) has disconnected: ${error.message}`,
);
}
}
}
@ -352,7 +355,7 @@ export class TaskBroker {
});
}
private async failTask(taskId: Task['id'], reason: string) {
private async failTask(taskId: Task['id'], error: Error) {
const task = this.tasks.get(taskId);
if (!task) {
return;
@ -362,7 +365,7 @@ export class TaskBroker {
await this.messageRequester(task.requesterId, {
type: 'broker:taskerror',
taskId,
error: reason,
error,
});
}
@ -375,11 +378,14 @@ export class TaskBroker {
}
const runner = this.knownRunners.get(task.runnerId);
if (!runner) {
const reason = `Cannot find runner, failed to find runner (${task.runnerId})`;
await this.failTask(taskId, reason);
throw new ApplicationError(reason, {
level: 'error',
});
const error = new ApplicationError(
`Cannot find runner, failed to find runner (${task.runnerId})`,
{
level: 'error',
},
);
await this.failTask(taskId, error);
throw error;
}
return runner.runner;
}

View file

@ -0,0 +1,60 @@
import { TaskRunnersConfig } from '@n8n/config';
import { Service } from 'typedi';
import config from '@/config';
import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
import { SlidingWindowSignal } from './sliding-window-signal';
import type { TaskRunner } from './task-broker.service';
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
import { TaskRunnerProcess } from './task-runner-process';
/**
* Analyzes the disconnect reason of a task runner process to provide a more
* meaningful error message to the user.
*/
@Service()
export class TaskRunnerDisconnectAnalyzer {
private readonly exitReasonSignal: SlidingWindowSignal<TaskRunnerProcessEventMap, 'exit'>;
constructor(
private readonly runnerConfig: TaskRunnersConfig,
private readonly taskRunnerProcess: TaskRunnerProcess,
) {
// When the task runner process is running as a child process, there's
// no determinate time when it exits compared to when the runner disconnects
// (i.e. it's a race condition). Hence we use a sliding window to determine
// the exit reason. As long as we receive the exit signal from the task
// runner process within the window, we can determine the exit reason.
this.exitReasonSignal = new SlidingWindowSignal(this.taskRunnerProcess, 'exit', {
windowSizeInMs: 500,
});
}
private get isCloudDeployment() {
return config.get('deployment.type') === 'cloud';
}
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
const exitCode = await this.awaitExitSignal();
if (exitCode === 'oom') {
return new TaskRunnerOomError(runnerId, this.isCloudDeployment);
}
return new TaskRunnerDisconnectedError(runnerId);
}
private async awaitExitSignal(): Promise<ExitReason> {
if (this.runnerConfig.mode === 'external') {
// If the task runner is running in external mode, we don't have
// control over the process and hence cannot determine the exit
// reason. We just return 'unknown' in this case.
return 'unknown';
}
const lastExitReason = await this.exitReasonSignal.getSignal();
return lastExitReason?.reason ?? 'unknown';
}
}

View file

@ -9,14 +9,24 @@ import { Logger } from '@/logging/logger.service';
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { forwardToLogger } from './forward-to-logger';
import { NodeProcessOomDetector } from './node-process-oom-detector';
import { TypedEmitter } from '../typed-emitter';
type ChildProcess = ReturnType<typeof spawn>;
export type ExitReason = 'unknown' | 'oom';
export type TaskRunnerProcessEventMap = {
exit: {
reason: ExitReason;
};
};
/**
* Manages the JS task runner process as a child process
*/
@Service()
export class TaskRunnerProcess {
export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
public get isRunning() {
return this.process !== null;
}
@ -39,6 +49,8 @@ export class TaskRunnerProcess {
private _runPromise: Promise<void> | null = null;
private oomDetector: NodeProcessOomDetector | null = null;
private isShuttingDown = false;
private logger: Logger;
@ -54,6 +66,8 @@ export class TaskRunnerProcess {
private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService,
) {
super();
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
@ -141,6 +155,8 @@ export class TaskRunnerProcess {
private monitorProcess(taskRunnerProcess: ChildProcess) {
this._runPromise = new Promise((resolve) => {
this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess);
taskRunnerProcess.on('exit', (code) => {
this.onProcessExit(code, resolve);
});
@ -149,6 +165,7 @@ export class TaskRunnerProcess {
private onProcessExit(_code: number | null, resolveFn: () => void) {
this.process = null;
this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' });
resolveFn();
// If we are not shutting down, restart the process

View file

@ -60,9 +60,11 @@ export class JsTaskRunnerSandbox {
}
private throwExecutionError(error: unknown): never {
// The error coming from task runner is not an instance of error,
// so we need to wrap it in an error instance.
if (isWrappableError(error)) {
if (error instanceof Error) {
throw error;
} else if (isWrappableError(error)) {
// The error coming from task runner is not an instance of error,
// so we need to wrap it in an error instance.
throw new WrappedExecutionError(error);
}