import { type EnvProviderState, type IExecuteFunctions, type Workflow, type IRunExecutionData, type INodeExecutionData, type ITaskDataConnections, type INode, type WorkflowParameters, type INodeParameters, type WorkflowExecuteMode, type IExecuteData, type IDataObject, type IWorkflowExecuteAdditionalData, type Result, createResultOk, createResultError, } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { DataRequestResponseBuilder } from './data-request-response-builder'; import { RPC_ALLOW_LIST, type TaskResultData, type N8nMessage, type RequesterMessage, } from '../runner-types'; export type RequestAccept = (jobId: string) => void; export type RequestReject = (reason: string) => void; export type TaskAccept = (data: TaskResultData) => void; export type TaskReject = (error: unknown) => void; export interface TaskData { executeFunctions: IExecuteFunctions; inputData: ITaskDataConnections; node: INode; workflow: Workflow; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; envProviderState: EnvProviderState; executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; contextNodeName: string; additionalData: IWorkflowExecuteAdditionalData; } export interface PartialAdditionalData { executionId?: string; restartExecutionId?: string; restApiUrl: string; instanceBaseUrl: string; formWaitingBaseUrl: string; webhookBaseUrl: string; webhookWaitingBaseUrl: string; webhookTestBaseUrl: string; currentNodeParameters?: INodeParameters; executionTimeoutTimestamp?: number; userId?: string; variables: IDataObject; } export interface DataRequestResponse { workflow: Omit; inputData: ITaskDataConnections; node: INode; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; envProviderState: EnvProviderState; executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; contextNodeName: string; additionalData: PartialAdditionalData; } export interface TaskRequest { requestId: string; taskType: string; settings: unknown; data: TaskData; } export interface Task { taskId: string; settings: unknown; data: TaskData; } interface ExecuteFunctionObject { [name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject; } export class TaskManager { requestAcceptRejects: Map = new Map(); taskAcceptRejects: Map = new Map(); pendingRequests: Map = new Map(); tasks: Map = new Map(); async startTask( additionalData: IWorkflowExecuteAdditionalData, taskType: string, settings: unknown, executeFunctions: IExecuteFunctions, inputData: ITaskDataConnections, node: INode, workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, itemIndex: number, activeNodeName: string, connectionInputData: INodeExecutionData[], siblingParameters: INodeParameters, mode: WorkflowExecuteMode, envProviderState: EnvProviderState, executeData?: IExecuteData, defaultReturnRunIndex = -1, selfData: IDataObject = {}, contextNodeName: string = activeNodeName, ): Promise> { const data: TaskData = { workflow, runExecutionData, runIndex, connectionInputData, inputData, node, executeFunctions, itemIndex, siblingParameters, mode, envProviderState, executeData, defaultReturnRunIndex, selfData, contextNodeName, activeNodeName, additionalData, }; const request: TaskRequest = { requestId: nanoid(), taskType, settings, data, }; this.pendingRequests.set(request.requestId, request); const taskIdPromise = new Promise((resolve, reject) => { this.requestAcceptRejects.set(request.requestId, { accept: resolve, reject, }); }); this.sendMessage({ type: 'requester:taskrequest', requestId: request.requestId, taskType, }); const taskId = await taskIdPromise; const task: Task = { taskId, data, settings, }; this.tasks.set(task.taskId, task); try { const dataPromise = new Promise((resolve, reject) => { this.taskAcceptRejects.set(task.taskId, { accept: resolve, reject, }); }); this.sendMessage({ type: 'requester:tasksettings', taskId, settings, }); const resultData = await dataPromise; // Set custom execution data (`$execution.customData`) if sent if (resultData.customData) { Object.entries(resultData.customData).forEach(([k, v]) => { if (!runExecutionData.resultData.metadata) { runExecutionData.resultData.metadata = {}; } runExecutionData.resultData.metadata[k] = v; }); } return createResultOk(resultData.result as TData); } catch (e: unknown) { return createResultError(e as TError); } finally { this.tasks.delete(taskId); } } sendMessage(_message: RequesterMessage.ToN8n.All) {} onMessage(message: N8nMessage.ToRequester.All) { switch (message.type) { case 'broker:taskready': this.taskReady(message.requestId, message.taskId); break; case 'broker:taskdone': this.taskDone(message.taskId, message.data); break; case 'broker:taskerror': this.taskError(message.taskId, message.error); break; case 'broker:taskdatarequest': this.sendTaskData(message.taskId, message.requestId, message.requestParams); break; case 'broker:rpc': void this.handleRpc(message.taskId, message.callId, message.name, message.params); break; } } taskReady(requestId: string, taskId: string) { const acceptReject = this.requestAcceptRejects.get(requestId); if (!acceptReject) { this.rejectTask( taskId, 'Request ID not found. In multi-main setup, it is possible for one of the mains to have reported ready state already.', ); return; } acceptReject.accept(taskId); this.requestAcceptRejects.delete(requestId); } rejectTask(jobId: string, reason: string) { this.sendMessage({ type: 'requester:taskcancel', taskId: jobId, reason, }); } taskDone(taskId: string, data: TaskResultData) { const acceptReject = this.taskAcceptRejects.get(taskId); if (acceptReject) { acceptReject.accept(data); this.taskAcceptRejects.delete(taskId); } } taskError(taskId: string, error: unknown) { const acceptReject = this.taskAcceptRejects.get(taskId); if (acceptReject) { acceptReject.reject(error); this.taskAcceptRejects.delete(taskId); } } sendTaskData( taskId: string, requestId: string, requestParams: N8nMessage.ToRequester.TaskDataRequest['requestParams'], ) { const job = this.tasks.get(taskId); if (!job) { // TODO: logging return; } const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams); const requestedData = dataRequestResponseBuilder.build(); this.sendMessage({ type: 'requester:taskdataresponse', taskId, requestId, data: requestedData, }); } async handleRpc( taskId: string, callId: string, name: N8nMessage.ToRequester.RPC['name'], params: unknown[], ) { const job = this.tasks.get(taskId); if (!job) { // TODO: logging return; } try { if (!RPC_ALLOW_LIST.includes(name)) { this.sendMessage({ type: 'requester:rpcresponse', taskId, callId, status: 'error', data: 'Method not allowed', }); return; } const splitPath = name.split('.'); const funcs = job.data.executeFunctions; let func: ((...args: unknown[]) => Promise) | undefined = undefined; let funcObj: ExecuteFunctionObject[string] | undefined = funcs as unknown as ExecuteFunctionObject; for (const part of splitPath) { funcObj = (funcObj as ExecuteFunctionObject)[part] ?? undefined; if (!funcObj) { break; } } func = funcObj as unknown as (...args: unknown[]) => Promise; if (!func) { this.sendMessage({ type: 'requester:rpcresponse', taskId, callId, status: 'error', data: 'Could not find method', }); return; } const data = (await func.call(funcs, ...params)) as unknown; this.sendMessage({ type: 'requester:rpcresponse', taskId, callId, status: 'success', data, }); } catch (e) { this.sendMessage({ type: 'requester:rpcresponse', taskId, callId, status: 'error', data: e, }); } } }