mirror of
https://github.com/n8n-io/n8n.git
synced 2025-02-21 02:56:40 -08:00
feat(core): Detect restart loop in a task runner process (no-changelog) (#12003)
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
parent
00897f6634
commit
516f3b7b4b
|
@ -0,0 +1,57 @@
|
||||||
|
import { TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
|
||||||
|
import type { Logger } from '@/logging/logger.service';
|
||||||
|
import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.service';
|
||||||
|
import { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error';
|
||||||
|
import { RunnerLifecycleEvents } from '@/runners/runner-lifecycle-events';
|
||||||
|
import { TaskRunnerProcess } from '@/runners/task-runner-process';
|
||||||
|
import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector';
|
||||||
|
|
||||||
|
describe('TaskRunnerProcessRestartLoopDetector', () => {
|
||||||
|
const mockLogger = mock<Logger>();
|
||||||
|
const mockAuthService = mock<TaskRunnerAuthService>();
|
||||||
|
const runnerConfig = new TaskRunnersConfig();
|
||||||
|
const taskRunnerProcess = new TaskRunnerProcess(
|
||||||
|
mockLogger,
|
||||||
|
runnerConfig,
|
||||||
|
mockAuthService,
|
||||||
|
new RunnerLifecycleEvents(),
|
||||||
|
);
|
||||||
|
|
||||||
|
it('should detect a restart loop if process exits 5 times within 5s', () => {
|
||||||
|
const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(taskRunnerProcess);
|
||||||
|
let emittedError: TaskRunnerRestartLoopError | undefined = undefined;
|
||||||
|
restartLoopDetector.on('restart-loop-detected', (error) => {
|
||||||
|
emittedError = error;
|
||||||
|
});
|
||||||
|
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
|
||||||
|
expect(emittedError).toBeInstanceOf(TaskRunnerRestartLoopError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not detect a restart loop if process exits less than 5 times within 5s', () => {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(taskRunnerProcess);
|
||||||
|
let emittedError: TaskRunnerRestartLoopError | undefined = undefined;
|
||||||
|
restartLoopDetector.on('restart-loop-detected', (error) => {
|
||||||
|
emittedError = error;
|
||||||
|
});
|
||||||
|
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
|
||||||
|
jest.advanceTimersByTime(5010);
|
||||||
|
|
||||||
|
taskRunnerProcess.emit('exit');
|
||||||
|
|
||||||
|
expect(emittedError).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,14 @@
|
||||||
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
|
|
||||||
|
export class TaskRunnerRestartLoopError extends ApplicationError {
|
||||||
|
constructor(
|
||||||
|
public readonly howManyTimes: number,
|
||||||
|
public readonly timePeriodMs: number,
|
||||||
|
) {
|
||||||
|
const message = `Task runner has restarted ${howManyTimes} times within ${timePeriodMs / 1000} seconds. This is an abnormally high restart rate that suggests a bug or other issue is preventing your runner process from starting up. If this issues persists, please file a report at: https://github.com/n8n-io/n8n/issues`;
|
||||||
|
|
||||||
|
super(message, {
|
||||||
|
level: 'fatal',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,13 @@
|
||||||
import { TaskRunnersConfig } from '@n8n/config';
|
import { TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { ErrorReporterProxy, sleep } from 'n8n-workflow';
|
||||||
import * as a from 'node:assert/strict';
|
import * as a from 'node:assert/strict';
|
||||||
import Container, { Service } from 'typedi';
|
import Container, { Service } from 'typedi';
|
||||||
|
|
||||||
import { OnShutdown } from '@/decorators/on-shutdown';
|
import { OnShutdown } from '@/decorators/on-shutdown';
|
||||||
|
import { Logger } from '@/logging/logger.service';
|
||||||
|
import type { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error';
|
||||||
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
|
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
|
||||||
|
import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector';
|
||||||
|
|
||||||
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
|
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
|
||||||
import { TaskRunnerWsServer } from './runner-ws-server';
|
import { TaskRunnerWsServer } from './runner-ws-server';
|
||||||
|
@ -25,7 +29,14 @@ export class TaskRunnerModule {
|
||||||
|
|
||||||
private taskRunnerProcess: TaskRunnerProcess | undefined;
|
private taskRunnerProcess: TaskRunnerProcess | undefined;
|
||||||
|
|
||||||
constructor(private readonly runnerConfig: TaskRunnersConfig) {}
|
private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly logger: Logger,
|
||||||
|
private readonly runnerConfig: TaskRunnersConfig,
|
||||||
|
) {
|
||||||
|
this.logger = this.logger.scoped('task-runner');
|
||||||
|
}
|
||||||
|
|
||||||
async start() {
|
async start() {
|
||||||
a.ok(this.runnerConfig.enabled, 'Task runner is disabled');
|
a.ok(this.runnerConfig.enabled, 'Task runner is disabled');
|
||||||
|
@ -83,6 +94,14 @@ export class TaskRunnerModule {
|
||||||
|
|
||||||
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
|
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
|
||||||
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
|
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
|
||||||
|
this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
|
||||||
|
this.taskRunnerProcess,
|
||||||
|
);
|
||||||
|
this.taskRunnerProcessRestartLoopDetector.on(
|
||||||
|
'restart-loop-detected',
|
||||||
|
this.onRunnerRestartLoopDetected,
|
||||||
|
);
|
||||||
|
|
||||||
await this.taskRunnerProcess.start();
|
await this.taskRunnerProcess.start();
|
||||||
|
|
||||||
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
|
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
|
||||||
|
@ -92,4 +111,13 @@ export class TaskRunnerModule {
|
||||||
Container.get(InternalTaskRunnerDisconnectAnalyzer),
|
Container.get(InternalTaskRunnerDisconnectAnalyzer),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private onRunnerRestartLoopDetected = async (error: TaskRunnerRestartLoopError) => {
|
||||||
|
this.logger.error(error.message);
|
||||||
|
ErrorReporterProxy.error(error);
|
||||||
|
|
||||||
|
// Allow some time for the error to be flushed
|
||||||
|
await sleep(1000);
|
||||||
|
process.exit(1);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
import { Time } from '@/constants';
|
||||||
|
import { TaskRunnerRestartLoopError } from '@/runners/errors/task-runner-restart-loop-error';
|
||||||
|
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
|
||||||
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
|
const MAX_RESTARTS = 5;
|
||||||
|
const RESTARTS_WINDOW = 2 * Time.seconds.toMilliseconds;
|
||||||
|
|
||||||
|
type TaskRunnerProcessRestartLoopDetectorEventMap = {
|
||||||
|
'restart-loop-detected': TaskRunnerRestartLoopError;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class to monitor the task runner process for restart loops
|
||||||
|
*/
|
||||||
|
export class TaskRunnerProcessRestartLoopDetector extends TypedEmitter<TaskRunnerProcessRestartLoopDetectorEventMap> {
|
||||||
|
/**
|
||||||
|
* How many times the process needs to restart for it to be detected
|
||||||
|
* being in a loop.
|
||||||
|
*/
|
||||||
|
private readonly maxCount = MAX_RESTARTS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time interval in which the process needs to restart `maxCount` times
|
||||||
|
* to be detected as being in a loop.
|
||||||
|
*/
|
||||||
|
private readonly restartsWindow = RESTARTS_WINDOW;
|
||||||
|
|
||||||
|
private numRestarts = 0;
|
||||||
|
|
||||||
|
/** Time when the first restart of a loop happened within a time window */
|
||||||
|
private firstRestartedAt = Date.now();
|
||||||
|
|
||||||
|
constructor(private readonly taskRunnerProcess: TaskRunnerProcess) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
this.taskRunnerProcess.on('exit', () => {
|
||||||
|
this.increment();
|
||||||
|
|
||||||
|
if (this.isMaxCountExceeded()) {
|
||||||
|
this.emit(
|
||||||
|
'restart-loop-detected',
|
||||||
|
new TaskRunnerRestartLoopError(this.numRestarts, this.msSinceFirstIncrement()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments the counter
|
||||||
|
*/
|
||||||
|
private increment() {
|
||||||
|
const now = Date.now();
|
||||||
|
if (now > this.firstRestartedAt + this.restartsWindow) {
|
||||||
|
this.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.numRestarts++;
|
||||||
|
}
|
||||||
|
|
||||||
|
private reset() {
|
||||||
|
this.numRestarts = 0;
|
||||||
|
this.firstRestartedAt = Date.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
private isMaxCountExceeded() {
|
||||||
|
return this.numRestarts >= this.maxCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
private msSinceFirstIncrement() {
|
||||||
|
return Date.now() - this.firstRestartedAt;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
import { TaskRunnersConfig } from '@n8n/config';
|
import { TaskRunnersConfig } from '@n8n/config';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
|
|
||||||
import { MissingAuthTokenError } from '@/runners/errors/missing-auth-token.error';
|
import { MissingAuthTokenError } from '@/runners/errors/missing-auth-token.error';
|
||||||
|
@ -32,7 +33,7 @@ describe('TaskRunnerModule in external mode', () => {
|
||||||
runnerConfig.enabled = true;
|
runnerConfig.enabled = true;
|
||||||
runnerConfig.authToken = '';
|
runnerConfig.authToken = '';
|
||||||
|
|
||||||
const module = new TaskRunnerModule(runnerConfig);
|
const module = new TaskRunnerModule(mock(), runnerConfig);
|
||||||
|
|
||||||
await expect(module.start()).rejects.toThrowError(MissingAuthTokenError);
|
await expect(module.start()).rejects.toThrowError(MissingAuthTokenError);
|
||||||
});
|
});
|
||||||
|
|
|
@ -3,6 +3,7 @@ import Container from 'typedi';
|
||||||
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
|
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
|
||||||
import { TaskBroker } from '@/runners/task-broker.service';
|
import { TaskBroker } from '@/runners/task-broker.service';
|
||||||
import { TaskRunnerProcess } from '@/runners/task-runner-process';
|
import { TaskRunnerProcess } from '@/runners/task-runner-process';
|
||||||
|
import { TaskRunnerProcessRestartLoopDetector } from '@/runners/task-runner-process-restart-loop-detector';
|
||||||
import { retryUntil } from '@test-integration/retry-until';
|
import { retryUntil } from '@test-integration/retry-until';
|
||||||
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
|
import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server';
|
||||||
|
|
||||||
|
@ -84,4 +85,33 @@ describe('TaskRunnerProcess', () => {
|
||||||
expect(getNumRegisteredRunners()).toBe(1);
|
expect(getNumRegisteredRunners()).toBe(1);
|
||||||
expect(runnerProcess.pid).not.toBe(processId);
|
expect(runnerProcess.pid).not.toBe(processId);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should work together with restart loop detector', async () => {
|
||||||
|
// Arrange
|
||||||
|
const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(runnerProcess);
|
||||||
|
let restartLoopDetectedEventEmitted = false;
|
||||||
|
restartLoopDetector.once('restart-loop-detected', () => {
|
||||||
|
restartLoopDetectedEventEmitted = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Act
|
||||||
|
await runnerProcess.start();
|
||||||
|
|
||||||
|
// Simulate a restart loop
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
await retryUntil(() => {
|
||||||
|
expect(runnerProcess.pid).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
// @ts-expect-error private property
|
||||||
|
runnerProcess.process?.kill();
|
||||||
|
|
||||||
|
await new Promise((resolve) => {
|
||||||
|
runnerProcess.once('exit', resolve);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
expect(restartLoopDetectedEventEmitted).toBe(true);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue