fix(core): Ensure tasks timeout even if they don't receive settings (#12431)

This commit is contained in:
Tomi Turtiainen 2025-01-03 12:27:47 +02:00 committed by GitHub
parent 552cff1860
commit b1940268e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 472 additions and 163 deletions

View file

@ -1,4 +1,3 @@
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import type { IBinaryData } from 'n8n-workflow';
import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow';
@ -18,11 +17,12 @@ import {
type DataRequestResponse,
type InputDataChunkDefinition,
} from '@/runner-types';
import type { Task } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import {
newDataRequestResponse,
newTaskWithSettings,
newTaskParamsWithSettings,
newTaskState,
withPairedItem,
wrapIntoJson,
} from './test-data';
@ -64,12 +64,12 @@ describe('JsTaskRunner', () => {
taskData,
runner = defaultTaskRunner,
}: {
task: Task<JSExecSettings>;
task: TaskParams<JSExecSettings>;
taskData: DataRequestResponse;
runner?: JsTaskRunner;
}) => {
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
return await runner.executeTask(task, mock<AbortSignal>());
return await runner.executeTask(task, new AbortController().signal);
};
afterEach(() => {
@ -88,7 +88,7 @@ describe('JsTaskRunner', () => {
runner?: JsTaskRunner;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code,
nodeMode: 'runOnceForAllItems',
...settings,
@ -112,7 +112,7 @@ describe('JsTaskRunner', () => {
chunk?: InputDataChunkDefinition;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code,
nodeMode: 'runOnceForEachItem',
chunk,
@ -128,7 +128,7 @@ describe('JsTaskRunner', () => {
'should make an rpc call for console log in %s mode',
async (nodeMode) => {
jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: "console.log('Hello', 'world!'); return {}",
nodeMode,
});
@ -146,7 +146,7 @@ describe('JsTaskRunner', () => {
);
it('should not throw when using unsupported console methods', async () => {
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: `
console.warn('test');
console.error('test');
@ -176,7 +176,7 @@ describe('JsTaskRunner', () => {
});
it('should not throw when trying to log the context object', async () => {
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: `
console.log(this);
return {json: {}}
@ -195,7 +195,7 @@ describe('JsTaskRunner', () => {
it('should log the context object as [[ExecutionContext]]', async () => {
const rpcCallSpy = jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: `
console.log(this);
return {json: {}}
@ -336,7 +336,7 @@ describe('JsTaskRunner', () => {
describe('$env', () => {
it('should have the env available in context when access has not been blocked', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
@ -355,7 +355,7 @@ describe('JsTaskRunner', () => {
it('should be possible to access env if it has been blocked', async () => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
@ -372,7 +372,7 @@ describe('JsTaskRunner', () => {
it('should not be possible to iterate $env', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems',
}),
@ -391,7 +391,7 @@ describe('JsTaskRunner', () => {
it("should not expose task runner's env variables even if no env state is received", async () => {
process.env.N8N_RUNNERS_TASK_BROKER_URI = 'http://127.0.0.1:5679';
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.N8N_RUNNERS_TASK_BROKER_URI }',
nodeMode: 'runOnceForAllItems',
}),
@ -412,7 +412,7 @@ describe('JsTaskRunner', () => {
};
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $now.toSeconds() }',
nodeMode: 'runOnceForAllItems',
}),
@ -429,7 +429,7 @@ describe('JsTaskRunner', () => {
});
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $now.toSeconds() }',
nodeMode: 'runOnceForAllItems',
}),
@ -444,7 +444,7 @@ describe('JsTaskRunner', () => {
describe("$getWorkflowStaticData('global')", () => {
it('should have the global workflow static data available in runOnceForAllItems', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("global") }',
nodeMode: 'runOnceForAllItems',
}),
@ -460,7 +460,7 @@ describe('JsTaskRunner', () => {
it('should have the global workflow static data available in runOnceForEachItem', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("global") }',
nodeMode: 'runOnceForEachItem',
}),
@ -480,7 +480,7 @@ describe('JsTaskRunner', () => {
"does not return static data if it hasn't been modified in %s",
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("global");
return { val: staticData };
@ -502,7 +502,7 @@ describe('JsTaskRunner', () => {
'returns the updated static data in %s',
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("global");
staticData.newKey = 'newValue';
@ -541,7 +541,7 @@ describe('JsTaskRunner', () => {
it('should have the node workflow static data available in runOnceForAllItems', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("node") }',
nodeMode: 'runOnceForAllItems',
}),
@ -553,7 +553,7 @@ describe('JsTaskRunner', () => {
it('should have the node workflow static data available in runOnceForEachItem', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("node") }',
nodeMode: 'runOnceForEachItem',
}),
@ -569,7 +569,7 @@ describe('JsTaskRunner', () => {
"does not return static data if it hasn't been modified in %s",
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("node");
return { val: staticData };
@ -587,7 +587,7 @@ describe('JsTaskRunner', () => {
'returns the updated static data in %s',
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("node");
staticData.newKey = 'newValue';
@ -662,7 +662,7 @@ describe('JsTaskRunner', () => {
// Act
await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `await ${group.invocation}; return []`,
nodeMode: 'runOnceForAllItems',
}),
@ -684,7 +684,7 @@ describe('JsTaskRunner', () => {
// Act
await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `await ${group.invocation}; return {}`,
nodeMode: 'runOnceForEachItem',
}),
@ -725,7 +725,7 @@ describe('JsTaskRunner', () => {
it('should allow access to Node.js Buffers', async () => {
const outcomeAll = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForAllItems',
}),
@ -737,7 +737,7 @@ describe('JsTaskRunner', () => {
expect(outcomeAll.result).toEqual([wrapIntoJson({ val: 'test-buffer' })]);
const outcomePer = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForEachItem',
}),
@ -1205,7 +1205,7 @@ describe('JsTaskRunner', () => {
async (nodeMode) => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'unknown',
nodeMode,
}),
@ -1218,12 +1218,13 @@ describe('JsTaskRunner', () => {
it('sends serializes an error correctly', async () => {
const runner = createRunnerWithOpts({});
const taskId = '1';
const task = newTaskWithSettings({
const task = newTaskState(taskId);
const taskSettings: JSExecSettings = {
code: 'unknown; return []',
nodeMode: 'runOnceForAllItems',
continueOnFail: false,
workflowMode: 'manual',
});
};
runner.runningTasks.set(taskId, task);
const sendSpy = jest.spyOn(runner.ws, 'send').mockImplementation(() => {});
@ -1232,7 +1233,7 @@ describe('JsTaskRunner', () => {
.spyOn(runner, 'requestData')
.mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings);
await runner.receivedSettings(taskId, taskSettings);
expect(sendSpy).toHaveBeenCalled();
const calledWith = sendSpy.mock.calls[0][0] as string;
@ -1304,11 +1305,7 @@ describe('JsTaskRunner', () => {
const emitSpy = jest.spyOn(runner, 'emit');
jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] });
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
runner.runningTasks.set(taskId, newTaskState(taskId));
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
@ -1335,15 +1332,13 @@ describe('JsTaskRunner', () => {
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
const task = newTaskState(taskId);
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
runner.runningTasks.set(taskId, task);
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
task.cleanup();
});
});
});

View file

@ -1,6 +1,9 @@
import { WebSocket } from 'ws';
import { newTaskState } from '@/js-task-runner/__tests__/test-data';
import { TimeoutError } from '@/js-task-runner/errors/timeout-error';
import { TaskRunner, type TaskRunnerOpts } from '@/task-runner';
import type { TaskStatus } from '@/task-state';
class TestRunner extends TaskRunner {}
@ -154,11 +157,8 @@ describe('TestRunner', () => {
runner.onMessage({
type: 'broker:runnerregistered',
});
runner.runningTasks.set('test-task', {
taskId: 'test-task',
active: true,
cancelled: false,
});
const taskState = newTaskState('test-task');
runner.runningTasks.set('test-task', taskState);
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
@ -174,6 +174,7 @@ describe('TestRunner', () => {
},
],
]);
taskState.cleanup();
});
it('should delete stale offers and send new ones', () => {
@ -198,15 +199,45 @@ describe('TestRunner', () => {
});
describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = newTestRunner();
test.each<[TaskStatus, string]>([
['aborting:cancelled', 'cancelled'],
['aborting:timeout', 'timeout'],
])('should not do anything if task status is %s', async (status, reason) => {
runner = newTestRunner();
const taskId = 'test-task';
runner.runningTasks.set(taskId, {
taskId,
active: false,
cancelled: false,
});
const task = newTaskState(taskId);
task.status = status;
runner.runningTasks.set(taskId, task);
await runner.taskCancelled(taskId, reason);
expect(runner.runningTasks.size).toBe(1);
expect(task.status).toBe(status);
});
it('should delete task if task is waiting for settings when task is cancelled', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
const taskCleanupSpy = jest.spyOn(task, 'cleanup');
runner.runningTasks.set(taskId, task);
await runner.taskCancelled(taskId, 'test-reason');
expect(runner.runningTasks.size).toBe(0);
expect(taskCleanupSpy).toHaveBeenCalled();
});
it('should reject pending requests when task is cancelled', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'running';
runner.runningTasks.set(taskId, task);
const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();
@ -225,7 +256,71 @@ describe('TestRunner', () => {
reject: nodeTypesRequestReject,
});
runner.taskCancelled(taskId, 'test-reason');
await runner.taskCancelled(taskId, 'test-reason');
expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);
expect(nodeTypesRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);
expect(runner.dataRequests.size).toBe(0);
expect(runner.nodeTypesRequests.size).toBe(0);
});
});
describe('taskTimedOut', () => {
it('should error task if task is waiting for settings', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'waitingForSettings';
runner.runningTasks.set(taskId, task);
const sendSpy = jest.spyOn(runner, 'send');
await runner.taskTimedOut(taskId);
expect(runner.runningTasks.size).toBe(0);
expect(sendSpy).toHaveBeenCalledWith({
type: 'runner:taskerror',
taskId,
error: expect.any(TimeoutError),
});
});
it('should reject pending requests when task is running', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'running';
runner.runningTasks.set(taskId, task);
const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();
runner.dataRequests.set('data-req', {
taskId,
requestId: 'data-req',
resolve: jest.fn(),
reject: dataRequestReject,
});
runner.nodeTypesRequests.set('node-req', {
taskId,
requestId: 'node-req',
resolve: jest.fn(),
reject: nodeTypesRequestReject,
});
await runner.taskCancelled(taskId, 'test-reason');
expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({

View file

@ -4,22 +4,21 @@ import { nanoid } from 'nanoid';
import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import { TaskState } from '@/task-state';
/**
* Creates a new task with the given settings
*/
export const newTaskWithSettings = (
export const newTaskParamsWithSettings = (
settings: Partial<JSExecSettings> & Pick<JSExecSettings, 'code' | 'nodeMode'>,
): Task<JSExecSettings> => ({
): TaskParams<JSExecSettings> => ({
taskId: '1',
settings: {
workflowMode: 'manual',
continueOnFail: false,
...settings,
},
active: true,
cancelled: false,
});
/**
@ -167,3 +166,13 @@ export const withPairedItem = (index: number, data: INodeExecutionData): INodeEx
item: index,
},
});
/**
* Creates a new task state with the given taskId
*/
export const newTaskState = (taskId: string) =>
new TaskState({
taskId,
timeoutInS: 60,
onTimeout: () => {},
});

View file

@ -23,15 +23,15 @@ import { runInNewContext, type Context } from 'node:vm';
import type { MainConfig } from '@/config/main-config';
import { UnsupportedFunctionError } from '@/js-task-runner/errors/unsupported-function.error';
import {
EXPOSED_RPC_METHODS,
UNSUPPORTED_HELPER_FUNCTIONS,
type DataRequestResponse,
type InputDataChunkDefinition,
type PartialAdditionalData,
type TaskResultData,
import { EXPOSED_RPC_METHODS, UNSUPPORTED_HELPER_FUNCTIONS } from '@/runner-types';
import type {
DataRequestResponse,
InputDataChunkDefinition,
PartialAdditionalData,
TaskResultData,
} from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import { noOp, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state';
@ -81,8 +81,6 @@ type CustomConsole = {
log: (...args: unknown[]) => void;
};
const noOp = () => {};
export class JsTaskRunner extends TaskRunner {
private readonly requireResolver: RequireResolver;
@ -107,8 +105,11 @@ export class JsTaskRunner extends TaskRunner {
});
}
async executeTask(task: Task<JSExecSettings>, signal: AbortSignal): Promise<TaskResultData> {
const settings = task.settings;
async executeTask(
taskParams: TaskParams<JSExecSettings>,
abortSignal: AbortSignal,
): Promise<TaskResultData> {
const { taskId, settings } = taskParams;
a.ok(settings, 'JS Code not sent to runner');
this.validateTaskSettings(settings);
@ -119,13 +120,13 @@ export class JsTaskRunner extends TaskRunner {
: BuiltInsParserState.newNeedsAllDataState();
const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId,
taskId,
neededBuiltIns.toDataRequestParams(settings.chunk),
);
const data = this.reconstructTaskData(dataResponse, settings.chunk);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, taskId);
const workflowParams = data.workflow;
const workflow = new Workflow({
@ -137,8 +138,8 @@ export class JsTaskRunner extends TaskRunner {
const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, signal);
? await this.runForAllItems(taskId, settings, data, workflow, abortSignal)
: await this.runForEachItem(taskId, settings, data, workflow, abortSignal);
return {
result,

View file

@ -5,19 +5,14 @@ import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
import { TimeoutError } from '@/js-task-runner/errors/timeout-error';
import type { BrokerMessage, RunnerMessage } from '@/message-types';
import { TaskRunnerNodeTypes } from '@/node-types';
import type { TaskResultData } from '@/runner-types';
import { TaskState } from '@/task-state';
import { TaskCancelledError } from './js-task-runner/errors/task-cancelled-error';
export interface Task<T = unknown> {
taskId: string;
settings?: T;
active: boolean;
cancelled: boolean;
}
export interface TaskOffer {
offerId: string;
validUntil: bigint;
@ -49,6 +44,14 @@ const OFFER_VALID_EXTRA_MS = 100;
/** Converts milliseconds to nanoseconds */
const msToNs = (ms: number) => BigInt(ms * 1_000_000);
export const noOp = () => {};
/** Params the task receives when it is executed */
export interface TaskParams<T = unknown> {
taskId: string;
settings: T;
}
export interface TaskRunnerOpts extends BaseRunnerConfig {
taskType: string;
name?: string;
@ -61,7 +64,7 @@ export abstract class TaskRunner extends EventEmitter {
canSendOffers = false;
runningTasks: Map<Task['taskId'], Task> = new Map();
runningTasks: Map<TaskState['taskId'], TaskState> = new Map();
offerInterval: NodeJS.Timeout | undefined;
@ -89,10 +92,9 @@ export abstract class TaskRunner extends EventEmitter {
/** How long (in seconds) a runner may be idle for before exit. */
private readonly idleTimeout: number;
protected taskCancellations = new Map<Task['taskId'], AbortController>();
constructor(opts: TaskRunnerOpts) {
super();
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
@ -219,7 +221,7 @@ export abstract class TaskRunner extends EventEmitter {
this.offerAccepted(message.offerId, message.taskId);
break;
case 'broker:taskcancel':
this.taskCancelled(message.taskId, message.reason);
void this.taskCancelled(message.taskId, message.reason);
break;
case 'broker:tasksettings':
void this.receivedSettings(message.taskId, message.settings);
@ -284,11 +286,14 @@ export abstract class TaskRunner extends EventEmitter {
}
this.resetIdleTimer();
this.runningTasks.set(taskId, {
const taskState = new TaskState({
taskId,
active: false,
cancelled: false,
timeoutInS: this.taskTimeout,
onTimeout: () => {
void this.taskTimedOut(taskId);
},
});
this.runningTasks.set(taskId, taskState);
this.send({
type: 'runner:taskaccepted',
@ -296,99 +301,103 @@ export abstract class TaskRunner extends EventEmitter {
});
}
taskCancelled(taskId: string, reason: string) {
const task = this.runningTasks.get(taskId);
if (!task) {
async taskCancelled(taskId: string, reason: string) {
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
task.cancelled = true;
for (const [requestId, request] of this.dataRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.dataRequests.delete(requestId);
}
}
await taskState.caseOf({
// If the cancelled task hasn't received settings yet, we can finish it
waitingForSettings: () => this.finishTask(taskState),
for (const [requestId, request] of this.nodeTypesRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.nodeTypesRequests.delete(requestId);
}
}
// If the task has already timed out or is already cancelled, we can
// ignore the cancellation
'aborting:timeout': noOp,
'aborting:cancelled': noOp,
const controller = this.taskCancellations.get(taskId);
if (controller) {
controller.abort();
this.taskCancellations.delete(taskId);
}
if (!task.active) this.runningTasks.delete(taskId);
this.sendOffers();
running: () => {
taskState.status = 'aborting:cancelled';
taskState.abortController.abort('cancelled');
this.cancelTaskRequests(taskId, reason);
},
});
}
taskErrored(taskId: string, error: unknown) {
this.send({
type: 'runner:taskerror',
taskId,
error,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
async taskTimedOut(taskId: string) {
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
taskDone(taskId: string, data: RunnerMessage.ToBroker.TaskDone['data']) {
this.send({
type: 'runner:taskdone',
taskId,
data,
await taskState.caseOf({
// If we are still waiting for settings for the task, we can error the
// task immediately
waitingForSettings: () => {
try {
this.send({
type: 'runner:taskerror',
taskId,
error: new TimeoutError(this.taskTimeout),
});
} finally {
this.finishTask(taskState);
}
},
// This should never happen, the timeout timer should only fire once
'aborting:timeout': TaskState.throwUnexpectedTaskStatus,
// If we are currently executing the task, abort the execution and
// mark the task as timed out
running: () => {
taskState.status = 'aborting:timeout';
taskState.abortController.abort('timeout');
this.cancelTaskRequests(taskId, 'timeout');
},
// If the task is already cancelling, we can ignore the timeout
'aborting:cancelled': noOp,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
async receivedSettings(taskId: string, settings: unknown) {
const task = this.runningTasks.get(taskId);
if (!task) {
return;
}
if (task.cancelled) {
this.runningTasks.delete(taskId);
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
const controller = new AbortController();
this.taskCancellations.set(taskId, controller);
await taskState.caseOf({
// These states should never happen, as they are handled already in
// the other lifecycle methods and the task should be removed from the
// running tasks
'aborting:cancelled': TaskState.throwUnexpectedTaskStatus,
'aborting:timeout': TaskState.throwUnexpectedTaskStatus,
running: TaskState.throwUnexpectedTaskStatus,
const taskTimeout = setTimeout(() => {
if (!task.cancelled) {
controller.abort();
this.taskCancellations.delete(taskId);
}
}, this.taskTimeout * 1_000);
waitingForSettings: async () => {
taskState.status = 'running';
task.settings = settings;
task.active = true;
try {
const data = await this.executeTask(task, controller.signal);
this.taskDone(taskId, data);
} catch (error) {
if (!task.cancelled) this.taskErrored(taskId, error);
} finally {
clearTimeout(taskTimeout);
this.taskCancellations.delete(taskId);
this.resetIdleTimer();
}
await this.executeTask(
{
taskId,
settings,
},
taskState.abortController.signal,
)
.then(async (data) => await this.taskExecutionSucceeded(taskState, data))
.catch(async (error) => await this.taskExecutionFailed(taskState, error));
},
});
}
// eslint-disable-next-line @typescript-eslint/naming-convention
async executeTask(_task: Task, _signal: AbortSignal): Promise<TaskResultData> {
async executeTask(_taskParams: TaskParams, _signal: AbortSignal): Promise<TaskResultData> {
throw new ApplicationError('Unimplemented');
}
async requestNodeTypes<T = unknown>(
taskId: Task['taskId'],
taskId: TaskState['taskId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const requestId = nanoid();
@ -417,12 +426,12 @@ export abstract class TaskRunner extends EventEmitter {
}
async requestData<T = unknown>(
taskId: Task['taskId'],
taskId: TaskState['taskId'],
requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'],
): Promise<T> {
const requestId = nanoid();
const p = new Promise<T>((resolve, reject) => {
const dataRequestPromise = new Promise<T>((resolve, reject) => {
this.dataRequests.set(requestId, {
requestId,
taskId,
@ -439,7 +448,7 @@ export abstract class TaskRunner extends EventEmitter {
});
try {
return await p;
return await dataRequestPromise;
} finally {
this.dataRequests.delete(requestId);
}
@ -527,4 +536,86 @@ export abstract class TaskRunner extends EventEmitter {
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
private async taskExecutionSucceeded(taskState: TaskState, data: TaskResultData) {
try {
const sendData = () => {
this.send({
type: 'runner:taskdone',
taskId: taskState.taskId,
data,
});
};
await taskState.caseOf({
waitingForSettings: TaskState.throwUnexpectedTaskStatus,
'aborting:cancelled': noOp,
// If the task timed out but we ended up reaching this point, we
// might as well send the data
'aborting:timeout': sendData,
running: sendData,
});
} finally {
this.finishTask(taskState);
}
}
private async taskExecutionFailed(taskState: TaskState, error: unknown) {
try {
const sendError = () => {
this.send({
type: 'runner:taskerror',
taskId: taskState.taskId,
error,
});
};
await taskState.caseOf({
waitingForSettings: TaskState.throwUnexpectedTaskStatus,
'aborting:cancelled': noOp,
'aborting:timeout': () => {
console.warn(`Task ${taskState.taskId} timed out`);
sendError();
},
running: sendError,
});
} finally {
this.finishTask(taskState);
}
}
/**
* Cancels all node type and data requests made by the given task
*/
private cancelTaskRequests(taskId: string, reason: string) {
for (const [requestId, request] of this.dataRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.dataRequests.delete(requestId);
}
}
for (const [requestId, request] of this.nodeTypesRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.nodeTypesRequests.delete(requestId);
}
}
}
/**
* Finishes task by removing it from the running tasks and sending new offers
*/
private finishTask(taskState: TaskState) {
taskState.cleanup();
this.runningTasks.delete(taskState.taskId);
this.sendOffers();
this.resetIdleTimer();
}
}

View file

@ -0,0 +1,118 @@
import * as a from 'node:assert';
export type TaskStatus =
| 'waitingForSettings'
| 'running'
| 'aborting:cancelled'
| 'aborting:timeout';
export type TaskStateOpts = {
taskId: string;
timeoutInS: number;
onTimeout: () => void;
};
/**
* The state of a task. The task can be in one of the following states:
* - waitingForSettings: The task is waiting for settings from the broker
* - running: The task is currently running
* - aborting:cancelled: The task was canceled by the broker and is being aborted
* - aborting:timeout: The task took too long to complete and is being aborted
*
* The task is discarded once it reaches an end state.
*
* The class only holds the state, and does not have any logic.
*
* The task has the following lifecycle:
*
*
*
*
* broker:taskofferaccept : create task state
*
*
* broker:taskcancel / timeout
* waitingForSettings
*
*
* broker:tasksettings
*
*
*
* running aborting:timeout
* timeout
* - execute task - fire abort signal
*
*
* broker:taskcancel
* Task execution Task execution
* resolves / rejects resolves / rejects
*
*
* aborting:cancelled
*
* - fire abort signal
*
* Task execution
* resolves / rejects
*
*
*
*
*
*/
export class TaskState {
status: TaskStatus = 'waitingForSettings';
readonly taskId: string;
/** Controller for aborting the execution of the task */
readonly abortController = new AbortController();
/** Timeout timer for the task */
private timeoutTimer: NodeJS.Timeout | undefined;
constructor(opts: TaskStateOpts) {
this.taskId = opts.taskId;
this.timeoutTimer = setTimeout(opts.onTimeout, opts.timeoutInS * 1000);
}
/** Cleans up any resources before the task can be removed */
cleanup() {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = undefined;
}
/** Custom JSON serialization for the task state for logging purposes */
toJSON() {
return `[Task ${this.taskId} (${this.status})]`;
}
/**
* Executes the function matching the current task status
*
* @example
* ```ts
* taskState.caseOf({
* waitingForSettings: () => {...},
* running: () => {...},
* aborting:cancelled: () => {...},
* aborting:timeout: () => {...},
* });
* ```
*/
async caseOf(
conditions: Record<TaskStatus, (taskState: TaskState) => void | Promise<void> | never>,
) {
if (!conditions[this.status]) {
TaskState.throwUnexpectedTaskStatus(this);
}
return await conditions[this.status](this);
}
/** Throws an error that the task status is unexpected */
static throwUnexpectedTaskStatus = (taskState: TaskState) => {
a.fail(`Unexpected task status: ${JSON.stringify(taskState)}`);
};
}