mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
Merge branch 'master' of https://github.com/n8n-io/n8n into node-1912-internal-instance-waiting-functionality-is-broken
This commit is contained in:
commit
bdb44e8aac
|
@ -1061,6 +1061,7 @@ describe('TelemetryEventRelay', () => {
|
|||
describe('Community+ registered', () => {
|
||||
it('should track `license-community-plus-registered` event', () => {
|
||||
const event: RelayEventMap['license-community-plus-registered'] = {
|
||||
userId: 'user123',
|
||||
email: 'user@example.com',
|
||||
licenseKey: 'license123',
|
||||
};
|
||||
|
@ -1068,6 +1069,7 @@ describe('TelemetryEventRelay', () => {
|
|||
eventService.emit('license-community-plus-registered', event);
|
||||
|
||||
expect(telemetry.track).toHaveBeenCalledWith('User registered for license community plus', {
|
||||
user_id: 'user123',
|
||||
email: 'user@example.com',
|
||||
licenseKey: 'license123',
|
||||
});
|
||||
|
|
|
@ -8,7 +8,7 @@ import type {
|
|||
|
||||
import type { AuthProviderType } from '@/databases/entities/auth-identity';
|
||||
import type { ProjectRole } from '@/databases/entities/project-relation';
|
||||
import type { GlobalRole } from '@/databases/entities/user';
|
||||
import type { GlobalRole, User } from '@/databases/entities/user';
|
||||
import type { IWorkflowDb } from '@/interfaces';
|
||||
|
||||
import type { AiEventMap } from './ai.event-map';
|
||||
|
@ -421,6 +421,7 @@ export type RelayEventMap = {
|
|||
};
|
||||
|
||||
'license-community-plus-registered': {
|
||||
userId: User['id'];
|
||||
email: string;
|
||||
licenseKey: string;
|
||||
};
|
||||
|
|
|
@ -236,10 +236,12 @@ export class TelemetryEventRelay extends EventRelay {
|
|||
}
|
||||
|
||||
private licenseCommunityPlusRegistered({
|
||||
userId,
|
||||
email,
|
||||
licenseKey,
|
||||
}: RelayEventMap['license-community-plus-registered']) {
|
||||
this.telemetry.track('User registered for license community plus', {
|
||||
user_id: userId,
|
||||
email,
|
||||
licenseKey,
|
||||
});
|
||||
|
|
|
@ -94,6 +94,7 @@ describe('LicenseService', () => {
|
|||
.spyOn(axios, 'post')
|
||||
.mockResolvedValueOnce({ data: { title: 'Title', text: 'Text', licenseKey: 'abc-123' } });
|
||||
const data = await licenseService.registerCommunityEdition({
|
||||
userId: '123',
|
||||
email: 'test@ema.il',
|
||||
instanceId: '123',
|
||||
instanceUrl: 'http://localhost',
|
||||
|
@ -102,6 +103,7 @@ describe('LicenseService', () => {
|
|||
|
||||
expect(data).toEqual({ title: 'Title', text: 'Text' });
|
||||
expect(eventService.emit).toHaveBeenCalledWith('license-community-plus-registered', {
|
||||
userId: '123',
|
||||
email: 'test@ema.il',
|
||||
licenseKey: 'abc-123',
|
||||
});
|
||||
|
@ -111,6 +113,7 @@ describe('LicenseService', () => {
|
|||
jest.spyOn(axios, 'post').mockRejectedValueOnce(new AxiosError('Failed'));
|
||||
await expect(
|
||||
licenseService.registerCommunityEdition({
|
||||
userId: '123',
|
||||
email: 'test@ema.il',
|
||||
instanceId: '123',
|
||||
instanceUrl: 'http://localhost',
|
||||
|
|
|
@ -4,7 +4,7 @@ import { InstanceSettings } from 'n8n-core';
|
|||
|
||||
import { Get, Post, RestController, GlobalScope, Body } from '@/decorators';
|
||||
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
|
||||
import { AuthenticatedRequest, AuthlessRequest, LicenseRequest } from '@/requests';
|
||||
import { AuthenticatedRequest, LicenseRequest } from '@/requests';
|
||||
import { UrlService } from '@/services/url.service';
|
||||
|
||||
import { LicenseService } from './license.service';
|
||||
|
@ -41,11 +41,12 @@ export class LicenseController {
|
|||
|
||||
@Post('/enterprise/community-registered')
|
||||
async registerCommunityEdition(
|
||||
_req: AuthlessRequest,
|
||||
req: AuthenticatedRequest,
|
||||
_res: Response,
|
||||
@Body payload: CommunityRegisteredRequestDto,
|
||||
) {
|
||||
return await this.licenseService.registerCommunityEdition({
|
||||
userId: req.user.id,
|
||||
email: payload.email,
|
||||
instanceId: this.instanceSettings.instanceId,
|
||||
instanceUrl: this.urlService.getInstanceBaseUrl(),
|
||||
|
|
|
@ -61,11 +61,13 @@ export class LicenseService {
|
|||
}
|
||||
|
||||
async registerCommunityEdition({
|
||||
userId,
|
||||
email,
|
||||
instanceId,
|
||||
instanceUrl,
|
||||
licenseType,
|
||||
}: {
|
||||
userId: User['id'];
|
||||
email: string;
|
||||
instanceId: string;
|
||||
instanceUrl: string;
|
||||
|
@ -83,7 +85,7 @@ export class LicenseService {
|
|||
licenseType,
|
||||
},
|
||||
);
|
||||
this.eventService.emit('license-community-plus-registered', { email, licenseKey });
|
||||
this.eventService.emit('license-community-plus-registered', { userId, email, licenseKey });
|
||||
return rest;
|
||||
} catch (e: unknown) {
|
||||
if (e instanceof AxiosError) {
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
import { spawn } from 'node:child_process';
|
||||
|
||||
import { NodeProcessOomDetector } from '../node-process-oom-detector';
|
||||
|
||||
describe('NodeProcessOomDetector', () => {
|
||||
test('should detect an out-of-memory error in a monitored process', (done) => {
|
||||
const childProcess = spawn(process.execPath, [
|
||||
// set low memory limit
|
||||
'--max-old-space-size=20',
|
||||
'-e',
|
||||
`
|
||||
const data = [];
|
||||
// fill memory until it crashes
|
||||
while (true) data.push(Array.from({ length: 10_000 }).map(() => Math.random().toString()).join());
|
||||
`,
|
||||
]);
|
||||
|
||||
const detector = new NodeProcessOomDetector(childProcess);
|
||||
|
||||
childProcess.on('exit', (code) => {
|
||||
expect(detector.didProcessOom).toBe(true);
|
||||
expect(code).not.toBe(0);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
test('should not detect an out-of-memory error in a process that exits normally', (done) => {
|
||||
const childProcess = spawn(process.execPath, [
|
||||
'-e',
|
||||
`
|
||||
console.log("Hello, World!");
|
||||
`,
|
||||
]);
|
||||
|
||||
const detector = new NodeProcessOomDetector(childProcess);
|
||||
|
||||
childProcess.on('exit', (code) => {
|
||||
expect(detector.didProcessOom).toBe(false);
|
||||
expect(code).toBe(0);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,71 @@
|
|||
import { TypedEmitter } from '../../typed-emitter';
|
||||
import { SlidingWindowSignal } from '../sliding-window-signal';
|
||||
|
||||
type TestEventMap = {
|
||||
testEvent: string;
|
||||
};
|
||||
|
||||
describe('SlidingWindowSignal', () => {
|
||||
let eventEmitter: TypedEmitter<TestEventMap>;
|
||||
let slidingWindowSignal: SlidingWindowSignal<TestEventMap, 'testEvent'>;
|
||||
|
||||
beforeEach(() => {
|
||||
eventEmitter = new TypedEmitter<TestEventMap>();
|
||||
slidingWindowSignal = new SlidingWindowSignal(eventEmitter, 'testEvent', {
|
||||
windowSizeInMs: 500,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('should return the last signal if within window size', async () => {
|
||||
const signal = 'testSignal';
|
||||
eventEmitter.emit('testEvent', signal);
|
||||
|
||||
const receivedSignal = await slidingWindowSignal.getSignal();
|
||||
|
||||
expect(receivedSignal).toBe(signal);
|
||||
});
|
||||
|
||||
it('should return null if there is no signal within the window', async () => {
|
||||
jest.useFakeTimers();
|
||||
const receivedSignalPromise = slidingWindowSignal.getSignal();
|
||||
jest.advanceTimersByTime(600);
|
||||
const receivedSignal = await receivedSignalPromise;
|
||||
|
||||
expect(receivedSignal).toBeNull();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('should return null if "exit" event is not emitted before timeout', async () => {
|
||||
const signal = 'testSignal';
|
||||
jest.useFakeTimers();
|
||||
const receivedSignalPromise = slidingWindowSignal.getSignal();
|
||||
jest.advanceTimersByTime(600);
|
||||
eventEmitter.emit('testEvent', signal);
|
||||
|
||||
const receivedSignal = await receivedSignalPromise;
|
||||
expect(receivedSignal).toBeNull();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('should return the signal emitted on "exit" event before timeout', async () => {
|
||||
jest.useFakeTimers();
|
||||
const receivedSignalPromise = slidingWindowSignal.getSignal();
|
||||
|
||||
// Emit 'exit' with a signal before timeout
|
||||
const exitSignal = 'exitSignal';
|
||||
eventEmitter.emit('testEvent', exitSignal);
|
||||
|
||||
// Advance timers enough to go outside the timeout window
|
||||
jest.advanceTimersByTime(600);
|
||||
|
||||
const receivedSignal = await receivedSignalPromise;
|
||||
expect(receivedSignal).toBe(exitSignal);
|
||||
|
||||
jest.useRealTimers();
|
||||
});
|
||||
});
|
|
@ -110,7 +110,7 @@ describe('TaskBroker', () => {
|
|||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, messageCallback);
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
taskBroker.deregisterRunner(runnerId, new Error());
|
||||
|
||||
const knownRunners = taskBroker.getKnownRunners();
|
||||
const runnerIds = Object.keys(knownRunners);
|
||||
|
@ -138,7 +138,7 @@ describe('TaskBroker', () => {
|
|||
validFor: 1000,
|
||||
validUntil: createValidUntil(1000),
|
||||
});
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
taskBroker.deregisterRunner(runnerId, new Error());
|
||||
|
||||
const offers = taskBroker.getPendingTaskOffers();
|
||||
expect(offers).toHaveLength(1);
|
||||
|
@ -161,10 +161,14 @@ describe('TaskBroker', () => {
|
|||
[taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' },
|
||||
task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' },
|
||||
});
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
const error = new Error('error');
|
||||
taskBroker.deregisterRunner(runnerId, error);
|
||||
|
||||
expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
|
||||
expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
|
||||
expect(failSpy).toBeCalledWith(taskId, error);
|
||||
expect(rejectSpy).toBeCalledWith(
|
||||
taskId,
|
||||
`The Task Runner (${runnerId}) has disconnected: error`,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class TaskRunnerDisconnectedError extends ApplicationError {
|
||||
constructor(runnerId: string) {
|
||||
super(`Task runner (${runnerId}) disconnected`);
|
||||
}
|
||||
}
|
31
packages/cli/src/runners/errors/task-runner-oom-error.ts
Normal file
31
packages/cli/src/runners/errors/task-runner-oom-error.ts
Normal file
|
@ -0,0 +1,31 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
import type { TaskRunner } from '../task-broker.service';
|
||||
|
||||
export class TaskRunnerOomError extends ApplicationError {
|
||||
public description: string;
|
||||
|
||||
constructor(runnerId: TaskRunner['id'], isCloudDeployment: boolean) {
|
||||
super(`Task runner (${runnerId}) ran out of memory.`, { level: 'error' });
|
||||
|
||||
const fixSuggestions = {
|
||||
reduceItems: 'Reduce the number of items processed at a time by batching the input.',
|
||||
increaseMemory:
|
||||
"Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable.",
|
||||
upgradePlan: 'Upgrade your cloud plan to increase the available memory.',
|
||||
};
|
||||
|
||||
const subtitle =
|
||||
'The runner executing the code ran out of memory. This usually happens when there are too many items to process. You can try the following:';
|
||||
const suggestions = isCloudDeployment
|
||||
? [fixSuggestions.reduceItems, fixSuggestions.upgradePlan]
|
||||
: [fixSuggestions.reduceItems, fixSuggestions.increaseMemory];
|
||||
const suggestionsText = suggestions
|
||||
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
|
||||
.join('<br/>');
|
||||
|
||||
const description = `${subtitle}<br/><br/>${suggestionsText}`;
|
||||
|
||||
this.description = description;
|
||||
}
|
||||
}
|
34
packages/cli/src/runners/node-process-oom-detector.ts
Normal file
34
packages/cli/src/runners/node-process-oom-detector.ts
Normal file
|
@ -0,0 +1,34 @@
|
|||
import * as a from 'node:assert/strict';
|
||||
import type { ChildProcess } from 'node:child_process';
|
||||
|
||||
/**
|
||||
* Class to monitor a nodejs process and detect if it runs out of
|
||||
* memory (OOMs).
|
||||
*/
|
||||
export class NodeProcessOomDetector {
|
||||
public get didProcessOom() {
|
||||
return this._didProcessOom;
|
||||
}
|
||||
|
||||
private _didProcessOom = false;
|
||||
|
||||
constructor(processToMonitor: ChildProcess) {
|
||||
this.monitorProcess(processToMonitor);
|
||||
}
|
||||
|
||||
private monitorProcess(processToMonitor: ChildProcess) {
|
||||
a.ok(processToMonitor.stderr, "Can't monitor a process without stderr");
|
||||
|
||||
processToMonitor.stderr.on('data', this.onStderr);
|
||||
|
||||
processToMonitor.once('exit', () => {
|
||||
processToMonitor.stderr?.off('data', this.onStderr);
|
||||
});
|
||||
}
|
||||
|
||||
private onStderr = (data: Buffer) => {
|
||||
if (data.includes('JavaScript heap out of memory')) {
|
||||
this._didProcessOom = true;
|
||||
}
|
||||
};
|
||||
}
|
|
@ -10,6 +10,7 @@ import type {
|
|||
TaskRunnerServerInitResponse,
|
||||
} from './runner-types';
|
||||
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
|
||||
import { TaskRunnerDisconnectAnalyzer } from './task-runner-disconnect-analyzer';
|
||||
|
||||
function heartbeat(this: WebSocket) {
|
||||
this.isAlive = true;
|
||||
|
@ -22,6 +23,7 @@ export class TaskRunnerService {
|
|||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly taskBroker: TaskBroker,
|
||||
private readonly disconnectAnalyzer: TaskRunnerDisconnectAnalyzer,
|
||||
) {}
|
||||
|
||||
sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
|
||||
|
@ -34,7 +36,7 @@ export class TaskRunnerService {
|
|||
|
||||
let isConnected = false;
|
||||
|
||||
const onMessage = (data: WebSocket.RawData) => {
|
||||
const onMessage = async (data: WebSocket.RawData) => {
|
||||
try {
|
||||
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
|
||||
|
||||
|
@ -45,7 +47,7 @@ export class TaskRunnerService {
|
|||
if (!isConnected && message.type !== 'runner:info') {
|
||||
return;
|
||||
} else if (!isConnected && message.type === 'runner:info') {
|
||||
this.removeConnection(id);
|
||||
await this.removeConnection(id);
|
||||
isConnected = true;
|
||||
|
||||
this.runnerConnections.set(id, connection);
|
||||
|
@ -75,10 +77,10 @@ export class TaskRunnerService {
|
|||
};
|
||||
|
||||
// Makes sure to remove the session if the connection is closed
|
||||
connection.once('close', () => {
|
||||
connection.once('close', async () => {
|
||||
connection.off('pong', heartbeat);
|
||||
connection.off('message', onMessage);
|
||||
this.removeConnection(id);
|
||||
await this.removeConnection(id);
|
||||
});
|
||||
|
||||
connection.on('message', onMessage);
|
||||
|
@ -87,10 +89,11 @@ export class TaskRunnerService {
|
|||
);
|
||||
}
|
||||
|
||||
removeConnection(id: TaskRunner['id']) {
|
||||
async removeConnection(id: TaskRunner['id']) {
|
||||
const connection = this.runnerConnections.get(id);
|
||||
if (connection) {
|
||||
this.taskBroker.deregisterRunner(id);
|
||||
const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id);
|
||||
this.taskBroker.deregisterRunner(id, disconnectReason);
|
||||
connection.close();
|
||||
this.runnerConnections.delete(id);
|
||||
}
|
||||
|
|
59
packages/cli/src/runners/sliding-window-signal.ts
Normal file
59
packages/cli/src/runners/sliding-window-signal.ts
Normal file
|
@ -0,0 +1,59 @@
|
|||
import type { TypedEmitter } from '../typed-emitter';
|
||||
|
||||
export type SlidingWindowSignalOpts = {
|
||||
windowSizeInMs?: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* A class that listens for a specific event on an emitter (signal) and
|
||||
* provides a sliding window of the last event that was emitted.
|
||||
*/
|
||||
export class SlidingWindowSignal<TEvents, TEventName extends keyof TEvents & string> {
|
||||
private lastSignal: TEvents[TEventName] | null = null;
|
||||
|
||||
private lastSignalTime: number = 0;
|
||||
|
||||
private windowSizeInMs: number;
|
||||
|
||||
constructor(
|
||||
private readonly eventEmitter: TypedEmitter<TEvents>,
|
||||
private readonly eventName: TEventName,
|
||||
opts: SlidingWindowSignalOpts = {},
|
||||
) {
|
||||
const { windowSizeInMs = 500 } = opts;
|
||||
|
||||
this.windowSizeInMs = windowSizeInMs;
|
||||
|
||||
eventEmitter.on(eventName, (signal: TEvents[TEventName]) => {
|
||||
this.lastSignal = signal;
|
||||
this.lastSignalTime = Date.now();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* If an event has been emitted within the last `windowSize` milliseconds,
|
||||
* that event is returned. Otherwise it will wait for up to `windowSize`
|
||||
* milliseconds for the event to be emitted. `null` is returned
|
||||
* if no event is emitted within the window.
|
||||
*/
|
||||
public async getSignal(): Promise<TEvents[TEventName] | null> {
|
||||
const timeSinceLastEvent = Date.now() - this.lastSignalTime;
|
||||
if (timeSinceLastEvent <= this.windowSizeInMs) return this.lastSignal;
|
||||
|
||||
return await new Promise<TEvents[TEventName] | null>((resolve) => {
|
||||
let timeoutTimerId: NodeJS.Timeout | null = null;
|
||||
|
||||
const onExit = (signal: TEvents[TEventName]) => {
|
||||
if (timeoutTimerId) clearTimeout(timeoutTimerId);
|
||||
resolve(signal);
|
||||
};
|
||||
|
||||
timeoutTimerId = setTimeout(() => {
|
||||
this.eventEmitter.off(this.eventName, onExit);
|
||||
resolve(null);
|
||||
});
|
||||
|
||||
this.eventEmitter.once(this.eventName, onExit);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -104,7 +104,7 @@ export class TaskBroker {
|
|||
});
|
||||
}
|
||||
|
||||
deregisterRunner(runnerId: string) {
|
||||
deregisterRunner(runnerId: string, error: Error) {
|
||||
this.knownRunners.delete(runnerId);
|
||||
|
||||
// Remove any pending offers
|
||||
|
@ -117,8 +117,11 @@ export class TaskBroker {
|
|||
// Fail any tasks
|
||||
for (const task of this.tasks.values()) {
|
||||
if (task.runnerId === runnerId) {
|
||||
void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`);
|
||||
this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`);
|
||||
void this.failTask(task.id, error);
|
||||
this.handleRunnerReject(
|
||||
task.id,
|
||||
`The Task Runner (${runnerId}) has disconnected: ${error.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -352,7 +355,7 @@ export class TaskBroker {
|
|||
});
|
||||
}
|
||||
|
||||
private async failTask(taskId: Task['id'], reason: string) {
|
||||
private async failTask(taskId: Task['id'], error: Error) {
|
||||
const task = this.tasks.get(taskId);
|
||||
if (!task) {
|
||||
return;
|
||||
|
@ -362,7 +365,7 @@ export class TaskBroker {
|
|||
await this.messageRequester(task.requesterId, {
|
||||
type: 'broker:taskerror',
|
||||
taskId,
|
||||
error: reason,
|
||||
error,
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -375,11 +378,14 @@ export class TaskBroker {
|
|||
}
|
||||
const runner = this.knownRunners.get(task.runnerId);
|
||||
if (!runner) {
|
||||
const reason = `Cannot find runner, failed to find runner (${task.runnerId})`;
|
||||
await this.failTask(taskId, reason);
|
||||
throw new ApplicationError(reason, {
|
||||
level: 'error',
|
||||
});
|
||||
const error = new ApplicationError(
|
||||
`Cannot find runner, failed to find runner (${task.runnerId})`,
|
||||
{
|
||||
level: 'error',
|
||||
},
|
||||
);
|
||||
await this.failTask(taskId, error);
|
||||
throw error;
|
||||
}
|
||||
return runner.runner;
|
||||
}
|
||||
|
|
60
packages/cli/src/runners/task-runner-disconnect-analyzer.ts
Normal file
60
packages/cli/src/runners/task-runner-disconnect-analyzer.ts
Normal file
|
@ -0,0 +1,60 @@
|
|||
import { TaskRunnersConfig } from '@n8n/config';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import config from '@/config';
|
||||
|
||||
import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
|
||||
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
|
||||
import { SlidingWindowSignal } from './sliding-window-signal';
|
||||
import type { TaskRunner } from './task-broker.service';
|
||||
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
|
||||
import { TaskRunnerProcess } from './task-runner-process';
|
||||
|
||||
/**
|
||||
* Analyzes the disconnect reason of a task runner process to provide a more
|
||||
* meaningful error message to the user.
|
||||
*/
|
||||
@Service()
|
||||
export class TaskRunnerDisconnectAnalyzer {
|
||||
private readonly exitReasonSignal: SlidingWindowSignal<TaskRunnerProcessEventMap, 'exit'>;
|
||||
|
||||
constructor(
|
||||
private readonly runnerConfig: TaskRunnersConfig,
|
||||
private readonly taskRunnerProcess: TaskRunnerProcess,
|
||||
) {
|
||||
// When the task runner process is running as a child process, there's
|
||||
// no determinate time when it exits compared to when the runner disconnects
|
||||
// (i.e. it's a race condition). Hence we use a sliding window to determine
|
||||
// the exit reason. As long as we receive the exit signal from the task
|
||||
// runner process within the window, we can determine the exit reason.
|
||||
this.exitReasonSignal = new SlidingWindowSignal(this.taskRunnerProcess, 'exit', {
|
||||
windowSizeInMs: 500,
|
||||
});
|
||||
}
|
||||
|
||||
private get isCloudDeployment() {
|
||||
return config.get('deployment.type') === 'cloud';
|
||||
}
|
||||
|
||||
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
|
||||
const exitCode = await this.awaitExitSignal();
|
||||
if (exitCode === 'oom') {
|
||||
return new TaskRunnerOomError(runnerId, this.isCloudDeployment);
|
||||
}
|
||||
|
||||
return new TaskRunnerDisconnectedError(runnerId);
|
||||
}
|
||||
|
||||
private async awaitExitSignal(): Promise<ExitReason> {
|
||||
if (this.runnerConfig.mode === 'external') {
|
||||
// If the task runner is running in external mode, we don't have
|
||||
// control over the process and hence cannot determine the exit
|
||||
// reason. We just return 'unknown' in this case.
|
||||
return 'unknown';
|
||||
}
|
||||
|
||||
const lastExitReason = await this.exitReasonSignal.getSignal();
|
||||
|
||||
return lastExitReason?.reason ?? 'unknown';
|
||||
}
|
||||
}
|
|
@ -9,14 +9,24 @@ import { Logger } from '@/logging/logger.service';
|
|||
|
||||
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
|
||||
import { forwardToLogger } from './forward-to-logger';
|
||||
import { NodeProcessOomDetector } from './node-process-oom-detector';
|
||||
import { TypedEmitter } from '../typed-emitter';
|
||||
|
||||
type ChildProcess = ReturnType<typeof spawn>;
|
||||
|
||||
export type ExitReason = 'unknown' | 'oom';
|
||||
|
||||
export type TaskRunnerProcessEventMap = {
|
||||
exit: {
|
||||
reason: ExitReason;
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Manages the JS task runner process as a child process
|
||||
*/
|
||||
@Service()
|
||||
export class TaskRunnerProcess {
|
||||
export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
|
||||
public get isRunning() {
|
||||
return this.process !== null;
|
||||
}
|
||||
|
@ -39,6 +49,8 @@ export class TaskRunnerProcess {
|
|||
|
||||
private _runPromise: Promise<void> | null = null;
|
||||
|
||||
private oomDetector: NodeProcessOomDetector | null = null;
|
||||
|
||||
private isShuttingDown = false;
|
||||
|
||||
private logger: Logger;
|
||||
|
@ -54,6 +66,8 @@ export class TaskRunnerProcess {
|
|||
private readonly runnerConfig: TaskRunnersConfig,
|
||||
private readonly authService: TaskRunnerAuthService,
|
||||
) {
|
||||
super();
|
||||
|
||||
a.ok(
|
||||
this.runnerConfig.mode === 'internal_childprocess' ||
|
||||
this.runnerConfig.mode === 'internal_launcher',
|
||||
|
@ -141,6 +155,8 @@ export class TaskRunnerProcess {
|
|||
|
||||
private monitorProcess(taskRunnerProcess: ChildProcess) {
|
||||
this._runPromise = new Promise((resolve) => {
|
||||
this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess);
|
||||
|
||||
taskRunnerProcess.on('exit', (code) => {
|
||||
this.onProcessExit(code, resolve);
|
||||
});
|
||||
|
@ -149,6 +165,7 @@ export class TaskRunnerProcess {
|
|||
|
||||
private onProcessExit(_code: number | null, resolveFn: () => void) {
|
||||
this.process = null;
|
||||
this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' });
|
||||
resolveFn();
|
||||
|
||||
// If we are not shutting down, restart the process
|
||||
|
|
|
@ -8,7 +8,6 @@ import Modals from '@/components/Modals.vue';
|
|||
import Telemetry from '@/components/Telemetry.vue';
|
||||
import AskAssistantFloatingButton from '@/components/AskAssistant/AskAssistantFloatingButton.vue';
|
||||
import { loadLanguage } from '@/plugins/i18n';
|
||||
import { useExternalHooks } from '@/composables/useExternalHooks';
|
||||
import { APP_MODALS_ELEMENT_ID, HIRING_BANNER, VIEWS } from '@/constants';
|
||||
import { useRootStore } from '@/stores/root.store';
|
||||
import { useAssistantStore } from '@/stores/assistant.store';
|
||||
|
@ -46,7 +45,6 @@ watch(defaultLocale, (newLocale) => {
|
|||
onMounted(async () => {
|
||||
setAppZIndexes();
|
||||
logHiringBanner();
|
||||
void useExternalHooks().run('app.mount');
|
||||
loading.value = false;
|
||||
window.addEventListener('resize', updateGridWidth);
|
||||
await updateGridWidth();
|
||||
|
|
|
@ -4,6 +4,7 @@ import { useRootStore } from '@/stores/root.store';
|
|||
import { useSettingsStore } from '@/stores/settings.store';
|
||||
import { useSourceControlStore } from '@/stores/sourceControl.store';
|
||||
import { useUsersStore } from '@/stores/users.store';
|
||||
import { useExternalHooks } from '@/composables/useExternalHooks';
|
||||
import { initializeCloudHooks } from '@/hooks/register';
|
||||
import { useVersionsStore } from '@/stores/versions.store';
|
||||
import { useProjectsStore } from '@/stores/projects.store';
|
||||
|
@ -26,6 +27,9 @@ export async function initializeCore() {
|
|||
const versionsStore = useVersionsStore();
|
||||
|
||||
await settingsStore.initialize();
|
||||
|
||||
void useExternalHooks().run('app.mount');
|
||||
|
||||
if (!settingsStore.isPreviewMode) {
|
||||
await usersStore.initialize();
|
||||
|
||||
|
|
|
@ -60,9 +60,11 @@ export class JsTaskRunnerSandbox {
|
|||
}
|
||||
|
||||
private throwExecutionError(error: unknown): never {
|
||||
// The error coming from task runner is not an instance of error,
|
||||
// so we need to wrap it in an error instance.
|
||||
if (isWrappableError(error)) {
|
||||
if (error instanceof Error) {
|
||||
throw error;
|
||||
} else if (isWrappableError(error)) {
|
||||
// The error coming from task runner is not an instance of error,
|
||||
// so we need to wrap it in an error instance.
|
||||
throw new WrappedExecutionError(error);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue