diff --git a/.github/workflows/release-publish.yml b/.github/workflows/release-publish.yml index 21e33bbcff..9196a7fccb 100644 --- a/.github/workflows/release-publish.yml +++ b/.github/workflows/release-publish.yml @@ -42,7 +42,7 @@ jobs: uses: actions/cache/save@v4.0.0 with: path: ./packages/**/dist - key: ${{ github.sha }}-base:build + key: ${{ github.sha }}-release:build - name: Dry-run publishing run: pnpm publish -r --no-git-checks --dry-run @@ -141,7 +141,7 @@ jobs: uses: actions/cache/restore@v4.0.0 with: path: ./packages/**/dist - key: ${{ github.sha }}:db-tests + key: ${{ github.sha }}-release:build - name: Create a frontend release uses: getsentry/action-release@v1.7.0 diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 001a8cb27d..bd7b744ea2 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -10,9 +10,8 @@ export type TaskRunnerMode = 'internal_childprocess' | 'internal_launcher' | 'ex @Config export class TaskRunnersConfig { - // Defaults to true for now - @Env('N8N_RUNNERS_DISABLED') - disabled: boolean = true; + @Env('N8N_RUNNERS_ENABLED') + enabled: boolean = false; // Defaults to true for now @Env('N8N_RUNNERS_MODE') @@ -51,6 +50,10 @@ export class TaskRunnersConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; + /** Should the output of deduplication be asserted for correctness */ + @Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT') + assertDeduplicationOutput: boolean = false; + /** How long (in minutes) until shutting down an idle runner. */ @Env('N8N_RUNNERS_IDLE_TIMEOUT') idleTimeout: number = 5; diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index bc10028f36..eeb98269de 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -222,7 +222,7 @@ describe('GlobalConfig', () => { }, }, taskRunners: { - disabled: true, + enabled: false, mode: 'internal_childprocess', path: '/runners', authToken: '', @@ -233,6 +233,7 @@ describe('GlobalConfig', () => { launcherRunner: 'javascript', maxOldSpaceSize: '', maxConcurrency: 5, + assertDeduplicationOutput: false, }, sentry: { backendDsn: '', diff --git a/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts new file mode 100644 index 0000000000..83a291a491 --- /dev/null +++ b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts @@ -0,0 +1,29 @@ +import type { IExecuteData, INodeExecutionData } from 'n8n-workflow'; + +import type { DataRequestResponse } from '@/runner-types'; + +/** + * Reconstructs data from a DataRequestResponse to the initial + * data structures. + */ +export class DataRequestResponseReconstruct { + /** + * Reconstructs `connectionInputData` from a DataRequestResponse + */ + reconstructConnectionInputData( + inputData: DataRequestResponse['inputData'], + ): INodeExecutionData[] { + return inputData?.main?.[0] ?? []; + } + + /** + * Reconstruct `executeData` from a DataRequestResponse + */ + reconstructExecuteData(response: DataRequestResponse): IExecuteData { + return { + data: response.inputData, + node: response.node, + source: response.connectionInputSource, + }; + } +} diff --git a/packages/@n8n/task-runner/src/index.ts b/packages/@n8n/task-runner/src/index.ts index bc770ea08e..5fcc6e078b 100644 --- a/packages/@n8n/task-runner/src/index.ts +++ b/packages/@n8n/task-runner/src/index.ts @@ -1,3 +1,4 @@ export * from './task-runner'; export * from './runner-types'; export * from './message-types'; +export * from './data-request/data-request-response-reconstruct'; diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index cd0863b13e..621a9c81a7 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -3,15 +3,21 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow'; import fs from 'node:fs'; import { builtinModules } from 'node:module'; +import type { JsRunnerConfig } from '@/config/js-runner-config'; +import { MainConfig } from '@/config/main-config'; +import { ExecutionError } from '@/js-task-runner/errors/execution-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error'; -import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; +import type { DataRequestResponse } from '@/runner-types'; import type { Task } from '@/task-runner'; -import { newCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; -import type { JsRunnerConfig } from '../../config/js-runner-config'; -import { MainConfig } from '../../config/main-config'; -import { ExecutionError } from '../errors/execution-error'; +import { + newDataRequestResponse, + newTaskWithSettings, + withPairedItem, + wrapIntoJson, +} from './test-data'; jest.mock('ws'); @@ -68,7 +74,7 @@ describe('JsTaskRunner', () => { nodeMode: 'runOnceForAllItems', ...settings, }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), runner, }); }; @@ -91,7 +97,7 @@ describe('JsTaskRunner', () => { nodeMode: 'runOnceForEachItem', ...settings, }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), runner, }); }; @@ -108,7 +114,7 @@ describe('JsTaskRunner', () => { await execTaskWithParams({ task, - taskData: newCodeTaskData([wrapIntoJson({})]), + taskData: newDataRequestResponse([wrapIntoJson({})]), }); expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [ @@ -243,7 +249,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: false, isProcessAvailable: true, @@ -262,7 +268,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.VAR1 }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: true, isProcessAvailable: true, @@ -279,7 +285,7 @@ describe('JsTaskRunner', () => { code: 'return Object.values($env).concat(Object.keys($env))', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: { isEnvAccessBlocked: false, isProcessAvailable: true, @@ -298,7 +304,7 @@ describe('JsTaskRunner', () => { code: 'return { val: $env.N8N_RUNNERS_N8N_URI }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -313,7 +319,7 @@ describe('JsTaskRunner', () => { code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForAllItems', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -325,7 +331,7 @@ describe('JsTaskRunner', () => { code: 'return { val: Buffer.from("test-buffer").toString() }', nodeMode: 'runOnceForEachItem', }), - taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { + taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), { envProviderState: undefined, }), }); @@ -771,7 +777,7 @@ describe('JsTaskRunner', () => { code: 'unknown', nodeMode, }), - taskData: newCodeTaskData([wrapIntoJson({ a: 1 })]), + taskData: newDataRequestResponse([wrapIntoJson({ a: 1 })]), }), ).rejects.toThrow(ExecutionError); }, @@ -793,7 +799,7 @@ describe('JsTaskRunner', () => { jest.spyOn(runner, 'sendOffers').mockImplementation(() => {}); jest .spyOn(runner, 'requestData') - .mockResolvedValue(newCodeTaskData([wrapIntoJson({ a: 1 })])); + .mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })])); await runner.receivedSettings(taskId, task.settings); diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts index 6de3e6d2b1..224f630807 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts @@ -2,7 +2,8 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-work import { NodeConnectionType } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; +import type { DataRequestResponse } from '@/runner-types'; import type { Task } from '@/task-runner'; /** @@ -46,10 +47,10 @@ export const newTaskData = (opts: Partial & Pick }); /** - * Creates a new all code task data with the given options + * Creates a new data request response with the given options */ -export const newCodeTaskData = ( - codeNodeInputData: INodeExecutionData[], +export const newDataRequestResponse = ( + inputData: INodeExecutionData[], opts: Partial = {}, ): DataRequestResponse => { const codeNode = newNode({ @@ -83,9 +84,8 @@ export const newCodeTaskData = ( nodes: [manualTriggerNode, codeNode], }, inputData: { - main: [codeNodeInputData], + main: [inputData], }, - connectionInputData: codeNodeInputData, node: codeNode, runExecutionData: { startData: {}, @@ -95,7 +95,7 @@ export const newCodeTaskData = ( newTaskData({ source: [], data: { - main: [codeNodeInputData], + main: [inputData], }, }), ], @@ -137,14 +137,13 @@ export const newCodeTaskData = ( var: 'value', }, }, - executeData: { - node: codeNode, - data: { - main: [codeNodeInputData], - }, - source: { - main: [{ previousNode: manualTriggerNode.name }], - }, + connectionInputSource: { + main: [ + { + previousNode: 'Trigger', + previousNodeOutput: 0, + }, + ], }, ...opts, }; diff --git a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts index 399d9e6e2b..366a9188de 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser.test.ts @@ -1,8 +1,13 @@ import { getAdditionalKeys } from 'n8n-core'; -import type { IDataObject, INodeType, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import type { + IDataObject, + IExecuteData, + INodeType, + IWorkflowExecuteAdditionalData, +} from 'n8n-workflow'; import { Workflow, WorkflowDataProxy } from 'n8n-workflow'; -import { newCodeTaskData } from '../../__tests__/test-data'; +import { newDataRequestResponse } from '../../__tests__/test-data'; import { BuiltInsParser } from '../built-ins-parser'; import { BuiltInsParserState } from '../built-ins-parser-state'; @@ -159,7 +164,12 @@ describe('BuiltInsParser', () => { describe('WorkflowDataProxy built-ins', () => { it('should have a known list of built-ins', () => { - const data = newCodeTaskData([]); + const data = newDataRequestResponse([]); + const executeData: IExecuteData = { + data: {}, + node: data.node, + source: data.connectionInputSource, + }; const dataProxy = new WorkflowDataProxy( new Workflow({ ...data.workflow, @@ -179,7 +189,7 @@ describe('BuiltInsParser', () => { data.runIndex, 0, data.activeNodeName, - data.connectionInputData, + [], data.siblingParameters, data.mode, getAdditionalKeys( @@ -187,7 +197,7 @@ describe('BuiltInsParser', () => { data.mode, data.runExecutionData, ), - data.executeData, + executeData, data.defaultReturnRunIndex, data.selfData, data.contextNodeName, diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index 7e79bba73b..c64d58636b 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -1,28 +1,25 @@ import { getAdditionalKeys } from 'n8n-core'; -import { - WorkflowDataProxy, - // type IWorkflowDataProxyAdditionalKeys, - Workflow, -} from 'n8n-workflow'; +import { WorkflowDataProxy, Workflow } from 'n8n-workflow'; import type { CodeExecutionMode, - INode, - ITaskDataConnections, IWorkflowExecuteAdditionalData, - WorkflowParameters, IDataObject, - IExecuteData, INodeExecutionData, INodeParameters, - IRunExecutionData, WorkflowExecuteMode, + WorkflowParameters, + ITaskDataConnections, + INode, + IRunExecutionData, EnvProviderState, + IExecuteData, INodeTypeDescription, } from 'n8n-workflow'; import * as a from 'node:assert'; import { runInNewContext, type Context } from 'node:vm'; -import type { TaskResultData } from '@/runner-types'; +import type { MainConfig } from '@/config/main-config'; +import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; import { type Task, TaskRunner } from '@/task-runner'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; @@ -33,7 +30,7 @@ import { makeSerializable } from './errors/serializable-error'; import type { RequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; -import type { MainConfig } from '../config/main-config'; +import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct'; export interface JSExecSettings { code: string; @@ -45,34 +42,19 @@ export interface JSExecSettings { mode: WorkflowExecuteMode; } -export interface PartialAdditionalData { - executionId?: string; - restartExecutionId?: string; - restApiUrl: string; - instanceBaseUrl: string; - formWaitingBaseUrl: string; - webhookBaseUrl: string; - webhookWaitingBaseUrl: string; - webhookTestBaseUrl: string; - currentNodeParameters?: INodeParameters; - executionTimeoutTimestamp?: number; - userId?: string; - variables: IDataObject; -} - -export interface DataRequestResponse { +export interface JsTaskData { workflow: Omit; inputData: ITaskDataConnections; + connectionInputData: INodeExecutionData[]; node: INode; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; - connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; - envProviderState?: EnvProviderState; + envProviderState: EnvProviderState; executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; @@ -89,6 +71,8 @@ export class JsTaskRunner extends TaskRunner { private readonly builtInsParser = new BuiltInsParser(); + private readonly taskDataReconstruct = new DataRequestResponseReconstruct(); + constructor(config: MainConfig, name = 'JS Task Runner') { super({ taskType: 'javascript', @@ -115,33 +99,14 @@ export class JsTaskRunner extends TaskRunner { ? neededBuiltInsResult.result : BuiltInsParserState.newNeedsAllDataState(); - const data = await this.requestData( + const dataResponse = await this.requestData( task.taskId, neededBuiltIns.toDataRequestParams(), ); - /** - * We request node types only when we know a task needs all nodes, because - * needing all nodes means that the task relies on paired item functionality, - * which is the same requirement for needing node types. - */ - if (neededBuiltIns.needsAllNodes) { - const uniqueNodeTypes = new Map( - data.workflow.nodes.map((node) => [ - `${node.type}|${node.typeVersion}`, - { name: node.type, version: node.typeVersion }, - ]), - ); + const data = this.reconstructTaskData(dataResponse); - const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); - - const nodeTypes = await this.requestNodeTypes( - task.taskId, - unknownNodeTypes, - ); - - this.nodeTypes.addNodeTypeDescriptions(nodeTypes); - } + await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); const workflowParams = data.workflow; const workflow = new Workflow({ @@ -201,7 +166,7 @@ export class JsTaskRunner extends TaskRunner { private async runForAllItems( taskId: string, settings: JSExecSettings, - data: DataRequestResponse, + data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, ): Promise { @@ -248,7 +213,7 @@ export class JsTaskRunner extends TaskRunner { private async runForEachItem( taskId: string, settings: JSExecSettings, - data: DataRequestResponse, + data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, ): Promise { @@ -315,7 +280,7 @@ export class JsTaskRunner extends TaskRunner { return returnData; } - private createDataProxy(data: DataRequestResponse, workflow: Workflow, itemIndex: number) { + private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) { return new WorkflowDataProxy( workflow, data.runExecutionData, @@ -359,4 +324,43 @@ export class JsTaskRunner extends TaskRunner { return new ExecutionError({ message: JSON.stringify(error) }); } + + private reconstructTaskData(response: DataRequestResponse): JsTaskData { + return { + ...response, + connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData( + response.inputData, + ), + executeData: this.taskDataReconstruct.reconstructExecuteData(response), + }; + } + + private async requestNodeTypeIfNeeded( + neededBuiltIns: BuiltInsParserState, + workflow: JsTaskData['workflow'], + taskId: string, + ) { + /** + * We request node types only when we know a task needs all nodes, because + * needing all nodes means that the task relies on paired item functionality, + * which is the same requirement for needing node types. + */ + if (neededBuiltIns.needsAllNodes) { + const uniqueNodeTypes = new Map( + workflow.nodes.map((node) => [ + `${node.type}|${node.typeVersion}`, + { name: node.type, version: node.typeVersion }, + ]), + ); + + const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); + + const nodeTypes = await this.requestNodeTypes( + taskId, + unknownNodeTypes, + ); + + this.nodeTypes.addNodeTypeDescriptions(nodeTypes); + } + } } diff --git a/packages/@n8n/task-runner/src/runner-types.ts b/packages/@n8n/task-runner/src/runner-types.ts index 836c42ed49..4649c2cc2f 100644 --- a/packages/@n8n/task-runner/src/runner-types.ts +++ b/packages/@n8n/task-runner/src/runner-types.ts @@ -8,6 +8,7 @@ import type { INodeParameters, IRunExecutionData, ITaskDataConnections, + ITaskDataConnectionsSource, IWorkflowExecuteAdditionalData, Workflow, WorkflowExecuteMode, @@ -29,17 +30,16 @@ export interface TaskDataRequestParams { export interface DataRequestResponse { workflow: Omit; inputData: ITaskDataConnections; + connectionInputSource: ITaskDataConnectionsSource | null; node: INode; runExecutionData: IRunExecutionData; runIndex: number; itemIndex: number; activeNodeName: string; - connectionInputData: INodeExecutionData[]; siblingParameters: INodeParameters; mode: WorkflowExecuteMode; envProviderState: EnvProviderState; - executeData?: IExecuteData; defaultReturnRunIndex: number; selfData: IDataObject; contextNodeName: string; diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index bb8b56d32b..42b5df13e6 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -221,7 +221,7 @@ export class Start extends BaseCommand { } const { taskRunners: taskRunnerConfig } = this.globalConfig; - if (!taskRunnerConfig.disabled) { + if (taskRunnerConfig.enabled) { const { TaskRunnerModule } = await import('@/runners/task-runner-module'); const taskRunnerModule = Container.get(TaskRunnerModule); await taskRunnerModule.start(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 730c6f6e80..0291a9e416 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -113,7 +113,7 @@ export class Worker extends BaseCommand { ); const { taskRunners: taskRunnerConfig } = this.globalConfig; - if (!taskRunnerConfig.disabled) { + if (taskRunnerConfig.enabled) { const { TaskRunnerModule } = await import('@/runners/task-runner-module'); const taskRunnerModule = Container.get(TaskRunnerModule); await taskRunnerModule.start(); diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index eb04e3ab8e..fbab9ee1e3 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -22,7 +22,7 @@ require('child_process').spawn = spawnMock; describe('TaskRunnerProcess', () => { const logger = mockInstance(Logger); const runnerConfig = mockInstance(TaskRunnersConfig); - runnerConfig.disabled = false; + runnerConfig.enabled = true; runnerConfig.mode = 'internal_childprocess'; const authService = mock(); let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index 8fcfe968d3..b373d3051e 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -5,18 +5,6 @@ import type WebSocket from 'ws'; import type { TaskRunner } from './task-broker.service'; import type { AuthlessRequest } from '../requests'; -/** - * Specifies what data should be included for a task data request. - */ -export interface TaskDataRequestParams { - dataOfNodes: string[] | 'all'; - prevNode: boolean; - /** Whether input data for the node should be included */ - input: boolean; - /** Whether env provider's state should be included */ - env: boolean; -} - export interface DisconnectAnalyzer { determineDisconnectReason(runnerId: TaskRunner['id']): Promise; } diff --git a/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts b/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts index ea1492f64e..b8868983ed 100644 --- a/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts +++ b/packages/cli/src/runners/task-managers/__tests__/data-request-response-builder.test.ts @@ -1,42 +1,10 @@ -import type { TaskData } from '@n8n/task-runner'; +import type { PartialAdditionalData, TaskData } from '@n8n/task-runner'; import { mock } from 'jest-mock-extended'; -import type { IExecuteFunctions, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; -import { type INode, type INodeExecutionData, type Workflow } from 'n8n-workflow'; +import type { Workflow } from 'n8n-workflow'; import { DataRequestResponseBuilder } from '../data-request-response-builder'; -const triggerNode: INode = mock({ - name: 'Trigger', -}); -const debugHelperNode: INode = mock({ - name: 'DebugHelper', -}); -const codeNode: INode = mock({ - name: 'Code', -}); -const workflow: TaskData['workflow'] = mock(); -const debugHelperNodeOutItems: INodeExecutionData[] = [ - { - json: { - uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032', - email: 'Dan.Schmidt31@yahoo.com', - firstname: 'Toni', - lastname: 'Schuster', - password: 'Q!D6C2', - }, - pairedItem: { - item: 0, - }, - }, -]; -const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems; -const connectionInputData: TaskData['connectionInputData'] = codeNodeInputItems; -const envProviderState: TaskData['envProviderState'] = mock({ - env: {}, - isEnvAccessBlocked: false, - isProcessAvailable: true, -}); -const additionalData = mock({ +const additionalData = mock({ formWaitingBaseUrl: 'http://localhost:5678/form-waiting', instanceBaseUrl: 'http://localhost:5678/', restApiUrl: 'http://localhost:5678/rest', @@ -50,275 +18,57 @@ const additionalData = mock({ executionTimeoutTimestamp: undefined, restartExecutionId: undefined, }); -const executeFunctions = mock(); -/** - * Drawn with https://asciiflow.com/#/ - * Task data for an execution of the following WF: - * where ►► denotes the currently being executing node. - * ►► - * ┌───────────┐ ┌─────────────┐ ┌────────┐ - * │ Trigger ├──►│ DebugHelper ├───►│ Code │ - * └───────────┘ └─────────────┘ └────────┘ - */ -const taskData: TaskData = { - executeFunctions, - workflow, - connectionInputData, - inputData: { - main: [codeNodeInputItems], - }, - itemIndex: 0, - activeNodeName: codeNode.name, - contextNodeName: codeNode.name, - defaultReturnRunIndex: -1, - mode: 'manual', - envProviderState, - node: codeNode, - runExecutionData: { - startData: { - destinationNode: codeNode.name, - runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name], - }, - resultData: { - runData: { - [triggerNode.name]: [ - { - hints: [], - startTime: 1730313407328, - executionTime: 1, - source: [], - executionStatus: 'success', - data: { - main: [[]], - }, - }, - ], - [debugHelperNode.name]: [ - { - hints: [], - startTime: 1730313407330, - executionTime: 1, - source: [ - { - previousNode: triggerNode.name, - }, - ], - executionStatus: 'success', - data: { - main: [debugHelperNodeOutItems], - }, - }, - ], - }, - pinData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack: [], - metadata: {}, - waitingExecution: { - [codeNode.name]: { - '0': { - main: [codeNodeInputItems], - }, - }, - }, - waitingExecutionSource: { - [codeNode.name]: { - '0': { - main: [ - { - previousNode: debugHelperNode.name, - }, - ], - }, - }, - }, - }, - }, - runIndex: 0, - selfData: {}, - siblingParameters: {}, - executeData: { - node: codeNode, - data: { - main: [codeNodeInputItems], - }, - source: { - main: [ - { - previousNode: debugHelperNode.name, - previousNodeOutput: 0, - }, - ], - }, - }, +const workflow: TaskData['workflow'] = mock({ + id: '1', + name: 'Test Workflow', + active: true, + connectionsBySourceNode: {}, + nodes: {}, + pinData: {}, + settings: {}, + staticData: {}, +}); + +const taskData = mock({ additionalData, -} as const; + workflow, +}); describe('DataRequestResponseBuilder', () => { - const allDataParam: DataRequestResponseBuilder['requestParams'] = { - dataOfNodes: 'all', - env: true, - input: true, - prevNode: true, - }; + const builder = new DataRequestResponseBuilder(); - const newRequestParam = (opts: Partial) => ({ - ...allDataParam, - ...opts, - }); + it('picks only specific properties for additional data', () => { + const result = builder.buildFromTaskData(taskData); - describe('all data', () => { - it('should build the runExecutionData as is when everything is requested', () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); - - const { runExecutionData } = dataRequestResponseBuilder.build(); - - expect(runExecutionData).toStrictEqual(taskData.runExecutionData); + expect(result.additionalData).toStrictEqual({ + formWaitingBaseUrl: 'http://localhost:5678/form-waiting', + instanceBaseUrl: 'http://localhost:5678/', + restApiUrl: 'http://localhost:5678/rest', + variables: additionalData.variables, + webhookBaseUrl: 'http://localhost:5678/webhook', + webhookTestBaseUrl: 'http://localhost:5678/webhook-test', + webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', + executionId: '45844', + userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', + currentNodeParameters: undefined, + executionTimeoutTimestamp: undefined, + restartExecutionId: undefined, }); }); - describe('envProviderState', () => { - it("should filter out envProviderState when it's not requested", () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder( - taskData, - newRequestParam({ - env: false, - }), - ); + it('picks only specific properties for workflow', () => { + const result = builder.buildFromTaskData(taskData); - const result = dataRequestResponseBuilder.build(); - - expect(result.envProviderState).toStrictEqual({ - env: {}, - isEnvAccessBlocked: false, - isProcessAvailable: true, - }); - }); - }); - - describe('additionalData', () => { - it('picks only specific properties for additional data', () => { - const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); - - const result = dataRequestResponseBuilder.build(); - - expect(result.additionalData).toStrictEqual({ - formWaitingBaseUrl: 'http://localhost:5678/form-waiting', - instanceBaseUrl: 'http://localhost:5678/', - restApiUrl: 'http://localhost:5678/rest', - webhookBaseUrl: 'http://localhost:5678/webhook', - webhookTestBaseUrl: 'http://localhost:5678/webhook-test', - webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', - executionId: '45844', - userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', - currentNodeParameters: undefined, - executionTimeoutTimestamp: undefined, - restartExecutionId: undefined, - variables: additionalData.variables, - }); - }); - }); - - describe('input data', () => { - const allExceptInputParam = newRequestParam({ - input: false, - }); - - it('drops input data from executeData', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.executeData).toStrictEqual({ - node: taskData.executeData!.node, - source: taskData.executeData!.source, - data: {}, - }); - }); - - it('drops input data from result', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.inputData).toStrictEqual({}); - }); - - it('drops input data from result', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.inputData).toStrictEqual({}); - }); - - it('drops input data from connectionInputData', () => { - const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); - - expect(result.connectionInputData).toStrictEqual([]); - }); - }); - - describe('nodes', () => { - it('should return empty run data when only Code node is requested', () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: ['Code'], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({}); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it('should return empty run data when only Code node is requested', () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({}); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it("should return only DebugHelper's data when only DebugHelper node is requested", () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({ - [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], - }); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); - }); - - it("should return DebugHelper's data when only prevNode node is requested", () => { - const result = new DataRequestResponseBuilder( - taskData, - newRequestParam({ dataOfNodes: [], prevNode: true }), - ).build(); - - expect(result.runExecutionData.resultData.runData).toStrictEqual({ - [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], - }); - expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); - // executionData & startData contain only metadata --> returned as is - expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); - expect(result.runExecutionData.executionData).toStrictEqual( - taskData.runExecutionData.executionData, - ); + expect(result.workflow).toStrictEqual({ + id: '1', + name: 'Test Workflow', + active: true, + connections: workflow.connectionsBySourceNode, + nodes: [], + pinData: workflow.pinData, + settings: workflow.settings, + staticData: workflow.staticData, }); }); }); diff --git a/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts new file mode 100644 index 0000000000..a37b9bdc7a --- /dev/null +++ b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts @@ -0,0 +1,300 @@ +import type { DataRequestResponse, TaskDataRequestParams } from '@n8n/task-runner'; +import { mock } from 'jest-mock-extended'; +import type { IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import { type INode, type INodeExecutionData } from 'n8n-workflow'; + +import { DataRequestResponseStripper } from '../data-request-response-stripper'; + +const triggerNode: INode = mock({ + name: 'Trigger', +}); +const debugHelperNode: INode = mock({ + name: 'DebugHelper', +}); +const codeNode: INode = mock({ + name: 'Code', +}); +const workflow: DataRequestResponse['workflow'] = mock(); +const debugHelperNodeOutItems: INodeExecutionData[] = [ + { + json: { + uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032', + email: 'Dan.Schmidt31@yahoo.com', + firstname: 'Toni', + lastname: 'Schuster', + password: 'Q!D6C2', + }, + pairedItem: { + item: 0, + }, + }, +]; +const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems; +const envProviderState: DataRequestResponse['envProviderState'] = mock< + DataRequestResponse['envProviderState'] +>({ + env: {}, + isEnvAccessBlocked: false, + isProcessAvailable: true, +}); +const additionalData = mock({ + formWaitingBaseUrl: 'http://localhost:5678/form-waiting', + instanceBaseUrl: 'http://localhost:5678/', + restApiUrl: 'http://localhost:5678/rest', + variables: {}, + webhookBaseUrl: 'http://localhost:5678/webhook', + webhookTestBaseUrl: 'http://localhost:5678/webhook-test', + webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', + executionId: '45844', + userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5', + currentNodeParameters: undefined, + executionTimeoutTimestamp: undefined, + restartExecutionId: undefined, +}); + +/** + * Drawn with https://asciiflow.com/#/ + * Task data for an execution of the following WF: + * where ►► denotes the currently being executing node. + * ►► + * ┌───────────┐ ┌─────────────┐ ┌────────┐ + * │ Trigger ├──►│ DebugHelper ├───►│ Code │ + * └───────────┘ └─────────────┘ └────────┘ + */ +const taskData: DataRequestResponse = { + workflow, + inputData: { + main: [codeNodeInputItems], + }, + itemIndex: 0, + activeNodeName: codeNode.name, + contextNodeName: codeNode.name, + defaultReturnRunIndex: -1, + mode: 'manual', + envProviderState, + node: codeNode, + runExecutionData: { + startData: { + destinationNode: codeNode.name, + runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name], + }, + resultData: { + runData: { + [triggerNode.name]: [ + { + hints: [], + startTime: 1730313407328, + executionTime: 1, + source: [], + executionStatus: 'success', + data: { + main: [[]], + }, + }, + ], + [debugHelperNode.name]: [ + { + hints: [], + startTime: 1730313407330, + executionTime: 1, + source: [ + { + previousNode: triggerNode.name, + }, + ], + executionStatus: 'success', + data: { + main: [debugHelperNodeOutItems], + }, + }, + ], + }, + pinData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: { + [codeNode.name]: { + '0': { + main: [codeNodeInputItems], + }, + }, + }, + waitingExecutionSource: { + [codeNode.name]: { + '0': { + main: [ + { + previousNode: debugHelperNode.name, + }, + ], + }, + }, + }, + }, + }, + runIndex: 0, + selfData: {}, + siblingParameters: {}, + connectionInputSource: { + main: [ + { + previousNode: debugHelperNode.name, + previousNodeOutput: 0, + }, + ], + }, + additionalData, +} as const; + +describe('DataRequestResponseStripper', () => { + const allDataParam: TaskDataRequestParams = { + dataOfNodes: 'all', + env: true, + input: true, + prevNode: true, + }; + + const newRequestParam = (opts: Partial) => ({ + ...allDataParam, + ...opts, + }); + + describe('all data', () => { + it('should build the runExecutionData as is when everything is requested', () => { + const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam); + + const { runExecutionData } = dataRequestResponseBuilder.strip(); + + expect(runExecutionData).toStrictEqual(taskData.runExecutionData); + }); + }); + + describe('envProviderState', () => { + it("should filter out envProviderState when it's not requested", () => { + const dataRequestResponseBuilder = new DataRequestResponseStripper( + taskData, + newRequestParam({ + env: false, + }), + ); + + const result = dataRequestResponseBuilder.strip(); + + expect(result.envProviderState).toStrictEqual({ + env: {}, + isEnvAccessBlocked: false, + isProcessAvailable: true, + }); + }); + }); + + describe('input data', () => { + const allExceptInputParam = newRequestParam({ + input: false, + }); + + it('drops input data from result', () => { + const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); + + expect(result.inputData).toStrictEqual({}); + }); + + it('drops input data from result', () => { + const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); + + expect(result.inputData).toStrictEqual({}); + }); + }); + + describe('nodes', () => { + it('should return empty run data when only Code node is requested', () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: ['Code'], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({}); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it('should return empty run data when only Code node is requested', () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({}); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it("should return only DebugHelper's data when only DebugHelper node is requested", () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({ + [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], + }); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + + it("should return DebugHelper's data when only prevNode node is requested", () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ dataOfNodes: [], prevNode: true }), + ).strip(); + + expect(result.runExecutionData.resultData.runData).toStrictEqual({ + [debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name], + }); + expect(result.runExecutionData.resultData.pinData).toStrictEqual({}); + // executionData & startData contain only metadata --> returned as is + expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData); + expect(result.runExecutionData.executionData).toStrictEqual( + taskData.runExecutionData.executionData, + ); + }); + }); + + describe('passthrough properties', () => { + test.each>([ + ['workflow'], + ['connectionInputSource'], + ['node'], + ['runIndex'], + ['itemIndex'], + ['activeNodeName'], + ['siblingParameters'], + ['mode'], + ['defaultReturnRunIndex'], + ['selfData'], + ['contextNodeName'], + ['additionalData'], + ])("it doesn't change %s", (propertyName) => { + const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam); + + const result = dataRequestResponseBuilder.strip(); + + expect(result[propertyName]).toBe(taskData[propertyName]); + }); + }); +}); diff --git a/packages/cli/src/runners/task-managers/data-request-response-builder.ts b/packages/cli/src/runners/task-managers/data-request-response-builder.ts index bc498c33a7..7df3b9e012 100644 --- a/packages/cli/src/runners/task-managers/data-request-response-builder.ts +++ b/packages/cli/src/runners/task-managers/data-request-response-builder.ts @@ -1,63 +1,30 @@ -import type { - DataRequestResponse, - BrokerMessage, - PartialAdditionalData, - TaskData, -} from '@n8n/task-runner'; -import type { - EnvProviderState, - IExecuteData, - INodeExecutionData, - IPinData, - IRunData, - IRunExecutionData, - ITaskDataConnections, - IWorkflowExecuteAdditionalData, - Workflow, - WorkflowParameters, -} from 'n8n-workflow'; +import type { DataRequestResponse, PartialAdditionalData, TaskData } from '@n8n/task-runner'; +import type { IWorkflowExecuteAdditionalData, Workflow, WorkflowParameters } from 'n8n-workflow'; /** - * Builds the response to a data request coming from a Task Runner. Tries to minimize - * the amount of data that is sent to the runner by only providing what is requested. + * Transforms TaskData to DataRequestResponse. The main purpose of the + * transformation is to make sure there is no duplication in the data + * (e.g. connectionInputData and executeData.data can be derived from + * inputData). */ export class DataRequestResponseBuilder { - private requestedNodeNames = new Set(); - - constructor( - private readonly taskData: TaskData, - private readonly requestParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'], - ) { - this.requestedNodeNames = new Set(requestParams.dataOfNodes); - - if (this.requestParams.prevNode && this.requestParams.dataOfNodes !== 'all') { - this.requestedNodeNames.add(this.determinePrevNodeName()); - } - } - - /** - * Builds a response to the data request - */ - build(): DataRequestResponse { - const { taskData: td } = this; - + buildFromTaskData(taskData: TaskData): DataRequestResponse { return { - workflow: this.buildWorkflow(td.workflow), - connectionInputData: this.buildConnectionInputData(td.connectionInputData), - inputData: this.buildInputData(td.inputData), - itemIndex: td.itemIndex, - activeNodeName: td.activeNodeName, - contextNodeName: td.contextNodeName, - defaultReturnRunIndex: td.defaultReturnRunIndex, - mode: td.mode, - envProviderState: this.buildEnvProviderState(td.envProviderState), - node: td.node, // The current node being executed - runExecutionData: this.buildRunExecutionData(td.runExecutionData), - runIndex: td.runIndex, - selfData: td.selfData, - siblingParameters: td.siblingParameters, - executeData: this.buildExecuteData(td.executeData), - additionalData: this.buildAdditionalData(td.additionalData), + workflow: this.buildWorkflow(taskData.workflow), + inputData: taskData.inputData, + connectionInputSource: taskData.executeData?.source ?? null, + itemIndex: taskData.itemIndex, + activeNodeName: taskData.activeNodeName, + contextNodeName: taskData.contextNodeName, + defaultReturnRunIndex: taskData.defaultReturnRunIndex, + mode: taskData.mode, + envProviderState: taskData.envProviderState, + node: taskData.node, + runExecutionData: taskData.runExecutionData, + runIndex: taskData.runIndex, + selfData: taskData.selfData, + siblingParameters: taskData.siblingParameters, + additionalData: this.buildAdditionalData(taskData.additionalData), }; } @@ -80,86 +47,6 @@ export class DataRequestResponseBuilder { }; } - private buildExecuteData(executeData: IExecuteData | undefined): IExecuteData | undefined { - if (executeData === undefined) { - return undefined; - } - - return { - node: executeData.node, // The current node being executed - data: this.requestParams.input ? executeData.data : {}, - source: executeData.source, - }; - } - - private buildRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData { - if (this.requestParams.dataOfNodes === 'all') { - return runExecutionData; - } - - return { - startData: runExecutionData.startData, - resultData: { - error: runExecutionData.resultData.error, - lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted, - metadata: runExecutionData.resultData.metadata, - runData: this.buildRunData(runExecutionData.resultData.runData), - pinData: this.buildPinData(runExecutionData.resultData.pinData), - }, - executionData: runExecutionData.executionData - ? { - // TODO: Figure out what these two are and can they be filtered - contextData: runExecutionData.executionData?.contextData, - nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack, - - metadata: runExecutionData.executionData.metadata, - waitingExecution: runExecutionData.executionData.waitingExecution, - waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource, - } - : undefined, - }; - } - - private buildRunData(runData: IRunData): IRunData { - return this.filterObjectByNodeNames(runData); - } - - private buildPinData(pinData: IPinData | undefined): IPinData | undefined { - return pinData ? this.filterObjectByNodeNames(pinData) : undefined; - } - - private buildEnvProviderState(envProviderState: EnvProviderState): EnvProviderState { - if (this.requestParams.env) { - // In case `isEnvAccessBlocked` = true, the provider state has already sanitized - // the environment variables and we can return it as is. - return envProviderState; - } - - return { - env: {}, - isEnvAccessBlocked: envProviderState.isEnvAccessBlocked, - isProcessAvailable: envProviderState.isProcessAvailable, - }; - } - - private buildInputData(inputData: ITaskDataConnections): ITaskDataConnections { - if (this.requestParams.input) { - return inputData; - } - - return {}; - } - - private buildConnectionInputData( - connectionInputData: INodeExecutionData[], - ): INodeExecutionData[] { - if (this.requestParams.input) { - return connectionInputData; - } - - return []; - } - private buildWorkflow(workflow: Workflow): Omit { return { id: workflow.id, @@ -172,37 +59,4 @@ export class DataRequestResponseBuilder { staticData: workflow.staticData, }; } - - /** - * Assuming the given `obj` is an object where the keys are node names, - * filters the object to only include the node names that are requested. - */ - private filterObjectByNodeNames>(obj: T): T { - if (this.requestParams.dataOfNodes === 'all') { - return obj; - } - - const filteredObj: T = {} as T; - - for (const nodeName in obj) { - if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) { - continue; - } - - if (this.requestedNodeNames.has(nodeName)) { - filteredObj[nodeName] = obj[nodeName]; - } - } - - return filteredObj; - } - - private determinePrevNodeName(): string { - const sourceData = this.taskData.executeData?.source?.main?.[0]; - if (!sourceData) { - return ''; - } - - return sourceData.previousNode; - } } diff --git a/packages/cli/src/runners/task-managers/data-request-response-stripper.ts b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts new file mode 100644 index 0000000000..b924a87c5f --- /dev/null +++ b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts @@ -0,0 +1,131 @@ +import type { DataRequestResponse, BrokerMessage } from '@n8n/task-runner'; +import type { + EnvProviderState, + IPinData, + IRunData, + IRunExecutionData, + ITaskDataConnections, +} from 'n8n-workflow'; + +/** + * Strips data from data request response based on the specified parameters + */ +export class DataRequestResponseStripper { + private requestedNodeNames = new Set(); + + constructor( + private readonly dataResponse: DataRequestResponse, + private readonly stripParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'], + ) { + this.requestedNodeNames = new Set(stripParams.dataOfNodes); + + if (this.stripParams.prevNode && this.stripParams.dataOfNodes !== 'all') { + this.requestedNodeNames.add(this.determinePrevNodeName()); + } + } + + /** + * Builds a response to the data request + */ + strip(): DataRequestResponse { + const { dataResponse: dr } = this; + + return { + ...dr, + inputData: this.stripInputData(dr.inputData), + envProviderState: this.stripEnvProviderState(dr.envProviderState), + runExecutionData: this.stripRunExecutionData(dr.runExecutionData), + }; + } + + private stripRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData { + if (this.stripParams.dataOfNodes === 'all') { + return runExecutionData; + } + + return { + startData: runExecutionData.startData, + resultData: { + error: runExecutionData.resultData.error, + lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted, + metadata: runExecutionData.resultData.metadata, + runData: this.stripRunData(runExecutionData.resultData.runData), + pinData: this.stripPinData(runExecutionData.resultData.pinData), + }, + executionData: runExecutionData.executionData + ? { + // TODO: Figure out what these two are and can they be stripped + contextData: runExecutionData.executionData?.contextData, + nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack, + + metadata: runExecutionData.executionData.metadata, + waitingExecution: runExecutionData.executionData.waitingExecution, + waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource, + } + : undefined, + }; + } + + private stripRunData(runData: IRunData): IRunData { + return this.filterObjectByNodeNames(runData); + } + + private stripPinData(pinData: IPinData | undefined): IPinData | undefined { + return pinData ? this.filterObjectByNodeNames(pinData) : undefined; + } + + private stripEnvProviderState(envProviderState: EnvProviderState): EnvProviderState { + if (this.stripParams.env) { + // In case `isEnvAccessBlocked` = true, the provider state has already sanitized + // the environment variables and we can return it as is. + return envProviderState; + } + + return { + env: {}, + isEnvAccessBlocked: envProviderState.isEnvAccessBlocked, + isProcessAvailable: envProviderState.isProcessAvailable, + }; + } + + private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections { + if (this.stripParams.input) { + return inputData; + } + + return {}; + } + + /** + * Assuming the given `obj` is an object where the keys are node names, + * filters the object to only include the node names that are requested. + */ + private filterObjectByNodeNames>(obj: T): T { + if (this.stripParams.dataOfNodes === 'all') { + return obj; + } + + const filteredObj: T = {} as T; + + for (const nodeName in obj) { + if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) { + continue; + } + + if (this.requestedNodeNames.has(nodeName)) { + filteredObj[nodeName] = obj[nodeName]; + } + } + + return filteredObj; + } + + private determinePrevNodeName(): string { + const sourceData = this.dataResponse.connectionInputSource?.main?.[0]; + if (!sourceData) { + return ''; + } + + return sourceData.previousNode; + } +} diff --git a/packages/cli/src/runners/task-managers/task-manager.ts b/packages/cli/src/runners/task-managers/task-manager.ts index 21d4e8eb9d..ffc86cdf62 100644 --- a/packages/cli/src/runners/task-managers/task-manager.ts +++ b/packages/cli/src/runners/task-managers/task-manager.ts @@ -1,5 +1,6 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; -import { RPC_ALLOW_LIST } from '@n8n/task-runner'; +import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner'; import type { EnvProviderState, IExecuteFunctions, @@ -17,11 +18,13 @@ import type { } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import { Service } from 'typedi'; +import * as a from 'node:assert/strict'; +import Container, { Service } from 'typedi'; import { NodeTypes } from '@/node-types'; import { DataRequestResponseBuilder } from './data-request-response-builder'; +import { DataRequestResponseStripper } from './data-request-response-stripper'; export type RequestAccept = (jobId: string) => void; export type RequestReject = (reason: string) => void; @@ -56,6 +59,10 @@ export abstract class TaskManager { tasks: Map = new Map(); + private readonly runnerConfig = Container.get(TaskRunnersConfig); + + private readonly dataResponseBuilder = new DataRequestResponseBuilder(); + constructor(private readonly nodeTypes: NodeTypes) {} async startTask( @@ -237,14 +244,30 @@ export abstract class TaskManager { return; } - const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams); - const requestedData = dataRequestResponseBuilder.build(); + const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data); + + if (this.runnerConfig.assertDeduplicationOutput) { + const reconstruct = new DataRequestResponseReconstruct(); + a.deepStrictEqual( + reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData), + job.data.connectionInputData, + ); + a.deepStrictEqual( + reconstruct.reconstructExecuteData(dataRequestResponse), + job.data.executeData, + ); + } + + const strippedData = new DataRequestResponseStripper( + dataRequestResponse, + requestParams, + ).strip(); this.sendMessage({ type: 'requester:taskdataresponse', taskId, requestId, - data: requestedData, + data: strippedData, }); } diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts index 175335f06f..10086a521c 100644 --- a/packages/cli/src/runners/task-runner-module.ts +++ b/packages/cli/src/runners/task-runner-module.ts @@ -43,7 +43,7 @@ export class TaskRunnerModule { } async start() { - a.ok(!this.runnerConfig.disabled, 'Task runner is disabled'); + a.ok(this.runnerConfig.enabled, 'Task runner is disabled'); await this.loadTaskManager(); await this.loadTaskRunnerServer(); diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index ce3280aa48..e17a8d2279 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -26,7 +26,7 @@ import { mockInstance } from '../../shared/mocking'; config.set('executions.mode', 'queue'); config.set('binaryDataManager.availableModes', 'filesystem'); -Container.get(TaskRunnersConfig).disabled = false; +Container.get(TaskRunnersConfig).enabled = true; mockInstance(LoadNodesAndCredentials); const binaryDataService = mockInstance(BinaryDataService); const externalHooks = mockInstance(ExternalHooks); diff --git a/packages/cli/test/integration/runners/task-runner-module.external.test.ts b/packages/cli/test/integration/runners/task-runner-module.external.test.ts index e8a7e54f1a..4974abfb39 100644 --- a/packages/cli/test/integration/runners/task-runner-module.external.test.ts +++ b/packages/cli/test/integration/runners/task-runner-module.external.test.ts @@ -18,14 +18,14 @@ describe('TaskRunnerModule in external mode', () => { describe('start', () => { it('should throw if the task runner is disabled', async () => { - runnerConfig.disabled = true; + runnerConfig.enabled = false; // Act await expect(module.start()).rejects.toThrow('Task runner is disabled'); }); it('should start the task runner', async () => { - runnerConfig.disabled = false; + runnerConfig.enabled = true; // Act await module.start(); diff --git a/packages/cli/test/integration/runners/task-runner-module.internal.test.ts b/packages/cli/test/integration/runners/task-runner-module.internal.test.ts index 922f7fee4b..444d576e87 100644 --- a/packages/cli/test/integration/runners/task-runner-module.internal.test.ts +++ b/packages/cli/test/integration/runners/task-runner-module.internal.test.ts @@ -18,14 +18,14 @@ describe('TaskRunnerModule in internal_childprocess mode', () => { describe('start', () => { it('should throw if the task runner is disabled', async () => { - runnerConfig.disabled = true; + runnerConfig.enabled = false; // Act await expect(module.start()).rejects.toThrow('Task runner is disabled'); }); it('should start the task runner', async () => { - runnerConfig.disabled = false; + runnerConfig.enabled = true; // Act await module.start(); diff --git a/packages/cli/test/integration/runners/task-runner-process.test.ts b/packages/cli/test/integration/runners/task-runner-process.test.ts index 219fbc8813..8c5289a5c7 100644 --- a/packages/cli/test/integration/runners/task-runner-process.test.ts +++ b/packages/cli/test/integration/runners/task-runner-process.test.ts @@ -10,7 +10,7 @@ import { retryUntil } from '@test-integration/retry-until'; describe('TaskRunnerProcess', () => { const authToken = 'token'; const runnerConfig = Container.get(TaskRunnersConfig); - runnerConfig.disabled = false; + runnerConfig.enabled = true; runnerConfig.mode = 'internal_childprocess'; runnerConfig.authToken = authToken; runnerConfig.port = 0; // Use any port diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 383624b569..e645309644 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -108,6 +108,7 @@ import type { AiEvent, ISupplyDataFunctions, WebhookType, + SchedulingFunctions, } from 'n8n-workflow'; import { NodeConnectionType, @@ -172,6 +173,7 @@ import { TriggerContext, WebhookContext, } from './node-execution-context'; +import { ScheduledTaskManager } from './ScheduledTaskManager'; import { getSecretsProxy } from './Secrets'; import { SSHClientsManager } from './SSHClientsManager'; @@ -3023,7 +3025,7 @@ const executionCancellationFunctions = ( }, }); -const getRequestHelperFunctions = ( +export const getRequestHelperFunctions = ( workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, @@ -3343,11 +3345,19 @@ const getRequestHelperFunctions = ( }; }; -const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({ +export const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({ getSSHClient: async (credentials) => await Container.get(SSHClientsManager).getClient(credentials), }); +export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => { + const scheduledTaskManager = Container.get(ScheduledTaskManager); + return { + registerCron: (cronExpression, onTick) => + scheduledTaskManager.registerCron(workflow, cronExpression, onTick), + }; +}; + const getAllowedPaths = () => { const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO]; if (!restrictFileAccessTo) { @@ -3414,7 +3424,7 @@ export function isFilePathBlocked(filePath: string): boolean { return false; } -const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({ +export const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({ async createReadStream(filePath) { try { await fsAccess(filePath); @@ -3450,7 +3460,7 @@ const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => }, }); -const getNodeHelperFunctions = ( +export const getNodeHelperFunctions = ( { executionId }: IWorkflowExecuteAdditionalData, workflowId: string, ): NodeHelperFunctions => ({ @@ -3458,7 +3468,7 @@ const getNodeHelperFunctions = ( await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType), }); -const getBinaryHelperFunctions = ( +export const getBinaryHelperFunctions = ( { executionId }: IWorkflowExecuteAdditionalData, workflowId: string, ): BinaryHelperFunctions => ({ @@ -3476,7 +3486,7 @@ const getBinaryHelperFunctions = ( }, }); -const getCheckProcessedHelperFunctions = ( +export const getCheckProcessedHelperFunctions = ( workflow: Workflow, node: INode, ): DeduplicationHelperFunctions => ({ diff --git a/packages/core/src/node-execution-context/execute-single-context.ts b/packages/core/src/node-execution-context/execute-single-context.ts index 6d8ef2a083..2b03a81974 100644 --- a/packages/core/src/node-execution-context/execute-single-context.ts +++ b/packages/core/src/node-execution-context/execute-single-context.ts @@ -27,13 +27,13 @@ import { continueOnFail, getAdditionalKeys, getBinaryDataBuffer, + getBinaryHelperFunctions, getCredentials, getNodeParameter, + getRequestHelperFunctions, returnJsonArray, } from '@/NodeExecuteFunctions'; -import { BinaryHelpers } from './helpers/binary-helpers'; -import { RequestHelpers } from './helpers/request-helpers'; import { NodeExecutionContext } from './node-execution-context'; export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions { @@ -57,8 +57,14 @@ export class ExecuteSingleContext extends NodeExecutionContext implements IExecu this.helpers = { createDeferredPromise, returnJsonArray, - ...new BinaryHelpers(workflow, additionalData).exported, - ...new RequestHelpers(this, workflow, node, additionalData).exported, + ...getRequestHelperFunctions( + workflow, + node, + additionalData, + runExecutionData, + connectionInputData, + ), + ...getBinaryHelperFunctions(additionalData, workflow.id), assertBinaryData: (propertyName, inputIndex = 0) => assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), diff --git a/packages/core/src/node-execution-context/helpers/__tests__/binary-helpers.test.ts b/packages/core/src/node-execution-context/helpers/__tests__/binary-helpers.test.ts deleted file mode 100644 index 302713954f..0000000000 --- a/packages/core/src/node-execution-context/helpers/__tests__/binary-helpers.test.ts +++ /dev/null @@ -1,136 +0,0 @@ -import FileType from 'file-type'; -import { IncomingMessage, type ClientRequest } from 'http'; -import { mock } from 'jest-mock-extended'; -import type { Workflow, IWorkflowExecuteAdditionalData, IBinaryData } from 'n8n-workflow'; -import type { Socket } from 'net'; -import { Container } from 'typedi'; - -import { BinaryDataService } from '@/BinaryData/BinaryData.service'; - -import { BinaryHelpers } from '../binary-helpers'; - -jest.mock('file-type'); - -describe('BinaryHelpers', () => { - let binaryDataService = mock(); - Container.set(BinaryDataService, binaryDataService); - const workflow = mock({ id: '123' }); - const additionalData = mock({ executionId: '456' }); - const binaryHelpers = new BinaryHelpers(workflow, additionalData); - - beforeEach(() => { - jest.clearAllMocks(); - - binaryDataService.store.mockImplementation( - async (_workflowId, _executionId, _buffer, value) => value, - ); - }); - - describe('getBinaryPath', () => { - it('should call getPath method of BinaryDataService', () => { - binaryHelpers.getBinaryPath('mock-binary-data-id'); - expect(binaryDataService.getPath).toHaveBeenCalledWith('mock-binary-data-id'); - }); - }); - - describe('getBinaryMetadata', () => { - it('should call getMetadata method of BinaryDataService', async () => { - await binaryHelpers.getBinaryMetadata('mock-binary-data-id'); - expect(binaryDataService.getMetadata).toHaveBeenCalledWith('mock-binary-data-id'); - }); - }); - - describe('getBinaryStream', () => { - it('should call getStream method of BinaryDataService', async () => { - await binaryHelpers.getBinaryStream('mock-binary-data-id'); - expect(binaryDataService.getAsStream).toHaveBeenCalledWith('mock-binary-data-id', undefined); - }); - }); - - describe('prepareBinaryData', () => { - it('should guess the mime type and file extension if not provided', async () => { - const buffer = Buffer.from('test'); - const fileTypeData = { mime: 'application/pdf', ext: 'pdf' }; - (FileType.fromBuffer as jest.Mock).mockResolvedValue(fileTypeData); - - const binaryData = await binaryHelpers.prepareBinaryData(buffer); - - expect(binaryData.mimeType).toEqual('application/pdf'); - expect(binaryData.fileExtension).toEqual('pdf'); - expect(binaryData.fileType).toEqual('pdf'); - expect(binaryData.fileName).toBeUndefined(); - expect(binaryData.directory).toBeUndefined(); - expect(binaryDataService.store).toHaveBeenCalledWith( - workflow.id, - additionalData.executionId!, - buffer, - binaryData, - ); - }); - - it('should use the provided mime type and file extension if provided', async () => { - const buffer = Buffer.from('test'); - const mimeType = 'application/octet-stream'; - - const binaryData = await binaryHelpers.prepareBinaryData(buffer, undefined, mimeType); - - expect(binaryData.mimeType).toEqual(mimeType); - expect(binaryData.fileExtension).toEqual('bin'); - expect(binaryData.fileType).toBeUndefined(); - expect(binaryData.fileName).toBeUndefined(); - expect(binaryData.directory).toBeUndefined(); - expect(binaryDataService.store).toHaveBeenCalledWith( - workflow.id, - additionalData.executionId!, - buffer, - binaryData, - ); - }); - - const mockSocket = mock({ readableHighWaterMark: 0 }); - - it('should use the contentDisposition.filename, responseUrl, and contentType properties to set the fileName, directory, and mimeType properties of the binaryData object', async () => { - const incomingMessage = new IncomingMessage(mockSocket); - incomingMessage.contentDisposition = { filename: 'test.txt', type: 'attachment' }; - incomingMessage.contentType = 'text/plain'; - incomingMessage.responseUrl = 'https://example.com/test.txt'; - - const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage); - - expect(binaryData.fileName).toEqual('test.txt'); - expect(binaryData.fileType).toEqual('text'); - expect(binaryData.directory).toBeUndefined(); - expect(binaryData.mimeType).toEqual('text/plain'); - expect(binaryData.fileExtension).toEqual('txt'); - }); - - it('should use the req.path property to set the fileName property of the binaryData object if contentDisposition.filename and responseUrl are not provided', async () => { - const incomingMessage = new IncomingMessage(mockSocket); - incomingMessage.contentType = 'text/plain'; - incomingMessage.req = mock({ path: '/test.txt' }); - - const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage); - - expect(binaryData.fileName).toEqual('test.txt'); - expect(binaryData.directory).toBeUndefined(); - expect(binaryData.mimeType).toEqual('text/plain'); - expect(binaryData.fileExtension).toEqual('txt'); - }); - }); - - describe('setBinaryDataBuffer', () => { - it('should call store method of BinaryDataService', async () => { - const binaryData = mock(); - const bufferOrStream = mock(); - - await binaryHelpers.setBinaryDataBuffer(binaryData, bufferOrStream); - - expect(binaryDataService.store).toHaveBeenCalledWith( - workflow.id, - additionalData.executionId, - bufferOrStream, - binaryData, - ); - }); - }); -}); diff --git a/packages/core/src/node-execution-context/helpers/__tests__/scheduling-helpers.test.ts b/packages/core/src/node-execution-context/helpers/__tests__/scheduling-helpers.test.ts deleted file mode 100644 index 06abae8204..0000000000 --- a/packages/core/src/node-execution-context/helpers/__tests__/scheduling-helpers.test.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { mock } from 'jest-mock-extended'; -import type { Workflow } from 'n8n-workflow'; -import { Container } from 'typedi'; - -import { ScheduledTaskManager } from '@/ScheduledTaskManager'; - -import { SchedulingHelpers } from '../scheduling-helpers'; - -describe('SchedulingHelpers', () => { - const scheduledTaskManager = mock(); - Container.set(ScheduledTaskManager, scheduledTaskManager); - const workflow = mock(); - const schedulingHelpers = new SchedulingHelpers(workflow); - - beforeEach(() => { - jest.clearAllMocks(); - }); - - describe('registerCron', () => { - it('should call registerCron method of ScheduledTaskManager', () => { - const cronExpression = '* * * * * *'; - const onTick = jest.fn(); - - schedulingHelpers.registerCron(cronExpression, onTick); - - expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith( - workflow, - cronExpression, - onTick, - ); - }); - }); -}); diff --git a/packages/core/src/node-execution-context/helpers/__tests__/ssh-tunnel-helpers.test.ts b/packages/core/src/node-execution-context/helpers/__tests__/ssh-tunnel-helpers.test.ts deleted file mode 100644 index cbe6916eea..0000000000 --- a/packages/core/src/node-execution-context/helpers/__tests__/ssh-tunnel-helpers.test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { mock } from 'jest-mock-extended'; -import type { SSHCredentials } from 'n8n-workflow'; -import type { Client } from 'ssh2'; -import { Container } from 'typedi'; - -import { SSHClientsManager } from '@/SSHClientsManager'; - -import { SSHTunnelHelpers } from '../ssh-tunnel-helpers'; - -describe('SSHTunnelHelpers', () => { - const sshClientsManager = mock(); - Container.set(SSHClientsManager, sshClientsManager); - const sshTunnelHelpers = new SSHTunnelHelpers(); - - beforeEach(() => { - jest.clearAllMocks(); - }); - - describe('getSSHClient', () => { - const credentials = mock(); - - it('should call SSHClientsManager.getClient with the given credentials', async () => { - const mockClient = mock(); - sshClientsManager.getClient.mockResolvedValue(mockClient); - - const client = await sshTunnelHelpers.getSSHClient(credentials); - - expect(sshClientsManager.getClient).toHaveBeenCalledWith(credentials); - expect(client).toBe(mockClient); - }); - }); -}); diff --git a/packages/core/src/node-execution-context/helpers/binary-helpers.ts b/packages/core/src/node-execution-context/helpers/binary-helpers.ts deleted file mode 100644 index a15c59139b..0000000000 --- a/packages/core/src/node-execution-context/helpers/binary-helpers.ts +++ /dev/null @@ -1,148 +0,0 @@ -import FileType from 'file-type'; -import { IncomingMessage } from 'http'; -import MimeTypes from 'mime-types'; -import { ApplicationError, fileTypeFromMimeType } from 'n8n-workflow'; -import type { - BinaryHelperFunctions, - IWorkflowExecuteAdditionalData, - Workflow, - IBinaryData, -} from 'n8n-workflow'; -import path from 'path'; -import type { Readable } from 'stream'; -import Container from 'typedi'; - -import { BinaryDataService } from '@/BinaryData/BinaryData.service'; -import { binaryToBuffer } from '@/BinaryData/utils'; -// eslint-disable-next-line import/no-cycle -import { binaryToString } from '@/NodeExecuteFunctions'; - -export class BinaryHelpers { - private readonly binaryDataService = Container.get(BinaryDataService); - - constructor( - private readonly workflow: Workflow, - private readonly additionalData: IWorkflowExecuteAdditionalData, - ) {} - - get exported(): BinaryHelperFunctions { - return { - getBinaryPath: this.getBinaryPath.bind(this), - getBinaryMetadata: this.getBinaryMetadata.bind(this), - getBinaryStream: this.getBinaryStream.bind(this), - binaryToBuffer, - binaryToString, - prepareBinaryData: this.prepareBinaryData.bind(this), - setBinaryDataBuffer: this.setBinaryDataBuffer.bind(this), - copyBinaryFile: this.copyBinaryFile.bind(this), - }; - } - - getBinaryPath(binaryDataId: string) { - return this.binaryDataService.getPath(binaryDataId); - } - - async getBinaryMetadata(binaryDataId: string) { - return await this.binaryDataService.getMetadata(binaryDataId); - } - - async getBinaryStream(binaryDataId: string, chunkSize?: number) { - return await this.binaryDataService.getAsStream(binaryDataId, chunkSize); - } - - // eslint-disable-next-line complexity - async prepareBinaryData(binaryData: Buffer | Readable, filePath?: string, mimeType?: string) { - let fileExtension: string | undefined; - if (binaryData instanceof IncomingMessage) { - if (!filePath) { - try { - const { responseUrl } = binaryData; - filePath = - binaryData.contentDisposition?.filename ?? - ((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1); - } catch {} - } - if (!mimeType) { - mimeType = binaryData.contentType; - } - } - - if (!mimeType) { - // If no mime type is given figure it out - - if (filePath) { - // Use file path to guess mime type - const mimeTypeLookup = MimeTypes.lookup(filePath); - if (mimeTypeLookup) { - mimeType = mimeTypeLookup; - } - } - - if (!mimeType) { - if (Buffer.isBuffer(binaryData)) { - // Use buffer to guess mime type - const fileTypeData = await FileType.fromBuffer(binaryData); - if (fileTypeData) { - mimeType = fileTypeData.mime; - fileExtension = fileTypeData.ext; - } - } else if (binaryData instanceof IncomingMessage) { - mimeType = binaryData.headers['content-type']; - } else { - // TODO: detect filetype from other kind of streams - } - } - } - - if (!fileExtension && mimeType) { - fileExtension = MimeTypes.extension(mimeType) || undefined; - } - - if (!mimeType) { - // Fall back to text - mimeType = 'text/plain'; - } - - const returnData: IBinaryData = { - mimeType, - fileType: fileTypeFromMimeType(mimeType), - fileExtension, - data: '', - }; - - if (filePath) { - if (filePath.includes('?')) { - // Remove maybe present query parameters - filePath = filePath.split('?').shift(); - } - - const filePathParts = path.parse(filePath as string); - - if (filePathParts.dir !== '') { - returnData.directory = filePathParts.dir; - } - returnData.fileName = filePathParts.base; - - // Remove the dot - const extractedFileExtension = filePathParts.ext.slice(1); - if (extractedFileExtension) { - returnData.fileExtension = extractedFileExtension; - } - } - - return await this.setBinaryDataBuffer(returnData, binaryData); - } - - async setBinaryDataBuffer(binaryData: IBinaryData, bufferOrStream: Buffer | Readable) { - return await this.binaryDataService.store( - this.workflow.id, - this.additionalData.executionId!, - bufferOrStream, - binaryData, - ); - } - - async copyBinaryFile(): Promise { - throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.'); - } -} diff --git a/packages/core/src/node-execution-context/helpers/request-helpers.ts b/packages/core/src/node-execution-context/helpers/request-helpers.ts deleted file mode 100644 index 2c5eb19290..0000000000 --- a/packages/core/src/node-execution-context/helpers/request-helpers.ts +++ /dev/null @@ -1,381 +0,0 @@ -import { createHash } from 'crypto'; -import { pick } from 'lodash'; -import { jsonParse, NodeOperationError, sleep } from 'n8n-workflow'; -import type { - RequestHelperFunctions, - IAdditionalCredentialOptions, - IAllExecuteFunctions, - IExecuteData, - IHttpRequestOptions, - IN8nHttpFullResponse, - IN8nHttpResponse, - INode, - INodeExecutionData, - IOAuth2Options, - IRequestOptions, - IRunExecutionData, - IWorkflowDataProxyAdditionalKeys, - IWorkflowExecuteAdditionalData, - NodeParameterValueType, - PaginationOptions, - Workflow, - WorkflowExecuteMode, -} from 'n8n-workflow'; -import { Readable } from 'stream'; - -// eslint-disable-next-line import/no-cycle -import { - applyPaginationRequestData, - binaryToString, - httpRequest, - httpRequestWithAuthentication, - proxyRequestToAxios, - requestOAuth1, - requestOAuth2, - requestWithAuthentication, - validateUrl, -} from '@/NodeExecuteFunctions'; - -export class RequestHelpers { - constructor( - private readonly context: IAllExecuteFunctions, - private readonly workflow: Workflow, - private readonly node: INode, - private readonly additionalData: IWorkflowExecuteAdditionalData, - private readonly runExecutionData: IRunExecutionData | null = null, - private readonly connectionInputData: INodeExecutionData[] = [], - ) {} - - get exported(): RequestHelperFunctions { - return { - httpRequest, - httpRequestWithAuthentication: this.httpRequestWithAuthentication.bind(this), - requestWithAuthenticationPaginated: this.requestWithAuthenticationPaginated.bind(this), - request: this.request.bind(this), - requestWithAuthentication: this.requestWithAuthentication.bind(this), - requestOAuth1: this.requestOAuth1.bind(this), - requestOAuth2: this.requestOAuth2.bind(this), - }; - } - - get httpRequest() { - return httpRequest; - } - - async httpRequestWithAuthentication( - credentialsType: string, - requestOptions: IHttpRequestOptions, - additionalCredentialOptions?: IAdditionalCredentialOptions, - ) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return await httpRequestWithAuthentication.call( - this.context, - credentialsType, - requestOptions, - this.workflow, - this.node, - this.additionalData, - additionalCredentialOptions, - ); - } - - // eslint-disable-next-line complexity - async requestWithAuthenticationPaginated( - requestOptions: IRequestOptions, - itemIndex: number, - paginationOptions: PaginationOptions, - credentialsType?: string, - additionalCredentialOptions?: IAdditionalCredentialOptions, - ): Promise { - const responseData = []; - if (!requestOptions.qs) { - requestOptions.qs = {}; - } - requestOptions.resolveWithFullResponse = true; - requestOptions.simple = false; - - let tempResponseData: IN8nHttpFullResponse; - let makeAdditionalRequest: boolean; - let paginateRequestData: PaginationOptions['request']; - - const runIndex = 0; - - const additionalKeys = { - $request: requestOptions, - $response: {} as IN8nHttpFullResponse, - $version: this.node.typeVersion, - $pageCount: 0, - }; - - const executeData: IExecuteData = { - data: {}, - node: this.node, - source: null, - }; - - const hashData = { - identicalCount: 0, - previousLength: 0, - previousHash: '', - }; - - do { - paginateRequestData = this.getResolvedValue( - paginationOptions.request as unknown as NodeParameterValueType, - itemIndex, - runIndex, - executeData, - additionalKeys, - false, - ) as object as PaginationOptions['request']; - - const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData); - - if (!validateUrl(tempRequestOptions.uri as string)) { - throw new NodeOperationError( - this.node, - `'${paginateRequestData.url}' is not a valid URL.`, - { - itemIndex, - runIndex, - type: 'invalid_url', - }, - ); - } - - if (credentialsType) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - tempResponseData = await this.requestWithAuthentication( - credentialsType, - tempRequestOptions, - additionalCredentialOptions, - ); - } else { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - tempResponseData = await this.request(tempRequestOptions); - } - - const newResponse: IN8nHttpFullResponse = Object.assign( - { - body: {}, - headers: {}, - statusCode: 0, - }, - pick(tempResponseData, ['body', 'headers', 'statusCode']), - ); - - let contentBody: Exclude; - - if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) { - // Keep the original string version that we can use it to hash if needed - contentBody = await binaryToString(newResponse.body as Buffer | Readable); - - const responseContentType = newResponse.headers['content-type']?.toString() ?? ''; - if (responseContentType.includes('application/json')) { - newResponse.body = jsonParse(contentBody, { fallbackValue: {} }); - } else { - newResponse.body = contentBody; - } - tempResponseData.__bodyResolved = true; - tempResponseData.body = newResponse.body; - } else { - contentBody = newResponse.body; - } - - if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) { - // If the data is not binary (and so not a stream), or an etag is present, - // we check via etag or hash if identical data is received - - let contentLength = 0; - if ('content-length' in tempResponseData.headers) { - contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0; - } - - if (hashData.previousLength === contentLength) { - let hash: string; - if (tempResponseData.headers.etag) { - // If an etag is provided, we use it as "hash" - hash = tempResponseData.headers.etag as string; - } else { - // If there is no etag, we calculate a hash from the data in the body - if (typeof contentBody !== 'string') { - contentBody = JSON.stringify(contentBody); - } - hash = createHash('md5').update(contentBody).digest('base64'); - } - - if (hashData.previousHash === hash) { - hashData.identicalCount += 1; - if (hashData.identicalCount > 2) { - // Length was identical 5x and hash 3x - throw new NodeOperationError( - this.node, - 'The returned response was identical 5x, so requests got stopped', - { - itemIndex, - description: - 'Check if "Pagination Completed When" has been configured correctly.', - }, - ); - } - } else { - hashData.identicalCount = 0; - } - hashData.previousHash = hash; - } else { - hashData.identicalCount = 0; - } - hashData.previousLength = contentLength; - } - - responseData.push(tempResponseData); - - additionalKeys.$response = newResponse; - additionalKeys.$pageCount = additionalKeys.$pageCount + 1; - - const maxRequests = this.getResolvedValue( - paginationOptions.maxRequests, - itemIndex, - runIndex, - executeData, - additionalKeys, - false, - ) as number; - - if (maxRequests && additionalKeys.$pageCount >= maxRequests) { - break; - } - - makeAdditionalRequest = this.getResolvedValue( - paginationOptions.continue, - itemIndex, - runIndex, - executeData, - additionalKeys, - false, - ) as boolean; - - if (makeAdditionalRequest) { - if (paginationOptions.requestInterval) { - const requestInterval = this.getResolvedValue( - paginationOptions.requestInterval, - itemIndex, - runIndex, - executeData, - additionalKeys, - false, - ) as number; - - await sleep(requestInterval); - } - if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) { - // We have it configured to let all requests pass no matter the response code - // via "requestOptions.simple = false" to not by default fail if it is for example - // configured to stop on 404 response codes. For that reason we have to throw here - // now an error manually if the response code is not a success one. - let data = tempResponseData.body; - if (data instanceof Readable && paginationOptions.binaryResult !== true) { - data = await binaryToString(data as Buffer | Readable); - } else if (typeof data === 'object') { - data = JSON.stringify(data); - } - - throw Object.assign(new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`), { - statusCode: tempResponseData.statusCode, - error: data, - isAxiosError: true, - response: { - headers: tempResponseData.headers, - status: tempResponseData.statusCode, - statusText: tempResponseData.statusMessage, - }, - }); - } - } - } while (makeAdditionalRequest); - - return responseData; - } - - async request(uriOrObject: string | IRequestOptions, options?: IRequestOptions) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return await proxyRequestToAxios( - this.workflow, - this.additionalData, - this.node, - uriOrObject, - options, - ); - } - - async requestWithAuthentication( - credentialsType: string, - requestOptions: IRequestOptions, - additionalCredentialOptions?: IAdditionalCredentialOptions, - itemIndex?: number, - ) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return await requestWithAuthentication.call( - this.context, - credentialsType, - requestOptions, - this.workflow, - this.node, - this.additionalData, - additionalCredentialOptions, - itemIndex, - ); - } - - async requestOAuth1(credentialsType: string, requestOptions: IRequestOptions) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return await requestOAuth1.call(this.context, credentialsType, requestOptions); - } - - async requestOAuth2( - credentialsType: string, - requestOptions: IRequestOptions, - oAuth2Options?: IOAuth2Options, - ) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return await requestOAuth2.call( - this.context, - credentialsType, - requestOptions, - this.node, - this.additionalData, - oAuth2Options, - ); - } - - private getResolvedValue( - parameterValue: NodeParameterValueType, - itemIndex: number, - runIndex: number, - executeData: IExecuteData, - additionalKeys?: IWorkflowDataProxyAdditionalKeys, - returnObjectAsString = false, - ): NodeParameterValueType { - const mode: WorkflowExecuteMode = 'internal'; - - if ( - typeof parameterValue === 'object' || - (typeof parameterValue === 'string' && parameterValue.charAt(0) === '=') - ) { - return this.workflow.expression.getParameterValue( - parameterValue, - this.runExecutionData, - runIndex, - itemIndex, - this.node.name, - this.connectionInputData, - mode, - additionalKeys ?? {}, - executeData, - returnObjectAsString, - ); - } - - return parameterValue; - } -} diff --git a/packages/core/src/node-execution-context/helpers/scheduling-helpers.ts b/packages/core/src/node-execution-context/helpers/scheduling-helpers.ts deleted file mode 100644 index e193f2beaf..0000000000 --- a/packages/core/src/node-execution-context/helpers/scheduling-helpers.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { CronExpression, Workflow, SchedulingFunctions } from 'n8n-workflow'; -import { Container } from 'typedi'; - -import { ScheduledTaskManager } from '@/ScheduledTaskManager'; - -export class SchedulingHelpers { - private readonly scheduledTaskManager = Container.get(ScheduledTaskManager); - - constructor(private readonly workflow: Workflow) {} - - get exported(): SchedulingFunctions { - return { - registerCron: this.registerCron.bind(this), - }; - } - - registerCron(cronExpression: CronExpression, onTick: () => void) { - this.scheduledTaskManager.registerCron(this.workflow, cronExpression, onTick); - } -} diff --git a/packages/core/src/node-execution-context/helpers/ssh-tunnel-helpers.ts b/packages/core/src/node-execution-context/helpers/ssh-tunnel-helpers.ts deleted file mode 100644 index f44df0e166..0000000000 --- a/packages/core/src/node-execution-context/helpers/ssh-tunnel-helpers.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { SSHCredentials, SSHTunnelFunctions } from 'n8n-workflow'; -import { Container } from 'typedi'; - -import { SSHClientsManager } from '@/SSHClientsManager'; - -export class SSHTunnelHelpers { - private readonly sshClientsManager = Container.get(SSHClientsManager); - - get exported(): SSHTunnelFunctions { - return { - getSSHClient: this.getSSHClient.bind(this), - }; - } - - async getSSHClient(credentials: SSHCredentials) { - return await this.sshClientsManager.getClient(credentials); - } -} diff --git a/packages/core/src/node-execution-context/hook-context.ts b/packages/core/src/node-execution-context/hook-context.ts index 7cc6567779..5585d6b8f3 100644 --- a/packages/core/src/node-execution-context/hook-context.ts +++ b/packages/core/src/node-execution-context/hook-context.ts @@ -21,10 +21,10 @@ import { getCredentials, getNodeParameter, getNodeWebhookUrl, + getRequestHelperFunctions, getWebhookDescription, } from '@/NodeExecuteFunctions'; -import { RequestHelpers } from './helpers/request-helpers'; import { NodeExecutionContext } from './node-execution-context'; export class HookContext extends NodeExecutionContext implements IHookFunctions { @@ -40,7 +40,7 @@ export class HookContext extends NodeExecutionContext implements IHookFunctions ) { super(workflow, node, additionalData, mode); - this.helpers = new RequestHelpers(this, workflow, node, additionalData); + this.helpers = getRequestHelperFunctions(workflow, node, additionalData); } getActivationMode() { diff --git a/packages/core/src/node-execution-context/load-options-context.ts b/packages/core/src/node-execution-context/load-options-context.ts index 98dd58210b..bb43d9c2e2 100644 --- a/packages/core/src/node-execution-context/load-options-context.ts +++ b/packages/core/src/node-execution-context/load-options-context.ts @@ -13,10 +13,14 @@ import type { import { extractValue } from '@/ExtractValue'; // eslint-disable-next-line import/no-cycle -import { getAdditionalKeys, getCredentials, getNodeParameter } from '@/NodeExecuteFunctions'; +import { + getAdditionalKeys, + getCredentials, + getNodeParameter, + getRequestHelperFunctions, + getSSHTunnelFunctions, +} from '@/NodeExecuteFunctions'; -import { RequestHelpers } from './helpers/request-helpers'; -import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers'; import { NodeExecutionContext } from './node-execution-context'; export class LoadOptionsContext extends NodeExecutionContext implements ILoadOptionsFunctions { @@ -31,8 +35,8 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt super(workflow, node, additionalData, 'internal'); this.helpers = { - ...new RequestHelpers(this, workflow, node, additionalData).exported, - ...new SSHTunnelHelpers().exported, + ...getSSHTunnelFunctions(), + ...getRequestHelperFunctions(workflow, node, additionalData), }; } diff --git a/packages/core/src/node-execution-context/poll-context.ts b/packages/core/src/node-execution-context/poll-context.ts index 88e8caafc8..e3c0dd0cc8 100644 --- a/packages/core/src/node-execution-context/poll-context.ts +++ b/packages/core/src/node-execution-context/poll-context.ts @@ -16,14 +16,14 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; // eslint-disable-next-line import/no-cycle import { getAdditionalKeys, + getBinaryHelperFunctions, getCredentials, getNodeParameter, + getRequestHelperFunctions, + getSchedulingFunctions, returnJsonArray, } from '@/NodeExecuteFunctions'; -import { BinaryHelpers } from './helpers/binary-helpers'; -import { RequestHelpers } from './helpers/request-helpers'; -import { SchedulingHelpers } from './helpers/scheduling-helpers'; import { NodeExecutionContext } from './node-execution-context'; const throwOnEmit = () => { @@ -51,9 +51,9 @@ export class PollContext extends NodeExecutionContext implements IPollFunctions this.helpers = { createDeferredPromise, returnJsonArray, - ...new BinaryHelpers(workflow, additionalData).exported, - ...new RequestHelpers(this, workflow, node, additionalData).exported, - ...new SchedulingHelpers(workflow).exported, + ...getRequestHelperFunctions(workflow, node, additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getSchedulingFunctions(workflow), }; } diff --git a/packages/core/src/node-execution-context/trigger-context.ts b/packages/core/src/node-execution-context/trigger-context.ts index 8535ccfe6c..5ae6ce47df 100644 --- a/packages/core/src/node-execution-context/trigger-context.ts +++ b/packages/core/src/node-execution-context/trigger-context.ts @@ -16,15 +16,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; // eslint-disable-next-line import/no-cycle import { getAdditionalKeys, + getBinaryHelperFunctions, getCredentials, getNodeParameter, + getRequestHelperFunctions, + getSchedulingFunctions, + getSSHTunnelFunctions, returnJsonArray, } from '@/NodeExecuteFunctions'; -import { BinaryHelpers } from './helpers/binary-helpers'; -import { RequestHelpers } from './helpers/request-helpers'; -import { SchedulingHelpers } from './helpers/scheduling-helpers'; -import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers'; import { NodeExecutionContext } from './node-execution-context'; const throwOnEmit = () => { @@ -52,10 +52,10 @@ export class TriggerContext extends NodeExecutionContext implements ITriggerFunc this.helpers = { createDeferredPromise, returnJsonArray, - ...new BinaryHelpers(workflow, additionalData).exported, - ...new RequestHelpers(this, workflow, node, additionalData).exported, - ...new SchedulingHelpers(workflow).exported, - ...new SSHTunnelHelpers().exported, + ...getSSHTunnelFunctions(), + ...getRequestHelperFunctions(workflow, node, additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getSchedulingFunctions(workflow), }; } diff --git a/packages/core/src/node-execution-context/webhook-context.ts b/packages/core/src/node-execution-context/webhook-context.ts index a7fa7203c8..4d3eef53e2 100644 --- a/packages/core/src/node-execution-context/webhook-context.ts +++ b/packages/core/src/node-execution-context/webhook-context.ts @@ -24,15 +24,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; import { copyBinaryFile, getAdditionalKeys, + getBinaryHelperFunctions, getCredentials, getInputConnectionData, getNodeParameter, getNodeWebhookUrl, + getRequestHelperFunctions, returnJsonArray, } from '@/NodeExecuteFunctions'; -import { BinaryHelpers } from './helpers/binary-helpers'; -import { RequestHelpers } from './helpers/request-helpers'; import { NodeExecutionContext } from './node-execution-context'; export class WebhookContext extends NodeExecutionContext implements IWebhookFunctions { @@ -54,8 +54,8 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc this.helpers = { createDeferredPromise, returnJsonArray, - ...new BinaryHelpers(workflow, additionalData).exported, - ...new RequestHelpers(this, workflow, node, additionalData).exported, + ...getRequestHelperFunctions(workflow, node, additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), }; this.nodeHelpers = { diff --git a/packages/editor-ui/src/components/Error/NodeErrorView.test.ts b/packages/editor-ui/src/components/Error/NodeErrorView.test.ts index 0813b2a781..af9960bc62 100644 --- a/packages/editor-ui/src/components/Error/NodeErrorView.test.ts +++ b/packages/editor-ui/src/components/Error/NodeErrorView.test.ts @@ -1,38 +1,61 @@ import { createComponentRenderer } from '@/__tests__/render'; -import { SETTINGS_STORE_DEFAULT_STATE } from '@/__tests__/utils'; + import NodeErrorView from '@/components/Error/NodeErrorView.vue'; -import { STORES } from '@/constants'; + import { createTestingPinia } from '@pinia/testing'; -import { type INode } from 'n8n-workflow'; +import type { NodeError } from 'n8n-workflow'; import { useAssistantStore } from '@/stores/assistant.store'; import { useNodeTypesStore } from '@/stores/nodeTypes.store'; +import { mockedStore } from '@/__tests__/utils'; +import userEvent from '@testing-library/user-event'; +import { useNDVStore } from '@/stores/ndv.store'; -const DEFAULT_SETUP = { - pinia: createTestingPinia({ - initialState: { - [STORES.SETTINGS]: SETTINGS_STORE_DEFAULT_STATE, - }, - }), -}; +const renderComponent = createComponentRenderer(NodeErrorView); -const renderComponent = createComponentRenderer(NodeErrorView, DEFAULT_SETUP); +let mockAiAssistantStore: ReturnType>; +let mockNodeTypeStore: ReturnType>; +let mockNdvStore: ReturnType>; describe('NodeErrorView.vue', () => { - let mockNode: INode; - afterEach(() => { - mockNode = { - parameters: { - mode: 'runOnceForAllItems', - language: 'javaScript', - jsCode: 'cons error = 9;', - notice: '', + let error: NodeError; + + beforeEach(() => { + createTestingPinia(); + + mockAiAssistantStore = mockedStore(useAssistantStore); + mockNodeTypeStore = mockedStore(useNodeTypesStore); + mockNdvStore = mockedStore(useNDVStore); + //@ts-expect-error + error = { + name: 'NodeOperationError', + message: 'Test error message', + description: 'Test error description', + context: { + descriptionKey: 'noInputConnection', + nodeCause: 'Test node cause', + runIndex: '1', + itemIndex: '2', + parameter: 'testParameter', + data: { key: 'value' }, + causeDetailed: 'Detailed cause', }, - id: 'd1ce5dc9-f9ae-4ac6-84e5-0696ba175dd9', - name: 'Code', - type: 'n8n-nodes-base.code', - typeVersion: 2, - position: [940, 240], + node: { + parameters: { + mode: 'runOnceForAllItems', + language: 'javaScript', + jsCode: 'cons error = 9;', + notice: '', + }, + id: 'd1ce5dc9-f9ae-4ac6-84e5-0696ba175dd9', + name: 'ErrorCode', + type: 'n8n-nodes-base.code', + typeVersion: 2, + position: [940, 240], + }, + stack: 'Test stack trace', }; + }); + afterEach(() => { vi.clearAllMocks(); }); @@ -40,7 +63,7 @@ describe('NodeErrorView.vue', () => { const { getByTestId } = renderComponent({ props: { error: { - node: mockNode, + node: error.node, messages: ['Unexpected identifier [line 1]'], }, }, @@ -55,7 +78,7 @@ describe('NodeErrorView.vue', () => { const { getByTestId } = renderComponent({ props: { error: { - node: mockNode, + node: error.node, message: 'Unexpected identifier [line 1]', }, }, @@ -67,24 +90,20 @@ describe('NodeErrorView.vue', () => { }); it('should not render AI assistant button when error happens in deprecated function node', async () => { - const aiAssistantStore = useAssistantStore(DEFAULT_SETUP.pinia); - const nodeTypeStore = useNodeTypesStore(DEFAULT_SETUP.pinia); - //@ts-expect-error - nodeTypeStore.getNodeType = vi.fn(() => ({ + mockNodeTypeStore.getNodeType = vi.fn(() => ({ type: 'n8n-nodes-base.function', typeVersion: 1, hidden: true, })); - //@ts-expect-error - aiAssistantStore.canShowAssistantButtonsOnCanvas = true; + mockAiAssistantStore.canShowAssistantButtonsOnCanvas = true; const { queryByTestId } = renderComponent({ props: { error: { node: { - ...mockNode, + ...error.node, type: 'n8n-nodes-base.function', typeVersion: 1, }, @@ -96,4 +115,73 @@ describe('NodeErrorView.vue', () => { expect(aiAssistantButton).toBeNull(); }); + + it('renders error message', () => { + const { getByTestId } = renderComponent({ + props: { error }, + }); + expect(getByTestId('node-error-message').textContent).toContain('Test error message'); + }); + + it('renders error description', () => { + const { getByTestId } = renderComponent({ + props: { error }, + }); + expect(getByTestId('node-error-description').innerHTML).toContain( + 'This node has no input data. Please make sure this node is connected to another node.', + ); + }); + + it('renders stack trace', () => { + const { getByText } = renderComponent({ + props: { error }, + }); + expect(getByText('Test stack trace')).toBeTruthy(); + }); + + it('renders open node button when the error is in sub node', () => { + const { getByTestId, queryByTestId } = renderComponent({ + props: { + error: { + ...error, + name: 'NodeOperationError', + functionality: 'configuration-node', + }, + }, + }); + + expect(getByTestId('node-error-view-open-node-button')).toHaveTextContent('Open errored node'); + + expect(queryByTestId('ask-assistant-button')).not.toBeInTheDocument(); + }); + + it('does not renders open node button when the error is in sub node', () => { + mockAiAssistantStore.canShowAssistantButtonsOnCanvas = true; + const { getByTestId, queryByTestId } = renderComponent({ + props: { + error, + }, + }); + + expect(queryByTestId('node-error-view-open-node-button')).not.toBeInTheDocument(); + + expect(getByTestId('ask-assistant-button')).toBeInTheDocument(); + }); + + it('open error node details when open error node is clicked', async () => { + const { getByTestId, emitted } = renderComponent({ + props: { + error: { + ...error, + name: 'NodeOperationError', + functionality: 'configuration-node', + }, + }, + }); + + await userEvent.click(getByTestId('node-error-view-open-node-button')); + + expect(emitted().click).toHaveLength(1); + expect(mockNdvStore.activeNodeName).toBe(error.node.name); + }); }); diff --git a/packages/editor-ui/src/components/Error/NodeErrorView.vue b/packages/editor-ui/src/components/Error/NodeErrorView.vue index 76965f0632..f1b8a1f7c6 100644 --- a/packages/editor-ui/src/components/Error/NodeErrorView.vue +++ b/packages/editor-ui/src/components/Error/NodeErrorView.vue @@ -117,7 +117,7 @@ const prepareRawMessages = computed(() => { }); const isAskAssistantAvailable = computed(() => { - if (!node.value) { + if (!node.value || isSubNodeError.value) { return false; } const isCustomNode = node.value.type === undefined || isCommunityPackageName(node.value.type); @@ -132,6 +132,13 @@ const assistantAlreadyAsked = computed(() => { }); }); +const isSubNodeError = computed(() => { + return ( + props.error.name === 'NodeOperationError' && + (props.error as NodeOperationError).functionality === 'configuration-node' + ); +}); + function nodeVersionTag(nodeType: NodeError['node']): string { if (!nodeType || ('hidden' in nodeType && nodeType.hidden)) { return i18n.baseText('nodeSettings.deprecated'); @@ -153,19 +160,6 @@ function prepareDescription(description: string): string { } function getErrorDescription(): string { - const isSubNodeError = - props.error.name === 'NodeOperationError' && - (props.error as NodeOperationError).functionality === 'configuration-node'; - - if (isSubNodeError) { - return prepareDescription( - props.error.description + - i18n.baseText('pushConnection.executionError.openNode', { - interpolate: { node: props.error.node.name }, - }), - ); - } - if (props.error.context?.descriptionKey) { const interpolate = { nodeCause: props.error.context.nodeCause as string, @@ -205,13 +199,10 @@ function addItemIndexSuffix(message: string): string { function getErrorMessage(): string { let message = ''; - const isSubNodeError = - props.error.name === 'NodeOperationError' && - (props.error as NodeOperationError).functionality === 'configuration-node'; const isNonEmptyString = (value?: unknown): value is string => !!value && typeof value === 'string'; - if (isSubNodeError) { + if (isSubNodeError.value) { message = i18n.baseText('nodeErrorView.errorSubNode', { interpolate: { node: props.error.node.name }, }); @@ -390,6 +381,10 @@ function nodeIsHidden() { return nodeType?.hidden ?? false; } +const onOpenErrorNodeDetailClick = () => { + ndvStore.activeNodeName = props.error.node.name; +}; + async function onAskAssistantClick() { const { message, lineNumber, description } = props.error; const sessionInProgress = !assistantStore.isSessionEnded; @@ -428,14 +423,25 @@ async function onAskAssistantClick() {
+ +
+ +
@@ -696,9 +702,14 @@ async function onAskAssistantClick() { } } - &__assistant-button { + &__button { margin-left: var(--spacing-s); margin-bottom: var(--spacing-xs); + flex-direction: row-reverse; + span { + margin-right: var(--spacing-5xs); + margin-left: var(--spacing-5xs); + } } &__debugging { @@ -831,7 +842,7 @@ async function onAskAssistantClick() { } } -.node-error-view__assistant-button { +.node-error-view__button { margin-top: var(--spacing-xs); } diff --git a/packages/editor-ui/src/components/RunData.vue b/packages/editor-ui/src/components/RunData.vue index 7343ad1f2f..a3d354d7e7 100644 --- a/packages/editor-ui/src/components/RunData.vue +++ b/packages/editor-ui/src/components/RunData.vue @@ -289,7 +289,7 @@ export default defineComponent({ return false; } - const canPinNode = usePinnedData(this.node).canPinNode(false); + const canPinNode = usePinnedData(this.node).canPinNode(false, this.currentOutputIndex); return ( canPinNode && @@ -1214,9 +1214,7 @@ export default defineComponent({