Merge branch 'master' into cat-301-runner-idle-shutdown

This commit is contained in:
Iván Ovejero 2024-11-07 13:24:23 +01:00
commit 5b139e49f6
No known key found for this signature in database
53 changed files with 1678 additions and 1872 deletions

View file

@ -42,7 +42,7 @@ jobs:
uses: actions/cache/save@v4.0.0 uses: actions/cache/save@v4.0.0
with: with:
path: ./packages/**/dist path: ./packages/**/dist
key: ${{ github.sha }}-base:build key: ${{ github.sha }}-release:build
- name: Dry-run publishing - name: Dry-run publishing
run: pnpm publish -r --no-git-checks --dry-run run: pnpm publish -r --no-git-checks --dry-run
@ -141,7 +141,7 @@ jobs:
uses: actions/cache/restore@v4.0.0 uses: actions/cache/restore@v4.0.0
with: with:
path: ./packages/**/dist path: ./packages/**/dist
key: ${{ github.sha }}:db-tests key: ${{ github.sha }}-release:build
- name: Create a frontend release - name: Create a frontend release
uses: getsentry/action-release@v1.7.0 uses: getsentry/action-release@v1.7.0

View file

@ -10,9 +10,8 @@ export type TaskRunnerMode = 'internal_childprocess' | 'internal_launcher' | 'ex
@Config @Config
export class TaskRunnersConfig { export class TaskRunnersConfig {
// Defaults to true for now @Env('N8N_RUNNERS_ENABLED')
@Env('N8N_RUNNERS_DISABLED') enabled: boolean = false;
disabled: boolean = true;
// Defaults to true for now // Defaults to true for now
@Env('N8N_RUNNERS_MODE') @Env('N8N_RUNNERS_MODE')
@ -51,6 +50,10 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY') @Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5; maxConcurrency: number = 5;
/** Should the output of deduplication be asserted for correctness */
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
assertDeduplicationOutput: boolean = false;
/** How long (in minutes) until shutting down an idle runner. */ /** How long (in minutes) until shutting down an idle runner. */
@Env('N8N_RUNNERS_IDLE_TIMEOUT') @Env('N8N_RUNNERS_IDLE_TIMEOUT')
idleTimeout: number = 5; idleTimeout: number = 5;

View file

@ -222,7 +222,7 @@ describe('GlobalConfig', () => {
}, },
}, },
taskRunners: { taskRunners: {
disabled: true, enabled: false,
mode: 'internal_childprocess', mode: 'internal_childprocess',
path: '/runners', path: '/runners',
authToken: '', authToken: '',
@ -233,6 +233,7 @@ describe('GlobalConfig', () => {
launcherRunner: 'javascript', launcherRunner: 'javascript',
maxOldSpaceSize: '', maxOldSpaceSize: '',
maxConcurrency: 5, maxConcurrency: 5,
assertDeduplicationOutput: false,
}, },
sentry: { sentry: {
backendDsn: '', backendDsn: '',

View file

@ -0,0 +1,29 @@
import type { IExecuteData, INodeExecutionData } from 'n8n-workflow';
import type { DataRequestResponse } from '@/runner-types';
/**
* Reconstructs data from a DataRequestResponse to the initial
* data structures.
*/
export class DataRequestResponseReconstruct {
/**
* Reconstructs `connectionInputData` from a DataRequestResponse
*/
reconstructConnectionInputData(
inputData: DataRequestResponse['inputData'],
): INodeExecutionData[] {
return inputData?.main?.[0] ?? [];
}
/**
* Reconstruct `executeData` from a DataRequestResponse
*/
reconstructExecuteData(response: DataRequestResponse): IExecuteData {
return {
data: response.inputData,
node: response.node,
source: response.connectionInputSource,
};
}
}

View file

@ -1,3 +1,4 @@
export * from './task-runner'; export * from './task-runner';
export * from './runner-types'; export * from './runner-types';
export * from './message-types'; export * from './message-types';
export * from './data-request/data-request-response-reconstruct';

View file

@ -3,15 +3,21 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs'; import fs from 'node:fs';
import { builtinModules } from 'node:module'; import { builtinModules } from 'node:module';
import type { JsRunnerConfig } from '@/config/js-runner-config';
import { MainConfig } from '@/config/main-config';
import { ExecutionError } from '@/js-task-runner/errors/execution-error';
import { ValidationError } from '@/js-task-runner/errors/validation-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
import { newCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; import {
import type { JsRunnerConfig } from '../../config/js-runner-config'; newDataRequestResponse,
import { MainConfig } from '../../config/main-config'; newTaskWithSettings,
import { ExecutionError } from '../errors/execution-error'; withPairedItem,
wrapIntoJson,
} from './test-data';
jest.mock('ws'); jest.mock('ws');
@ -68,7 +74,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
...settings, ...settings,
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
runner, runner,
}); });
}; };
@ -91,7 +97,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForEachItem', nodeMode: 'runOnceForEachItem',
...settings, ...settings,
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
runner, runner,
}); });
}; };
@ -108,7 +114,7 @@ describe('JsTaskRunner', () => {
await execTaskWithParams({ await execTaskWithParams({
task, task,
taskData: newCodeTaskData([wrapIntoJson({})]), taskData: newDataRequestResponse([wrapIntoJson({})]),
}); });
expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [ expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
@ -243,7 +249,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }', code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: false, isEnvAccessBlocked: false,
isProcessAvailable: true, isProcessAvailable: true,
@ -262,7 +268,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }', code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: true, isEnvAccessBlocked: true,
isProcessAvailable: true, isProcessAvailable: true,
@ -279,7 +285,7 @@ describe('JsTaskRunner', () => {
code: 'return Object.values($env).concat(Object.keys($env))', code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: false, isEnvAccessBlocked: false,
isProcessAvailable: true, isProcessAvailable: true,
@ -298,7 +304,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }', code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -313,7 +319,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }', code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -325,7 +331,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }', code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForEachItem', nodeMode: 'runOnceForEachItem',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -771,7 +777,7 @@ describe('JsTaskRunner', () => {
code: 'unknown', code: 'unknown',
nodeMode, nodeMode,
}), }),
taskData: newCodeTaskData([wrapIntoJson({ a: 1 })]), taskData: newDataRequestResponse([wrapIntoJson({ a: 1 })]),
}), }),
).rejects.toThrow(ExecutionError); ).rejects.toThrow(ExecutionError);
}, },
@ -793,7 +799,7 @@ describe('JsTaskRunner', () => {
jest.spyOn(runner, 'sendOffers').mockImplementation(() => {}); jest.spyOn(runner, 'sendOffers').mockImplementation(() => {});
jest jest
.spyOn(runner, 'requestData') .spyOn(runner, 'requestData')
.mockResolvedValue(newCodeTaskData([wrapIntoJson({ a: 1 })])); .mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings); await runner.receivedSettings(taskId, task.settings);

View file

@ -2,7 +2,8 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-work
import { NodeConnectionType } from 'n8n-workflow'; import { NodeConnectionType } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
/** /**
@ -46,10 +47,10 @@ export const newTaskData = (opts: Partial<ITaskData> & Pick<ITaskData, 'source'>
}); });
/** /**
* Creates a new all code task data with the given options * Creates a new data request response with the given options
*/ */
export const newCodeTaskData = ( export const newDataRequestResponse = (
codeNodeInputData: INodeExecutionData[], inputData: INodeExecutionData[],
opts: Partial<DataRequestResponse> = {}, opts: Partial<DataRequestResponse> = {},
): DataRequestResponse => { ): DataRequestResponse => {
const codeNode = newNode({ const codeNode = newNode({
@ -83,9 +84,8 @@ export const newCodeTaskData = (
nodes: [manualTriggerNode, codeNode], nodes: [manualTriggerNode, codeNode],
}, },
inputData: { inputData: {
main: [codeNodeInputData], main: [inputData],
}, },
connectionInputData: codeNodeInputData,
node: codeNode, node: codeNode,
runExecutionData: { runExecutionData: {
startData: {}, startData: {},
@ -95,7 +95,7 @@ export const newCodeTaskData = (
newTaskData({ newTaskData({
source: [], source: [],
data: { data: {
main: [codeNodeInputData], main: [inputData],
}, },
}), }),
], ],
@ -137,14 +137,13 @@ export const newCodeTaskData = (
var: 'value', var: 'value',
}, },
}, },
executeData: { connectionInputSource: {
node: codeNode, main: [
data: { {
main: [codeNodeInputData], previousNode: 'Trigger',
}, previousNodeOutput: 0,
source: {
main: [{ previousNode: manualTriggerNode.name }],
}, },
],
}, },
...opts, ...opts,
}; };

View file

@ -1,8 +1,13 @@
import { getAdditionalKeys } from 'n8n-core'; import { getAdditionalKeys } from 'n8n-core';
import type { IDataObject, INodeType, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; import type {
IDataObject,
IExecuteData,
INodeType,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';
import { Workflow, WorkflowDataProxy } from 'n8n-workflow'; import { Workflow, WorkflowDataProxy } from 'n8n-workflow';
import { newCodeTaskData } from '../../__tests__/test-data'; import { newDataRequestResponse } from '../../__tests__/test-data';
import { BuiltInsParser } from '../built-ins-parser'; import { BuiltInsParser } from '../built-ins-parser';
import { BuiltInsParserState } from '../built-ins-parser-state'; import { BuiltInsParserState } from '../built-ins-parser-state';
@ -159,7 +164,12 @@ describe('BuiltInsParser', () => {
describe('WorkflowDataProxy built-ins', () => { describe('WorkflowDataProxy built-ins', () => {
it('should have a known list of built-ins', () => { it('should have a known list of built-ins', () => {
const data = newCodeTaskData([]); const data = newDataRequestResponse([]);
const executeData: IExecuteData = {
data: {},
node: data.node,
source: data.connectionInputSource,
};
const dataProxy = new WorkflowDataProxy( const dataProxy = new WorkflowDataProxy(
new Workflow({ new Workflow({
...data.workflow, ...data.workflow,
@ -179,7 +189,7 @@ describe('BuiltInsParser', () => {
data.runIndex, data.runIndex,
0, 0,
data.activeNodeName, data.activeNodeName,
data.connectionInputData, [],
data.siblingParameters, data.siblingParameters,
data.mode, data.mode,
getAdditionalKeys( getAdditionalKeys(
@ -187,7 +197,7 @@ describe('BuiltInsParser', () => {
data.mode, data.mode,
data.runExecutionData, data.runExecutionData,
), ),
data.executeData, executeData,
data.defaultReturnRunIndex, data.defaultReturnRunIndex,
data.selfData, data.selfData,
data.contextNodeName, data.contextNodeName,

View file

@ -1,28 +1,25 @@
import { getAdditionalKeys } from 'n8n-core'; import { getAdditionalKeys } from 'n8n-core';
import { import { WorkflowDataProxy, Workflow } from 'n8n-workflow';
WorkflowDataProxy,
// type IWorkflowDataProxyAdditionalKeys,
Workflow,
} from 'n8n-workflow';
import type { import type {
CodeExecutionMode, CodeExecutionMode,
INode,
ITaskDataConnections,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
WorkflowParameters,
IDataObject, IDataObject,
IExecuteData,
INodeExecutionData, INodeExecutionData,
INodeParameters, INodeParameters,
IRunExecutionData,
WorkflowExecuteMode, WorkflowExecuteMode,
WorkflowParameters,
ITaskDataConnections,
INode,
IRunExecutionData,
EnvProviderState, EnvProviderState,
IExecuteData,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as a from 'node:assert'; import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm'; import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from '@/runner-types'; import type { MainConfig } from '@/config/main-config';
import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner'; import { type Task, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
@ -33,7 +30,7 @@ import { makeSerializable } from './errors/serializable-error';
import type { RequireResolver } from './require-resolver'; import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
import type { MainConfig } from '../config/main-config'; import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct';
export interface JSExecSettings { export interface JSExecSettings {
code: string; code: string;
@ -45,34 +42,19 @@ export interface JSExecSettings {
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
} }
export interface PartialAdditionalData { export interface JsTaskData {
executionId?: string;
restartExecutionId?: string;
restApiUrl: string;
instanceBaseUrl: string;
formWaitingBaseUrl: string;
webhookBaseUrl: string;
webhookWaitingBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters?: INodeParameters;
executionTimeoutTimestamp?: number;
userId?: string;
variables: IDataObject;
}
export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>; workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections; inputData: ITaskDataConnections;
connectionInputData: INodeExecutionData[];
node: INode; node: INode;
runExecutionData: IRunExecutionData; runExecutionData: IRunExecutionData;
runIndex: number; runIndex: number;
itemIndex: number; itemIndex: number;
activeNodeName: string; activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters; siblingParameters: INodeParameters;
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
envProviderState?: EnvProviderState; envProviderState: EnvProviderState;
executeData?: IExecuteData; executeData?: IExecuteData;
defaultReturnRunIndex: number; defaultReturnRunIndex: number;
selfData: IDataObject; selfData: IDataObject;
@ -89,6 +71,8 @@ export class JsTaskRunner extends TaskRunner {
private readonly builtInsParser = new BuiltInsParser(); private readonly builtInsParser = new BuiltInsParser();
private readonly taskDataReconstruct = new DataRequestResponseReconstruct();
constructor(config: MainConfig, name = 'JS Task Runner') { constructor(config: MainConfig, name = 'JS Task Runner') {
super({ super({
taskType: 'javascript', taskType: 'javascript',
@ -115,33 +99,14 @@ export class JsTaskRunner extends TaskRunner {
? neededBuiltInsResult.result ? neededBuiltInsResult.result
: BuiltInsParserState.newNeedsAllDataState(); : BuiltInsParserState.newNeedsAllDataState();
const data = await this.requestData<DataRequestResponse>( const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId, task.taskId,
neededBuiltIns.toDataRequestParams(), neededBuiltIns.toDataRequestParams(),
); );
/** const data = this.reconstructTaskData(dataResponse);
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
data.workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
task.taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
const workflowParams = data.workflow; const workflowParams = data.workflow;
const workflow = new Workflow({ const workflow = new Workflow({
@ -201,7 +166,7 @@ export class JsTaskRunner extends TaskRunner {
private async runForAllItems( private async runForAllItems(
taskId: string, taskId: string,
settings: JSExecSettings, settings: JSExecSettings,
data: DataRequestResponse, data: JsTaskData,
workflow: Workflow, workflow: Workflow,
customConsole: CustomConsole, customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
@ -248,7 +213,7 @@ export class JsTaskRunner extends TaskRunner {
private async runForEachItem( private async runForEachItem(
taskId: string, taskId: string,
settings: JSExecSettings, settings: JSExecSettings,
data: DataRequestResponse, data: JsTaskData,
workflow: Workflow, workflow: Workflow,
customConsole: CustomConsole, customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
@ -315,7 +280,7 @@ export class JsTaskRunner extends TaskRunner {
return returnData; return returnData;
} }
private createDataProxy(data: DataRequestResponse, workflow: Workflow, itemIndex: number) { private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) {
return new WorkflowDataProxy( return new WorkflowDataProxy(
workflow, workflow,
data.runExecutionData, data.runExecutionData,
@ -359,4 +324,43 @@ export class JsTaskRunner extends TaskRunner {
return new ExecutionError({ message: JSON.stringify(error) }); return new ExecutionError({ message: JSON.stringify(error) });
} }
private reconstructTaskData(response: DataRequestResponse): JsTaskData {
return {
...response,
connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData(
response.inputData,
),
executeData: this.taskDataReconstruct.reconstructExecuteData(response),
};
}
private async requestNodeTypeIfNeeded(
neededBuiltIns: BuiltInsParserState,
workflow: JsTaskData['workflow'],
taskId: string,
) {
/**
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
}
} }

View file

@ -8,6 +8,7 @@ import type {
INodeParameters, INodeParameters,
IRunExecutionData, IRunExecutionData,
ITaskDataConnections, ITaskDataConnections,
ITaskDataConnectionsSource,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
Workflow, Workflow,
WorkflowExecuteMode, WorkflowExecuteMode,
@ -29,17 +30,16 @@ export interface TaskDataRequestParams {
export interface DataRequestResponse { export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>; workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections; inputData: ITaskDataConnections;
connectionInputSource: ITaskDataConnectionsSource | null;
node: INode; node: INode;
runExecutionData: IRunExecutionData; runExecutionData: IRunExecutionData;
runIndex: number; runIndex: number;
itemIndex: number; itemIndex: number;
activeNodeName: string; activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters; siblingParameters: INodeParameters;
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
envProviderState: EnvProviderState; envProviderState: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number; defaultReturnRunIndex: number;
selfData: IDataObject; selfData: IDataObject;
contextNodeName: string; contextNodeName: string;

View file

@ -221,7 +221,7 @@ export class Start extends BaseCommand {
} }
const { taskRunners: taskRunnerConfig } = this.globalConfig; const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) { if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/runners/task-runner-module'); const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule); const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start(); await taskRunnerModule.start();

View file

@ -113,7 +113,7 @@ export class Worker extends BaseCommand {
); );
const { taskRunners: taskRunnerConfig } = this.globalConfig; const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) { if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/runners/task-runner-module'); const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule); const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start(); await taskRunnerModule.start();

View file

@ -22,7 +22,7 @@ require('child_process').spawn = spawnMock;
describe('TaskRunnerProcess', () => { describe('TaskRunnerProcess', () => {
const logger = mockInstance(Logger); const logger = mockInstance(Logger);
const runnerConfig = mockInstance(TaskRunnersConfig); const runnerConfig = mockInstance(TaskRunnersConfig);
runnerConfig.disabled = false; runnerConfig.enabled = true;
runnerConfig.mode = 'internal_childprocess'; runnerConfig.mode = 'internal_childprocess';
const authService = mock<TaskRunnerAuthService>(); const authService = mock<TaskRunnerAuthService>();
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);

View file

@ -5,18 +5,6 @@ import type WebSocket from 'ws';
import type { TaskRunner } from './task-broker.service'; import type { TaskRunner } from './task-broker.service';
import type { AuthlessRequest } from '../requests'; import type { AuthlessRequest } from '../requests';
/**
* Specifies what data should be included for a task data request.
*/
export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all';
prevNode: boolean;
/** Whether input data for the node should be included */
input: boolean;
/** Whether env provider's state should be included */
env: boolean;
}
export interface DisconnectAnalyzer { export interface DisconnectAnalyzer {
determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>; determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>;
} }

View file

@ -1,42 +1,10 @@
import type { TaskData } from '@n8n/task-runner'; import type { PartialAdditionalData, TaskData } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; import type { Workflow } from 'n8n-workflow';
import { type INode, type INodeExecutionData, type Workflow } from 'n8n-workflow';
import { DataRequestResponseBuilder } from '../data-request-response-builder'; import { DataRequestResponseBuilder } from '../data-request-response-builder';
const triggerNode: INode = mock<INode>({ const additionalData = mock<PartialAdditionalData>({
name: 'Trigger',
});
const debugHelperNode: INode = mock<INode>({
name: 'DebugHelper',
});
const codeNode: INode = mock<INode>({
name: 'Code',
});
const workflow: TaskData['workflow'] = mock<Workflow>();
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
lastname: 'Schuster',
password: 'Q!D6C2',
},
pairedItem: {
item: 0,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const connectionInputData: TaskData['connectionInputData'] = codeNodeInputItems;
const envProviderState: TaskData['envProviderState'] = mock<TaskData['envProviderState']>({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting', formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/', instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest', restApiUrl: 'http://localhost:5678/rest',
@ -50,165 +18,34 @@ const additionalData = mock<IWorkflowExecuteAdditionalData>({
executionTimeoutTimestamp: undefined, executionTimeoutTimestamp: undefined,
restartExecutionId: undefined, restartExecutionId: undefined,
}); });
const executeFunctions = mock<IExecuteFunctions>();
/** const workflow: TaskData['workflow'] = mock<Workflow>({
* Drawn with https://asciiflow.com/#/ id: '1',
* Task data for an execution of the following WF: name: 'Test Workflow',
* where denotes the currently being executing node. active: true,
* connectionsBySourceNode: {},
* nodes: {},
* Trigger DebugHelper Code
*
*/
const taskData: TaskData = {
executeFunctions,
workflow,
connectionInputData,
inputData: {
main: [codeNodeInputItems],
},
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
mode: 'manual',
envProviderState,
node: codeNode,
runExecutionData: {
startData: {
destinationNode: codeNode.name,
runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name],
},
resultData: {
runData: {
[triggerNode.name]: [
{
hints: [],
startTime: 1730313407328,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [[]],
},
},
],
[debugHelperNode.name]: [
{
hints: [],
startTime: 1730313407330,
executionTime: 1,
source: [
{
previousNode: triggerNode.name,
},
],
executionStatus: 'success',
data: {
main: [debugHelperNodeOutItems],
},
},
],
},
pinData: {}, pinData: {},
}, settings: {},
executionData: { staticData: {},
contextData: {}, });
nodeExecutionStack: [],
metadata: {}, const taskData = mock<TaskData>({
waitingExecution: {
[codeNode.name]: {
'0': {
main: [codeNodeInputItems],
},
},
},
waitingExecutionSource: {
[codeNode.name]: {
'0': {
main: [
{
previousNode: debugHelperNode.name,
},
],
},
},
},
},
},
runIndex: 0,
selfData: {},
siblingParameters: {},
executeData: {
node: codeNode,
data: {
main: [codeNodeInputItems],
},
source: {
main: [
{
previousNode: debugHelperNode.name,
previousNodeOutput: 0,
},
],
},
},
additionalData, additionalData,
} as const; workflow,
});
describe('DataRequestResponseBuilder', () => { describe('DataRequestResponseBuilder', () => {
const allDataParam: DataRequestResponseBuilder['requestParams'] = { const builder = new DataRequestResponseBuilder();
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const newRequestParam = (opts: Partial<DataRequestResponseBuilder['requestParams']>) => ({
...allDataParam,
...opts,
});
describe('all data', () => {
it('should build the runExecutionData as is when everything is requested', () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam);
const { runExecutionData } = dataRequestResponseBuilder.build();
expect(runExecutionData).toStrictEqual(taskData.runExecutionData);
});
});
describe('envProviderState', () => {
it("should filter out envProviderState when it's not requested", () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(
taskData,
newRequestParam({
env: false,
}),
);
const result = dataRequestResponseBuilder.build();
expect(result.envProviderState).toStrictEqual({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
});
});
describe('additionalData', () => {
it('picks only specific properties for additional data', () => { it('picks only specific properties for additional data', () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); const result = builder.buildFromTaskData(taskData);
const result = dataRequestResponseBuilder.build();
expect(result.additionalData).toStrictEqual({ expect(result.additionalData).toStrictEqual({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting', formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/', instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest', restApiUrl: 'http://localhost:5678/rest',
variables: additionalData.variables,
webhookBaseUrl: 'http://localhost:5678/webhook', webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test', webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting', webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
@ -217,108 +54,21 @@ describe('DataRequestResponseBuilder', () => {
currentNodeParameters: undefined, currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined, executionTimeoutTimestamp: undefined,
restartExecutionId: undefined, restartExecutionId: undefined,
variables: additionalData.variables,
});
});
});
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
});
it('drops input data from executeData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.executeData).toStrictEqual({
node: taskData.executeData!.node,
source: taskData.executeData!.source,
data: {},
}); });
}); });
it('drops input data from result', () => { it('picks only specific properties for workflow', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); const result = builder.buildFromTaskData(taskData);
expect(result.inputData).toStrictEqual({}); expect(result.workflow).toStrictEqual({
}); id: '1',
name: 'Test Workflow',
it('drops input data from result', () => { active: true,
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build(); connections: workflow.connectionsBySourceNode,
nodes: [],
expect(result.inputData).toStrictEqual({}); pinData: workflow.pinData,
}); settings: workflow.settings,
staticData: workflow.staticData,
it('drops input data from connectionInputData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.connectionInputData).toStrictEqual([]);
});
});
describe('nodes', () => {
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: ['Code'], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return only DebugHelper's data when only DebugHelper node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return DebugHelper's data when only prevNode node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [], prevNode: true }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
}); });
}); });
}); });

View file

@ -0,0 +1,300 @@
import type { DataRequestResponse, TaskDataRequestParams } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended';
import type { IWorkflowExecuteAdditionalData } from 'n8n-workflow';
import { type INode, type INodeExecutionData } from 'n8n-workflow';
import { DataRequestResponseStripper } from '../data-request-response-stripper';
const triggerNode: INode = mock<INode>({
name: 'Trigger',
});
const debugHelperNode: INode = mock<INode>({
name: 'DebugHelper',
});
const codeNode: INode = mock<INode>({
name: 'Code',
});
const workflow: DataRequestResponse['workflow'] = mock<DataRequestResponse['workflow']>();
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
lastname: 'Schuster',
password: 'Q!D6C2',
},
pairedItem: {
item: 0,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const envProviderState: DataRequestResponse['envProviderState'] = mock<
DataRequestResponse['envProviderState']
>({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
variables: {},
webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
});
/**
* Drawn with https://asciiflow.com/#/
* Task data for an execution of the following WF:
* where denotes the currently being executing node.
*
*
* Trigger DebugHelper Code
*
*/
const taskData: DataRequestResponse = {
workflow,
inputData: {
main: [codeNodeInputItems],
},
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
mode: 'manual',
envProviderState,
node: codeNode,
runExecutionData: {
startData: {
destinationNode: codeNode.name,
runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name],
},
resultData: {
runData: {
[triggerNode.name]: [
{
hints: [],
startTime: 1730313407328,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [[]],
},
},
],
[debugHelperNode.name]: [
{
hints: [],
startTime: 1730313407330,
executionTime: 1,
source: [
{
previousNode: triggerNode.name,
},
],
executionStatus: 'success',
data: {
main: [debugHelperNodeOutItems],
},
},
],
},
pinData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {
[codeNode.name]: {
'0': {
main: [codeNodeInputItems],
},
},
},
waitingExecutionSource: {
[codeNode.name]: {
'0': {
main: [
{
previousNode: debugHelperNode.name,
},
],
},
},
},
},
},
runIndex: 0,
selfData: {},
siblingParameters: {},
connectionInputSource: {
main: [
{
previousNode: debugHelperNode.name,
previousNodeOutput: 0,
},
],
},
additionalData,
} as const;
describe('DataRequestResponseStripper', () => {
const allDataParam: TaskDataRequestParams = {
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const newRequestParam = (opts: Partial<TaskDataRequestParams>) => ({
...allDataParam,
...opts,
});
describe('all data', () => {
it('should build the runExecutionData as is when everything is requested', () => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam);
const { runExecutionData } = dataRequestResponseBuilder.strip();
expect(runExecutionData).toStrictEqual(taskData.runExecutionData);
});
});
describe('envProviderState', () => {
it("should filter out envProviderState when it's not requested", () => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(
taskData,
newRequestParam({
env: false,
}),
);
const result = dataRequestResponseBuilder.strip();
expect(result.envProviderState).toStrictEqual({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
});
});
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
});
it('drops input data from result', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from result', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip();
expect(result.inputData).toStrictEqual({});
});
});
describe('nodes', () => {
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: ['Code'], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return only DebugHelper's data when only DebugHelper node is requested", () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return DebugHelper's data when only prevNode node is requested", () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [], prevNode: true }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
});
describe('passthrough properties', () => {
test.each<Array<keyof DataRequestResponse>>([
['workflow'],
['connectionInputSource'],
['node'],
['runIndex'],
['itemIndex'],
['activeNodeName'],
['siblingParameters'],
['mode'],
['defaultReturnRunIndex'],
['selfData'],
['contextNodeName'],
['additionalData'],
])("it doesn't change %s", (propertyName) => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam);
const result = dataRequestResponseBuilder.strip();
expect(result[propertyName]).toBe(taskData[propertyName]);
});
});
});

View file

@ -1,63 +1,30 @@
import type { import type { DataRequestResponse, PartialAdditionalData, TaskData } from '@n8n/task-runner';
DataRequestResponse, import type { IWorkflowExecuteAdditionalData, Workflow, WorkflowParameters } from 'n8n-workflow';
BrokerMessage,
PartialAdditionalData,
TaskData,
} from '@n8n/task-runner';
import type {
EnvProviderState,
IExecuteData,
INodeExecutionData,
IPinData,
IRunData,
IRunExecutionData,
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowParameters,
} from 'n8n-workflow';
/** /**
* Builds the response to a data request coming from a Task Runner. Tries to minimize * Transforms TaskData to DataRequestResponse. The main purpose of the
* the amount of data that is sent to the runner by only providing what is requested. * transformation is to make sure there is no duplication in the data
* (e.g. connectionInputData and executeData.data can be derived from
* inputData).
*/ */
export class DataRequestResponseBuilder { export class DataRequestResponseBuilder {
private requestedNodeNames = new Set<string>(); buildFromTaskData(taskData: TaskData): DataRequestResponse {
constructor(
private readonly taskData: TaskData,
private readonly requestParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'],
) {
this.requestedNodeNames = new Set(requestParams.dataOfNodes);
if (this.requestParams.prevNode && this.requestParams.dataOfNodes !== 'all') {
this.requestedNodeNames.add(this.determinePrevNodeName());
}
}
/**
* Builds a response to the data request
*/
build(): DataRequestResponse {
const { taskData: td } = this;
return { return {
workflow: this.buildWorkflow(td.workflow), workflow: this.buildWorkflow(taskData.workflow),
connectionInputData: this.buildConnectionInputData(td.connectionInputData), inputData: taskData.inputData,
inputData: this.buildInputData(td.inputData), connectionInputSource: taskData.executeData?.source ?? null,
itemIndex: td.itemIndex, itemIndex: taskData.itemIndex,
activeNodeName: td.activeNodeName, activeNodeName: taskData.activeNodeName,
contextNodeName: td.contextNodeName, contextNodeName: taskData.contextNodeName,
defaultReturnRunIndex: td.defaultReturnRunIndex, defaultReturnRunIndex: taskData.defaultReturnRunIndex,
mode: td.mode, mode: taskData.mode,
envProviderState: this.buildEnvProviderState(td.envProviderState), envProviderState: taskData.envProviderState,
node: td.node, // The current node being executed node: taskData.node,
runExecutionData: this.buildRunExecutionData(td.runExecutionData), runExecutionData: taskData.runExecutionData,
runIndex: td.runIndex, runIndex: taskData.runIndex,
selfData: td.selfData, selfData: taskData.selfData,
siblingParameters: td.siblingParameters, siblingParameters: taskData.siblingParameters,
executeData: this.buildExecuteData(td.executeData), additionalData: this.buildAdditionalData(taskData.additionalData),
additionalData: this.buildAdditionalData(td.additionalData),
}; };
} }
@ -80,86 +47,6 @@ export class DataRequestResponseBuilder {
}; };
} }
private buildExecuteData(executeData: IExecuteData | undefined): IExecuteData | undefined {
if (executeData === undefined) {
return undefined;
}
return {
node: executeData.node, // The current node being executed
data: this.requestParams.input ? executeData.data : {},
source: executeData.source,
};
}
private buildRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData {
if (this.requestParams.dataOfNodes === 'all') {
return runExecutionData;
}
return {
startData: runExecutionData.startData,
resultData: {
error: runExecutionData.resultData.error,
lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted,
metadata: runExecutionData.resultData.metadata,
runData: this.buildRunData(runExecutionData.resultData.runData),
pinData: this.buildPinData(runExecutionData.resultData.pinData),
},
executionData: runExecutionData.executionData
? {
// TODO: Figure out what these two are and can they be filtered
contextData: runExecutionData.executionData?.contextData,
nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack,
metadata: runExecutionData.executionData.metadata,
waitingExecution: runExecutionData.executionData.waitingExecution,
waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource,
}
: undefined,
};
}
private buildRunData(runData: IRunData): IRunData {
return this.filterObjectByNodeNames(runData);
}
private buildPinData(pinData: IPinData | undefined): IPinData | undefined {
return pinData ? this.filterObjectByNodeNames(pinData) : undefined;
}
private buildEnvProviderState(envProviderState: EnvProviderState): EnvProviderState {
if (this.requestParams.env) {
// In case `isEnvAccessBlocked` = true, the provider state has already sanitized
// the environment variables and we can return it as is.
return envProviderState;
}
return {
env: {},
isEnvAccessBlocked: envProviderState.isEnvAccessBlocked,
isProcessAvailable: envProviderState.isProcessAvailable,
};
}
private buildInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.requestParams.input) {
return inputData;
}
return {};
}
private buildConnectionInputData(
connectionInputData: INodeExecutionData[],
): INodeExecutionData[] {
if (this.requestParams.input) {
return connectionInputData;
}
return [];
}
private buildWorkflow(workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> { private buildWorkflow(workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> {
return { return {
id: workflow.id, id: workflow.id,
@ -172,37 +59,4 @@ export class DataRequestResponseBuilder {
staticData: workflow.staticData, staticData: workflow.staticData,
}; };
} }
/**
* Assuming the given `obj` is an object where the keys are node names,
* filters the object to only include the node names that are requested.
*/
private filterObjectByNodeNames<T extends Record<string, unknown>>(obj: T): T {
if (this.requestParams.dataOfNodes === 'all') {
return obj;
}
const filteredObj: T = {} as T;
for (const nodeName in obj) {
if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) {
continue;
}
if (this.requestedNodeNames.has(nodeName)) {
filteredObj[nodeName] = obj[nodeName];
}
}
return filteredObj;
}
private determinePrevNodeName(): string {
const sourceData = this.taskData.executeData?.source?.main?.[0];
if (!sourceData) {
return '';
}
return sourceData.previousNode;
}
} }

View file

@ -0,0 +1,131 @@
import type { DataRequestResponse, BrokerMessage } from '@n8n/task-runner';
import type {
EnvProviderState,
IPinData,
IRunData,
IRunExecutionData,
ITaskDataConnections,
} from 'n8n-workflow';
/**
* Strips data from data request response based on the specified parameters
*/
export class DataRequestResponseStripper {
private requestedNodeNames = new Set<string>();
constructor(
private readonly dataResponse: DataRequestResponse,
private readonly stripParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'],
) {
this.requestedNodeNames = new Set(stripParams.dataOfNodes);
if (this.stripParams.prevNode && this.stripParams.dataOfNodes !== 'all') {
this.requestedNodeNames.add(this.determinePrevNodeName());
}
}
/**
* Builds a response to the data request
*/
strip(): DataRequestResponse {
const { dataResponse: dr } = this;
return {
...dr,
inputData: this.stripInputData(dr.inputData),
envProviderState: this.stripEnvProviderState(dr.envProviderState),
runExecutionData: this.stripRunExecutionData(dr.runExecutionData),
};
}
private stripRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData {
if (this.stripParams.dataOfNodes === 'all') {
return runExecutionData;
}
return {
startData: runExecutionData.startData,
resultData: {
error: runExecutionData.resultData.error,
lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted,
metadata: runExecutionData.resultData.metadata,
runData: this.stripRunData(runExecutionData.resultData.runData),
pinData: this.stripPinData(runExecutionData.resultData.pinData),
},
executionData: runExecutionData.executionData
? {
// TODO: Figure out what these two are and can they be stripped
contextData: runExecutionData.executionData?.contextData,
nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack,
metadata: runExecutionData.executionData.metadata,
waitingExecution: runExecutionData.executionData.waitingExecution,
waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource,
}
: undefined,
};
}
private stripRunData(runData: IRunData): IRunData {
return this.filterObjectByNodeNames(runData);
}
private stripPinData(pinData: IPinData | undefined): IPinData | undefined {
return pinData ? this.filterObjectByNodeNames(pinData) : undefined;
}
private stripEnvProviderState(envProviderState: EnvProviderState): EnvProviderState {
if (this.stripParams.env) {
// In case `isEnvAccessBlocked` = true, the provider state has already sanitized
// the environment variables and we can return it as is.
return envProviderState;
}
return {
env: {},
isEnvAccessBlocked: envProviderState.isEnvAccessBlocked,
isProcessAvailable: envProviderState.isProcessAvailable,
};
}
private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.stripParams.input) {
return inputData;
}
return {};
}
/**
* Assuming the given `obj` is an object where the keys are node names,
* filters the object to only include the node names that are requested.
*/
private filterObjectByNodeNames<T extends Record<string, unknown>>(obj: T): T {
if (this.stripParams.dataOfNodes === 'all') {
return obj;
}
const filteredObj: T = {} as T;
for (const nodeName in obj) {
if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) {
continue;
}
if (this.requestedNodeNames.has(nodeName)) {
filteredObj[nodeName] = obj[nodeName];
}
}
return filteredObj;
}
private determinePrevNodeName(): string {
const sourceData = this.dataResponse.connectionInputSource?.main?.[0];
if (!sourceData) {
return '';
}
return sourceData.previousNode;
}
}

View file

@ -1,5 +1,6 @@
import { TaskRunnersConfig } from '@n8n/config';
import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner';
import { RPC_ALLOW_LIST } from '@n8n/task-runner'; import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner';
import type { import type {
EnvProviderState, EnvProviderState,
IExecuteFunctions, IExecuteFunctions,
@ -17,11 +18,13 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { Service } from 'typedi'; import * as a from 'node:assert/strict';
import Container, { Service } from 'typedi';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import { DataRequestResponseBuilder } from './data-request-response-builder'; import { DataRequestResponseBuilder } from './data-request-response-builder';
import { DataRequestResponseStripper } from './data-request-response-stripper';
export type RequestAccept = (jobId: string) => void; export type RequestAccept = (jobId: string) => void;
export type RequestReject = (reason: string) => void; export type RequestReject = (reason: string) => void;
@ -56,6 +59,10 @@ export abstract class TaskManager {
tasks: Map<string, Task> = new Map(); tasks: Map<string, Task> = new Map();
private readonly runnerConfig = Container.get(TaskRunnersConfig);
private readonly dataResponseBuilder = new DataRequestResponseBuilder();
constructor(private readonly nodeTypes: NodeTypes) {} constructor(private readonly nodeTypes: NodeTypes) {}
async startTask<TData, TError>( async startTask<TData, TError>(
@ -237,14 +244,30 @@ export abstract class TaskManager {
return; return;
} }
const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams); const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data);
const requestedData = dataRequestResponseBuilder.build();
if (this.runnerConfig.assertDeduplicationOutput) {
const reconstruct = new DataRequestResponseReconstruct();
a.deepStrictEqual(
reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData),
job.data.connectionInputData,
);
a.deepStrictEqual(
reconstruct.reconstructExecuteData(dataRequestResponse),
job.data.executeData,
);
}
const strippedData = new DataRequestResponseStripper(
dataRequestResponse,
requestParams,
).strip();
this.sendMessage({ this.sendMessage({
type: 'requester:taskdataresponse', type: 'requester:taskdataresponse',
taskId, taskId,
requestId, requestId,
data: requestedData, data: strippedData,
}); });
} }

View file

@ -43,7 +43,7 @@ export class TaskRunnerModule {
} }
async start() { async start() {
a.ok(!this.runnerConfig.disabled, 'Task runner is disabled'); a.ok(this.runnerConfig.enabled, 'Task runner is disabled');
await this.loadTaskManager(); await this.loadTaskManager();
await this.loadTaskRunnerServer(); await this.loadTaskRunnerServer();

View file

@ -26,7 +26,7 @@ import { mockInstance } from '../../shared/mocking';
config.set('executions.mode', 'queue'); config.set('executions.mode', 'queue');
config.set('binaryDataManager.availableModes', 'filesystem'); config.set('binaryDataManager.availableModes', 'filesystem');
Container.get(TaskRunnersConfig).disabled = false; Container.get(TaskRunnersConfig).enabled = true;
mockInstance(LoadNodesAndCredentials); mockInstance(LoadNodesAndCredentials);
const binaryDataService = mockInstance(BinaryDataService); const binaryDataService = mockInstance(BinaryDataService);
const externalHooks = mockInstance(ExternalHooks); const externalHooks = mockInstance(ExternalHooks);

View file

@ -18,14 +18,14 @@ describe('TaskRunnerModule in external mode', () => {
describe('start', () => { describe('start', () => {
it('should throw if the task runner is disabled', async () => { it('should throw if the task runner is disabled', async () => {
runnerConfig.disabled = true; runnerConfig.enabled = false;
// Act // Act
await expect(module.start()).rejects.toThrow('Task runner is disabled'); await expect(module.start()).rejects.toThrow('Task runner is disabled');
}); });
it('should start the task runner', async () => { it('should start the task runner', async () => {
runnerConfig.disabled = false; runnerConfig.enabled = true;
// Act // Act
await module.start(); await module.start();

View file

@ -18,14 +18,14 @@ describe('TaskRunnerModule in internal_childprocess mode', () => {
describe('start', () => { describe('start', () => {
it('should throw if the task runner is disabled', async () => { it('should throw if the task runner is disabled', async () => {
runnerConfig.disabled = true; runnerConfig.enabled = false;
// Act // Act
await expect(module.start()).rejects.toThrow('Task runner is disabled'); await expect(module.start()).rejects.toThrow('Task runner is disabled');
}); });
it('should start the task runner', async () => { it('should start the task runner', async () => {
runnerConfig.disabled = false; runnerConfig.enabled = true;
// Act // Act
await module.start(); await module.start();

View file

@ -10,7 +10,7 @@ import { retryUntil } from '@test-integration/retry-until';
describe('TaskRunnerProcess', () => { describe('TaskRunnerProcess', () => {
const authToken = 'token'; const authToken = 'token';
const runnerConfig = Container.get(TaskRunnersConfig); const runnerConfig = Container.get(TaskRunnersConfig);
runnerConfig.disabled = false; runnerConfig.enabled = true;
runnerConfig.mode = 'internal_childprocess'; runnerConfig.mode = 'internal_childprocess';
runnerConfig.authToken = authToken; runnerConfig.authToken = authToken;
runnerConfig.port = 0; // Use any port runnerConfig.port = 0; // Use any port

View file

@ -108,6 +108,7 @@ import type {
AiEvent, AiEvent,
ISupplyDataFunctions, ISupplyDataFunctions,
WebhookType, WebhookType,
SchedulingFunctions,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
NodeConnectionType, NodeConnectionType,
@ -172,6 +173,7 @@ import {
TriggerContext, TriggerContext,
WebhookContext, WebhookContext,
} from './node-execution-context'; } from './node-execution-context';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { getSecretsProxy } from './Secrets'; import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager'; import { SSHClientsManager } from './SSHClientsManager';
@ -3023,7 +3025,7 @@ const executionCancellationFunctions = (
}, },
}); });
const getRequestHelperFunctions = ( export const getRequestHelperFunctions = (
workflow: Workflow, workflow: Workflow,
node: INode, node: INode,
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
@ -3343,11 +3345,19 @@ const getRequestHelperFunctions = (
}; };
}; };
const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({ export const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
getSSHClient: async (credentials) => getSSHClient: async (credentials) =>
await Container.get(SSHClientsManager).getClient(credentials), await Container.get(SSHClientsManager).getClient(credentials),
}); });
export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
};
};
const getAllowedPaths = () => { const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO]; const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) { if (!restrictFileAccessTo) {
@ -3414,7 +3424,7 @@ export function isFilePathBlocked(filePath: string): boolean {
return false; return false;
} }
const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({ export const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({
async createReadStream(filePath) { async createReadStream(filePath) {
try { try {
await fsAccess(filePath); await fsAccess(filePath);
@ -3450,7 +3460,7 @@ const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions =>
}, },
}); });
const getNodeHelperFunctions = ( export const getNodeHelperFunctions = (
{ executionId }: IWorkflowExecuteAdditionalData, { executionId }: IWorkflowExecuteAdditionalData,
workflowId: string, workflowId: string,
): NodeHelperFunctions => ({ ): NodeHelperFunctions => ({
@ -3458,7 +3468,7 @@ const getNodeHelperFunctions = (
await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType), await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType),
}); });
const getBinaryHelperFunctions = ( export const getBinaryHelperFunctions = (
{ executionId }: IWorkflowExecuteAdditionalData, { executionId }: IWorkflowExecuteAdditionalData,
workflowId: string, workflowId: string,
): BinaryHelperFunctions => ({ ): BinaryHelperFunctions => ({
@ -3476,7 +3486,7 @@ const getBinaryHelperFunctions = (
}, },
}); });
const getCheckProcessedHelperFunctions = ( export const getCheckProcessedHelperFunctions = (
workflow: Workflow, workflow: Workflow,
node: INode, node: INode,
): DeduplicationHelperFunctions => ({ ): DeduplicationHelperFunctions => ({

View file

@ -27,13 +27,13 @@ import {
continueOnFail, continueOnFail,
getAdditionalKeys, getAdditionalKeys,
getBinaryDataBuffer, getBinaryDataBuffer,
getBinaryHelperFunctions,
getCredentials, getCredentials,
getNodeParameter, getNodeParameter,
getRequestHelperFunctions,
returnJsonArray, returnJsonArray,
} from '@/NodeExecuteFunctions'; } from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions { export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions {
@ -57,8 +57,14 @@ export class ExecuteSingleContext extends NodeExecutionContext implements IExecu
this.helpers = { this.helpers = {
createDeferredPromise, createDeferredPromise,
returnJsonArray, returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported, ...getRequestHelperFunctions(
...new RequestHelpers(this, workflow, node, additionalData).exported, workflow,
node,
additionalData,
runExecutionData,
connectionInputData,
),
...getBinaryHelperFunctions(additionalData, workflow.id),
assertBinaryData: (propertyName, inputIndex = 0) => assertBinaryData: (propertyName, inputIndex = 0) =>
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),

View file

@ -1,136 +0,0 @@
import FileType from 'file-type';
import { IncomingMessage, type ClientRequest } from 'http';
import { mock } from 'jest-mock-extended';
import type { Workflow, IWorkflowExecuteAdditionalData, IBinaryData } from 'n8n-workflow';
import type { Socket } from 'net';
import { Container } from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import { BinaryHelpers } from '../binary-helpers';
jest.mock('file-type');
describe('BinaryHelpers', () => {
let binaryDataService = mock<BinaryDataService>();
Container.set(BinaryDataService, binaryDataService);
const workflow = mock<Workflow>({ id: '123' });
const additionalData = mock<IWorkflowExecuteAdditionalData>({ executionId: '456' });
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
beforeEach(() => {
jest.clearAllMocks();
binaryDataService.store.mockImplementation(
async (_workflowId, _executionId, _buffer, value) => value,
);
});
describe('getBinaryPath', () => {
it('should call getPath method of BinaryDataService', () => {
binaryHelpers.getBinaryPath('mock-binary-data-id');
expect(binaryDataService.getPath).toHaveBeenCalledWith('mock-binary-data-id');
});
});
describe('getBinaryMetadata', () => {
it('should call getMetadata method of BinaryDataService', async () => {
await binaryHelpers.getBinaryMetadata('mock-binary-data-id');
expect(binaryDataService.getMetadata).toHaveBeenCalledWith('mock-binary-data-id');
});
});
describe('getBinaryStream', () => {
it('should call getStream method of BinaryDataService', async () => {
await binaryHelpers.getBinaryStream('mock-binary-data-id');
expect(binaryDataService.getAsStream).toHaveBeenCalledWith('mock-binary-data-id', undefined);
});
});
describe('prepareBinaryData', () => {
it('should guess the mime type and file extension if not provided', async () => {
const buffer = Buffer.from('test');
const fileTypeData = { mime: 'application/pdf', ext: 'pdf' };
(FileType.fromBuffer as jest.Mock).mockResolvedValue(fileTypeData);
const binaryData = await binaryHelpers.prepareBinaryData(buffer);
expect(binaryData.mimeType).toEqual('application/pdf');
expect(binaryData.fileExtension).toEqual('pdf');
expect(binaryData.fileType).toEqual('pdf');
expect(binaryData.fileName).toBeUndefined();
expect(binaryData.directory).toBeUndefined();
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId!,
buffer,
binaryData,
);
});
it('should use the provided mime type and file extension if provided', async () => {
const buffer = Buffer.from('test');
const mimeType = 'application/octet-stream';
const binaryData = await binaryHelpers.prepareBinaryData(buffer, undefined, mimeType);
expect(binaryData.mimeType).toEqual(mimeType);
expect(binaryData.fileExtension).toEqual('bin');
expect(binaryData.fileType).toBeUndefined();
expect(binaryData.fileName).toBeUndefined();
expect(binaryData.directory).toBeUndefined();
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId!,
buffer,
binaryData,
);
});
const mockSocket = mock<Socket>({ readableHighWaterMark: 0 });
it('should use the contentDisposition.filename, responseUrl, and contentType properties to set the fileName, directory, and mimeType properties of the binaryData object', async () => {
const incomingMessage = new IncomingMessage(mockSocket);
incomingMessage.contentDisposition = { filename: 'test.txt', type: 'attachment' };
incomingMessage.contentType = 'text/plain';
incomingMessage.responseUrl = 'https://example.com/test.txt';
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
expect(binaryData.fileName).toEqual('test.txt');
expect(binaryData.fileType).toEqual('text');
expect(binaryData.directory).toBeUndefined();
expect(binaryData.mimeType).toEqual('text/plain');
expect(binaryData.fileExtension).toEqual('txt');
});
it('should use the req.path property to set the fileName property of the binaryData object if contentDisposition.filename and responseUrl are not provided', async () => {
const incomingMessage = new IncomingMessage(mockSocket);
incomingMessage.contentType = 'text/plain';
incomingMessage.req = mock<ClientRequest>({ path: '/test.txt' });
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
expect(binaryData.fileName).toEqual('test.txt');
expect(binaryData.directory).toBeUndefined();
expect(binaryData.mimeType).toEqual('text/plain');
expect(binaryData.fileExtension).toEqual('txt');
});
});
describe('setBinaryDataBuffer', () => {
it('should call store method of BinaryDataService', async () => {
const binaryData = mock<IBinaryData>();
const bufferOrStream = mock<Buffer>();
await binaryHelpers.setBinaryDataBuffer(binaryData, bufferOrStream);
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId,
bufferOrStream,
binaryData,
);
});
});
});

View file

@ -1,33 +0,0 @@
import { mock } from 'jest-mock-extended';
import type { Workflow } from 'n8n-workflow';
import { Container } from 'typedi';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
import { SchedulingHelpers } from '../scheduling-helpers';
describe('SchedulingHelpers', () => {
const scheduledTaskManager = mock<ScheduledTaskManager>();
Container.set(ScheduledTaskManager, scheduledTaskManager);
const workflow = mock<Workflow>();
const schedulingHelpers = new SchedulingHelpers(workflow);
beforeEach(() => {
jest.clearAllMocks();
});
describe('registerCron', () => {
it('should call registerCron method of ScheduledTaskManager', () => {
const cronExpression = '* * * * * *';
const onTick = jest.fn();
schedulingHelpers.registerCron(cronExpression, onTick);
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(
workflow,
cronExpression,
onTick,
);
});
});
});

View file

@ -1,32 +0,0 @@
import { mock } from 'jest-mock-extended';
import type { SSHCredentials } from 'n8n-workflow';
import type { Client } from 'ssh2';
import { Container } from 'typedi';
import { SSHClientsManager } from '@/SSHClientsManager';
import { SSHTunnelHelpers } from '../ssh-tunnel-helpers';
describe('SSHTunnelHelpers', () => {
const sshClientsManager = mock<SSHClientsManager>();
Container.set(SSHClientsManager, sshClientsManager);
const sshTunnelHelpers = new SSHTunnelHelpers();
beforeEach(() => {
jest.clearAllMocks();
});
describe('getSSHClient', () => {
const credentials = mock<SSHCredentials>();
it('should call SSHClientsManager.getClient with the given credentials', async () => {
const mockClient = mock<Client>();
sshClientsManager.getClient.mockResolvedValue(mockClient);
const client = await sshTunnelHelpers.getSSHClient(credentials);
expect(sshClientsManager.getClient).toHaveBeenCalledWith(credentials);
expect(client).toBe(mockClient);
});
});
});

View file

@ -1,148 +0,0 @@
import FileType from 'file-type';
import { IncomingMessage } from 'http';
import MimeTypes from 'mime-types';
import { ApplicationError, fileTypeFromMimeType } from 'n8n-workflow';
import type {
BinaryHelperFunctions,
IWorkflowExecuteAdditionalData,
Workflow,
IBinaryData,
} from 'n8n-workflow';
import path from 'path';
import type { Readable } from 'stream';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import { binaryToBuffer } from '@/BinaryData/utils';
// eslint-disable-next-line import/no-cycle
import { binaryToString } from '@/NodeExecuteFunctions';
export class BinaryHelpers {
private readonly binaryDataService = Container.get(BinaryDataService);
constructor(
private readonly workflow: Workflow,
private readonly additionalData: IWorkflowExecuteAdditionalData,
) {}
get exported(): BinaryHelperFunctions {
return {
getBinaryPath: this.getBinaryPath.bind(this),
getBinaryMetadata: this.getBinaryMetadata.bind(this),
getBinaryStream: this.getBinaryStream.bind(this),
binaryToBuffer,
binaryToString,
prepareBinaryData: this.prepareBinaryData.bind(this),
setBinaryDataBuffer: this.setBinaryDataBuffer.bind(this),
copyBinaryFile: this.copyBinaryFile.bind(this),
};
}
getBinaryPath(binaryDataId: string) {
return this.binaryDataService.getPath(binaryDataId);
}
async getBinaryMetadata(binaryDataId: string) {
return await this.binaryDataService.getMetadata(binaryDataId);
}
async getBinaryStream(binaryDataId: string, chunkSize?: number) {
return await this.binaryDataService.getAsStream(binaryDataId, chunkSize);
}
// eslint-disable-next-line complexity
async prepareBinaryData(binaryData: Buffer | Readable, filePath?: string, mimeType?: string) {
let fileExtension: string | undefined;
if (binaryData instanceof IncomingMessage) {
if (!filePath) {
try {
const { responseUrl } = binaryData;
filePath =
binaryData.contentDisposition?.filename ??
((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1);
} catch {}
}
if (!mimeType) {
mimeType = binaryData.contentType;
}
}
if (!mimeType) {
// If no mime type is given figure it out
if (filePath) {
// Use file path to guess mime type
const mimeTypeLookup = MimeTypes.lookup(filePath);
if (mimeTypeLookup) {
mimeType = mimeTypeLookup;
}
}
if (!mimeType) {
if (Buffer.isBuffer(binaryData)) {
// Use buffer to guess mime type
const fileTypeData = await FileType.fromBuffer(binaryData);
if (fileTypeData) {
mimeType = fileTypeData.mime;
fileExtension = fileTypeData.ext;
}
} else if (binaryData instanceof IncomingMessage) {
mimeType = binaryData.headers['content-type'];
} else {
// TODO: detect filetype from other kind of streams
}
}
}
if (!fileExtension && mimeType) {
fileExtension = MimeTypes.extension(mimeType) || undefined;
}
if (!mimeType) {
// Fall back to text
mimeType = 'text/plain';
}
const returnData: IBinaryData = {
mimeType,
fileType: fileTypeFromMimeType(mimeType),
fileExtension,
data: '',
};
if (filePath) {
if (filePath.includes('?')) {
// Remove maybe present query parameters
filePath = filePath.split('?').shift();
}
const filePathParts = path.parse(filePath as string);
if (filePathParts.dir !== '') {
returnData.directory = filePathParts.dir;
}
returnData.fileName = filePathParts.base;
// Remove the dot
const extractedFileExtension = filePathParts.ext.slice(1);
if (extractedFileExtension) {
returnData.fileExtension = extractedFileExtension;
}
}
return await this.setBinaryDataBuffer(returnData, binaryData);
}
async setBinaryDataBuffer(binaryData: IBinaryData, bufferOrStream: Buffer | Readable) {
return await this.binaryDataService.store(
this.workflow.id,
this.additionalData.executionId!,
bufferOrStream,
binaryData,
);
}
async copyBinaryFile(): Promise<never> {
throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.');
}
}

View file

@ -1,381 +0,0 @@
import { createHash } from 'crypto';
import { pick } from 'lodash';
import { jsonParse, NodeOperationError, sleep } from 'n8n-workflow';
import type {
RequestHelperFunctions,
IAdditionalCredentialOptions,
IAllExecuteFunctions,
IExecuteData,
IHttpRequestOptions,
IN8nHttpFullResponse,
IN8nHttpResponse,
INode,
INodeExecutionData,
IOAuth2Options,
IRequestOptions,
IRunExecutionData,
IWorkflowDataProxyAdditionalKeys,
IWorkflowExecuteAdditionalData,
NodeParameterValueType,
PaginationOptions,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { Readable } from 'stream';
// eslint-disable-next-line import/no-cycle
import {
applyPaginationRequestData,
binaryToString,
httpRequest,
httpRequestWithAuthentication,
proxyRequestToAxios,
requestOAuth1,
requestOAuth2,
requestWithAuthentication,
validateUrl,
} from '@/NodeExecuteFunctions';
export class RequestHelpers {
constructor(
private readonly context: IAllExecuteFunctions,
private readonly workflow: Workflow,
private readonly node: INode,
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly runExecutionData: IRunExecutionData | null = null,
private readonly connectionInputData: INodeExecutionData[] = [],
) {}
get exported(): RequestHelperFunctions {
return {
httpRequest,
httpRequestWithAuthentication: this.httpRequestWithAuthentication.bind(this),
requestWithAuthenticationPaginated: this.requestWithAuthenticationPaginated.bind(this),
request: this.request.bind(this),
requestWithAuthentication: this.requestWithAuthentication.bind(this),
requestOAuth1: this.requestOAuth1.bind(this),
requestOAuth2: this.requestOAuth2.bind(this),
};
}
get httpRequest() {
return httpRequest;
}
async httpRequestWithAuthentication(
credentialsType: string,
requestOptions: IHttpRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await httpRequestWithAuthentication.call(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
);
}
// eslint-disable-next-line complexity
async requestWithAuthenticationPaginated(
requestOptions: IRequestOptions,
itemIndex: number,
paginationOptions: PaginationOptions,
credentialsType?: string,
additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<unknown[]> {
const responseData = [];
if (!requestOptions.qs) {
requestOptions.qs = {};
}
requestOptions.resolveWithFullResponse = true;
requestOptions.simple = false;
let tempResponseData: IN8nHttpFullResponse;
let makeAdditionalRequest: boolean;
let paginateRequestData: PaginationOptions['request'];
const runIndex = 0;
const additionalKeys = {
$request: requestOptions,
$response: {} as IN8nHttpFullResponse,
$version: this.node.typeVersion,
$pageCount: 0,
};
const executeData: IExecuteData = {
data: {},
node: this.node,
source: null,
};
const hashData = {
identicalCount: 0,
previousLength: 0,
previousHash: '',
};
do {
paginateRequestData = this.getResolvedValue(
paginationOptions.request as unknown as NodeParameterValueType,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as object as PaginationOptions['request'];
const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData);
if (!validateUrl(tempRequestOptions.uri as string)) {
throw new NodeOperationError(
this.node,
`'${paginateRequestData.url}' is not a valid URL.`,
{
itemIndex,
runIndex,
type: 'invalid_url',
},
);
}
if (credentialsType) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
tempResponseData = await this.requestWithAuthentication(
credentialsType,
tempRequestOptions,
additionalCredentialOptions,
);
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
tempResponseData = await this.request(tempRequestOptions);
}
const newResponse: IN8nHttpFullResponse = Object.assign(
{
body: {},
headers: {},
statusCode: 0,
},
pick(tempResponseData, ['body', 'headers', 'statusCode']),
);
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) {
// Keep the original string version that we can use it to hash if needed
contentBody = await binaryToString(newResponse.body as Buffer | Readable);
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
if (responseContentType.includes('application/json')) {
newResponse.body = jsonParse(contentBody, { fallbackValue: {} });
} else {
newResponse.body = contentBody;
}
tempResponseData.__bodyResolved = true;
tempResponseData.body = newResponse.body;
} else {
contentBody = newResponse.body;
}
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
// If the data is not binary (and so not a stream), or an etag is present,
// we check via etag or hash if identical data is received
let contentLength = 0;
if ('content-length' in tempResponseData.headers) {
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
}
if (hashData.previousLength === contentLength) {
let hash: string;
if (tempResponseData.headers.etag) {
// If an etag is provided, we use it as "hash"
hash = tempResponseData.headers.etag as string;
} else {
// If there is no etag, we calculate a hash from the data in the body
if (typeof contentBody !== 'string') {
contentBody = JSON.stringify(contentBody);
}
hash = createHash('md5').update(contentBody).digest('base64');
}
if (hashData.previousHash === hash) {
hashData.identicalCount += 1;
if (hashData.identicalCount > 2) {
// Length was identical 5x and hash 3x
throw new NodeOperationError(
this.node,
'The returned response was identical 5x, so requests got stopped',
{
itemIndex,
description:
'Check if "Pagination Completed When" has been configured correctly.',
},
);
}
} else {
hashData.identicalCount = 0;
}
hashData.previousHash = hash;
} else {
hashData.identicalCount = 0;
}
hashData.previousLength = contentLength;
}
responseData.push(tempResponseData);
additionalKeys.$response = newResponse;
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
const maxRequests = this.getResolvedValue(
paginationOptions.maxRequests,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
if (maxRequests && additionalKeys.$pageCount >= maxRequests) {
break;
}
makeAdditionalRequest = this.getResolvedValue(
paginationOptions.continue,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as boolean;
if (makeAdditionalRequest) {
if (paginationOptions.requestInterval) {
const requestInterval = this.getResolvedValue(
paginationOptions.requestInterval,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
await sleep(requestInterval);
}
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
// We have it configured to let all requests pass no matter the response code
// via "requestOptions.simple = false" to not by default fail if it is for example
// configured to stop on 404 response codes. For that reason we have to throw here
// now an error manually if the response code is not a success one.
let data = tempResponseData.body;
if (data instanceof Readable && paginationOptions.binaryResult !== true) {
data = await binaryToString(data as Buffer | Readable);
} else if (typeof data === 'object') {
data = JSON.stringify(data);
}
throw Object.assign(new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`), {
statusCode: tempResponseData.statusCode,
error: data,
isAxiosError: true,
response: {
headers: tempResponseData.headers,
status: tempResponseData.statusCode,
statusText: tempResponseData.statusMessage,
},
});
}
}
} while (makeAdditionalRequest);
return responseData;
}
async request(uriOrObject: string | IRequestOptions, options?: IRequestOptions) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await proxyRequestToAxios(
this.workflow,
this.additionalData,
this.node,
uriOrObject,
options,
);
}
async requestWithAuthentication(
credentialsType: string,
requestOptions: IRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
itemIndex?: number,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestWithAuthentication.call(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
itemIndex,
);
}
async requestOAuth1(credentialsType: string, requestOptions: IRequestOptions) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestOAuth1.call(this.context, credentialsType, requestOptions);
}
async requestOAuth2(
credentialsType: string,
requestOptions: IRequestOptions,
oAuth2Options?: IOAuth2Options,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestOAuth2.call(
this.context,
credentialsType,
requestOptions,
this.node,
this.additionalData,
oAuth2Options,
);
}
private getResolvedValue(
parameterValue: NodeParameterValueType,
itemIndex: number,
runIndex: number,
executeData: IExecuteData,
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
returnObjectAsString = false,
): NodeParameterValueType {
const mode: WorkflowExecuteMode = 'internal';
if (
typeof parameterValue === 'object' ||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
) {
return this.workflow.expression.getParameterValue(
parameterValue,
this.runExecutionData,
runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
mode,
additionalKeys ?? {},
executeData,
returnObjectAsString,
);
}
return parameterValue;
}
}

View file

@ -1,20 +0,0 @@
import type { CronExpression, Workflow, SchedulingFunctions } from 'n8n-workflow';
import { Container } from 'typedi';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
export class SchedulingHelpers {
private readonly scheduledTaskManager = Container.get(ScheduledTaskManager);
constructor(private readonly workflow: Workflow) {}
get exported(): SchedulingFunctions {
return {
registerCron: this.registerCron.bind(this),
};
}
registerCron(cronExpression: CronExpression, onTick: () => void) {
this.scheduledTaskManager.registerCron(this.workflow, cronExpression, onTick);
}
}

View file

@ -1,18 +0,0 @@
import type { SSHCredentials, SSHTunnelFunctions } from 'n8n-workflow';
import { Container } from 'typedi';
import { SSHClientsManager } from '@/SSHClientsManager';
export class SSHTunnelHelpers {
private readonly sshClientsManager = Container.get(SSHClientsManager);
get exported(): SSHTunnelFunctions {
return {
getSSHClient: this.getSSHClient.bind(this),
};
}
async getSSHClient(credentials: SSHCredentials) {
return await this.sshClientsManager.getClient(credentials);
}
}

View file

@ -21,10 +21,10 @@ import {
getCredentials, getCredentials,
getNodeParameter, getNodeParameter,
getNodeWebhookUrl, getNodeWebhookUrl,
getRequestHelperFunctions,
getWebhookDescription, getWebhookDescription,
} from '@/NodeExecuteFunctions'; } from '@/NodeExecuteFunctions';
import { RequestHelpers } from './helpers/request-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
export class HookContext extends NodeExecutionContext implements IHookFunctions { export class HookContext extends NodeExecutionContext implements IHookFunctions {
@ -40,7 +40,7 @@ export class HookContext extends NodeExecutionContext implements IHookFunctions
) { ) {
super(workflow, node, additionalData, mode); super(workflow, node, additionalData, mode);
this.helpers = new RequestHelpers(this, workflow, node, additionalData); this.helpers = getRequestHelperFunctions(workflow, node, additionalData);
} }
getActivationMode() { getActivationMode() {

View file

@ -13,10 +13,14 @@ import type {
import { extractValue } from '@/ExtractValue'; import { extractValue } from '@/ExtractValue';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { getAdditionalKeys, getCredentials, getNodeParameter } from '@/NodeExecuteFunctions'; import {
getAdditionalKeys,
getCredentials,
getNodeParameter,
getRequestHelperFunctions,
getSSHTunnelFunctions,
} from '@/NodeExecuteFunctions';
import { RequestHelpers } from './helpers/request-helpers';
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
export class LoadOptionsContext extends NodeExecutionContext implements ILoadOptionsFunctions { export class LoadOptionsContext extends NodeExecutionContext implements ILoadOptionsFunctions {
@ -31,8 +35,8 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt
super(workflow, node, additionalData, 'internal'); super(workflow, node, additionalData, 'internal');
this.helpers = { this.helpers = {
...new RequestHelpers(this, workflow, node, additionalData).exported, ...getSSHTunnelFunctions(),
...new SSHTunnelHelpers().exported, ...getRequestHelperFunctions(workflow, node, additionalData),
}; };
} }

View file

@ -16,14 +16,14 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { import {
getAdditionalKeys, getAdditionalKeys,
getBinaryHelperFunctions,
getCredentials, getCredentials,
getNodeParameter, getNodeParameter,
getRequestHelperFunctions,
getSchedulingFunctions,
returnJsonArray, returnJsonArray,
} from '@/NodeExecuteFunctions'; } from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { SchedulingHelpers } from './helpers/scheduling-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
const throwOnEmit = () => { const throwOnEmit = () => {
@ -51,9 +51,9 @@ export class PollContext extends NodeExecutionContext implements IPollFunctions
this.helpers = { this.helpers = {
createDeferredPromise, createDeferredPromise,
returnJsonArray, returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported, ...getRequestHelperFunctions(workflow, node, additionalData),
...new RequestHelpers(this, workflow, node, additionalData).exported, ...getBinaryHelperFunctions(additionalData, workflow.id),
...new SchedulingHelpers(workflow).exported, ...getSchedulingFunctions(workflow),
}; };
} }

View file

@ -16,15 +16,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { import {
getAdditionalKeys, getAdditionalKeys,
getBinaryHelperFunctions,
getCredentials, getCredentials,
getNodeParameter, getNodeParameter,
getRequestHelperFunctions,
getSchedulingFunctions,
getSSHTunnelFunctions,
returnJsonArray, returnJsonArray,
} from '@/NodeExecuteFunctions'; } from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { SchedulingHelpers } from './helpers/scheduling-helpers';
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
const throwOnEmit = () => { const throwOnEmit = () => {
@ -52,10 +52,10 @@ export class TriggerContext extends NodeExecutionContext implements ITriggerFunc
this.helpers = { this.helpers = {
createDeferredPromise, createDeferredPromise,
returnJsonArray, returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported, ...getSSHTunnelFunctions(),
...new RequestHelpers(this, workflow, node, additionalData).exported, ...getRequestHelperFunctions(workflow, node, additionalData),
...new SchedulingHelpers(workflow).exported, ...getBinaryHelperFunctions(additionalData, workflow.id),
...new SSHTunnelHelpers().exported, ...getSchedulingFunctions(workflow),
}; };
} }

View file

@ -24,15 +24,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import { import {
copyBinaryFile, copyBinaryFile,
getAdditionalKeys, getAdditionalKeys,
getBinaryHelperFunctions,
getCredentials, getCredentials,
getInputConnectionData, getInputConnectionData,
getNodeParameter, getNodeParameter,
getNodeWebhookUrl, getNodeWebhookUrl,
getRequestHelperFunctions,
returnJsonArray, returnJsonArray,
} from '@/NodeExecuteFunctions'; } from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { NodeExecutionContext } from './node-execution-context'; import { NodeExecutionContext } from './node-execution-context';
export class WebhookContext extends NodeExecutionContext implements IWebhookFunctions { export class WebhookContext extends NodeExecutionContext implements IWebhookFunctions {
@ -54,8 +54,8 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
this.helpers = { this.helpers = {
createDeferredPromise, createDeferredPromise,
returnJsonArray, returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported, ...getRequestHelperFunctions(workflow, node, additionalData),
...new RequestHelpers(this, workflow, node, additionalData).exported, ...getBinaryHelperFunctions(additionalData, workflow.id),
}; };
this.nodeHelpers = { this.nodeHelpers = {

View file

@ -1,26 +1,45 @@
import { createComponentRenderer } from '@/__tests__/render'; import { createComponentRenderer } from '@/__tests__/render';
import { SETTINGS_STORE_DEFAULT_STATE } from '@/__tests__/utils';
import NodeErrorView from '@/components/Error/NodeErrorView.vue'; import NodeErrorView from '@/components/Error/NodeErrorView.vue';
import { STORES } from '@/constants';
import { createTestingPinia } from '@pinia/testing'; import { createTestingPinia } from '@pinia/testing';
import { type INode } from 'n8n-workflow'; import type { NodeError } from 'n8n-workflow';
import { useAssistantStore } from '@/stores/assistant.store'; import { useAssistantStore } from '@/stores/assistant.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store'; import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { mockedStore } from '@/__tests__/utils';
import userEvent from '@testing-library/user-event';
import { useNDVStore } from '@/stores/ndv.store';
const DEFAULT_SETUP = { const renderComponent = createComponentRenderer(NodeErrorView);
pinia: createTestingPinia({
initialState: {
[STORES.SETTINGS]: SETTINGS_STORE_DEFAULT_STATE,
},
}),
};
const renderComponent = createComponentRenderer(NodeErrorView, DEFAULT_SETUP); let mockAiAssistantStore: ReturnType<typeof mockedStore<typeof useAssistantStore>>;
let mockNodeTypeStore: ReturnType<typeof mockedStore<typeof useNodeTypesStore>>;
let mockNdvStore: ReturnType<typeof mockedStore<typeof useNDVStore>>;
describe('NodeErrorView.vue', () => { describe('NodeErrorView.vue', () => {
let mockNode: INode; let error: NodeError;
afterEach(() => {
mockNode = { beforeEach(() => {
createTestingPinia();
mockAiAssistantStore = mockedStore(useAssistantStore);
mockNodeTypeStore = mockedStore(useNodeTypesStore);
mockNdvStore = mockedStore(useNDVStore);
//@ts-expect-error
error = {
name: 'NodeOperationError',
message: 'Test error message',
description: 'Test error description',
context: {
descriptionKey: 'noInputConnection',
nodeCause: 'Test node cause',
runIndex: '1',
itemIndex: '2',
parameter: 'testParameter',
data: { key: 'value' },
causeDetailed: 'Detailed cause',
},
node: {
parameters: { parameters: {
mode: 'runOnceForAllItems', mode: 'runOnceForAllItems',
language: 'javaScript', language: 'javaScript',
@ -28,11 +47,15 @@ describe('NodeErrorView.vue', () => {
notice: '', notice: '',
}, },
id: 'd1ce5dc9-f9ae-4ac6-84e5-0696ba175dd9', id: 'd1ce5dc9-f9ae-4ac6-84e5-0696ba175dd9',
name: 'Code', name: 'ErrorCode',
type: 'n8n-nodes-base.code', type: 'n8n-nodes-base.code',
typeVersion: 2, typeVersion: 2,
position: [940, 240], position: [940, 240],
},
stack: 'Test stack trace',
}; };
});
afterEach(() => {
vi.clearAllMocks(); vi.clearAllMocks();
}); });
@ -40,7 +63,7 @@ describe('NodeErrorView.vue', () => {
const { getByTestId } = renderComponent({ const { getByTestId } = renderComponent({
props: { props: {
error: { error: {
node: mockNode, node: error.node,
messages: ['Unexpected identifier [line 1]'], messages: ['Unexpected identifier [line 1]'],
}, },
}, },
@ -55,7 +78,7 @@ describe('NodeErrorView.vue', () => {
const { getByTestId } = renderComponent({ const { getByTestId } = renderComponent({
props: { props: {
error: { error: {
node: mockNode, node: error.node,
message: 'Unexpected identifier [line 1]', message: 'Unexpected identifier [line 1]',
}, },
}, },
@ -67,24 +90,20 @@ describe('NodeErrorView.vue', () => {
}); });
it('should not render AI assistant button when error happens in deprecated function node', async () => { it('should not render AI assistant button when error happens in deprecated function node', async () => {
const aiAssistantStore = useAssistantStore(DEFAULT_SETUP.pinia);
const nodeTypeStore = useNodeTypesStore(DEFAULT_SETUP.pinia);
//@ts-expect-error //@ts-expect-error
nodeTypeStore.getNodeType = vi.fn(() => ({ mockNodeTypeStore.getNodeType = vi.fn(() => ({
type: 'n8n-nodes-base.function', type: 'n8n-nodes-base.function',
typeVersion: 1, typeVersion: 1,
hidden: true, hidden: true,
})); }));
//@ts-expect-error mockAiAssistantStore.canShowAssistantButtonsOnCanvas = true;
aiAssistantStore.canShowAssistantButtonsOnCanvas = true;
const { queryByTestId } = renderComponent({ const { queryByTestId } = renderComponent({
props: { props: {
error: { error: {
node: { node: {
...mockNode, ...error.node,
type: 'n8n-nodes-base.function', type: 'n8n-nodes-base.function',
typeVersion: 1, typeVersion: 1,
}, },
@ -96,4 +115,73 @@ describe('NodeErrorView.vue', () => {
expect(aiAssistantButton).toBeNull(); expect(aiAssistantButton).toBeNull();
}); });
it('renders error message', () => {
const { getByTestId } = renderComponent({
props: { error },
});
expect(getByTestId('node-error-message').textContent).toContain('Test error message');
});
it('renders error description', () => {
const { getByTestId } = renderComponent({
props: { error },
});
expect(getByTestId('node-error-description').innerHTML).toContain(
'This node has no input data. Please make sure this node is connected to another node.',
);
});
it('renders stack trace', () => {
const { getByText } = renderComponent({
props: { error },
});
expect(getByText('Test stack trace')).toBeTruthy();
});
it('renders open node button when the error is in sub node', () => {
const { getByTestId, queryByTestId } = renderComponent({
props: {
error: {
...error,
name: 'NodeOperationError',
functionality: 'configuration-node',
},
},
});
expect(getByTestId('node-error-view-open-node-button')).toHaveTextContent('Open errored node');
expect(queryByTestId('ask-assistant-button')).not.toBeInTheDocument();
});
it('does not renders open node button when the error is in sub node', () => {
mockAiAssistantStore.canShowAssistantButtonsOnCanvas = true;
const { getByTestId, queryByTestId } = renderComponent({
props: {
error,
},
});
expect(queryByTestId('node-error-view-open-node-button')).not.toBeInTheDocument();
expect(getByTestId('ask-assistant-button')).toBeInTheDocument();
});
it('open error node details when open error node is clicked', async () => {
const { getByTestId, emitted } = renderComponent({
props: {
error: {
...error,
name: 'NodeOperationError',
functionality: 'configuration-node',
},
},
});
await userEvent.click(getByTestId('node-error-view-open-node-button'));
expect(emitted().click).toHaveLength(1);
expect(mockNdvStore.activeNodeName).toBe(error.node.name);
});
}); });

View file

@ -117,7 +117,7 @@ const prepareRawMessages = computed(() => {
}); });
const isAskAssistantAvailable = computed(() => { const isAskAssistantAvailable = computed(() => {
if (!node.value) { if (!node.value || isSubNodeError.value) {
return false; return false;
} }
const isCustomNode = node.value.type === undefined || isCommunityPackageName(node.value.type); const isCustomNode = node.value.type === undefined || isCommunityPackageName(node.value.type);
@ -132,6 +132,13 @@ const assistantAlreadyAsked = computed(() => {
}); });
}); });
const isSubNodeError = computed(() => {
return (
props.error.name === 'NodeOperationError' &&
(props.error as NodeOperationError).functionality === 'configuration-node'
);
});
function nodeVersionTag(nodeType: NodeError['node']): string { function nodeVersionTag(nodeType: NodeError['node']): string {
if (!nodeType || ('hidden' in nodeType && nodeType.hidden)) { if (!nodeType || ('hidden' in nodeType && nodeType.hidden)) {
return i18n.baseText('nodeSettings.deprecated'); return i18n.baseText('nodeSettings.deprecated');
@ -153,19 +160,6 @@ function prepareDescription(description: string): string {
} }
function getErrorDescription(): string { function getErrorDescription(): string {
const isSubNodeError =
props.error.name === 'NodeOperationError' &&
(props.error as NodeOperationError).functionality === 'configuration-node';
if (isSubNodeError) {
return prepareDescription(
props.error.description +
i18n.baseText('pushConnection.executionError.openNode', {
interpolate: { node: props.error.node.name },
}),
);
}
if (props.error.context?.descriptionKey) { if (props.error.context?.descriptionKey) {
const interpolate = { const interpolate = {
nodeCause: props.error.context.nodeCause as string, nodeCause: props.error.context.nodeCause as string,
@ -205,13 +199,10 @@ function addItemIndexSuffix(message: string): string {
function getErrorMessage(): string { function getErrorMessage(): string {
let message = ''; let message = '';
const isSubNodeError =
props.error.name === 'NodeOperationError' &&
(props.error as NodeOperationError).functionality === 'configuration-node';
const isNonEmptyString = (value?: unknown): value is string => const isNonEmptyString = (value?: unknown): value is string =>
!!value && typeof value === 'string'; !!value && typeof value === 'string';
if (isSubNodeError) { if (isSubNodeError.value) {
message = i18n.baseText('nodeErrorView.errorSubNode', { message = i18n.baseText('nodeErrorView.errorSubNode', {
interpolate: { node: props.error.node.name }, interpolate: { node: props.error.node.name },
}); });
@ -390,6 +381,10 @@ function nodeIsHidden() {
return nodeType?.hidden ?? false; return nodeType?.hidden ?? false;
} }
const onOpenErrorNodeDetailClick = () => {
ndvStore.activeNodeName = props.error.node.name;
};
async function onAskAssistantClick() { async function onAskAssistantClick() {
const { message, lineNumber, description } = props.error; const { message, lineNumber, description } = props.error;
const sessionInProgress = !assistantStore.isSessionEnded; const sessionInProgress = !assistantStore.isSessionEnded;
@ -428,14 +423,25 @@ async function onAskAssistantClick() {
</div> </div>
</div> </div>
<div <div
v-if="error.description || error.context?.descriptionKey" v-if="(error.description || error.context?.descriptionKey) && !isSubNodeError"
data-test-id="node-error-description" data-test-id="node-error-description"
class="node-error-view__header-description" class="node-error-view__header-description"
v-n8n-html="getErrorDescription()" v-n8n-html="getErrorDescription()"
></div> ></div>
<div v-if="isSubNodeError">
<n8n-button
icon="arrow-right"
type="secondary"
:label="i18n.baseText('pushConnection.executionError.openNode')"
class="node-error-view__button"
data-test-id="node-error-view-open-node-button"
@click="onOpenErrorNodeDetailClick"
/>
</div>
<div <div
v-if="isAskAssistantAvailable" v-if="isAskAssistantAvailable"
class="node-error-view__assistant-button" class="node-error-view__button"
data-test-id="node-error-view-ask-assistant-button" data-test-id="node-error-view-ask-assistant-button"
> >
<InlineAskAssistantButton :asked="assistantAlreadyAsked" @click="onAskAssistantClick" /> <InlineAskAssistantButton :asked="assistantAlreadyAsked" @click="onAskAssistantClick" />
@ -696,9 +702,14 @@ async function onAskAssistantClick() {
} }
} }
&__assistant-button { &__button {
margin-left: var(--spacing-s); margin-left: var(--spacing-s);
margin-bottom: var(--spacing-xs); margin-bottom: var(--spacing-xs);
flex-direction: row-reverse;
span {
margin-right: var(--spacing-5xs);
margin-left: var(--spacing-5xs);
}
} }
&__debugging { &__debugging {
@ -831,7 +842,7 @@ async function onAskAssistantClick() {
} }
} }
.node-error-view__assistant-button { .node-error-view__button {
margin-top: var(--spacing-xs); margin-top: var(--spacing-xs);
} }
</style> </style>

View file

@ -289,7 +289,7 @@ export default defineComponent({
return false; return false;
} }
const canPinNode = usePinnedData(this.node).canPinNode(false); const canPinNode = usePinnedData(this.node).canPinNode(false, this.currentOutputIndex);
return ( return (
canPinNode && canPinNode &&
@ -1214,9 +1214,7 @@ export default defineComponent({
<template> <template>
<div :class="['run-data', $style.container]" @mouseover="activatePane"> <div :class="['run-data', $style.container]" @mouseover="activatePane">
<n8n-callout <n8n-callout
v-if=" v-if="pinnedData.hasData.value && !editMode.enabled && !isProductionExecutionPreview"
canPinData && pinnedData.hasData.value && !editMode.enabled && !isProductionExecutionPreview
"
theme="secondary" theme="secondary"
icon="thumbtack" icon="thumbtack"
:class="$style.pinnedDataCallout" :class="$style.pinnedDataCallout"

View file

@ -1,17 +1,15 @@
<script lang="ts"> <script setup lang="ts">
import { computed, onMounted, ref } from 'vue';
import { get, set, unset } from 'lodash-es'; import { get, set, unset } from 'lodash-es';
import { mapStores } from 'pinia';
import { useLogStreamingStore } from '@/stores/logStreaming.store';
import { useNDVStore } from '@/stores/ndv.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import ParameterInputList from '@/components/ParameterInputList.vue';
import type { IMenuItem, INodeUi, IUpdateInformation, ModalKey } from '@/Interface';
import type { import type {
IDataObject, IDataObject,
NodeParameterValue, NodeParameterValue,
MessageEventBusDestinationOptions, MessageEventBusDestinationOptions,
INodeParameters, INodeParameters,
NodeParameterValueType, NodeParameterValueType,
MessageEventBusDestinationSentryOptions,
MessageEventBusDestinationSyslogOptions,
MessageEventBusDestinationWebhookOptions,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
deepCopy, deepCopy,
@ -22,81 +20,76 @@ import {
defaultMessageEventBusDestinationSyslogOptions, defaultMessageEventBusDestinationSyslogOptions,
defaultMessageEventBusDestinationSentryOptions, defaultMessageEventBusDestinationSentryOptions,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { PropType } from 'vue'; import type { EventBus } from 'n8n-design-system';
import { defineComponent } from 'vue'; import { createEventBus } from 'n8n-design-system/utils';
import { useLogStreamingStore } from '@/stores/logStreaming.store';
import { useNDVStore } from '@/stores/ndv.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import ParameterInputList from '@/components/ParameterInputList.vue';
import type { IMenuItem, IUpdateInformation, ModalKey } from '@/Interface';
import { LOG_STREAM_MODAL_KEY, MODAL_CONFIRM } from '@/constants'; import { LOG_STREAM_MODAL_KEY, MODAL_CONFIRM } from '@/constants';
import Modal from '@/components/Modal.vue'; import Modal from '@/components/Modal.vue';
import { useI18n } from '@/composables/useI18n';
import { useMessage } from '@/composables/useMessage'; import { useMessage } from '@/composables/useMessage';
import { useUIStore } from '@/stores/ui.store'; import { useUIStore } from '@/stores/ui.store';
import { hasPermission } from '@/utils/rbac/permissions'; import { hasPermission } from '@/utils/rbac/permissions';
import { destinationToFakeINodeUi } from '@/components/SettingsLogStreaming/Helpers.ee'; import { destinationToFakeINodeUi } from '@/components/SettingsLogStreaming/Helpers.ee';
import type { BaseTextKey } from '@/plugins/i18n';
import InlineNameEdit from '@/components/InlineNameEdit.vue';
import SaveButton from '@/components/SaveButton.vue';
import EventSelection from '@/components/SettingsLogStreaming/EventSelection.ee.vue';
import { useTelemetry } from '@/composables/useTelemetry';
import { useRootStore } from '@/stores/root.store';
import { import {
webhookModalDescription, webhookModalDescription,
sentryModalDescription, sentryModalDescription,
syslogModalDescription, syslogModalDescription,
} from './descriptions.ee'; } from './descriptions.ee';
import type { BaseTextKey } from '@/plugins/i18n';
import InlineNameEdit from '@/components/InlineNameEdit.vue';
import SaveButton from '@/components/SaveButton.vue';
import EventSelection from '@/components/SettingsLogStreaming/EventSelection.ee.vue';
import type { EventBus } from 'n8n-design-system';
import { createEventBus } from 'n8n-design-system/utils';
import { useTelemetry } from '@/composables/useTelemetry';
import { useRootStore } from '@/stores/root.store';
export default defineComponent({ defineOptions({ name: 'EventDestinationSettingsModal' });
name: 'EventDestinationSettingsModal',
components: { const props = withDefaults(
Modal, defineProps<{
ParameterInputList, modalName: ModalKey;
InlineNameEdit, destination?: MessageEventBusDestinationOptions;
SaveButton, isNew?: boolean;
EventSelection, eventBus?: EventBus;
}>(),
{
destination: () => deepCopy(defaultMessageEventBusDestinationOptions),
isNew: false,
}, },
props: { );
modalName: { const { modalName, destination, isNew, eventBus } = props;
type: String as PropType<ModalKey>,
required: true, const i18n = useI18n();
}, const { confirm } = useMessage();
destination: { const telemetry = useTelemetry();
type: Object, const logStreamingStore = useLogStreamingStore();
default: () => deepCopy(defaultMessageEventBusDestinationOptions), const ndvStore = useNDVStore();
}, const workflowsStore = useWorkflowsStore();
isNew: Boolean, const uiStore = useUIStore();
eventBus: {
type: Object as PropType<EventBus>, const unchanged = ref(!isNew);
}, const activeTab = ref('settings');
}, const hasOnceBeenSaved = ref(!isNew);
setup() { const isSaving = ref(false);
return { const isDeleting = ref(false);
...useMessage(), const loading = ref(false);
}; const typeSelectValue = ref('');
}, const typeSelectPlaceholder = ref('Destination Type');
data() { const nodeParameters = ref(deepCopy(defaultMessageEventBusDestinationOptions) as INodeParameters);
return { const webhookDescription = ref(webhookModalDescription);
unchanged: !this.isNew, const sentryDescription = ref(sentryModalDescription);
activeTab: 'settings', const syslogDescription = ref(syslogModalDescription);
hasOnceBeenSaved: !this.isNew, const modalBus = ref(createEventBus());
isSaving: false, const headerLabel = ref(destination.label!);
isDeleting: false, const testMessageSent = ref(false);
loading: false, const testMessageResult = ref(false);
showRemoveConfirm: false,
typeSelectValue: '', const typeSelectOptions = computed(() => {
typeSelectPlaceholder: 'Destination Type',
nodeParameters: deepCopy(defaultMessageEventBusDestinationOptions) as INodeParameters,
webhookDescription: webhookModalDescription,
sentryDescription: sentryModalDescription,
syslogDescription: syslogModalDescription,
modalBus: createEventBus(),
headerLabel: this.destination.label,
testMessageSent: false,
testMessageResult: false,
LOG_STREAM_MODAL_KEY,
};
},
computed: {
...mapStores(useUIStore, useLogStreamingStore, useNDVStore, useWorkflowsStore),
typeSelectOptions(): Array<{ value: string; label: BaseTextKey }> {
const options: Array<{ value: string; label: BaseTextKey }> = []; const options: Array<{ value: string; label: BaseTextKey }> = [];
for (const t of messageEventBusDestinationTypeNames) { for (const t of messageEventBusDestinationTypeNames) {
if (t === MessageEventBusDestinationTypeNames.abstract) { if (t === MessageEventBusDestinationTypeNames.abstract) {
@ -108,252 +101,256 @@ export default defineComponent({
}); });
} }
return options; return options;
}, });
isTypeAbstract(): boolean {
return this.nodeParameters.__type === MessageEventBusDestinationTypeNames.abstract; const isTypeAbstract = computed(
}, () => nodeParameters.value.__type === MessageEventBusDestinationTypeNames.abstract,
isTypeWebhook(): boolean { );
return this.nodeParameters.__type === MessageEventBusDestinationTypeNames.webhook;
}, const isTypeWebhook = computed(
isTypeSyslog(): boolean { () => nodeParameters.value.__type === MessageEventBusDestinationTypeNames.webhook,
return this.nodeParameters.__type === MessageEventBusDestinationTypeNames.syslog; );
},
isTypeSentry(): boolean { const isTypeSyslog = computed(
return this.nodeParameters.__type === MessageEventBusDestinationTypeNames.sentry; () => nodeParameters.value.__type === MessageEventBusDestinationTypeNames.syslog,
}, );
node(): INodeUi {
return destinationToFakeINodeUi(this.nodeParameters); const isTypeSentry = computed(
}, () => nodeParameters.value.__type === MessageEventBusDestinationTypeNames.sentry,
typeLabelName(): BaseTextKey { );
return `settings.log-streaming.${this.nodeParameters.__type}` as BaseTextKey;
}, const node = computed(() => destinationToFakeINodeUi(nodeParameters.value));
sidebarItems(): IMenuItem[] {
const typeLabelName = computed(
() => `settings.log-streaming.${nodeParameters.value.__type}` as BaseTextKey,
);
const sidebarItems = computed(() => {
const items: IMenuItem[] = [ const items: IMenuItem[] = [
{ {
id: 'settings', id: 'settings',
label: this.$locale.baseText('settings.log-streaming.tab.settings'), label: i18n.baseText('settings.log-streaming.tab.settings'),
position: 'top', position: 'top',
}, },
]; ];
if (!this.isTypeAbstract) { if (!isTypeAbstract.value) {
items.push({ items.push({
id: 'events', id: 'events',
label: this.$locale.baseText('settings.log-streaming.tab.events'), label: i18n.baseText('settings.log-streaming.tab.events'),
position: 'top', position: 'top',
}); });
} }
return items; return items;
}, });
canManageLogStreaming(): boolean {
return hasPermission(['rbac'], { rbac: { scope: 'logStreaming:manage' } }); const canManageLogStreaming = computed(() =>
}, hasPermission(['rbac'], { rbac: { scope: 'logStreaming:manage' } }),
},
mounted() {
this.setupNode(
Object.assign(deepCopy(defaultMessageEventBusDestinationOptions), this.destination),
); );
this.workflowsStore.$onAction(
({ onMounted(() => {
name, // name of the action setupNode(Object.assign(deepCopy(defaultMessageEventBusDestinationOptions), destination));
args, // array of parameters passed to the action workflowsStore.$onAction(({ name, args }) => {
}) => {
if (name === 'updateNodeProperties') { if (name === 'updateNodeProperties') {
for (const arg of args) { for (const arg of args) {
if (arg.name === this.destination.id) { if (arg.name === destination.id) {
if ('credentials' in arg.properties) { if ('credentials' in arg.properties) {
this.unchanged = false; unchanged.value = false;
this.nodeParameters.credentials = arg.properties nodeParameters.value.credentials = arg.properties.credentials as NodeParameterValueType;
.credentials as NodeParameterValueType;
} }
} }
} }
} }
}, });
); });
},
methods: { function onInput() {
onInput() { unchanged.value = false;
this.unchanged = false; testMessageSent.value = false;
this.testMessageSent = false; }
},
onTabSelect(tab: string) { function onTabSelect(tab: string) {
this.activeTab = tab; activeTab.value = tab;
}, }
onLabelChange(value: string) {
this.onInput(); function onLabelChange(value: string) {
this.headerLabel = value; onInput();
this.nodeParameters.label = value; headerLabel.value = value;
}, nodeParameters.value.label = value;
setupNode(options: MessageEventBusDestinationOptions) { }
this.workflowsStore.removeNode(this.node);
this.ndvStore.activeNodeName = options.id ?? 'thisshouldnothappen'; function setupNode(options: MessageEventBusDestinationOptions) {
this.workflowsStore.addNode(destinationToFakeINodeUi(options)); workflowsStore.removeNode(node.value);
this.nodeParameters = options as INodeParameters; ndvStore.activeNodeName = options.id ?? 'thisshouldnothappen';
this.logStreamingStore.items[this.destination.id].destination = options; workflowsStore.addNode(destinationToFakeINodeUi(options));
}, nodeParameters.value = options as INodeParameters;
onTypeSelectInput(destinationType: MessageEventBusDestinationTypeNames) { logStreamingStore.items[destination.id!].destination = options;
this.typeSelectValue = destinationType; }
},
async onContinueAddClicked() { function onTypeSelectInput(destinationType: MessageEventBusDestinationTypeNames) {
typeSelectValue.value = destinationType;
}
async function onContinueAddClicked() {
let newDestination; let newDestination;
switch (this.typeSelectValue) { switch (typeSelectValue.value) {
case MessageEventBusDestinationTypeNames.syslog: case MessageEventBusDestinationTypeNames.syslog:
newDestination = Object.assign(deepCopy(defaultMessageEventBusDestinationSyslogOptions), { newDestination = Object.assign(deepCopy(defaultMessageEventBusDestinationSyslogOptions), {
id: this.destination.id, id: destination.id,
}); });
break; break;
case MessageEventBusDestinationTypeNames.sentry: case MessageEventBusDestinationTypeNames.sentry:
newDestination = Object.assign(deepCopy(defaultMessageEventBusDestinationSentryOptions), { newDestination = Object.assign(deepCopy(defaultMessageEventBusDestinationSentryOptions), {
id: this.destination.id, id: destination.id,
}); });
break; break;
case MessageEventBusDestinationTypeNames.webhook: case MessageEventBusDestinationTypeNames.webhook:
newDestination = Object.assign( newDestination = Object.assign(deepCopy(defaultMessageEventBusDestinationWebhookOptions), {
deepCopy(defaultMessageEventBusDestinationWebhookOptions), id: destination.id,
{ id: this.destination.id }, });
);
break; break;
} }
if (newDestination) { if (newDestination) {
this.headerLabel = newDestination?.label ?? this.headerLabel; headerLabel.value = newDestination?.label ?? headerLabel.value;
this.setupNode(newDestination); setupNode(newDestination);
} }
}, }
valueChanged(parameterData: IUpdateInformation) {
this.unchanged = false; function valueChanged(parameterData: IUpdateInformation) {
this.testMessageSent = false; unchanged.value = false;
testMessageSent.value = false;
const newValue: NodeParameterValue = parameterData.value as string | number; const newValue: NodeParameterValue = parameterData.value as string | number;
const parameterPath = parameterData.name?.startsWith('parameters.') const parameterPath = parameterData.name?.startsWith('parameters.')
? parameterData.name.split('.').slice(1).join('.') ? parameterData.name.split('.').slice(1).join('.')
: parameterData.name || ''; : parameterData.name || '';
const nodeParameters = deepCopy(this.nodeParameters); const nodeParametersCopy = deepCopy(nodeParameters.value);
// Check if the path is supposed to change an array and if so get if (parameterData.value === undefined && parameterPath.match(/(.*)\[(\d+)\]$/)) {
// the needed data like path and index const path = parameterPath.match(
const parameterPathArray = parameterPath.match(/(.*)\[(\d+)\]$/); /(.*)\[(\d+)\]$/,
)?.[1] as keyof MessageEventBusDestinationOptions;
// Apply the new value const index = parseInt(parameterPath.match(/(.*)\[(\d+)\]$/)?.[2] ?? '0', 10);
if (parameterData.value === undefined && parameterPathArray !== null) { const data = get(nodeParametersCopy, path);
// Delete array item
const path = parameterPathArray[1] as keyof MessageEventBusDestinationOptions;
const index = parameterPathArray[2];
const data = get(nodeParameters, path);
if (Array.isArray(data)) { if (Array.isArray(data)) {
data.splice(parseInt(index, 10), 1); data.splice(index, 1);
nodeParameters[path] = data as never; nodeParametersCopy[path] = data as never;
} }
} else { } else {
if (newValue === undefined) { if (newValue === undefined) {
unset(nodeParameters, parameterPath); unset(nodeParametersCopy, parameterPath);
} else { } else {
set(nodeParameters, parameterPath, newValue); set(nodeParametersCopy, parameterPath, newValue);
} }
} }
this.nodeParameters = deepCopy(nodeParameters); nodeParameters.value = deepCopy(nodeParametersCopy);
this.workflowsStore.updateNodeProperties({ workflowsStore.updateNodeProperties({
name: this.node.name, name: node.value.name,
properties: { parameters: this.nodeParameters as unknown as IDataObject, position: [0, 0] }, properties: { parameters: nodeParameters.value as unknown as IDataObject, position: [0, 0] },
}); });
if (this.hasOnceBeenSaved) { if (hasOnceBeenSaved.value) {
this.logStreamingStore.updateDestination(this.nodeParameters); logStreamingStore.updateDestination(nodeParameters.value);
} }
}, }
async sendTestEvent() {
this.testMessageResult = await this.logStreamingStore.sendTestMessage(this.nodeParameters); async function sendTestEvent() {
this.testMessageSent = true; testMessageResult.value = await logStreamingStore.sendTestMessage(nodeParameters.value);
}, testMessageSent.value = true;
async removeThis() { }
const deleteConfirmed = await this.confirm(
this.$locale.baseText('settings.log-streaming.destinationDelete.message', { async function removeThis() {
interpolate: { destinationName: this.destination.label }, const deleteConfirmed = await confirm(
i18n.baseText('settings.log-streaming.destinationDelete.message', {
interpolate: { destinationName: destination.label! },
}), }),
this.$locale.baseText('settings.log-streaming.destinationDelete.headline'), i18n.baseText('settings.log-streaming.destinationDelete.headline'),
{ {
type: 'warning', type: 'warning',
confirmButtonText: this.$locale.baseText( confirmButtonText: i18n.baseText(
'settings.log-streaming.destinationDelete.confirmButtonText', 'settings.log-streaming.destinationDelete.confirmButtonText',
), ),
cancelButtonText: this.$locale.baseText( cancelButtonText: i18n.baseText('settings.log-streaming.destinationDelete.cancelButtonText'),
'settings.log-streaming.destinationDelete.cancelButtonText',
),
}, },
); );
if (deleteConfirmed !== MODAL_CONFIRM) { if (deleteConfirmed !== MODAL_CONFIRM) {
return; return;
} else { } else {
this.callEventBus('remove', this.destination.id); callEventBus('remove', destination.id);
this.uiStore.closeModal(LOG_STREAM_MODAL_KEY); uiStore.closeModal(LOG_STREAM_MODAL_KEY);
this.uiStore.stateIsDirty = false; uiStore.stateIsDirty = false;
}
},
onModalClose() {
if (!this.hasOnceBeenSaved) {
this.workflowsStore.removeNode(this.node);
if (this.nodeParameters.id && typeof this.nodeParameters.id !== 'object') {
this.logStreamingStore.removeDestination(this.nodeParameters.id.toString());
} }
} }
this.ndvStore.activeNodeName = null;
this.callEventBus('closing', this.destination.id); function onModalClose() {
this.uiStore.stateIsDirty = false; if (!hasOnceBeenSaved.value) {
}, workflowsStore.removeNode(node.value);
async saveDestination() { if (nodeParameters.value.id && typeof nodeParameters.value.id !== 'object') {
if (this.unchanged || !this.destination.id) { logStreamingStore.removeDestination(nodeParameters.value.id.toString());
}
}
ndvStore.activeNodeName = null;
callEventBus('closing', destination.id);
uiStore.stateIsDirty = false;
}
async function saveDestination() {
if (unchanged.value || !destination.id) {
return; return;
} }
const saveResult = await this.logStreamingStore.saveDestination(this.nodeParameters); const saveResult = await logStreamingStore.saveDestination(nodeParameters.value);
if (saveResult) { if (saveResult) {
this.hasOnceBeenSaved = true; hasOnceBeenSaved.value = true;
this.testMessageSent = false; testMessageSent.value = false;
this.unchanged = true; unchanged.value = true;
this.callEventBus('destinationWasSaved', this.destination.id); callEventBus('destinationWasSaved', destination.id);
this.uiStore.stateIsDirty = false; uiStore.stateIsDirty = false;
const destinationType = ( const destinationType = (
this.nodeParameters.__type && typeof this.nodeParameters.__type !== 'object' nodeParameters.value.__type && typeof nodeParameters.value.__type !== 'object'
? `${this.nodeParameters.__type}` ? `${nodeParameters.value.__type}`
: 'unknown' : 'unknown'
) )
.replace('$$MessageEventBusDestination', '') .replace('$$MessageEventBusDestination', '')
.toLowerCase(); .toLowerCase();
const isComplete = () => { const isComplete = () => {
if (this.isTypeWebhook) { if (isTypeWebhook.value) {
return this.destination.host !== ''; const webhookDestination = destination as MessageEventBusDestinationWebhookOptions;
} else if (this.isTypeSentry) { return webhookDestination.url !== '';
return this.destination.dsn !== ''; } else if (isTypeSentry.value) {
} else if (this.isTypeSyslog) { const sentryDestination = destination as MessageEventBusDestinationSentryOptions;
return sentryDestination.dsn !== '';
} else if (isTypeSyslog.value) {
const syslogDestination = destination as MessageEventBusDestinationSyslogOptions;
return ( return (
this.destination.host !== '' && syslogDestination.host !== '' &&
this.destination.port !== undefined && syslogDestination.port !== undefined &&
this.destination.protocol !== '' && // @ts-expect-error TODO: fix this typing
this.destination.facility !== undefined && syslogDestination.protocol !== '' &&
this.destination.app_name !== '' syslogDestination.facility !== undefined &&
syslogDestination.app_name !== ''
); );
} }
return false; return false;
}; };
useTelemetry().track('User updated log streaming destination', { telemetry.track('User updated log streaming destination', {
instance_id: useRootStore().instanceId, instance_id: useRootStore().instanceId,
destination_type: destinationType, destination_type: destinationType,
is_complete: isComplete(), is_complete: isComplete(),
is_active: this.destination.enabled, is_active: destination.enabled,
}); });
} }
},
callEventBus(event: string, data: unknown) {
if (this.eventBus) {
this.eventBus.emit(event, data);
} }
},
}, function callEventBus(event: string, data: unknown) {
}); if (eventBus) {
eventBus.emit(event, data);
}
}
</script> </script>
<template> <template>
@ -381,7 +378,7 @@ export default defineComponent({
<div :class="$style.destinationInfo"> <div :class="$style.destinationInfo">
<InlineNameEdit <InlineNameEdit
:model-value="headerLabel" :model-value="headerLabel"
:subtitle="!isTypeAbstract ? $locale.baseText(typeLabelName) : 'Select type'" :subtitle="!isTypeAbstract ? i18n.baseText(typeLabelName) : 'Select type'"
:readonly="isTypeAbstract" :readonly="isTypeAbstract"
type="Credential" type="Credential"
data-test-id="subtitle-showing-type" data-test-id="subtitle-showing-type"
@ -406,7 +403,7 @@ export default defineComponent({
<template v-if="canManageLogStreaming"> <template v-if="canManageLogStreaming">
<n8n-icon-button <n8n-icon-button
v-if="nodeParameters && hasOnceBeenSaved" v-if="nodeParameters && hasOnceBeenSaved"
:title="$locale.baseText('settings.log-streaming.delete')" :title="i18n.baseText('settings.log-streaming.delete')"
icon="trash" icon="trash"
type="tertiary" type="tertiary"
:disabled="isSaving" :disabled="isSaving"
@ -417,7 +414,7 @@ export default defineComponent({
<SaveButton <SaveButton
:saved="unchanged && hasOnceBeenSaved" :saved="unchanged && hasOnceBeenSaved"
:disabled="isTypeAbstract || unchanged" :disabled="isTypeAbstract || unchanged"
:saving-label="$locale.baseText('settings.log-streaming.saving')" :saving-label="i18n.baseText('settings.log-streaming.saving')"
data-test-id="destination-save-button" data-test-id="destination-save-button"
@click="saveDestination" @click="saveDestination"
/> />
@ -432,8 +429,8 @@ export default defineComponent({
<template v-if="isTypeAbstract"> <template v-if="isTypeAbstract">
<n8n-input-label <n8n-input-label
:class="$style.typeSelector" :class="$style.typeSelector"
:label="$locale.baseText('settings.log-streaming.selecttype')" :label="i18n.baseText('settings.log-streaming.selecttype')"
:tooltip-text="$locale.baseText('settings.log-streaming.selecttypehint')" :tooltip-text="i18n.baseText('settings.log-streaming.selecttypehint')"
:bold="false" :bold="false"
size="medium" size="medium"
:underline="false" :underline="false"
@ -450,7 +447,7 @@ export default defineComponent({
v-for="option in typeSelectOptions || []" v-for="option in typeSelectOptions || []"
:key="option.value" :key="option.value"
:value="option.value" :value="option.value"
:label="$locale.baseText(option.label)" :label="i18n.baseText(option.label)"
/> />
</n8n-select> </n8n-select>
<div class="mt-m text-right"> <div class="mt-m text-right">
@ -460,7 +457,7 @@ export default defineComponent({
:disabled="!typeSelectValue" :disabled="!typeSelectValue"
@click="onContinueAddClicked" @click="onContinueAddClicked"
> >
{{ $locale.baseText(`settings.log-streaming.continue`) }} {{ i18n.baseText(`settings.log-streaming.continue`) }}
</n8n-button> </n8n-button>
</div> </div>
</n8n-input-label> </n8n-input-label>
@ -505,7 +502,7 @@ export default defineComponent({
<div class=""> <div class="">
<n8n-input-label <n8n-input-label
class="mb-m mt-m" class="mb-m mt-m"
:label="$locale.baseText('settings.log-streaming.tab.events.title')" :label="i18n.baseText('settings.log-streaming.tab.events.title')"
:bold="true" :bold="true"
size="medium" size="medium"
:underline="false" :underline="false"

View file

@ -3,9 +3,10 @@ import { setActivePinia, createPinia } from 'pinia';
import { ref } from 'vue'; import { ref } from 'vue';
import { usePinnedData } from '@/composables/usePinnedData'; import { usePinnedData } from '@/composables/usePinnedData';
import type { INodeUi } from '@/Interface'; import type { INodeUi } from '@/Interface';
import { MAX_PINNED_DATA_SIZE } from '@/constants'; import { HTTP_REQUEST_NODE_TYPE, IF_NODE_TYPE, MAX_PINNED_DATA_SIZE } from '@/constants';
import { useWorkflowsStore } from '@/stores/workflows.store'; import { useWorkflowsStore } from '@/stores/workflows.store';
import { useTelemetry } from '@/composables/useTelemetry'; import { useTelemetry } from '@/composables/useTelemetry';
import { NodeConnectionType, STICKY_NODE_TYPE, type INodeTypeDescription } from 'n8n-workflow';
vi.mock('@/composables/useToast', () => ({ useToast: vi.fn(() => ({ showError: vi.fn() })) })); vi.mock('@/composables/useToast', () => ({ useToast: vi.fn(() => ({ showError: vi.fn() })) }));
vi.mock('@/composables/useI18n', () => ({ vi.mock('@/composables/useI18n', () => ({
@ -17,6 +18,13 @@ vi.mock('@/composables/useExternalHooks', () => ({
})), })),
})); }));
const getNodeType = vi.fn();
vi.mock('@/stores/nodeTypes.store', () => ({
useNodeTypesStore: vi.fn(() => ({
getNodeType,
})),
}));
describe('usePinnedData', () => { describe('usePinnedData', () => {
beforeEach(() => { beforeEach(() => {
setActivePinia(createPinia()); setActivePinia(createPinia());
@ -133,4 +141,127 @@ describe('usePinnedData', () => {
expect(spy).toHaveBeenCalled(); expect(spy).toHaveBeenCalled();
}); });
}); });
describe('canPinData()', () => {
afterEach(() => {
vi.clearAllMocks();
}); });
it('allows pin on single output', async () => {
const node = ref({
name: 'single output node',
typeVersion: 1,
type: HTTP_REQUEST_NODE_TYPE,
parameters: {},
onError: 'stopWorkflow',
} as INodeUi);
getNodeType.mockReturnValue(makeNodeType([NodeConnectionType.Main], HTTP_REQUEST_NODE_TYPE));
const { canPinNode } = usePinnedData(node);
expect(canPinNode()).toBe(true);
expect(canPinNode(false, 0)).toBe(true);
// validate out of range index
expect(canPinNode(false, 1)).toBe(false);
expect(canPinNode(false, -1)).toBe(false);
});
it('allows pin on one main and one error output', async () => {
const node = ref({
name: 'single output node',
typeVersion: 1,
type: HTTP_REQUEST_NODE_TYPE,
parameters: {},
onError: 'continueErrorOutput',
} as INodeUi);
getNodeType.mockReturnValue(makeNodeType([NodeConnectionType.Main], HTTP_REQUEST_NODE_TYPE));
const { canPinNode } = usePinnedData(node);
expect(canPinNode()).toBe(true);
expect(canPinNode(false, 0)).toBe(true);
expect(canPinNode(false, 1)).toBe(false);
// validate out of range index
expect(canPinNode(false, 2)).toBe(false);
expect(canPinNode(false, -1)).toBe(false);
});
it('does not allow pin on two main outputs', async () => {
const node = ref({
name: 'single output node',
typeVersion: 1,
type: IF_NODE_TYPE,
parameters: {},
onError: 'stopWorkflow',
} as INodeUi);
getNodeType.mockReturnValue(
makeNodeType([NodeConnectionType.Main, NodeConnectionType.Main], IF_NODE_TYPE),
);
const { canPinNode } = usePinnedData(node);
expect(canPinNode()).toBe(false);
expect(canPinNode(false, 0)).toBe(false);
expect(canPinNode(false, 1)).toBe(false);
// validate out of range index
expect(canPinNode(false, 2)).toBe(false);
expect(canPinNode(false, -1)).toBe(false);
});
it('does not allow pin on denylisted node', async () => {
const node = ref({
name: 'single output node',
typeVersion: 1,
type: STICKY_NODE_TYPE,
} as INodeUi);
const { canPinNode } = usePinnedData(node);
expect(canPinNode()).toBe(false);
expect(canPinNode(false, 0)).toBe(false);
});
it('does not allow pin with checkDataEmpty and no pin', async () => {
const node = ref({
name: 'single output node',
typeVersion: 1,
type: HTTP_REQUEST_NODE_TYPE,
} as INodeUi);
getNodeType.mockReturnValue(makeNodeType([NodeConnectionType.Main], HTTP_REQUEST_NODE_TYPE));
const { canPinNode } = usePinnedData(node);
expect(canPinNode(true)).toBe(false);
expect(canPinNode(true, 0)).toBe(false);
});
it('does not allow pin without output', async () => {
const node = ref({
name: 'zero output node',
typeVersion: 1,
type: 'n8n-nodes-base.stopAndError',
} as INodeUi);
getNodeType.mockReturnValue(makeNodeType([], 'n8n-nodes-base.stopAndError'));
const { canPinNode } = usePinnedData(node);
expect(canPinNode()).toBe(false);
expect(canPinNode(false, 0)).toBe(false);
expect(canPinNode(false, -1)).toBe(false);
expect(canPinNode(false, 1)).toBe(false);
});
});
});
const makeNodeType = (outputs: NodeConnectionType[], name: string) =>
({
displayName: name,
name,
version: [1],
inputs: [],
outputs,
properties: [],
defaults: { color: '', name: '' },
group: [],
description: '',
}) as INodeTypeDescription;

View file

@ -75,9 +75,9 @@ export function usePinnedData(
); );
}); });
function canPinNode(checkDataEmpty = false) { function canPinNode(checkDataEmpty = false, outputIndex?: number) {
const targetNode = unref(node); const targetNode = unref(node);
if (targetNode === null) return false; if (targetNode === null || PIN_DATA_NODE_TYPES_DENYLIST.includes(targetNode.type)) return false;
const nodeType = useNodeTypesStore().getNodeType(targetNode.type, targetNode.typeVersion); const nodeType = useNodeTypesStore().getNodeType(targetNode.type, targetNode.typeVersion);
const dataToPin = getInputDataWithPinned(targetNode); const dataToPin = getInputDataWithPinned(targetNode);
@ -85,14 +85,25 @@ export function usePinnedData(
if (!nodeType || (checkDataEmpty && dataToPin.length === 0)) return false; if (!nodeType || (checkDataEmpty && dataToPin.length === 0)) return false;
const workflow = workflowsStore.getCurrentWorkflow(); const workflow = workflowsStore.getCurrentWorkflow();
const outputs = NodeHelpers.getNodeOutputs(workflow, targetNode, nodeType); const outputs = NodeHelpers.getNodeOutputs(workflow, targetNode, nodeType).map((output) =>
const mainOutputs = outputs.filter((output) => typeof output === 'string' ? { type: output } : output,
typeof output === 'string'
? output === NodeConnectionType.Main
: output.type === NodeConnectionType.Main,
); );
return mainOutputs.length === 1 && !PIN_DATA_NODE_TYPES_DENYLIST.includes(targetNode.type); const mainOutputs = outputs.filter(
(output) => output.type === NodeConnectionType.Main && output.category !== 'error',
);
let indexAcceptable = true;
if (outputIndex !== undefined) {
const output = outputs[outputIndex];
if (outputs[outputIndex] === undefined) return false;
indexAcceptable = output.type === NodeConnectionType.Main && output.category !== 'error';
}
return mainOutputs.length === 1 && indexAcceptable;
} }
function isValidJSON(data: string): boolean { function isValidJSON(data: string): boolean {

View file

@ -1498,7 +1498,7 @@
"pushConnection.executionFailed": "Execution failed", "pushConnection.executionFailed": "Execution failed",
"pushConnection.executionFailed.message": "There might not be enough memory to finish the execution. Tips for avoiding this <a target=\"_blank\" href=\"https://docs.n8n.io/flow-logic/error-handling/memory-errors/\">here</a>", "pushConnection.executionFailed.message": "There might not be enough memory to finish the execution. Tips for avoiding this <a target=\"_blank\" href=\"https://docs.n8n.io/flow-logic/error-handling/memory-errors/\">here</a>",
"pushConnection.executionError": "There was a problem executing the workflow{error}", "pushConnection.executionError": "There was a problem executing the workflow{error}",
"pushConnection.executionError.openNode": " <a data-action='openNodeDetail' data-action-parameter-node='{node}'>Open node</a>", "pushConnection.executionError.openNode": "Open errored node",
"pushConnection.executionError.details": "<br /><strong>{details}</strong>", "pushConnection.executionError.details": "<br /><strong>{details}</strong>",
"prompts.productTeamMessage": "Our product team will get in touch personally", "prompts.productTeamMessage": "Our product team will get in touch personally",
"prompts.npsSurvey.recommendationQuestion": "How likely are you to recommend n8n to a friend or colleague?", "prompts.npsSurvey.recommendationQuestion": "How likely are you to recommend n8n to a friend or colleague?",

View file

@ -1,4 +1,9 @@
import type { ICredentialType, INodeProperties } from 'n8n-workflow'; import type {
IAuthenticateGeneric,
ICredentialTestRequest,
ICredentialType,
INodeProperties,
} from 'n8n-workflow';
export class OuraApi implements ICredentialType { export class OuraApi implements ICredentialType {
name = 'ouraApi'; name = 'ouraApi';
@ -16,4 +21,20 @@ export class OuraApi implements ICredentialType {
default: '', default: '',
}, },
]; ];
authenticate: IAuthenticateGeneric = {
type: 'generic',
properties: {
headers: {
Authorization: '=Bearer {{$credentials.accessToken}}',
},
},
};
test: ICredentialTestRequest = {
request: {
baseURL: 'https://api.ouraring.com',
url: '/v2/usercollection/personal_info',
},
};
} }

View file

@ -108,7 +108,7 @@ export class Code implements INodeType {
: 'javaScript'; : 'javaScript';
const codeParameterName = language === 'python' ? 'pythonCode' : 'jsCode'; const codeParameterName = language === 'python' ? 'pythonCode' : 'jsCode';
if (!runnersConfig.disabled && language === 'javaScript') { if (runnersConfig.enabled && language === 'javaScript') {
const code = this.getNodeParameter(codeParameterName, 0) as string; const code = this.getNodeParameter(codeParameterName, 0) as string;
const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this); const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this);

View file

@ -235,7 +235,7 @@ export const bucketOperations: INodeProperties[] = [
preSend: [parseJSONBody], preSend: [parseJSONBody],
}, },
}, },
action: 'Create a new Bucket', action: 'Update the metadata of a Bucket',
}, },
], ],
default: 'getAll', default: 'getAll',

View file

@ -4,7 +4,7 @@ import type {
IHookFunctions, IHookFunctions,
ILoadOptionsFunctions, ILoadOptionsFunctions,
JsonObject, JsonObject,
IRequestOptions, IHttpRequestOptions,
IHttpRequestMethods, IHttpRequestMethods,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeApiError } from 'n8n-workflow'; import { NodeApiError } from 'n8n-workflow';
@ -18,15 +18,11 @@ export async function ouraApiRequest(
uri?: string, uri?: string,
option: IDataObject = {}, option: IDataObject = {},
) { ) {
const credentials = await this.getCredentials('ouraApi'); let options: IHttpRequestOptions = {
let options: IRequestOptions = {
headers: {
Authorization: `Bearer ${credentials.accessToken}`,
},
method, method,
qs, qs,
body, body,
uri: uri || `https://api.ouraring.com/v1${resource}`, url: uri ?? `https://api.ouraring.com/v2${resource}`,
json: true, json: true,
}; };
@ -41,7 +37,7 @@ export async function ouraApiRequest(
options = Object.assign({}, options, option); options = Object.assign({}, options, option);
try { try {
return await this.helpers.request(options); return await this.helpers.httpRequestWithAuthentication.call(this, 'ouraApi', options);
} catch (error) { } catch (error) {
throw new NodeApiError(this.getNode(), error as JsonObject); throw new NodeApiError(this.getNode(), error as JsonObject);
} }

View file

@ -63,12 +63,13 @@ export class Oura implements INodeType {
const length = items.length; const length = items.length;
let responseData; let responseData;
const returnData: IDataObject[] = []; const returnData: INodeExecutionData[] = [];
const resource = this.getNodeParameter('resource', 0); const resource = this.getNodeParameter('resource', 0);
const operation = this.getNodeParameter('operation', 0); const operation = this.getNodeParameter('operation', 0);
for (let i = 0; i < length; i++) { for (let i = 0; i < length; i++) {
try {
if (resource === 'profile') { if (resource === 'profile') {
// ********************************************************************* // *********************************************************************
// profile // profile
@ -81,7 +82,7 @@ export class Oura implements INodeType {
// profile: get // profile: get
// ---------------------------------- // ----------------------------------
responseData = await ouraApiRequest.call(this, 'GET', '/userinfo'); responseData = await ouraApiRequest.call(this, 'GET', '/usercollection/personal_info');
} }
} else if (resource === 'summary') { } else if (resource === 'summary') {
// ********************************************************************* // *********************************************************************
@ -100,11 +101,11 @@ export class Oura implements INodeType {
const returnAll = this.getNodeParameter('returnAll', 0); const returnAll = this.getNodeParameter('returnAll', 0);
if (start) { if (start) {
qs.start = moment(start).format('YYYY-MM-DD'); qs.start_date = moment(start).format('YYYY-MM-DD');
} }
if (end) { if (end) {
qs.end = moment(end).format('YYYY-MM-DD'); qs.end_date = moment(end).format('YYYY-MM-DD');
} }
if (operation === 'getActivity') { if (operation === 'getActivity') {
@ -112,8 +113,14 @@ export class Oura implements INodeType {
// profile: getActivity // profile: getActivity
// ---------------------------------- // ----------------------------------
responseData = await ouraApiRequest.call(this, 'GET', '/activity', {}, qs); responseData = await ouraApiRequest.call(
responseData = responseData.activity; this,
'GET',
'/usercollection/daily_activity',
{},
qs,
);
responseData = responseData.data;
if (!returnAll) { if (!returnAll) {
const limit = this.getNodeParameter('limit', 0); const limit = this.getNodeParameter('limit', 0);
@ -124,8 +131,14 @@ export class Oura implements INodeType {
// profile: getReadiness // profile: getReadiness
// ---------------------------------- // ----------------------------------
responseData = await ouraApiRequest.call(this, 'GET', '/readiness', {}, qs); responseData = await ouraApiRequest.call(
responseData = responseData.readiness; this,
'GET',
'/usercollection/daily_readiness',
{},
qs,
);
responseData = responseData.data;
if (!returnAll) { if (!returnAll) {
const limit = this.getNodeParameter('limit', 0); const limit = this.getNodeParameter('limit', 0);
@ -136,8 +149,14 @@ export class Oura implements INodeType {
// profile: getSleep // profile: getSleep
// ---------------------------------- // ----------------------------------
responseData = await ouraApiRequest.call(this, 'GET', '/sleep', {}, qs); responseData = await ouraApiRequest.call(
responseData = responseData.sleep; this,
'GET',
'/usercollection/daily_sleep',
{},
qs,
);
responseData = responseData.data;
if (!returnAll) { if (!returnAll) {
const limit = this.getNodeParameter('limit', 0); const limit = this.getNodeParameter('limit', 0);
@ -146,11 +165,24 @@ export class Oura implements INodeType {
} }
} }
Array.isArray(responseData) const executionData = this.helpers.constructExecutionMetaData(
? returnData.push(...(responseData as IDataObject[])) this.helpers.returnJsonArray(responseData as IDataObject[]),
: returnData.push(responseData as IDataObject); { itemData: { item: i } },
} );
return [this.helpers.returnJsonArray(returnData)]; returnData.push(...executionData);
} catch (error) {
if (this.continueOnFail()) {
const executionErrorData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: error.message }),
{ itemData: { item: i } },
);
returnData.push(...executionErrorData);
continue;
}
throw error;
}
}
return [returnData];
} }
} }

View file

@ -0,0 +1,8 @@
export const profileResponse = {
id: 'some-id',
age: 30,
weight: 168,
height: 80,
biological_sex: 'male',
email: 'nathan@n8n.io',
};

View file

@ -0,0 +1,76 @@
import type {
IExecuteFunctions,
IHookFunctions,
ILoadOptionsFunctions,
IHttpRequestMethods,
INode,
} from 'n8n-workflow';
import nock from 'nock';
import { setup, equalityTest, workflowToTests, getWorkflowFilenames } from '@test/nodes/Helpers';
import { profileResponse } from './apiResponses';
import { ouraApiRequest } from '../GenericFunctions';
const node: INode = {
id: '2cdb46cf-b561-4537-a982-b8d26dd7718b',
name: 'Oura',
type: 'n8n-nodes-base.oura',
typeVersion: 1,
position: [0, 0],
parameters: {
resource: 'profile',
operation: 'get',
},
};
const mockThis = {
helpers: {
httpRequestWithAuthentication: jest
.fn()
.mockResolvedValue({ statusCode: 200, data: profileResponse }),
},
getNode() {
return node;
},
getNodeParameter: jest.fn(),
} as unknown as IHookFunctions | IExecuteFunctions | ILoadOptionsFunctions;
describe('Oura', () => {
describe('ouraApiRequest', () => {
it('should make an authenticated API request to Oura', async () => {
const method: IHttpRequestMethods = 'GET';
const resource = '/usercollection/personal_info';
await ouraApiRequest.call(mockThis, method, resource);
expect(mockThis.helpers.httpRequestWithAuthentication).toHaveBeenCalledWith('ouraApi', {
method: 'GET',
url: 'https://api.ouraring.com/v2/usercollection/personal_info',
json: true,
});
});
});
describe('Run Oura workflow', () => {
const workflows = getWorkflowFilenames(__dirname);
const tests = workflowToTests(workflows);
beforeAll(() => {
nock.disableNetConnect();
nock('https://api.ouraring.com/v2')
.get('/usercollection/personal_info')
.reply(200, profileResponse);
});
afterAll(() => {
nock.restore();
});
const nodeTypes = setup(tests);
for (const testData of tests) {
test(testData.description, async () => await equalityTest(testData, nodeTypes));
}
});
});

View file

@ -0,0 +1,86 @@
{
"name": "Oura Test Workflow",
"nodes": [
{
"parameters": {},
"id": "c1e3b825-a9a8-4def-986b-9108d9441992",
"name": "When clicking Test workflow",
"type": "n8n-nodes-base.manualTrigger",
"position": [720, 400],
"typeVersion": 1
},
{
"parameters": {
"resource": "profile"
},
"id": "7969bf78-9343-4f81-8f79-dc415a60e168",
"name": "Oura",
"type": "n8n-nodes-base.oura",
"typeVersion": 1,
"position": [940, 400],
"credentials": {
"ouraApi": {
"id": "r083EOdhFatkVvFy",
"name": "Oura account"
}
}
},
{
"parameters": {},
"id": "9b97fa0e-51a6-41d3-8a7d-cff0531e5527",
"name": "No Operation, do nothing",
"type": "n8n-nodes-base.noOp",
"typeVersion": 1,
"position": [1140, 400]
}
],
"pinData": {
"No Operation, do nothing": [
{
"json": {
"id": "some-id",
"age": 30,
"weight": 168,
"height": 80,
"biological_sex": "male",
"email": "nathan@n8n.io"
}
}
]
},
"connections": {
"When clicking Test workflow": {
"main": [
[
{
"node": "Oura",
"type": "main",
"index": 0
}
]
]
},
"Oura": {
"main": [
[
{
"node": "No Operation, do nothing",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "bd108f46-f6fc-4c22-8655-ade2f51c4b33",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "0fa937d34dcabeff4bd6480d3b42cc95edf3bc20e6810819086ef1ce2623639d"
},
"id": "SrUileWU90mQeo02",
"tags": []
}