From b1940268e6110ed3d8949318a5252ac6563d624f Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Fri, 3 Jan 2025 12:27:47 +0200 Subject: [PATCH] fix(core): Ensure tasks timeout even if they don't receive settings (#12431) --- .../__tests__/js-task-runner.test.ts | 81 +++--- .../__tests__/task-runner.test.ts | 121 +++++++- .../src/js-task-runner/__tests__/test-data.ts | 19 +- .../src/js-task-runner/js-task-runner.ts | 33 +-- packages/@n8n/task-runner/src/task-runner.ts | 263 ++++++++++++------ packages/@n8n/task-runner/src/task-state.ts | 118 ++++++++ 6 files changed, 472 insertions(+), 163 deletions(-) create mode 100644 packages/@n8n/task-runner/src/task-state.ts diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index 00715621b2..5ebd965e87 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -1,4 +1,3 @@ -import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; import type { IBinaryData } from 'n8n-workflow'; import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow'; @@ -18,11 +17,12 @@ import { type DataRequestResponse, type InputDataChunkDefinition, } from '@/runner-types'; -import type { Task } from '@/task-runner'; +import type { TaskParams } from '@/task-runner'; import { newDataRequestResponse, - newTaskWithSettings, + newTaskParamsWithSettings, + newTaskState, withPairedItem, wrapIntoJson, } from './test-data'; @@ -64,12 +64,12 @@ describe('JsTaskRunner', () => { taskData, runner = defaultTaskRunner, }: { - task: Task; + task: TaskParams; taskData: DataRequestResponse; runner?: JsTaskRunner; }) => { jest.spyOn(runner, 'requestData').mockResolvedValue(taskData); - return await runner.executeTask(task, mock()); + return await runner.executeTask(task, new AbortController().signal); }; afterEach(() => { @@ -88,7 +88,7 @@ describe('JsTaskRunner', () => { runner?: JsTaskRunner; }) => { return await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code, nodeMode: 'runOnceForAllItems', ...settings, @@ -112,7 +112,7 @@ describe('JsTaskRunner', () => { chunk?: InputDataChunkDefinition; }) => { return await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code, nodeMode: 'runOnceForEachItem', chunk, @@ -128,7 +128,7 @@ describe('JsTaskRunner', () => { 'should make an rpc call for console log in %s mode', async (nodeMode) => { jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined); - const task = newTaskWithSettings({ + const task = newTaskParamsWithSettings({ code: "console.log('Hello', 'world!'); return {}", nodeMode, }); @@ -146,7 +146,7 @@ describe('JsTaskRunner', () => { ); it('should not throw when using unsupported console methods', async () => { - const task = newTaskWithSettings({ + const task = newTaskParamsWithSettings({ code: ` console.warn('test'); console.error('test'); @@ -176,7 +176,7 @@ describe('JsTaskRunner', () => { }); it('should not throw when trying to log the context object', async () => { - const task = newTaskWithSettings({ + const task = newTaskParamsWithSettings({ code: ` console.log(this); return {json: {}} @@ -195,7 +195,7 @@ describe('JsTaskRunner', () => { it('should log the context object as [[ExecutionContext]]', async () => { const rpcCallSpy = jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined); - const task = newTaskWithSettings({ + const task = newTaskParamsWithSettings({ code: ` console.log(this); return {json: {}} @@ -336,7 +336,7 @@ describe('JsTaskRunner', () => { describe('$env', () => { it('should have the env available in context when access has not been blocked', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), @@ -355,7 +355,7 @@ describe('JsTaskRunner', () => { it('should be possible to access env if it has been blocked', async () => { await expect( execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), @@ -372,7 +372,7 @@ describe('JsTaskRunner', () => { it('should not be possible to iterate $env', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return Object.values($env).concat(Object.keys($env))', nodeMode: 'runOnceForAllItems', }), @@ -391,7 +391,7 @@ describe('JsTaskRunner', () => { it("should not expose task runner's env variables even if no env state is received", async () => { process.env.N8N_RUNNERS_TASK_BROKER_URI = 'http://127.0.0.1:5679'; const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $env.N8N_RUNNERS_TASK_BROKER_URI }', nodeMode: 'runOnceForAllItems', }), @@ -412,7 +412,7 @@ describe('JsTaskRunner', () => { }; const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $now.toSeconds() }', nodeMode: 'runOnceForAllItems', }), @@ -429,7 +429,7 @@ describe('JsTaskRunner', () => { }); const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $now.toSeconds() }', nodeMode: 'runOnceForAllItems', }), @@ -444,7 +444,7 @@ describe('JsTaskRunner', () => { describe("$getWorkflowStaticData('global')", () => { it('should have the global workflow static data available in runOnceForAllItems', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $getWorkflowStaticData("global") }', nodeMode: 'runOnceForAllItems', }), @@ -460,7 +460,7 @@ describe('JsTaskRunner', () => { it('should have the global workflow static data available in runOnceForEachItem', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $getWorkflowStaticData("global") }', nodeMode: 'runOnceForEachItem', }), @@ -480,7 +480,7 @@ describe('JsTaskRunner', () => { "does not return static data if it hasn't been modified in %s", async (mode) => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: ` const staticData = $getWorkflowStaticData("global"); return { val: staticData }; @@ -502,7 +502,7 @@ describe('JsTaskRunner', () => { 'returns the updated static data in %s', async (mode) => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: ` const staticData = $getWorkflowStaticData("global"); staticData.newKey = 'newValue'; @@ -541,7 +541,7 @@ describe('JsTaskRunner', () => { it('should have the node workflow static data available in runOnceForAllItems', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $getWorkflowStaticData("node") }', nodeMode: 'runOnceForAllItems', }), @@ -553,7 +553,7 @@ describe('JsTaskRunner', () => { it('should have the node workflow static data available in runOnceForEachItem', async () => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: $getWorkflowStaticData("node") }', nodeMode: 'runOnceForEachItem', }), @@ -569,7 +569,7 @@ describe('JsTaskRunner', () => { "does not return static data if it hasn't been modified in %s", async (mode) => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: ` const staticData = $getWorkflowStaticData("node"); return { val: staticData }; @@ -587,7 +587,7 @@ describe('JsTaskRunner', () => { 'returns the updated static data in %s', async (mode) => { const outcome = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: ` const staticData = $getWorkflowStaticData("node"); staticData.newKey = 'newValue'; @@ -662,7 +662,7 @@ describe('JsTaskRunner', () => { // Act await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: `await ${group.invocation}; return []`, nodeMode: 'runOnceForAllItems', }), @@ -684,7 +684,7 @@ describe('JsTaskRunner', () => { // Act await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: `await ${group.invocation}; return {}`, nodeMode: 'runOnceForEachItem', }), @@ -725,7 +725,7 @@ describe('JsTaskRunner', () => { it('should allow access to Node.js Buffers', async () => { const outcomeAll = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForAllItems', }), @@ -737,7 +737,7 @@ describe('JsTaskRunner', () => { expect(outcomeAll.result).toEqual([wrapIntoJson({ val: 'test-buffer' })]); const outcomePer = await execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForEachItem', }), @@ -1205,7 +1205,7 @@ describe('JsTaskRunner', () => { async (nodeMode) => { await expect( execTaskWithParams({ - task: newTaskWithSettings({ + task: newTaskParamsWithSettings({ code: 'unknown', nodeMode, }), @@ -1218,12 +1218,13 @@ describe('JsTaskRunner', () => { it('sends serializes an error correctly', async () => { const runner = createRunnerWithOpts({}); const taskId = '1'; - const task = newTaskWithSettings({ + const task = newTaskState(taskId); + const taskSettings: JSExecSettings = { code: 'unknown; return []', nodeMode: 'runOnceForAllItems', continueOnFail: false, workflowMode: 'manual', - }); + }; runner.runningTasks.set(taskId, task); const sendSpy = jest.spyOn(runner.ws, 'send').mockImplementation(() => {}); @@ -1232,7 +1233,7 @@ describe('JsTaskRunner', () => { .spyOn(runner, 'requestData') .mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })])); - await runner.receivedSettings(taskId, task.settings); + await runner.receivedSettings(taskId, taskSettings); expect(sendSpy).toHaveBeenCalled(); const calledWith = sendSpy.mock.calls[0][0] as string; @@ -1304,11 +1305,7 @@ describe('JsTaskRunner', () => { const emitSpy = jest.spyOn(runner, 'emit'); jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] }); - runner.runningTasks.set(taskId, { - taskId, - active: true, - cancelled: false, - }); + runner.runningTasks.set(taskId, newTaskState(taskId)); jest.advanceTimersByTime(idleTimeout * 1000 - 100); expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); @@ -1335,15 +1332,13 @@ describe('JsTaskRunner', () => { const runner = createRunnerWithOpts({}, { idleTimeout }); const taskId = '123'; const emitSpy = jest.spyOn(runner, 'emit'); + const task = newTaskState(taskId); - runner.runningTasks.set(taskId, { - taskId, - active: true, - cancelled: false, - }); + runner.runningTasks.set(taskId, task); jest.advanceTimersByTime(idleTimeout * 1000); expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + task.cleanup(); }); }); }); diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts index ef1d65a737..9db385eb7f 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts @@ -1,6 +1,9 @@ import { WebSocket } from 'ws'; +import { newTaskState } from '@/js-task-runner/__tests__/test-data'; +import { TimeoutError } from '@/js-task-runner/errors/timeout-error'; import { TaskRunner, type TaskRunnerOpts } from '@/task-runner'; +import type { TaskStatus } from '@/task-state'; class TestRunner extends TaskRunner {} @@ -154,11 +157,8 @@ describe('TestRunner', () => { runner.onMessage({ type: 'broker:runnerregistered', }); - runner.runningTasks.set('test-task', { - taskId: 'test-task', - active: true, - cancelled: false, - }); + const taskState = newTaskState('test-task'); + runner.runningTasks.set('test-task', taskState); const sendSpy = jest.spyOn(runner, 'send'); runner.sendOffers(); @@ -174,6 +174,7 @@ describe('TestRunner', () => { }, ], ]); + taskState.cleanup(); }); it('should delete stale offers and send new ones', () => { @@ -198,15 +199,45 @@ describe('TestRunner', () => { }); describe('taskCancelled', () => { - it('should reject pending requests when task is cancelled', () => { - const runner = newTestRunner(); + test.each<[TaskStatus, string]>([ + ['aborting:cancelled', 'cancelled'], + ['aborting:timeout', 'timeout'], + ])('should not do anything if task status is %s', async (status, reason) => { + runner = newTestRunner(); const taskId = 'test-task'; - runner.runningTasks.set(taskId, { - taskId, - active: false, - cancelled: false, - }); + const task = newTaskState(taskId); + task.status = status; + + runner.runningTasks.set(taskId, task); + + await runner.taskCancelled(taskId, reason); + + expect(runner.runningTasks.size).toBe(1); + expect(task.status).toBe(status); + }); + + it('should delete task if task is waiting for settings when task is cancelled', async () => { + runner = newTestRunner(); + + const taskId = 'test-task'; + const task = newTaskState(taskId); + const taskCleanupSpy = jest.spyOn(task, 'cleanup'); + runner.runningTasks.set(taskId, task); + + await runner.taskCancelled(taskId, 'test-reason'); + + expect(runner.runningTasks.size).toBe(0); + expect(taskCleanupSpy).toHaveBeenCalled(); + }); + + it('should reject pending requests when task is cancelled', async () => { + runner = newTestRunner(); + + const taskId = 'test-task'; + const task = newTaskState(taskId); + task.status = 'running'; + runner.runningTasks.set(taskId, task); const dataRequestReject = jest.fn(); const nodeTypesRequestReject = jest.fn(); @@ -225,7 +256,71 @@ describe('TestRunner', () => { reject: nodeTypesRequestReject, }); - runner.taskCancelled(taskId, 'test-reason'); + await runner.taskCancelled(taskId, 'test-reason'); + + expect(dataRequestReject).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Task cancelled: test-reason', + }), + ); + + expect(nodeTypesRequestReject).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Task cancelled: test-reason', + }), + ); + + expect(runner.dataRequests.size).toBe(0); + expect(runner.nodeTypesRequests.size).toBe(0); + }); + }); + + describe('taskTimedOut', () => { + it('should error task if task is waiting for settings', async () => { + runner = newTestRunner(); + + const taskId = 'test-task'; + const task = newTaskState(taskId); + task.status = 'waitingForSettings'; + runner.runningTasks.set(taskId, task); + const sendSpy = jest.spyOn(runner, 'send'); + + await runner.taskTimedOut(taskId); + + expect(runner.runningTasks.size).toBe(0); + expect(sendSpy).toHaveBeenCalledWith({ + type: 'runner:taskerror', + taskId, + error: expect.any(TimeoutError), + }); + }); + + it('should reject pending requests when task is running', async () => { + runner = newTestRunner(); + + const taskId = 'test-task'; + const task = newTaskState(taskId); + task.status = 'running'; + runner.runningTasks.set(taskId, task); + + const dataRequestReject = jest.fn(); + const nodeTypesRequestReject = jest.fn(); + + runner.dataRequests.set('data-req', { + taskId, + requestId: 'data-req', + resolve: jest.fn(), + reject: dataRequestReject, + }); + + runner.nodeTypesRequests.set('node-req', { + taskId, + requestId: 'node-req', + resolve: jest.fn(), + reject: nodeTypesRequestReject, + }); + + await runner.taskCancelled(taskId, 'test-reason'); expect(dataRequestReject).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts index f13939e51e..85a1235dc6 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts @@ -4,22 +4,21 @@ import { nanoid } from 'nanoid'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { DataRequestResponse } from '@/runner-types'; -import type { Task } from '@/task-runner'; +import type { TaskParams } from '@/task-runner'; +import { TaskState } from '@/task-state'; /** * Creates a new task with the given settings */ -export const newTaskWithSettings = ( +export const newTaskParamsWithSettings = ( settings: Partial & Pick, -): Task => ({ +): TaskParams => ({ taskId: '1', settings: { workflowMode: 'manual', continueOnFail: false, ...settings, }, - active: true, - cancelled: false, }); /** @@ -167,3 +166,13 @@ export const withPairedItem = (index: number, data: INodeExecutionData): INodeEx item: index, }, }); + +/** + * Creates a new task state with the given taskId + */ +export const newTaskState = (taskId: string) => + new TaskState({ + taskId, + timeoutInS: 60, + onTimeout: () => {}, + }); diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index db93f3c338..ab2cc3a304 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -23,15 +23,15 @@ import { runInNewContext, type Context } from 'node:vm'; import type { MainConfig } from '@/config/main-config'; import { UnsupportedFunctionError } from '@/js-task-runner/errors/unsupported-function.error'; -import { - EXPOSED_RPC_METHODS, - UNSUPPORTED_HELPER_FUNCTIONS, - type DataRequestResponse, - type InputDataChunkDefinition, - type PartialAdditionalData, - type TaskResultData, +import { EXPOSED_RPC_METHODS, UNSUPPORTED_HELPER_FUNCTIONS } from '@/runner-types'; +import type { + DataRequestResponse, + InputDataChunkDefinition, + PartialAdditionalData, + TaskResultData, } from '@/runner-types'; -import { type Task, TaskRunner } from '@/task-runner'; +import type { TaskParams } from '@/task-runner'; +import { noOp, TaskRunner } from '@/task-runner'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state'; @@ -81,8 +81,6 @@ type CustomConsole = { log: (...args: unknown[]) => void; }; -const noOp = () => {}; - export class JsTaskRunner extends TaskRunner { private readonly requireResolver: RequireResolver; @@ -107,8 +105,11 @@ export class JsTaskRunner extends TaskRunner { }); } - async executeTask(task: Task, signal: AbortSignal): Promise { - const settings = task.settings; + async executeTask( + taskParams: TaskParams, + abortSignal: AbortSignal, + ): Promise { + const { taskId, settings } = taskParams; a.ok(settings, 'JS Code not sent to runner'); this.validateTaskSettings(settings); @@ -119,13 +120,13 @@ export class JsTaskRunner extends TaskRunner { : BuiltInsParserState.newNeedsAllDataState(); const dataResponse = await this.requestData( - task.taskId, + taskId, neededBuiltIns.toDataRequestParams(settings.chunk), ); const data = this.reconstructTaskData(dataResponse, settings.chunk); - await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); + await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, taskId); const workflowParams = data.workflow; const workflow = new Workflow({ @@ -137,8 +138,8 @@ export class JsTaskRunner extends TaskRunner { const result = settings.nodeMode === 'runOnceForAllItems' - ? await this.runForAllItems(task.taskId, settings, data, workflow, signal) - : await this.runForEachItem(task.taskId, settings, data, workflow, signal); + ? await this.runForAllItems(taskId, settings, data, workflow, abortSignal) + : await this.runForEachItem(taskId, settings, data, workflow, abortSignal); return { result, diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 78ec83961c..dd048bcf7e 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -5,19 +5,14 @@ import { EventEmitter } from 'node:events'; import { type MessageEvent, WebSocket } from 'ws'; import type { BaseRunnerConfig } from '@/config/base-runner-config'; +import { TimeoutError } from '@/js-task-runner/errors/timeout-error'; import type { BrokerMessage, RunnerMessage } from '@/message-types'; import { TaskRunnerNodeTypes } from '@/node-types'; import type { TaskResultData } from '@/runner-types'; +import { TaskState } from '@/task-state'; import { TaskCancelledError } from './js-task-runner/errors/task-cancelled-error'; -export interface Task { - taskId: string; - settings?: T; - active: boolean; - cancelled: boolean; -} - export interface TaskOffer { offerId: string; validUntil: bigint; @@ -49,6 +44,14 @@ const OFFER_VALID_EXTRA_MS = 100; /** Converts milliseconds to nanoseconds */ const msToNs = (ms: number) => BigInt(ms * 1_000_000); +export const noOp = () => {}; + +/** Params the task receives when it is executed */ +export interface TaskParams { + taskId: string; + settings: T; +} + export interface TaskRunnerOpts extends BaseRunnerConfig { taskType: string; name?: string; @@ -61,7 +64,7 @@ export abstract class TaskRunner extends EventEmitter { canSendOffers = false; - runningTasks: Map = new Map(); + runningTasks: Map = new Map(); offerInterval: NodeJS.Timeout | undefined; @@ -89,10 +92,9 @@ export abstract class TaskRunner extends EventEmitter { /** How long (in seconds) a runner may be idle for before exit. */ private readonly idleTimeout: number; - protected taskCancellations = new Map(); - constructor(opts: TaskRunnerOpts) { super(); + this.taskType = opts.taskType; this.name = opts.name ?? 'Node.js Task Runner SDK'; this.maxConcurrency = opts.maxConcurrency; @@ -219,7 +221,7 @@ export abstract class TaskRunner extends EventEmitter { this.offerAccepted(message.offerId, message.taskId); break; case 'broker:taskcancel': - this.taskCancelled(message.taskId, message.reason); + void this.taskCancelled(message.taskId, message.reason); break; case 'broker:tasksettings': void this.receivedSettings(message.taskId, message.settings); @@ -284,11 +286,14 @@ export abstract class TaskRunner extends EventEmitter { } this.resetIdleTimer(); - this.runningTasks.set(taskId, { + const taskState = new TaskState({ taskId, - active: false, - cancelled: false, + timeoutInS: this.taskTimeout, + onTimeout: () => { + void this.taskTimedOut(taskId); + }, }); + this.runningTasks.set(taskId, taskState); this.send({ type: 'runner:taskaccepted', @@ -296,99 +301,103 @@ export abstract class TaskRunner extends EventEmitter { }); } - taskCancelled(taskId: string, reason: string) { - const task = this.runningTasks.get(taskId); - if (!task) { + async taskCancelled(taskId: string, reason: string) { + const taskState = this.runningTasks.get(taskId); + if (!taskState) { return; } - task.cancelled = true; - for (const [requestId, request] of this.dataRequests.entries()) { - if (request.taskId === taskId) { - request.reject(new TaskCancelledError(reason)); - this.dataRequests.delete(requestId); - } - } + await taskState.caseOf({ + // If the cancelled task hasn't received settings yet, we can finish it + waitingForSettings: () => this.finishTask(taskState), - for (const [requestId, request] of this.nodeTypesRequests.entries()) { - if (request.taskId === taskId) { - request.reject(new TaskCancelledError(reason)); - this.nodeTypesRequests.delete(requestId); - } - } + // If the task has already timed out or is already cancelled, we can + // ignore the cancellation + 'aborting:timeout': noOp, + 'aborting:cancelled': noOp, - const controller = this.taskCancellations.get(taskId); - if (controller) { - controller.abort(); - this.taskCancellations.delete(taskId); - } - - if (!task.active) this.runningTasks.delete(taskId); - - this.sendOffers(); + running: () => { + taskState.status = 'aborting:cancelled'; + taskState.abortController.abort('cancelled'); + this.cancelTaskRequests(taskId, reason); + }, + }); } - taskErrored(taskId: string, error: unknown) { - this.send({ - type: 'runner:taskerror', - taskId, - error, - }); - this.runningTasks.delete(taskId); - this.sendOffers(); - } + async taskTimedOut(taskId: string) { + const taskState = this.runningTasks.get(taskId); + if (!taskState) { + return; + } - taskDone(taskId: string, data: RunnerMessage.ToBroker.TaskDone['data']) { - this.send({ - type: 'runner:taskdone', - taskId, - data, + await taskState.caseOf({ + // If we are still waiting for settings for the task, we can error the + // task immediately + waitingForSettings: () => { + try { + this.send({ + type: 'runner:taskerror', + taskId, + error: new TimeoutError(this.taskTimeout), + }); + } finally { + this.finishTask(taskState); + } + }, + + // This should never happen, the timeout timer should only fire once + 'aborting:timeout': TaskState.throwUnexpectedTaskStatus, + + // If we are currently executing the task, abort the execution and + // mark the task as timed out + running: () => { + taskState.status = 'aborting:timeout'; + taskState.abortController.abort('timeout'); + this.cancelTaskRequests(taskId, 'timeout'); + }, + + // If the task is already cancelling, we can ignore the timeout + 'aborting:cancelled': noOp, }); - this.runningTasks.delete(taskId); - this.sendOffers(); } async receivedSettings(taskId: string, settings: unknown) { - const task = this.runningTasks.get(taskId); - if (!task) { - return; - } - if (task.cancelled) { - this.runningTasks.delete(taskId); + const taskState = this.runningTasks.get(taskId); + if (!taskState) { return; } - const controller = new AbortController(); - this.taskCancellations.set(taskId, controller); + await taskState.caseOf({ + // These states should never happen, as they are handled already in + // the other lifecycle methods and the task should be removed from the + // running tasks + 'aborting:cancelled': TaskState.throwUnexpectedTaskStatus, + 'aborting:timeout': TaskState.throwUnexpectedTaskStatus, + running: TaskState.throwUnexpectedTaskStatus, - const taskTimeout = setTimeout(() => { - if (!task.cancelled) { - controller.abort(); - this.taskCancellations.delete(taskId); - } - }, this.taskTimeout * 1_000); + waitingForSettings: async () => { + taskState.status = 'running'; - task.settings = settings; - task.active = true; - try { - const data = await this.executeTask(task, controller.signal); - this.taskDone(taskId, data); - } catch (error) { - if (!task.cancelled) this.taskErrored(taskId, error); - } finally { - clearTimeout(taskTimeout); - this.taskCancellations.delete(taskId); - this.resetIdleTimer(); - } + await this.executeTask( + { + taskId, + settings, + }, + taskState.abortController.signal, + ) + .then(async (data) => await this.taskExecutionSucceeded(taskState, data)) + .catch(async (error) => await this.taskExecutionFailed(taskState, error)); + }, + }); } // eslint-disable-next-line @typescript-eslint/naming-convention - async executeTask(_task: Task, _signal: AbortSignal): Promise { + async executeTask(_taskParams: TaskParams, _signal: AbortSignal): Promise { throw new ApplicationError('Unimplemented'); } async requestNodeTypes( - taskId: Task['taskId'], + taskId: TaskState['taskId'], requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'], ) { const requestId = nanoid(); @@ -417,12 +426,12 @@ export abstract class TaskRunner extends EventEmitter { } async requestData( - taskId: Task['taskId'], + taskId: TaskState['taskId'], requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'], ): Promise { const requestId = nanoid(); - const p = new Promise((resolve, reject) => { + const dataRequestPromise = new Promise((resolve, reject) => { this.dataRequests.set(requestId, { requestId, taskId, @@ -439,7 +448,7 @@ export abstract class TaskRunner extends EventEmitter { }); try { - return await p; + return await dataRequestPromise; } finally { this.dataRequests.delete(requestId); } @@ -527,4 +536,86 @@ export abstract class TaskRunner extends EventEmitter { await new Promise((resolve) => setTimeout(resolve, 100)); } } + + private async taskExecutionSucceeded(taskState: TaskState, data: TaskResultData) { + try { + const sendData = () => { + this.send({ + type: 'runner:taskdone', + taskId: taskState.taskId, + data, + }); + }; + + await taskState.caseOf({ + waitingForSettings: TaskState.throwUnexpectedTaskStatus, + + 'aborting:cancelled': noOp, + + // If the task timed out but we ended up reaching this point, we + // might as well send the data + 'aborting:timeout': sendData, + running: sendData, + }); + } finally { + this.finishTask(taskState); + } + } + + private async taskExecutionFailed(taskState: TaskState, error: unknown) { + try { + const sendError = () => { + this.send({ + type: 'runner:taskerror', + taskId: taskState.taskId, + error, + }); + }; + + await taskState.caseOf({ + waitingForSettings: TaskState.throwUnexpectedTaskStatus, + + 'aborting:cancelled': noOp, + + 'aborting:timeout': () => { + console.warn(`Task ${taskState.taskId} timed out`); + + sendError(); + }, + + running: sendError, + }); + } finally { + this.finishTask(taskState); + } + } + + /** + * Cancels all node type and data requests made by the given task + */ + private cancelTaskRequests(taskId: string, reason: string) { + for (const [requestId, request] of this.dataRequests.entries()) { + if (request.taskId === taskId) { + request.reject(new TaskCancelledError(reason)); + this.dataRequests.delete(requestId); + } + } + + for (const [requestId, request] of this.nodeTypesRequests.entries()) { + if (request.taskId === taskId) { + request.reject(new TaskCancelledError(reason)); + this.nodeTypesRequests.delete(requestId); + } + } + } + + /** + * Finishes task by removing it from the running tasks and sending new offers + */ + private finishTask(taskState: TaskState) { + taskState.cleanup(); + this.runningTasks.delete(taskState.taskId); + this.sendOffers(); + this.resetIdleTimer(); + } } diff --git a/packages/@n8n/task-runner/src/task-state.ts b/packages/@n8n/task-runner/src/task-state.ts new file mode 100644 index 0000000000..4c2c0e44a8 --- /dev/null +++ b/packages/@n8n/task-runner/src/task-state.ts @@ -0,0 +1,118 @@ +import * as a from 'node:assert'; + +export type TaskStatus = + | 'waitingForSettings' + | 'running' + | 'aborting:cancelled' + | 'aborting:timeout'; + +export type TaskStateOpts = { + taskId: string; + timeoutInS: number; + onTimeout: () => void; +}; + +/** + * The state of a task. The task can be in one of the following states: + * - waitingForSettings: The task is waiting for settings from the broker + * - running: The task is currently running + * - aborting:cancelled: The task was canceled by the broker and is being aborted + * - aborting:timeout: The task took too long to complete and is being aborted + * + * The task is discarded once it reaches an end state. + * + * The class only holds the state, and does not have any logic. + * + * The task has the following lifecycle: + * + * ┌───┐ + * └───┘ + * │ + * broker:taskofferaccept : create task state + * │ + * ▼ + * ┌────────────────────┐ broker:taskcancel / timeout + * │ waitingForSettings ├──────────────────────────────────┐ + * └────────┬───────────┘ │ + * │ │ + * broker:tasksettings │ + * │ │ + * ▼ │ + * ┌───────────────┐ ┌────────────────────┐ │ + * │ running │ │ aborting:timeout │ │ + * │ │ timeout │ │ │ + * ┌───────┤- execute task ├───────────►│- fire abort signal │ │ + * │ └──────┬────────┘ └──────────┬─────────┘ │ + * │ │ │ │ + * │ broker:taskcancel │ │ + * Task execution │ Task execution │ + * resolves / rejects │ resolves / rejects │ + * │ ▼ │ │ + * │ ┌─────────────────────┐ │ │ + * │ │ aborting:cancelled │ │ │ + * │ │ │ │ │ + * │ │- fire abort signal │ │ │ + * │ └──────────┬──────────┘ │ │ + * │ Task execution │ │ + * │ resolves / rejects │ │ + * │ │ │ │ + * │ ▼ │ │ + * │ ┌──┐ │ │ + * └─────────────►│ │◄────────────────────────────┴─────────────┘ + * └──┘ + */ +export class TaskState { + status: TaskStatus = 'waitingForSettings'; + + readonly taskId: string; + + /** Controller for aborting the execution of the task */ + readonly abortController = new AbortController(); + + /** Timeout timer for the task */ + private timeoutTimer: NodeJS.Timeout | undefined; + + constructor(opts: TaskStateOpts) { + this.taskId = opts.taskId; + this.timeoutTimer = setTimeout(opts.onTimeout, opts.timeoutInS * 1000); + } + + /** Cleans up any resources before the task can be removed */ + cleanup() { + clearTimeout(this.timeoutTimer); + this.timeoutTimer = undefined; + } + + /** Custom JSON serialization for the task state for logging purposes */ + toJSON() { + return `[Task ${this.taskId} (${this.status})]`; + } + + /** + * Executes the function matching the current task status + * + * @example + * ```ts + * taskState.caseOf({ + * waitingForSettings: () => {...}, + * running: () => {...}, + * aborting:cancelled: () => {...}, + * aborting:timeout: () => {...}, + * }); + * ``` + */ + async caseOf( + conditions: Record void | Promise | never>, + ) { + if (!conditions[this.status]) { + TaskState.throwUnexpectedTaskStatus(this); + } + + return await conditions[this.status](this); + } + + /** Throws an error that the task status is unexpected */ + static throwUnexpectedTaskStatus = (taskState: TaskState) => { + a.fail(`Unexpected task status: ${JSON.stringify(taskState)}`); + }; +}