Merge remote-tracking branch 'origin/master' into pay-2003-add-executions-tab-to-home-and-projects-view

This commit is contained in:
Csaba Tuncsik 2024-11-06 14:23:02 +01:00
commit 19de76ca5d
No known key found for this signature in database
30 changed files with 1265 additions and 501 deletions

View file

@ -26,6 +26,22 @@ const nodeDetailsView = new NDV();
const NEW_CREDENTIAL_NAME = 'Something else'; const NEW_CREDENTIAL_NAME = 'Something else';
const NEW_CREDENTIAL_NAME2 = 'Something else entirely'; const NEW_CREDENTIAL_NAME2 = 'Something else entirely';
function createNotionCredential() {
workflowPage.actions.addNodeToCanvas(NOTION_NODE_NAME);
workflowPage.actions.openNode(NOTION_NODE_NAME);
workflowPage.getters.nodeCredentialsSelect().click();
getVisibleSelect().find('li').last().click();
credentialsModal.actions.fillCredentialsForm();
cy.get('body').type('{esc}');
workflowPage.actions.deleteNode(NOTION_NODE_NAME);
}
function deleteSelectedCredential() {
workflowPage.getters.nodeCredentialsEditButton().click();
credentialsModal.getters.deleteButton().click();
cy.get('.el-message-box').find('button').contains('Yes').click();
}
describe('Credentials', () => { describe('Credentials', () => {
beforeEach(() => { beforeEach(() => {
cy.visit(credentialsPage.url); cy.visit(credentialsPage.url);
@ -229,6 +245,40 @@ describe('Credentials', () => {
.should('have.value', NEW_CREDENTIAL_NAME); .should('have.value', NEW_CREDENTIAL_NAME);
}); });
it('should set a default credential when adding nodes', () => {
workflowPage.actions.visit();
createNotionCredential();
workflowPage.actions.addNodeToCanvas(NOTION_NODE_NAME, true, true);
workflowPage.getters
.nodeCredentialsSelect()
.find('input')
.should('have.value', NEW_NOTION_ACCOUNT_NAME);
deleteSelectedCredential();
});
it('should set a default credential when editing a node', () => {
workflowPage.actions.visit();
createNotionCredential();
workflowPage.actions.addNodeToCanvas(HTTP_REQUEST_NODE_NAME, true, true);
nodeDetailsView.getters.parameterInput('authentication').click();
getVisibleSelect().find('li').contains('Predefined').click();
nodeDetailsView.getters.parameterInput('nodeCredentialType').click();
getVisibleSelect().find('li').contains('Notion API').click();
workflowPage.getters
.nodeCredentialsSelect()
.find('input')
.should('have.value', NEW_NOTION_ACCOUNT_NAME);
deleteSelectedCredential();
});
it('should setup generic authentication for HTTP node', () => { it('should setup generic authentication for HTTP node', () => {
workflowPage.actions.visit(); workflowPage.actions.visit();
workflowPage.actions.addNodeToCanvas(SCHEDULE_TRIGGER_NODE_NAME); workflowPage.actions.addNodeToCanvas(SCHEDULE_TRIGGER_NODE_NAME);

View file

@ -63,4 +63,35 @@ describe('TaskRunnerNodeTypes', () => {
}); });
}); });
}); });
describe('addNodeTypeDescriptions', () => {
it('should add new node types', () => {
const nodeTypes = new TaskRunnerNodeTypes(TYPES);
const nodeTypeDescriptions = [
{ name: 'new-type', version: 1 },
{ name: 'new-type', version: 2 },
] as INodeTypeDescription[];
nodeTypes.addNodeTypeDescriptions(nodeTypeDescriptions);
expect(nodeTypes.getByNameAndVersion('new-type', 1)).toEqual({
description: { name: 'new-type', version: 1 },
});
expect(nodeTypes.getByNameAndVersion('new-type', 2)).toEqual({
description: { name: 'new-type', version: 2 },
});
});
});
describe('onlyUnknown', () => {
it('should return only unknown node types', () => {
const nodeTypes = new TaskRunnerNodeTypes(TYPES);
const candidate = { name: 'unknown', version: 1 };
expect(nodeTypes.onlyUnknown([candidate])).toEqual([candidate]);
expect(nodeTypes.onlyUnknown([SINGLE_VERSIONED])).toEqual([]);
});
});
}); });

View file

@ -17,6 +17,7 @@ import type {
IRunExecutionData, IRunExecutionData,
WorkflowExecuteMode, WorkflowExecuteMode,
EnvProviderState, EnvProviderState,
INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as a from 'node:assert'; import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm'; import { runInNewContext, type Context } from 'node:vm';
@ -119,6 +120,29 @@ export class JsTaskRunner extends TaskRunner {
neededBuiltIns.toDataRequestParams(), neededBuiltIns.toDataRequestParams(),
); );
/**
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
data.workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
task.taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
const workflowParams = data.workflow; const workflowParams = data.workflow;
const workflow = new Workflow({ const workflow = new Workflow({
...workflowParams, ...workflowParams,

View file

@ -1,6 +1,11 @@
import type { INodeTypeBaseDescription } from 'n8n-workflow'; import type { INodeTypeBaseDescription } from 'n8n-workflow';
import type { RPC_ALLOW_LIST, TaskDataRequestParams, TaskResultData } from './runner-types'; import type {
NeededNodeType,
RPC_ALLOW_LIST,
TaskDataRequestParams,
TaskResultData,
} from './runner-types';
export namespace BrokerMessage { export namespace BrokerMessage {
export namespace ToRunner { export namespace ToRunner {
@ -47,6 +52,8 @@ export namespace BrokerMessage {
export interface NodeTypes { export interface NodeTypes {
type: 'broker:nodetypes'; type: 'broker:nodetypes';
taskId: string;
requestId: string;
nodeTypes: INodeTypeBaseDescription[]; nodeTypes: INodeTypeBaseDescription[];
} }
@ -87,6 +94,13 @@ export namespace BrokerMessage {
requestParams: TaskDataRequestParams; requestParams: TaskDataRequestParams;
} }
export interface NodeTypesRequest {
type: 'broker:nodetypesrequest';
taskId: string;
requestId: string;
requestParams: NeededNodeType[];
}
export interface RPC { export interface RPC {
type: 'broker:rpc'; type: 'broker:rpc';
callId: string; callId: string;
@ -95,7 +109,7 @@ export namespace BrokerMessage {
params: unknown[]; params: unknown[];
} }
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC; export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | NodeTypesRequest | RPC;
} }
} }
@ -120,6 +134,13 @@ export namespace RequesterMessage {
data: unknown; data: unknown;
} }
export interface NodeTypesResponse {
type: 'requester:nodetypesresponse';
taskId: string;
requestId: string;
nodeTypes: INodeTypeBaseDescription[];
}
export interface RPCResponse { export interface RPCResponse {
type: 'requester:rpcresponse'; type: 'requester:rpcresponse';
taskId: string; taskId: string;
@ -134,7 +155,13 @@ export namespace RequesterMessage {
taskType: string; taskType: string;
} }
export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest; export type All =
| TaskSettings
| TaskCancel
| RPCResponse
| TaskDataResponse
| NodeTypesResponse
| TaskRequest;
} }
} }
@ -183,6 +210,25 @@ export namespace RunnerMessage {
requestParams: TaskDataRequestParams; requestParams: TaskDataRequestParams;
} }
export interface NodeTypesRequest {
type: 'runner:nodetypesrequest';
taskId: string;
requestId: string;
/**
* Which node types should be included in the runner's node types request.
*
* Node types are needed only when the script relies on paired item functionality.
* If so, we need only the node types not already cached in the runner.
*
* TODO: In future we can trim this down to only node types in the paired item chain,
* rather than assuming we need all node types in the workflow.
*
* @example [{ name: 'n8n-nodes-base.httpRequest', version: 1 }]
*/
requestParams: NeededNodeType[];
}
export interface RPC { export interface RPC {
type: 'runner:rpc'; type: 'runner:rpc';
callId: string; callId: string;
@ -199,6 +245,7 @@ export namespace RunnerMessage {
| TaskRejected | TaskRejected
| TaskOffer | TaskOffer
| RPC | RPC
| TaskDataRequest; | TaskDataRequest
| NodeTypesRequest;
} }
} }

View file

@ -7,6 +7,8 @@ import {
type IVersionedNodeType, type IVersionedNodeType,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { NeededNodeType } from './runner-types';
type VersionedTypes = Map<number, INodeTypeDescription>; type VersionedTypes = Map<number, INodeTypeDescription>;
export const DEFAULT_NODETYPE_VERSION = 1; export const DEFAULT_NODETYPE_VERSION = 1;
@ -61,4 +63,30 @@ export class TaskRunnerNodeTypes implements INodeTypes {
getKnownTypes(): IDataObject { getKnownTypes(): IDataObject {
throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' }); throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' });
} }
addNodeTypeDescriptions(nodeTypeDescriptions: INodeTypeDescription[]) {
const newNodeTypes = this.parseNodeTypes(nodeTypeDescriptions);
for (const [name, newVersions] of newNodeTypes.entries()) {
if (!this.nodeTypesByVersion.has(name)) {
this.nodeTypesByVersion.set(name, newVersions);
} else {
const existingVersions = this.nodeTypesByVersion.get(name)!;
for (const [version, nodeType] of newVersions.entries()) {
existingVersions.set(version, nodeType);
}
}
}
}
/** Filter out node type versions that are already registered. */
onlyUnknown(nodeTypes: NeededNodeType[]) {
return nodeTypes.filter(({ name, version }) => {
const existingVersions = this.nodeTypesByVersion.get(name);
if (!existingVersions) return true;
return !existingVersions.has(version);
});
}
} }

View file

@ -112,3 +112,6 @@ export const RPC_ALLOW_LIST = [
'helpers.httpRequest', 'helpers.httpRequest',
'logNodeOutput', 'logNodeOutput',
] as const; ] as const;
/** Node types needed for the runner to execute a task. */
export type NeededNodeType = { name: string; version: number };

View file

@ -1,4 +1,4 @@
import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { type MessageEvent, WebSocket } from 'ws'; import { type MessageEvent, WebSocket } from 'ws';
@ -25,6 +25,12 @@ interface DataRequest {
reject: (error: unknown) => void; reject: (error: unknown) => void;
} }
interface NodeTypesRequest {
requestId: string;
resolve: (data: unknown) => void;
reject: (error: unknown) => void;
}
interface RPCCall { interface RPCCall {
callId: string; callId: string;
resolve: (data: unknown) => void; resolve: (data: unknown) => void;
@ -58,6 +64,8 @@ export abstract class TaskRunner {
dataRequests: Map<DataRequest['requestId'], DataRequest> = new Map(); dataRequests: Map<DataRequest['requestId'], DataRequest> = new Map();
nodeTypesRequests: Map<NodeTypesRequest['requestId'], NodeTypesRequest> = new Map();
rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map(); rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map();
nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]); nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]);
@ -168,15 +176,11 @@ export abstract class TaskRunner {
this.handleRpcResponse(message.callId, message.status, message.data); this.handleRpcResponse(message.callId, message.status, message.data);
break; break;
case 'broker:nodetypes': case 'broker:nodetypes':
this.setNodeTypes(message.nodeTypes as unknown as INodeTypeDescription[]); this.processNodeTypesResponse(message.requestId, message.nodeTypes);
break; break;
} }
} }
setNodeTypes(nodeTypes: INodeTypeDescription[]) {
this.nodeTypes = new TaskRunnerNodeTypes(nodeTypes);
}
processDataResponse(requestId: string, data: unknown) { processDataResponse(requestId: string, data: unknown) {
const request = this.dataRequests.get(requestId); const request = this.dataRequests.get(requestId);
if (!request) { if (!request) {
@ -187,6 +191,16 @@ export abstract class TaskRunner {
request.resolve(data); request.resolve(data);
} }
processNodeTypesResponse(requestId: string, nodeTypes: unknown) {
const request = this.nodeTypesRequests.get(requestId);
if (!request) return;
// Deleting of the request is handled in `requestNodeTypes`, using a
// `finally` wrapped around the return
request.resolve(nodeTypes);
}
hasOpenTasks() { hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency; return Object.values(this.runningTasks).length < this.maxConcurrency;
} }
@ -282,6 +296,34 @@ export abstract class TaskRunner {
throw new ApplicationError('Unimplemented'); throw new ApplicationError('Unimplemented');
} }
async requestNodeTypes<T = unknown>(
taskId: Task['taskId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const requestId = nanoid();
const nodeTypesPromise = new Promise<T>((resolve, reject) => {
this.nodeTypesRequests.set(requestId, {
requestId,
resolve: resolve as (data: unknown) => void,
reject,
});
});
this.send({
type: 'runner:nodetypesrequest',
taskId,
requestId,
requestParams,
});
try {
return await nodeTypesPromise;
} finally {
this.nodeTypesRequests.delete(requestId);
}
}
async requestData<T = unknown>( async requestData<T = unknown>(
taskId: Task['taskId'], taskId: Task['taskId'],
requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'], requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'],

View file

@ -127,6 +127,9 @@ export const TIME = {
* Eventually this will superseed `TIME` above * Eventually this will superseed `TIME` above
*/ */
export const Time = { export const Time = {
milliseconds: {
toMinutes: 1 / (60 * 1000),
},
seconds: { seconds: {
toMilliseconds: 1000, toMilliseconds: 1000,
}, },

View file

@ -108,6 +108,22 @@ describe('`parseRangeQuery` middleware', () => {
expect(nextFn).toBeCalledTimes(1); expect(nextFn).toBeCalledTimes(1);
}); });
test('should parse `projectId` field', () => {
const req = mock<ExecutionRequest.GetMany>({
query: {
filter: '{ "projectId": "123" }',
limit: undefined,
firstId: undefined,
lastId: undefined,
},
});
parseRangeQuery(req, res, nextFn);
expect(req.rangeQuery.projectId).toBe('123');
expect(nextFn).toBeCalledTimes(1);
});
test('should delete invalid fields', () => { test('should delete invalid fields', () => {
const req = mock<ExecutionRequest.GetMany>({ const req = mock<ExecutionRequest.GetMany>({
query: { query: {

View file

@ -66,6 +66,7 @@ export const schemaGetExecutionsQueryFilter = {
startedBefore: { type: 'date-time' }, startedBefore: { type: 'date-time' },
annotationTags: { type: 'array', items: { type: 'string' } }, annotationTags: { type: 'array', items: { type: 'string' } },
vote: { type: 'string' }, vote: { type: 'string' },
projectId: { type: 'string' },
}, },
$defs: { $defs: {
metadata: { metadata: {

View file

@ -1,3 +1,4 @@
import type { NeededNodeType } from '@n8n/task-runner';
import type { Dirent } from 'fs'; import type { Dirent } from 'fs';
import { readdir } from 'fs/promises'; import { readdir } from 'fs/promises';
import { loadClassInIsolation } from 'n8n-core'; import { loadClassInIsolation } from 'n8n-core';
@ -149,4 +150,22 @@ export class NodeTypes implements INodeTypes {
dirent.name.toLowerCase().startsWith('v') dirent.name.toLowerCase().startsWith('v')
); );
} }
getNodeTypeDescriptions(nodeTypes: NeededNodeType[]): INodeTypeDescription[] {
return nodeTypes.map(({ name: nodeTypeName, version: nodeTypeVersion }) => {
const nodeType = this.getNode(nodeTypeName);
if (!nodeType) throw new ApplicationError(`Unknown node type: ${nodeTypeName}`);
const { description } = NodeHelpers.getVersionedNodeType(nodeType.type, nodeTypeVersion);
const descriptionCopy = { ...description };
descriptionCopy.name = descriptionCopy.name.startsWith('n8n-nodes')
? descriptionCopy.name
: `n8n-nodes-base.${descriptionCopy.name}`; // nodes-base nodes are unprefixed
return descriptionCopy;
});
}
} }

View file

@ -1,5 +1,6 @@
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner'; import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { INodeTypeBaseDescription } from 'n8n-workflow';
import { TaskRejectError } from '../errors'; import { TaskRejectError } from '../errors';
import { TaskBroker } from '../task-broker.service'; import { TaskBroker } from '../task-broker.service';
@ -11,7 +12,7 @@ describe('TaskBroker', () => {
let taskBroker: TaskBroker; let taskBroker: TaskBroker;
beforeEach(() => { beforeEach(() => {
taskBroker = new TaskBroker(mock(), mock()); taskBroker = new TaskBroker(mock());
jest.restoreAllMocks(); jest.restoreAllMocks();
}); });
@ -76,13 +77,6 @@ describe('TaskBroker', () => {
const messageCallback = jest.fn(); const messageCallback = jest.fn();
taskBroker.registerRunner(runner, messageCallback); taskBroker.registerRunner(runner, messageCallback);
expect(messageCallback).toBeCalledWith({
type: 'broker:nodetypes',
// We're mocking the node types service, so this will
// be undefined.
nodeType: undefined,
});
}); });
}); });
@ -560,5 +554,68 @@ describe('TaskBroker', () => {
params: rpcParams, params: rpcParams,
}); });
}); });
it('should handle `runner:nodetypesrequest` message', async () => {
const runnerId = 'runner1';
const taskId = 'task1';
const requesterId = 'requester1';
const requestId = 'request1';
const requestParams = [
{
name: 'n8n-nodes-base.someNode',
version: 1,
},
];
const message: RunnerMessage.ToBroker.NodeTypesRequest = {
type: 'runner:nodetypesrequest',
taskId,
requestId,
requestParams,
};
const requesterMessageCallback = jest.fn();
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
});
taskBroker.registerRequester(requesterId, requesterMessageCallback);
await taskBroker.onRunnerMessage(runnerId, message);
expect(requesterMessageCallback).toHaveBeenCalledWith({
type: 'broker:nodetypesrequest',
taskId,
requestId,
requestParams,
});
});
});
describe('onRequesterMessage', () => {
it('should handle `requester:nodetypesresponse` message', async () => {
const runnerId = 'runner1';
const taskId = 'task1';
const requesterId = 'requester1';
const requestId = 'request1';
const nodeTypes = [mock<INodeTypeBaseDescription>(), mock<INodeTypeBaseDescription>()];
const runnerMessageCallback = jest.fn();
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), runnerMessageCallback);
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
});
await taskBroker.handleRequesterNodeTypesResponse(taskId, requestId, nodeTypes);
expect(runnerMessageCallback).toHaveBeenCalledWith({
type: 'broker:nodetypes',
taskId,
requestId,
nodeTypes,
});
});
}); });
}); });

View file

@ -8,7 +8,6 @@ import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { TaskRejectError } from './errors'; import { TaskRejectError } from './errors';
@ -79,19 +78,7 @@ export class TaskBroker {
private pendingTaskRequests: TaskRequest[] = []; private pendingTaskRequests: TaskRequest[] = [];
constructor( constructor(private readonly logger: Logger) {}
private readonly logger: Logger,
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
) {
this.loadNodesAndCredentials.addPostProcessor(this.updateNodeTypes);
}
updateNodeTypes = async () => {
await this.messageAllRunners({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
};
expireTasks() { expireTasks() {
const now = process.hrtime.bigint(); const now = process.hrtime.bigint();
@ -105,10 +92,6 @@ export class TaskBroker {
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) { registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
this.knownRunners.set(runner.id, { runner, messageCallback }); this.knownRunners.set(runner.id, { runner, messageCallback });
void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' }); void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' });
void this.knownRunners.get(runner.id)!.messageCallback({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
} }
deregisterRunner(runnerId: string, error: Error) { deregisterRunner(runnerId: string, error: Error) {
@ -145,14 +128,6 @@ export class TaskBroker {
await this.knownRunners.get(runnerId)?.messageCallback(message); await this.knownRunners.get(runnerId)?.messageCallback(message);
} }
private async messageAllRunners(message: BrokerMessage.ToRunner.All) {
await Promise.allSettled(
[...this.knownRunners.values()].map(async (runner) => {
await runner.messageCallback(message);
}),
);
}
private async messageRequester(requesterId: string, message: BrokerMessage.ToRequester.All) { private async messageRequester(requesterId: string, message: BrokerMessage.ToRequester.All) {
await this.requesters.get(requesterId)?.(message); await this.requesters.get(requesterId)?.(message);
} }
@ -187,7 +162,9 @@ export class TaskBroker {
case 'runner:taskdatarequest': case 'runner:taskdatarequest':
await this.handleDataRequest(message.taskId, message.requestId, message.requestParams); await this.handleDataRequest(message.taskId, message.requestId, message.requestParams);
break; break;
case 'runner:nodetypesrequest':
await this.handleNodeTypesRequest(message.taskId, message.requestId, message.requestParams);
break;
case 'runner:rpc': case 'runner:rpc':
await this.handleRpcRequest(message.taskId, message.callId, message.name, message.params); await this.handleRpcRequest(message.taskId, message.callId, message.name, message.params);
break; break;
@ -249,6 +226,23 @@ export class TaskBroker {
}); });
} }
async handleNodeTypesRequest(
taskId: Task['id'],
requestId: RunnerMessage.ToBroker.NodeTypesRequest['requestId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const task = this.tasks.get(taskId);
if (!task) {
return;
}
await this.messageRequester(task.requesterId, {
type: 'broker:nodetypesrequest',
taskId,
requestId,
requestParams,
});
}
async handleResponse( async handleResponse(
taskId: Task['id'], taskId: Task['id'],
requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'], requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'],
@ -284,6 +278,13 @@ export class TaskBroker {
case 'requester:taskdataresponse': case 'requester:taskdataresponse':
await this.handleRequesterDataResponse(message.taskId, message.requestId, message.data); await this.handleRequesterDataResponse(message.taskId, message.requestId, message.data);
break; break;
case 'requester:nodetypesresponse':
await this.handleRequesterNodeTypesResponse(
message.taskId,
message.requestId,
message.nodeTypes,
);
break;
case 'requester:rpcresponse': case 'requester:rpcresponse':
await this.handleRequesterRpcResponse( await this.handleRequesterRpcResponse(
message.taskId, message.taskId,
@ -322,6 +323,21 @@ export class TaskBroker {
}); });
} }
async handleRequesterNodeTypesResponse(
taskId: Task['id'],
requestId: RequesterMessage.ToBroker.NodeTypesResponse['requestId'],
nodeTypes: RequesterMessage.ToBroker.NodeTypesResponse['nodeTypes'],
) {
const runner = await this.getRunnerOrFailTask(taskId);
await this.messageRunner(runner.id, {
type: 'broker:nodetypes',
taskId,
requestId,
nodeTypes,
});
}
handleRequesterAccept( handleRequesterAccept(
taskId: Task['id'], taskId: Task['id'],
settings: RequesterMessage.ToBroker.TaskSettings['settings'], settings: RequesterMessage.ToBroker.TaskSettings['settings'],

View file

@ -1,17 +1,20 @@
import type { RequesterMessage } from '@n8n/task-runner'; import type { RequesterMessage } from '@n8n/task-runner';
import Container from 'typedi'; import Container, { Service } from 'typedi';
import { NodeTypes } from '@/node-types';
import { TaskManager } from './task-manager'; import { TaskManager } from './task-manager';
import type { RequesterMessageCallback } from '../task-broker.service'; import type { RequesterMessageCallback } from '../task-broker.service';
import { TaskBroker } from '../task-broker.service'; import { TaskBroker } from '../task-broker.service';
@Service()
export class LocalTaskManager extends TaskManager { export class LocalTaskManager extends TaskManager {
taskBroker: TaskBroker; taskBroker: TaskBroker;
id: string = 'single-main'; id: string = 'single-main';
constructor() { constructor(nodeTypes: NodeTypes) {
super(); super(nodeTypes);
this.registerRequester(); this.registerRequester();
} }

View file

@ -17,6 +17,9 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import { NodeTypes } from '@/node-types';
import { DataRequestResponseBuilder } from './data-request-response-builder'; import { DataRequestResponseBuilder } from './data-request-response-builder';
@ -43,7 +46,8 @@ interface ExecuteFunctionObject {
[name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject; [name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject;
} }
export class TaskManager { @Service()
export abstract class TaskManager {
requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map(); requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map();
taskAcceptRejects: Map<string, { accept: TaskAccept; reject: TaskReject }> = new Map(); taskAcceptRejects: Map<string, { accept: TaskAccept; reject: TaskReject }> = new Map();
@ -52,6 +56,8 @@ export class TaskManager {
tasks: Map<string, Task> = new Map(); tasks: Map<string, Task> = new Map();
constructor(private readonly nodeTypes: NodeTypes) {}
async startTask<TData, TError>( async startTask<TData, TError>(
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
taskType: string, taskType: string,
@ -173,6 +179,9 @@ export class TaskManager {
case 'broker:taskdatarequest': case 'broker:taskdatarequest':
this.sendTaskData(message.taskId, message.requestId, message.requestParams); this.sendTaskData(message.taskId, message.requestId, message.requestParams);
break; break;
case 'broker:nodetypesrequest':
this.sendNodeTypes(message.taskId, message.requestId, message.requestParams);
break;
case 'broker:rpc': case 'broker:rpc':
void this.handleRpc(message.taskId, message.callId, message.name, message.params); void this.handleRpc(message.taskId, message.callId, message.name, message.params);
break; break;
@ -239,6 +248,21 @@ export class TaskManager {
}); });
} }
sendNodeTypes(
taskId: string,
requestId: string,
neededNodeTypes: BrokerMessage.ToRequester.NodeTypesRequest['requestParams'],
) {
const nodeTypes = this.nodeTypes.getNodeTypeDescriptions(neededNodeTypes);
this.sendMessage({
type: 'requester:nodetypesresponse',
taskId,
requestId,
nodeTypes,
});
}
async handleRpc( async handleRpc(
taskId: string, taskId: string,
callId: string, callId: string,

View file

@ -54,7 +54,7 @@ export class TaskRunnerModule {
private async loadTaskManager() { private async loadTaskManager() {
const { TaskManager } = await import('@/runners/task-managers/task-manager'); const { TaskManager } = await import('@/runners/task-managers/task-manager');
const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager'); const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager');
this.taskManager = new LocalTaskManager(); this.taskManager = Container.get(LocalTaskManager);
Container.set(TaskManager, this.taskManager); Container.set(TaskManager, this.taskManager);
} }

View file

@ -20,7 +20,7 @@ export class OrchestrationService {
private subscriber: Subscriber; private subscriber: Subscriber;
protected isInitialized = false; isInitialized = false;
private isMultiMainSetupLicensed = false; private isMultiMainSetupLicensed = false;

View file

@ -1,4 +1,4 @@
import type { GlobalConfig } from '@n8n/config'; import type { PruningConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core'; import type { InstanceSettings } from 'n8n-core';
@ -8,9 +8,13 @@ import { mockLogger } from '@test/mocking';
import { PruningService } from '../pruning.service'; import { PruningService } from '../pruning.service';
jest.mock('@/db', () => ({
connectionState: { migrated: true },
}));
describe('PruningService', () => { describe('PruningService', () => {
describe('init', () => { describe('init', () => {
it('should start pruning if leader', () => { it('should start pruning on main instance that is the leader', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: true }), mock<InstanceSettings>({ isLeader: true }),
@ -29,7 +33,7 @@ describe('PruningService', () => {
expect(startPruningSpy).toHaveBeenCalled(); expect(startPruningSpy).toHaveBeenCalled();
}); });
it('should not start pruning if follower', () => { it('should not start pruning on main instance that is a follower', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: false }), mock<InstanceSettings>({ isLeader: false }),
@ -48,7 +52,7 @@ describe('PruningService', () => {
expect(startPruningSpy).not.toHaveBeenCalled(); expect(startPruningSpy).not.toHaveBeenCalled();
}); });
it('should register leadership events if multi-main setup is enabled', () => { it('should register leadership events if main on multi-main setup', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: true }), mock<InstanceSettings>({ isLeader: true }),
@ -88,13 +92,10 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: true } }), mock<PruningConfig>({ isEnabled: true }),
); );
// @ts-expect-error Private method expect(pruningService.isEnabled).toBe(true);
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(true);
}); });
it('should return `false` based on config if leader main', () => { it('should return `false` based on config if leader main', () => {
@ -107,16 +108,13 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: false } }), mock<PruningConfig>({ isEnabled: false }),
); );
// @ts-expect-error Private method expect(pruningService.isEnabled).toBe(false);
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
}); });
it('should return `false` if non-main even if enabled', () => { it('should return `false` if non-main even if config is enabled', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker' }), mock<InstanceSettings>({ isLeader: false, instanceType: 'worker' }),
@ -126,16 +124,13 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: true } }), mock<PruningConfig>({ isEnabled: true }),
); );
// @ts-expect-error Private method expect(pruningService.isEnabled).toBe(false);
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
}); });
it('should return `false` if follower main even if enabled', () => { it('should return `false` if follower main even if config is enabled', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: false, isFollower: true, instanceType: 'main' }), mock<InstanceSettings>({ isLeader: false, isFollower: true, instanceType: 'main' }),
@ -145,13 +140,10 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: true }, multiMainSetup: { enabled: true } }), mock<PruningConfig>({ isEnabled: true }),
); );
// @ts-expect-error Private method expect(pruningService.isEnabled).toBe(false);
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
}); });
}); });
@ -166,22 +158,25 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: false } }), mock<PruningConfig>({ isEnabled: false }),
);
const scheduleRollingSoftDeletionsSpy = jest.spyOn(
pruningService,
// @ts-expect-error Private method
'scheduleRollingSoftDeletions',
); );
// @ts-expect-error Private method // @ts-expect-error Private method
const setSoftDeletionInterval = jest.spyOn(pruningService, 'setSoftDeletionInterval'); const scheduleNextHardDeletionSpy = jest.spyOn(pruningService, 'scheduleNextHardDeletion');
// @ts-expect-error Private method
const scheduleHardDeletion = jest.spyOn(pruningService, 'scheduleHardDeletion');
pruningService.startPruning(); pruningService.startPruning();
expect(setSoftDeletionInterval).not.toHaveBeenCalled(); expect(scheduleRollingSoftDeletionsSpy).not.toHaveBeenCalled();
expect(scheduleHardDeletion).not.toHaveBeenCalled(); expect(scheduleNextHardDeletionSpy).not.toHaveBeenCalled();
}); });
it('should start pruning if service is enabled', () => { it('should start pruning if service is enabled and DB is migrated', () => {
const pruningService = new PruningService( const pruningService = new PruningService(
mockLogger(), mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }), mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
@ -191,23 +186,23 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true, isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(), multiMainSetup: mock<MultiMainSetup>(),
}), }),
mock<GlobalConfig>({ pruning: { isEnabled: true } }), mock<PruningConfig>({ isEnabled: true }),
); );
const setSoftDeletionInterval = jest const scheduleRollingSoftDeletionsSpy = jest
// @ts-expect-error Private method // @ts-expect-error Private method
.spyOn(pruningService, 'setSoftDeletionInterval') .spyOn(pruningService, 'scheduleRollingSoftDeletions')
.mockImplementation(); .mockImplementation();
const scheduleHardDeletion = jest const scheduleNextHardDeletionSpy = jest
// @ts-expect-error Private method // @ts-expect-error Private method
.spyOn(pruningService, 'scheduleHardDeletion') .spyOn(pruningService, 'scheduleNextHardDeletion')
.mockImplementation(); .mockImplementation();
pruningService.startPruning(); pruningService.startPruning();
expect(setSoftDeletionInterval).toHaveBeenCalled(); expect(scheduleRollingSoftDeletionsSpy).toHaveBeenCalled();
expect(scheduleHardDeletion).toHaveBeenCalled(); expect(scheduleNextHardDeletionSpy).toHaveBeenCalled();
}); });
}); });
}); });

View file

@ -1,27 +1,37 @@
import { GlobalConfig } from '@n8n/config'; import { PruningConfig } from '@n8n/config';
import { BinaryDataService, InstanceSettings } from 'n8n-core'; import { BinaryDataService, InstanceSettings } from 'n8n-core';
import { jsonStringify } from 'n8n-workflow'; import { ensureError } from 'n8n-workflow';
import { strict } from 'node:assert';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { TIME } from '@/constants'; import { Time } from '@/constants';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { connectionState as dbConnectionState } from '@/db';
import { OnShutdown } from '@/decorators/on-shutdown'; import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '../orchestration.service'; import { OrchestrationService } from '../orchestration.service';
/**
* Responsible for pruning executions from the database and their associated binary data
* from the filesystem, on a rolling basis. By default we soft-delete execution rows
* every cycle and hard-delete them and their binary data every 4th cycle.
*/
@Service() @Service()
export class PruningService { export class PruningService {
private hardDeletionBatchSize = 100; /** Timer for soft-deleting executions on a rolling basis. */
private softDeletionInterval: NodeJS.Timer | undefined;
private rates: Record<string, number> = { /** Timeout for next hard-deletion of soft-deleted executions. */
softDeletion: this.globalConfig.pruning.softDeleteInterval * TIME.MINUTE, private hardDeletionTimeout: NodeJS.Timeout | undefined;
hardDeletion: this.globalConfig.pruning.hardDeleteInterval * TIME.MINUTE,
private readonly rates = {
softDeletion: this.pruningConfig.softDeleteInterval * Time.minutes.toMilliseconds,
hardDeletion: this.pruningConfig.hardDeleteInterval * Time.minutes.toMilliseconds,
}; };
public softDeletionInterval: NodeJS.Timer | undefined; /** Max number of executions to hard-delete in a cycle. */
private readonly batchSize = 100;
public hardDeletionTimeout: NodeJS.Timeout | undefined;
private isShuttingDown = false; private isShuttingDown = false;
@ -31,103 +41,68 @@ export class PruningService {
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService, private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
private readonly globalConfig: GlobalConfig, private readonly pruningConfig: PruningConfig,
) { ) {
this.logger = this.logger.scoped('pruning'); this.logger = this.logger.scoped('pruning');
} }
/**
* @important Requires `OrchestrationService` to be initialized.
*/
init() { init() {
const { isLeader } = this.instanceSettings; strict(this.instanceSettings.instanceRole !== 'unset', 'Instance role is not set');
const { isMultiMainSetupEnabled } = this.orchestrationService;
if (isLeader) this.startPruning(); if (this.instanceSettings.isLeader) this.startPruning();
if (isMultiMainSetupEnabled) { if (this.orchestrationService.isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning()); this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning());
this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning()); this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning());
} }
} }
private isEnabled() { get isEnabled() {
const { instanceType, isFollower } = this.instanceSettings; return (
if (!this.globalConfig.pruning.isEnabled || instanceType !== 'main') { this.pruningConfig.isEnabled &&
return false; this.instanceSettings.instanceType === 'main' &&
this.instanceSettings.isLeader
);
} }
if (this.globalConfig.multiMainSetup.enabled && instanceType === 'main' && isFollower) {
return false;
}
return true;
}
/**
* @important Call this method only after DB migrations have completed.
*/
startPruning() { startPruning() {
if (!this.isEnabled()) return; if (!this.isEnabled || !dbConnectionState.migrated || this.isShuttingDown) return;
if (this.isShuttingDown) { this.scheduleRollingSoftDeletions();
this.logger.warn('Cannot start pruning while shutting down'); this.scheduleNextHardDeletion();
return;
}
this.logger.debug('Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
this.scheduleHardDeletion();
} }
stopPruning() { stopPruning() {
if (!this.isEnabled()) return; if (!this.isEnabled) return;
this.logger.debug('Removing soft-deletion and hard-deletion timers');
clearInterval(this.softDeletionInterval); clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout); clearTimeout(this.hardDeletionTimeout);
} }
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) { private scheduleRollingSoftDeletions(rateMs = this.rates.softDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.softDeletionInterval = setInterval( this.softDeletionInterval = setInterval(
async () => await this.softDeleteOnPruningCycle(), async () => await this.softDelete(),
this.rates.softDeletion, this.rates.softDeletion,
); );
this.logger.debug(`Soft-deletion scheduled every ${when}`); this.logger.debug(`Soft-deletion every ${rateMs * Time.milliseconds.toMinutes} minutes`);
} }
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) { private scheduleNextHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.hardDeletionTimeout = setTimeout(() => { this.hardDeletionTimeout = setTimeout(() => {
this.hardDeleteOnPruningCycle() this.hardDelete()
.then((rate) => this.scheduleHardDeletion(rate)) .then((rate) => this.scheduleNextHardDeletion(rate))
.catch((error) => { .catch((error) => {
this.scheduleHardDeletion(1 * TIME.SECOND); this.scheduleNextHardDeletion(1_000);
this.logger.error('Failed to hard-delete executions', { error: ensureError(error) });
const errorMessage =
error instanceof Error
? error.message
: jsonStringify(error, { replaceCircularRefs: true });
this.logger.error('Failed to hard-delete executions', { errorMessage });
}); });
}, rateMs); }, rateMs);
this.logger.debug(`Hard-deletion scheduled for next ${when}`); this.logger.debug(`Hard-deletion in next ${rateMs * Time.milliseconds.toMinutes} minutes`);
} }
/** /** Soft-delete executions based on max age and/or max count. */
* Mark executions as deleted based on age and count, in a pruning cycle. async softDelete() {
*/
async softDeleteOnPruningCycle() {
this.logger.debug('Starting soft-deletion of executions');
const result = await this.executionRepository.softDeletePrunableExecutions(); const result = await this.executionRepository.softDeletePrunableExecutions();
if (result.affected === 0) { if (result.affected === 0) {
@ -145,10 +120,11 @@ export class PruningService {
} }
/** /**
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle. * Delete all soft-deleted executions and their binary data.
* @return Delay in ms after which the next cycle should be started *
* @returns Delay in milliseconds until next hard-deletion
*/ */
private async hardDeleteOnPruningCycle() { private async hardDelete(): Promise<number> {
const ids = await this.executionRepository.findSoftDeletedExecutions(); const ids = await this.executionRepository.findSoftDeletedExecutions();
const executionIds = ids.map((o) => o.executionId); const executionIds = ids.map((o) => o.executionId);
@ -160,8 +136,6 @@ export class PruningService {
} }
try { try {
this.logger.debug('Starting hard-deletion of executions', { executionIds });
await this.binaryDataService.deleteMany(ids); await this.binaryDataService.deleteMany(ids);
await this.executionRepository.deleteByIds(executionIds); await this.executionRepository.deleteByIds(executionIds);
@ -170,16 +144,13 @@ export class PruningService {
} catch (error) { } catch (error) {
this.logger.error('Failed to hard-delete executions', { this.logger.error('Failed to hard-delete executions', {
executionIds, executionIds,
error: error instanceof Error ? error.message : `${error}`, error: ensureError(error),
}); });
} }
/** // if high volume, speed up next hard-deletion
* For next batch, speed up hard-deletion cycle in high-volume case if (executionIds.length >= this.batchSize) return 1 * Time.seconds.toMilliseconds;
* to prevent high concurrency from causing duplicate deletions.
*/
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; return this.rates.hardDeletion;
} }
} }

View file

@ -1,4 +1,4 @@
import { GlobalConfig } from '@n8n/config'; import { PruningConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { BinaryDataService, InstanceSettings } from 'n8n-core'; import { BinaryDataService, InstanceSettings } from 'n8n-core';
import type { ExecutionStatus } from 'n8n-workflow'; import type { ExecutionStatus } from 'n8n-workflow';
@ -27,19 +27,19 @@ describe('softDeleteOnPruningCycle()', () => {
const now = new Date(); const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY); const yesterday = new Date(Date.now() - TIME.DAY);
let workflow: WorkflowEntity; let workflow: WorkflowEntity;
let globalConfig: GlobalConfig; let pruningConfig: PruningConfig;
beforeAll(async () => { beforeAll(async () => {
await testDb.init(); await testDb.init();
globalConfig = Container.get(GlobalConfig); pruningConfig = Container.get(PruningConfig);
pruningService = new PruningService( pruningService = new PruningService(
mockLogger(), mockLogger(),
instanceSettings, instanceSettings,
Container.get(ExecutionRepository), Container.get(ExecutionRepository),
mockInstance(BinaryDataService), mockInstance(BinaryDataService),
mock(), mock(),
globalConfig, pruningConfig,
); );
workflow = await createWorkflow(); workflow = await createWorkflow();
@ -62,8 +62,8 @@ describe('softDeleteOnPruningCycle()', () => {
describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => { describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
beforeAll(() => { beforeAll(() => {
globalConfig.pruning.maxAge = 336; pruningConfig.maxAge = 336;
globalConfig.pruning.maxCount = 1; pruningConfig.maxCount = 1;
}); });
test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => { test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
@ -73,7 +73,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -92,7 +92,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -113,7 +113,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -132,7 +132,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -150,7 +150,7 @@ describe('softDeleteOnPruningCycle()', () => {
await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]); await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]);
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -163,8 +163,8 @@ describe('softDeleteOnPruningCycle()', () => {
describe('when EXECUTIONS_DATA_MAX_AGE is set', () => { describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
beforeAll(() => { beforeAll(() => {
globalConfig.pruning.maxAge = 1; pruningConfig.maxAge = 1;
globalConfig.pruning.maxCount = 0; pruningConfig.maxCount = 0;
}); });
test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => { test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
@ -179,7 +179,7 @@ describe('softDeleteOnPruningCycle()', () => {
), ),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -203,7 +203,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -221,7 +221,7 @@ describe('softDeleteOnPruningCycle()', () => {
])('should prune %s executions', async (status, attributes) => { ])('should prune %s executions', async (status, attributes) => {
const execution = await createExecution({ status, ...attributes }, workflow); const execution = await createExecution({ status, ...attributes }, workflow);
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -239,7 +239,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow), await createSuccessfulExecution(workflow),
]; ];
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([
@ -266,7 +266,7 @@ describe('softDeleteOnPruningCycle()', () => {
await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]); await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]);
await pruningService.softDeleteOnPruningCycle(); await pruningService.softDelete();
const result = await findAllExecutions(); const result = await findAllExecutions();
expect(result).toEqual([ expect(result).toEqual([

View file

@ -76,7 +76,6 @@ import type {
IPollFunctions, IPollFunctions,
IRequestOptions, IRequestOptions,
IRunExecutionData, IRunExecutionData,
ISourceData,
ITaskData, ITaskData,
ITaskDataConnections, ITaskDataConnections,
ITriggerFunctions, ITriggerFunctions,
@ -166,7 +165,13 @@ import { extractValue } from './ExtractValue';
import { InstanceSettings } from './InstanceSettings'; import { InstanceSettings } from './InstanceSettings';
import type { ExtendedValidationResult, IResponseError } from './Interfaces'; import type { ExtendedValidationResult, IResponseError } from './Interfaces';
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
import { HookContext, PollContext, TriggerContext, WebhookContext } from './node-execution-context'; import {
ExecuteSingleContext,
HookContext,
PollContext,
TriggerContext,
WebhookContext,
} from './node-execution-context';
import { getSecretsProxy } from './Secrets'; import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager'; import { SSHClientsManager } from './SSHClientsManager';
@ -4180,145 +4185,19 @@ export function getExecuteSingleFunctions(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal, abortSignal?: AbortSignal,
): IExecuteSingleFunctions { ): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => { return new ExecuteSingleContext(
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;
return workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
runExecutionData,
runIndex,
evaluateItemIndex,
node.name,
connectionInputData,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
);
},
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
getCredentials: async (type) =>
await getCredentials(
workflow,
node,
type,
additionalData,
mode,
executeData,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
),
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return { json: {} };
}
// TODO: Check if nodeType has input with that index defined
if (inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input index', {
extra: { inputIndex, inputName },
});
}
const allItems = inputData[inputName][inputIndex];
if (allItems === null) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, inputName },
});
}
if (allItems[itemIndex] === null) {
throw new ApplicationError('Value of input with given index was not set', {
extra: { inputIndex, inputName, itemIndex },
});
}
return allItems[itemIndex];
},
getInputSourceData: (inputIndex = 0, inputName = 'main') => {
if (executeData?.source === null) {
// Should never happen as n8n sets it automatically
throw new ApplicationError('Source data is missing');
}
return executeData.source[inputName][inputIndex] as ISourceData;
},
getItemIndex: () => itemIndex,
getMode: () => mode,
getExecuteData: () => executeData,
getNodeParameter: (
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object => {
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
options,
);
},
getWorkflowDataProxy: (): IWorkflowDataProxyData => {
const dataProxy = new WorkflowDataProxy(
workflow,
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
{},
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
);
return dataProxy.getDataProxy();
},
helpers: {
createDeferredPromise,
returnJsonArray,
...getRequestHelperFunctions(
workflow, workflow,
node, node,
additionalData, additionalData,
mode,
runExecutionData, runExecutionData,
runIndex,
connectionInputData, connectionInputData,
), inputData,
...getBinaryHelperFunctions(additionalData, workflow.id), itemIndex,
executeData,
assertBinaryData: (propertyName, inputIndex = 0) => abortSignal,
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), );
getBinaryDataBuffer: async (propertyName, inputIndex = 0) =>
await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex),
},
logAiEvent: (eventName: AiEvent, msg: string) => {
return additionalData.logAiEvent(eventName, {
executionId: additionalData.executionId ?? 'unsaved-execution',
nodeName: node.name,
workflowName: workflow.name ?? 'Unnamed workflow',
nodeType: node.type,
workflowId: workflow.id ?? 'unsaved-workflow',
msg,
});
},
};
})(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex);
} }
export function getCredentialTestFunctions(): ICredentialTestFunctions { export function getCredentialTestFunctions(): ICredentialTestFunctions {

View file

@ -0,0 +1,301 @@
import { mock } from 'jest-mock-extended';
import type {
INode,
IWorkflowExecuteAdditionalData,
IRunExecutionData,
INodeExecutionData,
ITaskDataConnections,
IExecuteData,
Workflow,
WorkflowExecuteMode,
ICredentialsHelper,
Expression,
INodeType,
INodeTypes,
OnError,
ContextType,
IContextObject,
ICredentialDataDecryptedObject,
ISourceData,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import { ExecuteSingleContext } from '../execute-single-context';
describe('ExecuteSingleContext', () => {
const testCredentialType = 'testCredential';
const nodeType = mock<INodeType>({
description: {
credentials: [
{
name: testCredentialType,
required: true,
},
],
properties: [
{
name: 'testParameter',
required: true,
},
],
},
});
const nodeTypes = mock<INodeTypes>();
const expression = mock<Expression>();
const workflow = mock<Workflow>({ expression, nodeTypes });
const node = mock<INode>({
credentials: {
[testCredentialType]: {
id: 'testCredentialId',
},
},
});
node.parameters = {
testParameter: 'testValue',
};
const credentialsHelper = mock<ICredentialsHelper>();
const additionalData = mock<IWorkflowExecuteAdditionalData>({ credentialsHelper });
const mode: WorkflowExecuteMode = 'manual';
const runExecutionData = mock<IRunExecutionData>();
const connectionInputData = mock<INodeExecutionData[]>();
const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] };
const executeData = mock<IExecuteData>();
const runIndex = 0;
const itemIndex = 0;
const abortSignal = mock<AbortSignal>();
const executeSingleContext = new ExecuteSingleContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
itemIndex,
executeData,
abortSignal,
);
beforeEach(() => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
expression.getParameterValue.mockImplementation((value) => value);
});
describe('getExecutionCancelSignal', () => {
it('should return the abort signal', () => {
expect(executeSingleContext.getExecutionCancelSignal()).toBe(abortSignal);
});
});
describe('continueOnFail', () => {
afterEach(() => {
node.onError = undefined;
node.continueOnFail = false;
});
it('should return false for nodes by default', () => {
expect(executeSingleContext.continueOnFail()).toEqual(false);
});
it('should return true if node has continueOnFail set to true', () => {
node.continueOnFail = true;
expect(executeSingleContext.continueOnFail()).toEqual(true);
});
test.each([
['continueRegularOutput', true],
['continueErrorOutput', true],
['stopWorkflow', false],
])('if node has onError set to %s, it should return %s', (onError, expected) => {
node.onError = onError as OnError;
expect(executeSingleContext.continueOnFail()).toEqual(expected);
});
});
describe('evaluateExpression', () => {
it('should evaluate the expression correctly', () => {
const expression = '$json.test';
const expectedResult = 'data';
const resolveSimpleParameterValueSpy = jest.spyOn(
workflow.expression,
'resolveSimpleParameterValue',
);
resolveSimpleParameterValueSpy.mockReturnValue(expectedResult);
expect(executeSingleContext.evaluateExpression(expression, itemIndex)).toEqual(
expectedResult,
);
expect(resolveSimpleParameterValueSpy).toHaveBeenCalledWith(
`=${expression}`,
{},
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
mode,
expect.objectContaining({}),
executeData,
);
resolveSimpleParameterValueSpy.mockRestore();
});
});
describe('getContext', () => {
it('should return the context object', () => {
const contextType: ContextType = 'node';
const expectedContext = mock<IContextObject>();
const getContextSpy = jest.spyOn(NodeHelpers, 'getContext');
getContextSpy.mockReturnValue(expectedContext);
expect(executeSingleContext.getContext(contextType)).toEqual(expectedContext);
expect(getContextSpy).toHaveBeenCalledWith(runExecutionData, contextType, node);
getContextSpy.mockRestore();
});
});
describe('getInputData', () => {
const inputIndex = 0;
const inputName = 'main';
afterEach(() => {
inputData[inputName] = [[{ json: { test: 'data' } }]];
});
it('should return the input data correctly', () => {
const expectedData = { json: { test: 'data' } };
expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
});
it('should return an empty object if the input name does not exist', () => {
const inputName = 'nonExistent';
const expectedData = { json: {} };
expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
});
it('should throw an error if the input index is out of range', () => {
const inputIndex = 1;
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
ApplicationError,
);
});
it('should throw an error if the input index was not set', () => {
inputData.main[inputIndex] = null;
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
ApplicationError,
);
});
it('should throw an error if the value of input with given index was not set', () => {
delete inputData.main[inputIndex]![itemIndex];
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
ApplicationError,
);
});
});
describe('getItemIndex', () => {
it('should return the item index correctly', () => {
expect(executeSingleContext.getItemIndex()).toEqual(itemIndex);
});
});
describe('getNodeParameter', () => {
beforeEach(() => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
expression.getParameterValue.mockImplementation((value) => value);
});
it('should return parameter value when it exists', () => {
const parameter = executeSingleContext.getNodeParameter('testParameter');
expect(parameter).toBe('testValue');
});
it('should return the fallback value when the parameter does not exist', () => {
const parameter = executeSingleContext.getNodeParameter('otherParameter', 'fallback');
expect(parameter).toBe('fallback');
});
});
describe('getCredentials', () => {
it('should get decrypted credentials', async () => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' });
const credentials =
await executeSingleContext.getCredentials<ICredentialDataDecryptedObject>(
testCredentialType,
);
expect(credentials).toEqual({ secret: 'token' });
});
});
describe('getExecuteData', () => {
it('should return the execute data correctly', () => {
expect(executeSingleContext.getExecuteData()).toEqual(executeData);
});
});
describe('getWorkflowDataProxy', () => {
it('should return the workflow data proxy correctly', () => {
const workflowDataProxy = executeSingleContext.getWorkflowDataProxy();
expect(workflowDataProxy.isProxy).toBe(true);
expect(Object.keys(workflowDataProxy.$input)).toEqual([
'all',
'context',
'first',
'item',
'last',
'params',
]);
});
});
describe('getInputSourceData', () => {
it('should return the input source data correctly', () => {
const inputSourceData = mock<ISourceData>();
executeData.source = { main: [inputSourceData] };
expect(executeSingleContext.getInputSourceData()).toEqual(inputSourceData);
});
it('should throw an error if the source data is missing', () => {
executeData.source = null;
expect(() => executeSingleContext.getInputSourceData()).toThrow(ApplicationError);
});
});
describe('logAiEvent', () => {
it('should log the AI event correctly', () => {
const eventName = 'ai-tool-called';
const msg = 'test message';
executeSingleContext.logAiEvent(eventName, msg);
expect(additionalData.logAiEvent).toHaveBeenCalledWith(eventName, {
executionId: additionalData.executionId,
nodeName: node.name,
workflowName: workflow.name,
nodeType: node.type,
workflowId: workflow.id,
msg,
});
});
});
});

View file

@ -0,0 +1,212 @@
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IRunExecutionData,
IExecuteSingleFunctions,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowExecuteMode,
ITaskDataConnections,
IExecuteData,
ContextType,
AiEvent,
ISourceData,
} from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
NodeHelpers,
WorkflowDataProxy,
} from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
assertBinaryData,
continueOnFail,
getAdditionalKeys,
getBinaryDataBuffer,
getCredentials,
getNodeParameter,
returnJsonArray,
} from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { NodeExecutionContext } from './node-execution-context';
export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions {
readonly helpers: IExecuteSingleFunctions['helpers'];
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
private readonly runExecutionData: IRunExecutionData,
private readonly runIndex: number,
private readonly connectionInputData: INodeExecutionData[],
private readonly inputData: ITaskDataConnections,
private readonly itemIndex: number,
private readonly executeData: IExecuteData,
private readonly abortSignal?: AbortSignal,
) {
super(workflow, node, additionalData, mode);
this.helpers = {
createDeferredPromise,
returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported,
...new RequestHelpers(this, workflow, node, additionalData).exported,
assertBinaryData: (propertyName, inputIndex = 0) =>
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),
getBinaryDataBuffer: async (propertyName, inputIndex = 0) =>
await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex),
};
}
getExecutionCancelSignal() {
return this.abortSignal;
}
onExecutionCancellation(handler: () => unknown) {
const fn = () => {
this.abortSignal?.removeEventListener('abort', fn);
handler();
};
this.abortSignal?.addEventListener('abort', fn);
}
continueOnFail() {
return continueOnFail(this.node);
}
evaluateExpression(expression: string, evaluateItemIndex: number | undefined) {
evaluateItemIndex = evaluateItemIndex ?? this.itemIndex;
return this.workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
this.runExecutionData,
this.runIndex,
evaluateItemIndex,
this.node.name,
this.connectionInputData,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
);
}
getContext(type: ContextType) {
return NodeHelpers.getContext(this.runExecutionData, type, this.node);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return { json: {} };
}
// TODO: Check if nodeType has input with that index defined
if (this.inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input index', {
extra: { inputIndex, inputName },
});
}
const allItems = this.inputData[inputName][inputIndex];
if (allItems === null || allItems === undefined) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, inputName },
});
}
const data = allItems[this.itemIndex];
if (data === null || data === undefined) {
throw new ApplicationError('Value of input with given index was not set', {
extra: { inputIndex, inputName, itemIndex: this.itemIndex },
});
}
return data;
}
getItemIndex() {
return this.itemIndex;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
getNodeParameter(parameterName: string, fallbackValue?: any, options?: IGetNodeParameterOptions) {
return getNodeParameter(
this.workflow,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.node,
parameterName,
this.itemIndex,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
fallbackValue,
options,
);
}
// TODO: extract out in a BaseExecutionContext
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
return await getCredentials<T>(
this.workflow,
this.node,
type,
this.additionalData,
this.mode,
this.executeData,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.itemIndex,
);
}
getExecuteData() {
return this.executeData;
}
getWorkflowDataProxy() {
return new WorkflowDataProxy(
this.workflow,
this.runExecutionData,
this.runIndex,
this.itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
).getDataProxy();
}
getInputSourceData(inputIndex = 0, inputName = 'main'): ISourceData {
if (this.executeData?.source === null) {
// Should never happen as n8n sets it automatically
throw new ApplicationError('Source data is missing');
}
return this.executeData.source[inputName][inputIndex] as ISourceData;
}
logAiEvent(eventName: AiEvent, msg: string) {
return this.additionalData.logAiEvent(eventName, {
executionId: this.additionalData.executionId ?? 'unsaved-execution',
nodeName: this.node.name,
workflowName: this.workflow.name ?? 'Unnamed workflow',
nodeType: this.node.type,
workflowId: this.workflow.id ?? 'unsaved-workflow',
msg,
});
}
}

View file

@ -1,4 +1,5 @@
// eslint-disable-next-line import/no-cycle // eslint-disable-next-line import/no-cycle
export { ExecuteSingleContext } from './execute-single-context';
export { HookContext } from './hook-context'; export { HookContext } from './hook-context';
export { LoadOptionsContext } from './load-options-context'; export { LoadOptionsContext } from './load-options-context';
export { PollContext } from './poll-context'; export { PollContext } from './poll-context';

View file

@ -37,6 +37,7 @@ import {
N8nText, N8nText,
N8nTooltip, N8nTooltip,
} from 'n8n-design-system'; } from 'n8n-design-system';
import { isEmpty } from '@/utils/typesUtils';
interface CredentialDropdownOption extends ICredentialsResponse { interface CredentialDropdownOption extends ICredentialsResponse {
typeDisplayName: string; typeDisplayName: string;
@ -87,9 +88,9 @@ const credentialTypesNode = computed(() =>
); );
const credentialTypesNodeDescriptionDisplayed = computed(() => const credentialTypesNodeDescriptionDisplayed = computed(() =>
credentialTypesNodeDescription.value.filter((credentialTypeDescription) => credentialTypesNodeDescription.value
displayCredentials(credentialTypeDescription), .filter((credentialTypeDescription) => displayCredentials(credentialTypeDescription))
), .map((type) => ({ type, options: getCredentialOptions(getAllRelatedCredentialTypes(type)) })),
); );
const credentialTypesNodeDescription = computed(() => { const credentialTypesNodeDescription = computed(() => {
if (typeof props.overrideCredType !== 'string') return []; if (typeof props.overrideCredType !== 'string') return [];
@ -149,6 +150,27 @@ watch(
{ immediate: true, deep: true }, { immediate: true, deep: true },
); );
// Select most recent credential by default
watch(
credentialTypesNodeDescriptionDisplayed,
(types) => {
if (types.length === 0 || !isEmpty(selected.value)) return;
const allOptions = types.map((type) => type.options).flat();
if (allOptions.length === 0) return;
const mostRecentCredential = allOptions.reduce(
(mostRecent, current) =>
mostRecent && mostRecent.updatedAt > current.updatedAt ? mostRecent : current,
allOptions[0],
);
onCredentialSelected(mostRecentCredential.type, mostRecentCredential.id);
},
{ immediate: true },
);
onMounted(() => { onMounted(() => {
credentialsStore.$onAction(({ name, after, args }) => { credentialsStore.$onAction(({ name, after, args }) => {
const listeningForActions = ['createNewCredential', 'updateCredential', 'deleteCredential']; const listeningForActions = ['createNewCredential', 'updateCredential', 'deleteCredential'];
@ -481,12 +503,9 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
v-if="credentialTypesNodeDescriptionDisplayed.length" v-if="credentialTypesNodeDescriptionDisplayed.length"
:class="['node-credentials', $style.container]" :class="['node-credentials', $style.container]"
> >
<div <div v-for="{ type, options } in credentialTypesNodeDescriptionDisplayed" :key="type.name">
v-for="credentialTypeDescription in credentialTypesNodeDescriptionDisplayed"
:key="credentialTypeDescription.name"
>
<N8nInputLabel <N8nInputLabel
:label="getCredentialsFieldLabel(credentialTypeDescription)" :label="getCredentialsFieldLabel(type)"
:bold="false" :bold="false"
size="small" size="small"
color="text-dark" color="text-dark"
@ -494,7 +513,7 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
> >
<div v-if="readonly"> <div v-if="readonly">
<N8nInput <N8nInput
:model-value="getSelectedName(credentialTypeDescription.name)" :model-value="getSelectedName(type.name)"
disabled disabled
size="small" size="small"
data-test-id="node-credentials-select" data-test-id="node-credentials-select"
@ -502,36 +521,20 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
</div> </div>
<div <div
v-else v-else
:class=" :class="getIssues(type.name).length && !hideIssues ? $style.hasIssues : $style.input"
getIssues(credentialTypeDescription.name).length && !hideIssues
? $style.hasIssues
: $style.input
"
data-test-id="node-credentials-select" data-test-id="node-credentials-select"
> >
<N8nSelect <N8nSelect
:model-value="getSelectedId(credentialTypeDescription.name)" :model-value="getSelectedId(type.name)"
:placeholder=" :placeholder="getSelectPlaceholder(type.name, getIssues(type.name))"
getSelectPlaceholder(
credentialTypeDescription.name,
getIssues(credentialTypeDescription.name),
)
"
size="small" size="small"
@update:model-value=" @update:model-value="
(value: string) => (value: string) => onCredentialSelected(type.name, value, showMixedCredentials(type))
onCredentialSelected(
credentialTypeDescription.name,
value,
showMixedCredentials(credentialTypeDescription),
)
" "
@blur="emit('blur', 'credentials')" @blur="emit('blur', 'credentials')"
> >
<N8nOption <N8nOption
v-for="item in getCredentialOptions( v-for="item in options"
getAllRelatedCredentialTypes(credentialTypeDescription),
)"
:key="item.id" :key="item.id"
:data-test-id="`node-credentials-select-item-${item.id}`" :data-test-id="`node-credentials-select-item-${item.id}`"
:label="item.name" :label="item.name"
@ -551,15 +554,12 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
</N8nOption> </N8nOption>
</N8nSelect> </N8nSelect>
<div <div v-if="getIssues(type.name).length && !hideIssues" :class="$style.warning">
v-if="getIssues(credentialTypeDescription.name).length && !hideIssues"
:class="$style.warning"
>
<N8nTooltip placement="top"> <N8nTooltip placement="top">
<template #content> <template #content>
<TitledList <TitledList
:title="`${$locale.baseText('nodeCredentials.issues')}:`" :title="`${$locale.baseText('nodeCredentials.issues')}:`"
:items="getIssues(credentialTypeDescription.name)" :items="getIssues(type.name)"
/> />
</template> </template>
<font-awesome-icon icon="exclamation-triangle" /> <font-awesome-icon icon="exclamation-triangle" />
@ -567,10 +567,7 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
</div> </div>
<div <div
v-if=" v-if="selected[type.name] && isCredentialExisting(type.name)"
selected[credentialTypeDescription.name] &&
isCredentialExisting(credentialTypeDescription.name)
"
:class="$style.edit" :class="$style.edit"
data-test-id="credential-edit-button" data-test-id="credential-edit-button"
> >
@ -578,7 +575,7 @@ function getCredentialsFieldLabel(credentialType: INodeCredentialDescription): s
icon="pen" icon="pen"
class="clickable" class="clickable"
:title="$locale.baseText('nodeCredentials.updateCredential')" :title="$locale.baseText('nodeCredentials.updateCredential')"
@click="editCredential(credentialTypeDescription.name)" @click="editCredential(type.name)"
/> />
</div> </div>
</div> </div>

View file

@ -7,7 +7,7 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeConnectionType, NodeHelpers } from 'n8n-workflow'; import { NodeConnectionType, NodeHelpers } from 'n8n-workflow';
import { useCanvasOperations } from '@/composables/useCanvasOperations'; import { useCanvasOperations } from '@/composables/useCanvasOperations';
import type { CanvasNode } from '@/types'; import type { CanvasConnection, CanvasNode } from '@/types';
import { CanvasConnectionMode } from '@/types'; import { CanvasConnectionMode } from '@/types';
import type { ICredentialsResponse, INodeUi, IWorkflowDb } from '@/Interface'; import type { ICredentialsResponse, INodeUi, IWorkflowDb } from '@/Interface';
import { RemoveNodeCommand } from '@/models/history'; import { RemoveNodeCommand } from '@/models/history';
@ -191,36 +191,6 @@ describe('useCanvasOperations', () => {
expect(result.position).toEqual([20, 20]); expect(result.position).toEqual([20, 20]);
}); });
it('should create node with default credentials when only one credential is available', () => {
const credentialsStore = useCredentialsStore();
const credential = mock<ICredentialsResponse>({ id: '1', name: 'cred', type: 'cred' });
const nodeTypeName = 'type';
const nodeTypeDescription = mockNodeTypeDescription({
name: nodeTypeName,
credentials: [{ name: credential.name }],
});
credentialsStore.state.credentials = {
[credential.id]: credential,
};
// @ts-expect-error Known pinia issue when spying on store getters
vi.spyOn(credentialsStore, 'getUsableCredentialByType', 'get').mockReturnValue(() => [
credential,
]);
const { addNode } = useCanvasOperations({ router });
const result = addNode(
{
type: nodeTypeName,
typeVersion: 1,
},
nodeTypeDescription,
);
expect(result.credentials).toEqual({ [credential.name]: { id: '1', name: credential.name } });
});
it('should not assign credentials when multiple credentials are available', () => { it('should not assign credentials when multiple credentials are available', () => {
const credentialsStore = useCredentialsStore(); const credentialsStore = useCredentialsStore();
const credentialA = mock<ICredentialsResponse>({ id: '1', name: 'credA', type: 'cred' }); const credentialA = mock<ICredentialsResponse>({ id: '1', name: 'credA', type: 'cred' });
@ -648,6 +618,48 @@ describe('useCanvasOperations', () => {
const added = await addNodes(nodes, {}); const added = await addNodes(nodes, {});
expect(added.length).toBe(2); expect(added.length).toBe(2);
}); });
it('should mark UI state as dirty', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const uiStore = mockedStore(useUIStore);
const nodeTypesStore = useNodeTypesStore();
const nodeTypeName = 'type';
const nodes = [mockNode({ name: 'Node 1', type: nodeTypeName, position: [30, 40] })];
workflowsStore.getCurrentWorkflow.mockReturnValue(
createTestWorkflowObject(workflowsStore.workflow),
);
nodeTypesStore.nodeTypes = {
[nodeTypeName]: { 1: mockNodeTypeDescription({ name: nodeTypeName }) },
};
const { addNodes } = useCanvasOperations({ router });
await addNodes(nodes, { keepPristine: false });
expect(uiStore.stateIsDirty).toEqual(true);
});
it('should not mark UI state as dirty if keepPristine is true', async () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const uiStore = mockedStore(useUIStore);
const nodeTypesStore = useNodeTypesStore();
const nodeTypeName = 'type';
const nodes = [mockNode({ name: 'Node 1', type: nodeTypeName, position: [30, 40] })];
workflowsStore.getCurrentWorkflow.mockReturnValue(
createTestWorkflowObject(workflowsStore.workflow),
);
nodeTypesStore.nodeTypes = {
[nodeTypeName]: { 1: mockNodeTypeDescription({ name: nodeTypeName }) },
};
const { addNodes } = useCanvasOperations({ router });
await addNodes(nodes, { keepPristine: true });
expect(uiStore.stateIsDirty).toEqual(false);
});
}); });
describe('revertAddNode', () => { describe('revertAddNode', () => {
@ -1043,6 +1055,26 @@ describe('useCanvasOperations', () => {
], ],
}); });
}); });
it('should set UI state as dirty', async () => {
const uiStore = mockedStore(useUIStore);
const connections: CanvasConnection[] = [];
const { addConnections } = useCanvasOperations({ router });
await addConnections(connections, { keepPristine: false });
expect(uiStore.stateIsDirty).toBe(true);
});
it('should not set UI state as dirty if keepPristine is true', async () => {
const uiStore = mockedStore(useUIStore);
const connections: CanvasConnection[] = [];
const { addConnections } = useCanvasOperations({ router });
await addConnections(connections, { keepPristine: true });
expect(uiStore.stateIsDirty).toBe(false);
});
}); });
describe('createConnection', () => { describe('createConnection', () => {
@ -1132,6 +1164,57 @@ describe('useCanvasOperations', () => {
}); });
expect(uiStore.stateIsDirty).toBe(true); expect(uiStore.stateIsDirty).toBe(true);
}); });
it('should not set UI state as dirty if keepPristine is true', () => {
const workflowsStore = mockedStore(useWorkflowsStore);
const uiStore = mockedStore(useUIStore);
const nodeTypesStore = mockedStore(useNodeTypesStore);
const nodeTypeDescription = mockNodeTypeDescription({
name: SET_NODE_TYPE,
inputs: [NodeConnectionType.Main],
outputs: [NodeConnectionType.Main],
});
const nodeA = createTestNode({
id: 'a',
type: nodeTypeDescription.name,
name: 'Node A',
});
const nodeB = createTestNode({
id: 'b',
type: nodeTypeDescription.name,
name: 'Node B',
});
const connection: Connection = {
source: nodeA.id,
sourceHandle: `outputs/${NodeConnectionType.Main}/0`,
target: nodeB.id,
targetHandle: `inputs/${NodeConnectionType.Main}/0`,
};
nodeTypesStore.nodeTypes = {
node: { 1: nodeTypeDescription },
};
workflowsStore.workflow.nodes = [nodeA, nodeB];
workflowsStore.getNodeById.mockReturnValueOnce(nodeA).mockReturnValueOnce(nodeB);
nodeTypesStore.getNodeType = vi.fn().mockReturnValue(nodeTypeDescription);
const workflowObject = createTestWorkflowObject(workflowsStore.workflow);
workflowsStore.getCurrentWorkflow.mockReturnValue(workflowObject);
const { createConnection, editableWorkflowObject } = useCanvasOperations({ router });
editableWorkflowObject.value.nodes[nodeA.name] = nodeA;
editableWorkflowObject.value.nodes[nodeB.name] = nodeB;
createConnection(connection, { keepPristine: true });
expect(uiStore.stateIsDirty).toBe(false);
});
}); });
describe('revertCreateConnection', () => { describe('revertCreateConnection', () => {

View file

@ -105,14 +105,23 @@ type AddNodeDataWithTypeVersion = AddNodeData & {
typeVersion: INodeUi['typeVersion']; typeVersion: INodeUi['typeVersion'];
}; };
type AddNodeOptions = { type AddNodesBaseOptions = {
dragAndDrop?: boolean; dragAndDrop?: boolean;
openNDV?: boolean;
trackHistory?: boolean; trackHistory?: boolean;
isAutoAdd?: boolean; keepPristine?: boolean;
telemetry?: boolean; telemetry?: boolean;
}; };
type AddNodesOptions = AddNodesBaseOptions & {
position?: XYPosition;
trackBulk?: boolean;
};
type AddNodeOptions = AddNodesBaseOptions & {
openNDV?: boolean;
isAutoAdd?: boolean;
};
export function useCanvasOperations({ router }: { router: ReturnType<typeof useRouter> }) { export function useCanvasOperations({ router }: { router: ReturnType<typeof useRouter> }) {
const rootStore = useRootStore(); const rootStore = useRootStore();
const workflowsStore = useWorkflowsStore(); const workflowsStore = useWorkflowsStore();
@ -479,17 +488,7 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
); );
} }
async function addNodes( async function addNodes(nodes: AddedNodesAndConnections['nodes'], options: AddNodesOptions = {}) {
nodes: AddedNodesAndConnections['nodes'],
options: {
dragAndDrop?: boolean;
position?: XYPosition;
trackHistory?: boolean;
trackBulk?: boolean;
keepPristine?: boolean;
telemetry?: boolean;
} = {},
) {
let insertPosition = options.position; let insertPosition = options.position;
let lastAddedNode: INodeUi | undefined; let lastAddedNode: INodeUi | undefined;
const addedNodes: INodeUi[] = []; const addedNodes: INodeUi[] = [];
@ -615,6 +614,10 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
void nextTick(() => { void nextTick(() => {
workflowsStore.setNodePristine(nodeData.name, true); workflowsStore.setNodePristine(nodeData.name, true);
if (!options.keepPristine) {
uiStore.stateIsDirty = true;
}
nodeHelpers.matchCredentials(nodeData); nodeHelpers.matchCredentials(nodeData);
nodeHelpers.updateNodeParameterIssues(nodeData); nodeHelpers.updateNodeParameterIssues(nodeData);
nodeHelpers.updateNodeCredentialIssues(nodeData); nodeHelpers.updateNodeCredentialIssues(nodeData);
@ -777,7 +780,6 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
}; };
resolveNodeParameters(nodeData); resolveNodeParameters(nodeData);
resolveNodeCredentials(nodeData, nodeTypeDescription);
resolveNodeName(nodeData); resolveNodeName(nodeData);
resolveNodeWebhook(nodeData, nodeTypeDescription); resolveNodeWebhook(nodeData, nodeTypeDescription);
@ -840,60 +842,6 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
node.parameters = nodeParameters ?? {}; node.parameters = nodeParameters ?? {};
} }
function resolveNodeCredentials(node: INodeUi, nodeTypeDescription: INodeTypeDescription) {
const credentialPerType = nodeTypeDescription.credentials
?.map((type) => credentialsStore.getUsableCredentialByType(type.name))
.flat();
if (credentialPerType?.length === 1) {
const defaultCredential = credentialPerType[0];
const selectedCredentials = credentialsStore.getCredentialById(defaultCredential.id);
const selected = { id: selectedCredentials.id, name: selectedCredentials.name };
const credentials = {
[defaultCredential.type]: selected,
};
if (nodeTypeDescription.credentials) {
const authentication = nodeTypeDescription.credentials.find(
(type) => type.name === defaultCredential.type,
);
const authDisplayOptionsHide = authentication?.displayOptions?.hide;
const authDisplayOptionsShow = authentication?.displayOptions?.show;
if (!authDisplayOptionsHide) {
if (!authDisplayOptionsShow) {
node.credentials = credentials;
} else if (
Object.keys(authDisplayOptionsShow).length === 1 &&
authDisplayOptionsShow.authentication
) {
// ignore complex case when there's multiple dependencies
node.credentials = credentials;
let parameters: { [key: string]: string } = {};
for (const displayOption of Object.keys(authDisplayOptionsShow)) {
if (node.parameters && !node.parameters[displayOption]) {
parameters = {};
node.credentials = undefined;
break;
}
const optionValue = authDisplayOptionsShow[displayOption]?.[0];
if (optionValue && typeof optionValue === 'string') {
parameters[displayOption] = optionValue;
}
node.parameters = {
...node.parameters,
...parameters,
};
}
}
}
}
}
}
function resolveNodePosition( function resolveNodePosition(
node: Omit<INodeUi, 'position'> & { position?: INodeUi['position'] }, node: Omit<INodeUi, 'position'> & { position?: INodeUi['position'] },
nodeTypeDescription: INodeTypeDescription, nodeTypeDescription: INodeTypeDescription,
@ -1128,7 +1076,10 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
* Connection operations * Connection operations
*/ */
function createConnection(connection: Connection, { trackHistory = false } = {}) { function createConnection(
connection: Connection,
{ trackHistory = false, keepPristine = false } = {},
) {
const sourceNode = workflowsStore.getNodeById(connection.source); const sourceNode = workflowsStore.getNodeById(connection.source);
const targetNode = workflowsStore.getNodeById(connection.target); const targetNode = workflowsStore.getNodeById(connection.target);
if (!sourceNode || !targetNode) { if (!sourceNode || !targetNode) {
@ -1162,8 +1113,10 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
nodeHelpers.updateNodeInputIssues(targetNode); nodeHelpers.updateNodeInputIssues(targetNode);
}); });
if (!keepPristine) {
uiStore.stateIsDirty = true; uiStore.stateIsDirty = true;
} }
}
function revertCreateConnection(connection: [IConnection, IConnection]) { function revertCreateConnection(connection: [IConnection, IConnection]) {
const sourceNodeName = connection[0].node; const sourceNodeName = connection[0].node;
@ -1371,7 +1324,7 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
async function addConnections( async function addConnections(
connections: CanvasConnectionCreateData[] | CanvasConnection[], connections: CanvasConnectionCreateData[] | CanvasConnection[],
{ trackBulk = true, trackHistory = false } = {}, { trackBulk = true, trackHistory = false, keepPristine = false } = {},
) { ) {
await nextTick(); // Connection creation relies on the nodes being already added to the store await nextTick(); // Connection creation relies on the nodes being already added to the store
@ -1380,12 +1333,16 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
} }
for (const connection of connections) { for (const connection of connections) {
createConnection(connection, { trackHistory }); createConnection(connection, { trackHistory, keepPristine });
} }
if (trackBulk && trackHistory) { if (trackBulk && trackHistory) {
historyStore.stopRecordingUndo(); historyStore.stopRecordingUndo();
} }
if (!keepPristine) {
uiStore.stateIsDirty = true;
}
} }
/** /**
@ -1427,7 +1384,9 @@ export function useCanvasOperations({ router }: { router: ReturnType<typeof useR
// Add nodes and connections // Add nodes and connections
await addNodes(data.nodes, { keepPristine: true }); await addNodes(data.nodes, { keepPristine: true });
await addConnections(mapLegacyConnectionsToCanvasConnections(data.connections, data.nodes)); await addConnections(mapLegacyConnectionsToCanvasConnections(data.connections, data.nodes), {
keepPristine: true,
});
} }
/** /**

View file

@ -8,9 +8,9 @@ import type {
export class Rapid7InsightVmApi implements ICredentialType { export class Rapid7InsightVmApi implements ICredentialType {
name = 'rapid7InsightVmApi'; name = 'rapid7InsightVmApi';
displayName = 'Rapid7 InsightVm API'; displayName = 'Rapid7 InsightVM API';
documentationUrl = 'Rapid7 InsightVm'; documentationUrl = 'rapid7insightvm';
icon = { icon = {
light: 'file:icons/Rapid7InsightVm.svg', light: 'file:icons/Rapid7InsightVm.svg',
@ -18,7 +18,7 @@ export class Rapid7InsightVmApi implements ICredentialType {
} as const; } as const;
httpRequestNode = { httpRequestNode = {
name: 'Rapid7 Insight Vm', name: 'Rapid7 InsightVM',
docsUrl: 'https://docs.rapid7.com/', docsUrl: 'https://docs.rapid7.com/',
apiBaseUrlPlaceholder: 'https://insight.rapid7.com/', apiBaseUrlPlaceholder: 'https://insight.rapid7.com/',
}; };

View file

@ -42,6 +42,7 @@ import type {
JsonObject, JsonObject,
CloseFunction, CloseFunction,
INodeCredentialDescription, INodeCredentialDescription,
IExecutePaginationFunctions,
} from './Interfaces'; } from './Interfaces';
import * as NodeHelpers from './NodeHelpers'; import * as NodeHelpers from './NodeHelpers';
import { sleep } from './utils'; import { sleep } from './utils';
@ -623,9 +624,7 @@ export class RoutingNode {
); );
} }
const executePaginationFunctions = { const makeRoutingRequest = async (requestOptions: DeclarativeRestApiSettings.ResultOptions) => {
...executeSingleFunctions,
makeRoutingRequest: async (requestOptions: DeclarativeRestApiSettings.ResultOptions) => {
return await this.rawRoutingRequest( return await this.rawRoutingRequest(
executeSingleFunctions, executeSingleFunctions,
requestOptions, requestOptions,
@ -641,9 +640,12 @@ export class RoutingNode {
runIndex, runIndex,
), ),
); );
},
}; };
const executePaginationFunctions = Object.create(executeSingleFunctions, {
makeRoutingRequest: { value: makeRoutingRequest },
}) as IExecutePaginationFunctions;
if (requestData.paginate && requestOperations?.pagination) { if (requestData.paginate && requestOperations?.pagination) {
// Has pagination // Has pagination