Merge branch 'master' of https://github.com/n8n-io/n8n into node-1863-input-fields-dragging

This commit is contained in:
Michael Kret 2024-10-16 11:19:47 +03:00
commit 9ca133e5f7
54 changed files with 1190 additions and 263 deletions

View file

@ -1,3 +1,6 @@
import { nanoid } from 'nanoid';
import { simpleWebhookCall, waitForWebhook } from './16-webhook-node.cy';
import {
HTTP_REQUEST_NODE_NAME,
MANUAL_TRIGGER_NODE_NAME,
@ -7,6 +10,7 @@ import {
} from '../constants';
import { WorkflowPage, NDV } from '../pages';
import { errorToast } from '../pages/notifications';
import { getVisiblePopper } from '../utils';
const workflowPage = new WorkflowPage();
const ndv = new NDV();
@ -212,6 +216,42 @@ describe('Data pinning', () => {
},
);
});
it('should show pinned data tooltip', () => {
const { callEndpoint } = simpleWebhookCall({
method: 'GET',
webhookPath: nanoid(),
executeNow: false,
});
ndv.actions.close();
workflowPage.actions.executeWorkflow();
cy.wait(waitForWebhook);
// hide other visible popper on workflow execute button
workflowPage.getters.canvasNodes().eq(0).click();
callEndpoint((response) => {
expect(response.status).to.eq(200);
getVisiblePopper().should('have.length', 1);
getVisiblePopper()
.eq(0)
.should(
'have.text',
'You can pin this output instead of waiting for a test event. Open node to do so.',
);
});
});
it('should not show pinned data tooltip', () => {
cy.createFixtureWorkflow('Pinned_webhook_node.json', 'Test');
workflowPage.actions.executeWorkflow();
// hide other visible popper on workflow execute button
workflowPage.getters.canvasNodes().eq(0).click();
getVisiblePopper().should('have.length', 0);
});
});
function setExpressionOnStringValueInSet(expression: string) {

View file

@ -9,7 +9,7 @@ const workflowPage = new WorkflowPage();
const ndv = new NDV();
const credentialsModal = new CredentialsModal();
const waitForWebhook = 500;
export const waitForWebhook = 500;
interface SimpleWebhookCallOptions {
method: string;
@ -21,7 +21,7 @@ interface SimpleWebhookCallOptions {
authentication?: string;
}
const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
export const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
const {
authentication,
method,
@ -65,15 +65,23 @@ const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
getVisibleSelect().find('.option-headline').contains(responseData).click();
}
const callEndpoint = (cb: (response: Cypress.Response<unknown>) => void) => {
cy.request(method, `${BACKEND_BASE_URL}/webhook-test/${webhookPath}`).then(cb);
};
if (executeNow) {
ndv.actions.execute();
cy.wait(waitForWebhook);
cy.request(method, `${BACKEND_BASE_URL}/webhook-test/${webhookPath}`).then((response) => {
callEndpoint((response) => {
expect(response.status).to.eq(200);
ndv.getters.outputPanel().contains('headers');
});
}
return {
callEndpoint,
};
};
describe('Webhook Trigger node', () => {

View file

@ -0,0 +1,39 @@
{
"nodes": [
{
"parameters": {
"path": "FwrbSiaua2Xmvn6-Z-7CQ",
"options": {}
},
"id": "8fcc7e5f-2cef-4938-9564-eea504c20aa0",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
360,
220
],
"webhookId": "9c778f2a-e882-46ed-a0e4-c8e2f76ccd65"
}
],
"connections": {},
"pinData": {
"Webhook": [
{
"headers": {
"connection": "keep-alive",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"accept": "*/*",
"cookie": "n8n-auth=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjNiM2FhOTE5LWRhZDgtNDE5MS1hZWZiLTlhZDIwZTZkMjJjNiIsImhhc2giOiJ1ZVAxR1F3U2paIiwiaWF0IjoxNzI4OTE1NTQyLCJleHAiOjE3Mjk1MjAzNDJ9.fV02gpUnSiUoMxHwfB0npBjcjct7Mv9vGfj-jRTT3-I",
"host": "localhost:5678",
"accept-encoding": "gzip, deflate"
},
"params": {},
"query": {},
"body": {},
"webhookUrl": "http://localhost:5678/webhook-test/FwrbSiaua2Xmvn6-Z-7CQ",
"executionMode": "test"
}
]
}
}

View file

@ -0,0 +1,100 @@
import { mock } from 'jest-mock-extended';
import type { INodeType, IVersionedNodeType } from 'n8n-workflow';
import type { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { NodeTypes } from '../node-types';
describe('NodeTypes', () => {
let nodeTypes: NodeTypes;
const loadNodesAndCredentials = mock<LoadNodesAndCredentials>();
beforeEach(() => {
jest.clearAllMocks();
nodeTypes = new NodeTypes(loadNodesAndCredentials);
});
describe('getByNameAndVersion', () => {
const nodeTypeName = 'n8n-nodes-base.testNode';
it('should throw an error if the node-type does not exist', () => {
const nodeTypeName = 'unknownNode';
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {};
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.knownNodes = {};
expect(() => nodeTypes.getByNameAndVersion(nodeTypeName)).toThrow(
'Unrecognized node type: unknownNode',
);
});
it('should return a regular node-type without version', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(nodeTypeName);
expect(result).toEqual(nodeType);
});
it('should return a regular node-type with version', () => {
const nodeTypeV1 = mock<INodeType>();
const nodeType = mock<IVersionedNodeType>({
nodeVersions: { 1: nodeTypeV1 },
getNodeType: () => nodeTypeV1,
});
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(nodeTypeName);
expect(result).toEqual(nodeTypeV1);
});
it('should throw when a node-type is requested as tool, but does not support being used as one', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
expect(() => nodeTypes.getByNameAndVersion(`${nodeTypeName}Tool`)).toThrow(
'Node cannot be used as a tool',
);
});
it('should return the tool node-type when requested as tool', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error can't use a mock here
nodeType.description = {
name: nodeTypeName,
displayName: 'TestNode',
usableAsTool: true,
properties: [],
};
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(`${nodeTypeName}Tool`);
expect(result).not.toEqual(nodeType);
expect(result.description.name).toEqual('n8n-nodes-base.testNodeTool');
expect(result.description.displayName).toEqual('TestNode Tool');
expect(result.description.codex?.categories).toContain('AI');
expect(result.description.inputs).toEqual([]);
expect(result.description.outputs).toEqual(['ai_tool']);
});
});
});

View file

@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import isbot from 'isbot';
import type { InstanceType } from 'n8n-core';
import { Container, Service } from 'typedi';
import config from '@/config';
@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks';
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';
import { generateHostInstanceId } from './databases/utils/generators';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
@Service()
@ -61,7 +59,7 @@ export abstract class AbstractServer {
readonly uniqueInstanceId: string;
constructor(instanceType: Exclude<InstanceType, 'worker'>) {
constructor() {
this.app = express();
this.app.disable('x-powered-by');
@ -85,8 +83,6 @@ export abstract class AbstractServer {
this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest;
this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting;
this.uniqueInstanceId = generateHostInstanceId(instanceType);
this.logger = Container.get(Logger);
}

View file

@ -19,7 +19,6 @@ import type { AbstractServer } from '@/abstract-server';
import config from '@/config';
import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { getDataDeduplicationService } from '@/deduplication';
import { initErrorHandling } from '@/error-reporting';
@ -45,8 +44,6 @@ export abstract class BaseCommand extends Command {
protected instanceSettings: InstanceSettings = Container.get(InstanceSettings);
queueModeId: string;
protected server?: AbstractServer;
protected shutdownService: ShutdownService = Container.get(ShutdownService);
@ -133,16 +130,6 @@ export abstract class BaseCommand extends Command {
await Container.get(TelemetryEventRelay).init();
}
protected setInstanceQueueModeId() {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!);
config.set('redis.queueModeId', this.queueModeId);
}
protected async stopProcess() {
// This needs to be overridden
}

View file

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs';
import { mkdir } from 'fs/promises';
@ -21,7 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
@ -70,11 +70,6 @@ export class Start extends BaseCommand {
override needsCommunityPackages = true;
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceQueueModeId();
}
/**
* Opens the UI in browser
*/
@ -176,7 +171,7 @@ export class Start extends BaseCommand {
if (config.getEnv('executions.mode') === 'queue') {
const scopedLogger = this.logger.withScope('scaling');
scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.queueModeId}`);
scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`);
}
const { flags } = await this.parse(Start);
@ -227,7 +222,7 @@ export class Start extends BaseCommand {
}
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new SingleMainTaskManager());
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();

View file

@ -1,4 +1,4 @@
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';
@ -6,7 +6,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { WebhookServer } from '@/webhooks/webhook-server';
import { BaseCommand } from './base-command';
@ -24,14 +24,6 @@ export class Webhook extends BaseCommand {
override needsCommunityPackages = true;
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
if (this.queueModeId) {
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
}
this.setInstanceQueueModeId();
}
/**
* Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
@ -71,8 +63,8 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Crash journal initialized');
this.logger.info('Initializing n8n webhook process');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
this.logger.info('Starting n8n webhook process...');
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);
await super.init();
@ -100,7 +92,6 @@ export class Webhook extends BaseCommand {
const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
this.logger.info('Webhook listener waiting for requests.');
// Make sure that the process does not close
@ -112,7 +103,7 @@ export class Webhook extends BaseCommand {
}
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init();
await Container.get(Subscriber).subscribe('n8n.commands');

View file

@ -8,11 +8,13 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { Logger } from '@/logging/logger.service';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service';
import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { BaseCommand } from './base-command';
@ -68,8 +70,6 @@ export class Worker extends BaseCommand {
super(argv, cmdConfig);
this.logger = Container.get(Logger).withScope('scaling');
this.setInstanceQueueModeId();
}
async init() {
@ -84,7 +84,7 @@ export class Worker extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Starting n8n worker...');
this.logger.debug(`Host ID: ${this.queueModeId}`);
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);
await this.setConcurrency();
await super.init();
@ -109,15 +109,26 @@ export class Worker extends BaseCommand {
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
},
}),
);
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
}
}
async initEventBus() {
await Container.get(MessageEventBus).initialize({
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
});
Container.get(LogStreamingEventRelay).init();
}
@ -129,7 +140,7 @@ export class Worker extends BaseCommand {
* The subscription connection adds a handler to handle the command messages
*/
async initOrchestration() {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init();
await Container.get(Subscriber).subscribe('n8n.commands');

View file

@ -491,11 +491,6 @@ export const schema = {
default: 'n8n',
env: 'N8N_REDIS_KEY_PREFIX',
},
queueModeId: {
doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup',
format: String,
default: '',
},
},
/**

View file

@ -7,7 +7,7 @@ export class WorkerMissingEncryptionKey extends ApplicationError {
'Failed to start worker because of missing encryption key.',
'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.',
'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
].join(''),
].join(' '),
{ level: 'warning' },
);
}

View file

@ -651,7 +651,9 @@ export class TelemetryEventRelay extends EventRelay {
}
if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
@ -663,7 +665,9 @@ export class TelemetryEventRelay extends EventRelay {
if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
}
let userRole: 'owner' | 'sharee' | undefined = undefined;
@ -688,7 +692,9 @@ export class TelemetryEventRelay extends EventRelay {
};
if (!manualExecEventProperties.node_graph_string) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
}

View file

@ -1,5 +1,4 @@
import { ClientSecretCredential } from '@azure/identity';
import { SecretClient } from '@azure/keyvault-secrets';
import type { SecretClient } from '@azure/keyvault-secrets';
import type { INodeProperties } from 'n8n-workflow';
import { DOCS_HELP_NOTICE, EXTERNAL_SECRETS_NAME_REGEX } from '@/external-secrets/constants';
@ -72,6 +71,9 @@ export class AzureKeyVault implements SecretsProvider {
async connect() {
const { vaultName, tenantId, clientId, clientSecret } = this.settings;
const { ClientSecretCredential } = await import('@azure/identity');
const { SecretClient } = await import('@azure/keyvault-secrets');
try {
const credential = new ClientSecretCredential(tenantId, clientId, clientSecret);
this.client = new SecretClient(`https://${vaultName}.vault.azure.net/`, credential);

View file

@ -1,4 +1,4 @@
import { SecretManagerServiceClient as GcpClient } from '@google-cloud/secret-manager';
import type { SecretManagerServiceClient as GcpClient } from '@google-cloud/secret-manager';
import { jsonParse, type INodeProperties } from 'n8n-workflow';
import { DOCS_HELP_NOTICE, EXTERNAL_SECRETS_NAME_REGEX } from '@/external-secrets/constants';
@ -45,6 +45,8 @@ export class GcpSecretsManager implements SecretsProvider {
async connect() {
const { projectId, privateKey, clientEmail } = this.settings;
const { SecretManagerServiceClient: GcpClient } = await import('@google-cloud/secret-manager');
try {
this.client = new GcpClient({
credentials: { client_email: clientEmail, private_key: privateKey },

View file

@ -44,15 +44,38 @@ export class NodeTypes implements INodeTypes {
}
getByNameAndVersion(nodeType: string, version?: number): INodeType {
const versionedNodeType = NodeHelpers.getVersionedNodeType(
this.getNode(nodeType).type,
version,
);
if (versionedNodeType.description.usableAsTool) {
return NodeHelpers.convertNodeToAiTool(versionedNodeType);
const origType = nodeType;
const toolRequested = nodeType.startsWith('n8n-nodes-base') && nodeType.endsWith('Tool');
// Make sure the nodeType to actually get from disk is the un-wrapped type
if (toolRequested) {
nodeType = nodeType.replace(/Tool$/, '');
}
return versionedNodeType;
const node = this.getNode(nodeType);
const versionedNodeType = NodeHelpers.getVersionedNodeType(node.type, version);
if (!toolRequested) return versionedNodeType;
if (!versionedNodeType.description.usableAsTool)
throw new ApplicationError('Node cannot be used as a tool', { extra: { nodeType } });
const { loadedNodes } = this.loadNodesAndCredentials;
if (origType in loadedNodes) {
return loadedNodes[origType].type as INodeType;
}
// Instead of modifying the existing type, we extend it into a new type object
const clonedProperties = Object.create(
versionedNodeType.description.properties,
) as INodeTypeDescription['properties'];
const clonedDescription = Object.create(versionedNodeType.description, {
properties: { value: clonedProperties },
}) as INodeTypeDescription;
const clonedNode = Object.create(versionedNodeType, {
description: { value: clonedDescription },
}) as INodeType;
const tool = NodeHelpers.convertNodeToAiTool(clonedNode);
loadedNodes[nodeType + 'Tool'] = { sourcePath: '', type: tool };
return tool;
}
/* Some nodeTypes need to get special parameters applied like the polling nodes the polling times */

View file

@ -38,7 +38,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [
'license:manage',
'logStreaming:manage',
'orchestration:read',
'orchestration:list',
'saml:manage',
'securityAudit:generate',
'sourceControl:pull',

View file

@ -5,6 +5,8 @@ import type { RunnerMessage, TaskResultData } from '../runner-types';
import { TaskBroker } from '../task-broker.service';
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000);
describe('TaskBroker', () => {
let taskBroker: TaskBroker;
@ -15,14 +17,12 @@ describe('TaskBroker', () => {
describe('expireTasks', () => {
it('should remove expired task offers and keep valid task offers', () => {
const now = process.hrtime.bigint();
const validOffer: TaskOffer = {
offerId: 'valid',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
validUntil: createValidUntil(1000), // 1 second in the future
};
const expiredOffer1: TaskOffer = {
@ -30,7 +30,7 @@ describe('TaskBroker', () => {
runnerId: 'runner2',
taskType: 'taskType1',
validFor: 1000,
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
validUntil: createValidUntil(-1000), // 1 second in the past
};
const expiredOffer2: TaskOffer = {
@ -38,7 +38,7 @@ describe('TaskBroker', () => {
runnerId: 'runner3',
taskType: 'taskType1',
validFor: 2000,
validUntil: now - BigInt(2000 * 1_000_000), // 2 seconds in the past
validUntil: createValidUntil(-2000), // 2 seconds in the past
};
taskBroker.setPendingTaskOffers([validOffer, expiredOffer1, expiredOffer2]);
@ -102,6 +102,55 @@ describe('TaskBroker', () => {
expect(runnerIds).toHaveLength(0);
});
it('should remove any pending offers for that runner', () => {
const runnerId = 'runner1';
const runner = mock<TaskRunner>({ id: runnerId });
const messageCallback = jest.fn();
taskBroker.registerRunner(runner, messageCallback);
taskBroker.taskOffered({
offerId: 'offer1',
runnerId,
taskType: 'mock',
validFor: 1000,
validUntil: createValidUntil(1000),
});
taskBroker.taskOffered({
offerId: 'offer2',
runnerId: 'runner2',
taskType: 'mock',
validFor: 1000,
validUntil: createValidUntil(1000),
});
taskBroker.deregisterRunner(runnerId);
const offers = taskBroker.getPendingTaskOffers();
expect(offers).toHaveLength(1);
expect(offers[0].runnerId).toBe('runner2');
});
it('should fail any running tasks for that runner', () => {
const runnerId = 'runner1';
const runner = mock<TaskRunner>({ id: runnerId });
const messageCallback = jest.fn();
const taskId = 'task1';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const failSpy = jest.spyOn(taskBroker as any, 'failTask');
const rejectSpy = jest.spyOn(taskBroker, 'handleRunnerReject');
taskBroker.registerRunner(runner, messageCallback);
taskBroker.setTasks({
[taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' },
task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' },
});
taskBroker.deregisterRunner(runnerId);
expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
});
});
describe('deregisterRequester', () => {
@ -121,14 +170,12 @@ describe('TaskBroker', () => {
describe('taskRequested', () => {
it('should match a pending offer to an incoming request', async () => {
const now = process.hrtime.bigint();
const offer: TaskOffer = {
offerId: 'offer1',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000),
validUntil: createValidUntil(1000),
};
taskBroker.setPendingTaskOffers([offer]);
@ -150,8 +197,6 @@ describe('TaskBroker', () => {
describe('taskOffered', () => {
it('should match a pending request to an incoming offer', () => {
const now = process.hrtime.bigint();
const request: TaskRequest = {
requestId: 'request1',
requesterId: 'requester1',
@ -166,7 +211,7 @@ describe('TaskBroker', () => {
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000),
validUntil: createValidUntil(1000),
};
jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue(); // allow Jest to exit cleanly
@ -180,14 +225,12 @@ describe('TaskBroker', () => {
describe('settleTasks', () => {
it('should match task offers with task requests by task type', () => {
const now = process.hrtime.bigint();
const offer1: TaskOffer = {
offerId: 'offer1',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000),
validUntil: createValidUntil(1000),
};
const offer2: TaskOffer = {
@ -195,7 +238,7 @@ describe('TaskBroker', () => {
runnerId: 'runner2',
taskType: 'taskType2',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000),
validUntil: createValidUntil(1000),
};
const request1: TaskRequest = {
@ -235,14 +278,12 @@ describe('TaskBroker', () => {
});
it('should not match a request whose acceptance is in progress', () => {
const now = process.hrtime.bigint();
const offer: TaskOffer = {
offerId: 'offer1',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000),
validUntil: createValidUntil(1000),
};
const request: TaskRequest = {
@ -271,14 +312,12 @@ describe('TaskBroker', () => {
});
it('should expire tasks before settling', () => {
const now = process.hrtime.bigint();
const validOffer: TaskOffer = {
offerId: 'valid',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: 1000,
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
validUntil: createValidUntil(1000), // 1 second in the future
};
const expiredOffer: TaskOffer = {
@ -286,7 +325,7 @@ describe('TaskBroker', () => {
runnerId: 'runner2',
taskType: 'taskType2', // will be removed before matching
validFor: 1000,
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
validUntil: createValidUntil(-1000), // 1 second in the past
};
const request1: TaskRequest = {

View file

@ -75,15 +75,11 @@ export class TaskBroker {
expireTasks() {
const now = process.hrtime.bigint();
const invalidOffers: number[] = [];
for (let i = 0; i < this.pendingTaskOffers.length; i++) {
for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) {
if (this.pendingTaskOffers[i].validUntil < now) {
invalidOffers.push(i);
this.pendingTaskOffers.splice(i, 1);
}
}
// We reverse the list so the later indexes are valid after deleting earlier ones
invalidOffers.reverse().forEach((i) => this.pendingTaskOffers.splice(i, 1));
}
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
@ -92,6 +88,21 @@ export class TaskBroker {
deregisterRunner(runnerId: string) {
this.knownRunners.delete(runnerId);
// Remove any pending offers
for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) {
if (this.pendingTaskOffers[i].runnerId === runnerId) {
this.pendingTaskOffers.splice(i, 1);
}
}
// Fail any tasks
for (const task of this.tasks.values()) {
if (task.runnerId === runnerId) {
void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`);
this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`);
}
}
}
registerRequester(requesterId: string, messageCallback: RequesterMessageCallback) {

View file

@ -5,7 +5,7 @@ import type { RequesterMessage } from '../runner-types';
import type { RequesterMessageCallback } from '../task-broker.service';
import { TaskBroker } from '../task-broker.service';
export class SingleMainTaskManager extends TaskManager {
export class LocalTaskManager extends TaskManager {
taskBroker: TaskBroker;
id: string = 'single-main';

View file

@ -1,8 +1,8 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis-client.service';
import { mockLogger } from '@test/mocking';
@ -10,28 +10,26 @@ import { Publisher } from '../pubsub/publisher.service';
import type { PubSub } from '../pubsub/pubsub.types';
describe('Publisher', () => {
let queueModeId: string;
beforeEach(() => {
config.set('executions.mode', 'queue');
queueModeId = generateNanoId();
config.set('redis.queueModeId', queueModeId);
});
const client = mock<SingleNodeClient>();
const logger = mockLogger();
const hostId = 'main-bnxa1riryKUNHtln';
const instanceSettings = mock<InstanceSettings>({ hostId });
const redisClientService = mock<RedisClientService>({ createClient: () => client });
describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
expect(publisher.getClient()).toEqual(client);
});
it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
expect(publisher.getClient()).toBeUndefined();
});
@ -39,7 +37,7 @@ describe('Publisher', () => {
describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
@ -47,21 +45,21 @@ describe('Publisher', () => {
describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.Command>({ command: 'reload-license' });
await publisher.publishCommand(msg);
expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }),
JSON.stringify({ ...msg, senderId: hostId, selfSend: false, debounce: true }),
);
});
});
describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(logger, redisClientService);
const publisher = new Publisher(logger, redisClientService, instanceSettings);
const msg = mock<PubSub.WorkerResponse>({
response: 'response-to-get-worker-status',
});

View file

@ -17,14 +17,14 @@ describe('Subscriber', () => {
describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
expect(subscriber.getClient()).toEqual(client);
});
it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
expect(subscriber.getClient()).toBeUndefined();
});
@ -32,7 +32,7 @@ describe('Subscriber', () => {
describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
@ -40,7 +40,7 @@ describe('Subscriber', () => {
describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const subscriber = new Subscriber(mock(), redisClientService, mock(), mock());
await subscriber.subscribe('n8n.commands');

View file

@ -1,5 +1,5 @@
import type { RunningJobSummary } from '@n8n/api-types';
import { WorkflowExecute } from 'n8n-core';
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
@ -33,6 +33,7 @@ export class JobProcessor {
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes,
private readonly instanceSettings: InstanceSettings,
) {
this.logger = this.logger.withScope('scaling');
}
@ -120,7 +121,7 @@ export class JobProcessor {
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};
await job.progress(msg);
@ -173,7 +174,7 @@ export class JobProcessor {
const msg: JobFinishedMessage = {
kind: 'job-finished',
executionId,
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
};
await job.progress(msg);

View file

@ -1,4 +1,5 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';
import config from '@/config';
@ -20,6 +21,7 @@ export class Publisher {
constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly instanceSettings: InstanceSettings,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
@ -48,7 +50,7 @@ export class Publisher {
'n8n.commands',
JSON.stringify({
...msg,
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
selfSend: SELF_SEND_COMMANDS.has(msg.command),
debounce: !IMMEDIATE_COMMANDS.has(msg.command),
}),

View file

@ -3,7 +3,6 @@ import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
@ -49,7 +48,7 @@ export class PubSubHandler {
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
response: 'response-to-get-worker-status',
payload: this.workerStatusService.generateStatus(),
}),

View file

@ -1,5 +1,6 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import debounce from 'lodash/debounce';
import { InstanceSettings } from 'n8n-core';
import { jsonParse } from 'n8n-workflow';
import { Service } from 'typedi';
@ -21,6 +22,7 @@ export class Subscriber {
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;
@ -77,12 +79,12 @@ export class Subscriber {
return null;
}
const queueModeId = config.getEnv('redis.queueModeId');
const { hostId } = this.instanceSettings;
if (
'command' in msg &&
!msg.selfSend &&
(msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId)))
(msg.senderId === hostId || (msg.targets && !msg.targets.includes(hostId)))
) {
return null;
}

View file

@ -112,8 +112,9 @@ export class ScalingService {
const msg: JobFailedMessage = {
kind: 'job-failed',
executionId,
workerId: config.getEnv('redis.queueModeId'),
workerId: this.instanceSettings.hostId,
errorMsg: error.message,
errorStack: error.stack ?? '',
};
await job.progress(msg);
@ -295,12 +296,18 @@ export class ScalingService {
});
break;
case 'job-failed':
this.logger.error(`Execution ${msg.executionId} (job ${jobId}) failed`, {
workerId: msg.workerId,
errorMsg: msg.errorMsg,
executionId: msg.executionId,
jobId,
});
this.logger.error(
[
`Execution ${msg.executionId} (job ${jobId}) failed`,
msg.errorStack ? `\n${msg.errorStack}\n` : '',
].join(''),
{
workerId: msg.workerId,
errorMsg: msg.errorMsg,
executionId: msg.executionId,
jobId,
},
);
break;
case 'abort-job':
break; // only for worker

View file

@ -56,6 +56,7 @@ export type JobFailedMessage = {
executionId: string;
workerId: string;
errorMsg: string;
errorStack: string;
};
/** Message sent by main to worker to abort a job. */

View file

@ -1,19 +1,22 @@
import type { WorkerStatus } from '@n8n/api-types';
import { InstanceSettings } from 'n8n-core';
import os from 'node:os';
import { Service } from 'typedi';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { JobProcessor } from './job-processor';
@Service()
export class WorkerStatusService {
constructor(private readonly jobProcessor: JobProcessor) {}
constructor(
private readonly jobProcessor: JobProcessor,
private readonly instanceSettings: InstanceSettings,
) {}
generateStatus(): WorkerStatus {
return {
senderId: config.getEnv('redis.queueModeId'),
senderId: this.instanceSettings.hostId,
runningJobsSummary: this.jobProcessor.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),

View file

@ -79,8 +79,9 @@ export class Server extends AbstractServer {
private readonly orchestrationService: OrchestrationService,
private readonly postHogClient: PostHogClient,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings,
) {
super('main');
super();
this.testWebhooksEnabled = true;
this.webhooksEnabled = !this.globalConfig.endpoints.disableProductionWebhooksOnMainProcess;
@ -97,7 +98,7 @@ export class Server extends AbstractServer {
this.endpointPresetCredentials = this.globalConfig.credentials.overwrite.endpoint;
await super.start();
this.logger.debug(`Server ID: ${this.uniqueInstanceId}`);
this.logger.debug(`Server ID: ${this.instanceSettings.hostId}`);
if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') {
void this.loadNodesAndCredentials.setupHotReload();

View file

@ -23,15 +23,11 @@ redisClientService.createClient.mockReturnValue(mockRedisClient);
const os = Container.get(OrchestrationService);
mockInstance(ActiveWorkflowManager);
let queueModeId: string;
describe('Orchestration Service', () => {
mockInstance(Push);
mockInstance(ExternalSecretsManager);
beforeAll(async () => {
queueModeId = config.get('redis.queueModeId');
// @ts-expect-error readonly property
instanceSettings.instanceType = 'main';
});
@ -48,7 +44,6 @@ describe('Orchestration Service', () => {
await os.init();
// @ts-expect-error Private field
expect(os.publisher).toBeDefined();
expect(queueModeId).toBeDefined();
});
describe('shouldAddWebhooks', () => {

View file

@ -1,26 +0,0 @@
import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
export abstract class OrchestrationHandlerService {
protected initialized = false;
async init() {
await this.initSubscriber();
this.initialized = true;
}
async initWithOptions(
options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
) {
await this.initSubscriber(options);
this.initialized = true;
}
async shutdown() {
this.initialized = false;
}
protected abstract initSubscriber(
options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
): Promise<void>;
}

View file

@ -43,10 +43,6 @@ export class OrchestrationService {
return !this.isMultiMainSetupEnabled;
}
get instanceId() {
return config.getEnv('redis.queueModeId');
}
sanityCheck() {
return this.isInitialized && config.get('executions.mode') === 'queue';
}
@ -94,7 +90,7 @@ export class OrchestrationService {
if (!this.sanityCheck()) return;
this.logger.debug(
`[Instance ID ${this.instanceId}] Publishing command "${commandKey}"`,
`[Instance ID ${this.instanceSettings.hostId}] Publishing command "${commandKey}"`,
payload,
);

View file

@ -24,10 +24,6 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
super();
}
get instanceId() {
return config.getEnv('redis.queueModeId');
}
private leaderKey: string;
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
@ -57,16 +53,18 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
private async checkLeader() {
const leaderId = await this.publisher.get(this.leaderKey);
if (leaderId === this.instanceId) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`);
const { hostId } = this.instanceSettings;
if (leaderId === hostId) {
this.logger.debug(`[Instance ID ${hostId}] Leader is this instance`);
await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
return;
}
if (leaderId && leaderId !== this.instanceId) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`);
if (leaderId && leaderId !== hostId) {
this.logger.debug(`[Instance ID ${hostId}] Leader is other instance "${leaderId}"`);
if (this.instanceSettings.isLeader) {
this.instanceSettings.markAsFollower();
@ -81,7 +79,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (!leaderId) {
this.logger.debug(
`[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`,
`[Instance ID ${hostId}] Leadership vacant, attempting to become leader...`,
);
this.instanceSettings.markAsFollower();
@ -96,11 +94,13 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
}
private async tryBecomeLeader() {
const { hostId } = this.instanceSettings;
// this can only succeed if leadership is currently vacant
const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.instanceId);
const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, hostId);
if (keySetSuccessfully) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`);
this.logger.debug(`[Instance ID ${hostId}] Leader is now this instance`);
this.instanceSettings.markAsLeader();

View file

@ -1,6 +0,0 @@
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export type MainResponseReceivedHandlerOptions = {
queueModeId: string;
publisher: Publisher;
};

View file

@ -1,16 +0,0 @@
import { Service } from 'typedi';
import config from '@/config';
import { OrchestrationService } from '../../orchestration.service';
@Service()
export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean {
return (
this.isInitialized &&
config.get('executions.mode') === 'queue' &&
this.instanceSettings.instanceType === 'webhook'
);
}
}

View file

@ -1,16 +0,0 @@
import { Service } from 'typedi';
import config from '@/config';
import { OrchestrationService } from '../../orchestration.service';
@Service()
export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean {
return (
this.isInitialized &&
config.get('executions.mode') === 'queue' &&
this.instanceSettings.instanceType === 'worker'
);
}
}

View file

@ -1,10 +0,0 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
publisher: Publisher;
getRunningJobIds: () => Array<string | number>;
getRunningJobsSummary: () => RunningJobSummary[];
}

View file

@ -3,8 +3,4 @@ import { Service } from 'typedi';
import { AbstractServer } from '@/abstract-server';
@Service()
export class WebhookServer extends AbstractServer {
constructor() {
super('webhook');
}
}
export class WebhookServer extends AbstractServer {}

View file

@ -1,6 +1,8 @@
process.argv[2] = 'worker';
import { TaskRunnersConfig } from '@n8n/config';
import { BinaryDataService } from 'n8n-core';
import Container from 'typedi';
import { Worker } from '@/commands/worker';
import config from '@/config';
@ -11,10 +13,12 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana
import { License } from '@/license';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { Push } from '@/push';
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { TaskRunnerServer } from '@/runners/task-runner-server';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { Telemetry } from '@/telemetry';
import { setupTestCommand } from '@test-integration/utils/test-command';
@ -22,6 +26,7 @@ import { mockInstance } from '../../shared/mocking';
config.set('executions.mode', 'queue');
config.set('binaryDataManager.availableModes', 'filesystem');
Container.get(TaskRunnersConfig).disabled = false;
mockInstance(LoadNodesAndCredentials);
const binaryDataService = mockInstance(BinaryDataService);
const externalHooks = mockInstance(ExternalHooks);
@ -30,7 +35,9 @@ const license = mockInstance(License, { loadCertStr: async () => '' });
const messageEventBus = mockInstance(MessageEventBus);
const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
const scalingService = mockInstance(ScalingService);
const orchestrationWorkerService = mockInstance(OrchestrationWorkerService);
const orchestrationService = mockInstance(OrchestrationService);
const taskRunnerServer = mockInstance(TaskRunnerServer);
const taskRunnerProcess = mockInstance(TaskRunnerProcess);
mockInstance(Publisher);
mockInstance(Subscriber);
mockInstance(Telemetry);
@ -41,10 +48,8 @@ const command = setupTestCommand(Worker);
test('worker initializes all its components', async () => {
config.set('executions.mode', 'regular'); // should be overridden
const worker = await command.run();
expect(worker.queueModeId).toBeDefined();
expect(worker.queueModeId).toContain('worker');
expect(worker.queueModeId.length).toBeGreaterThan(15);
await command.run();
expect(license.init).toHaveBeenCalledTimes(1);
expect(binaryDataService.init).toHaveBeenCalledTimes(1);
expect(externalHooks.init).toHaveBeenCalledTimes(1);
@ -53,8 +58,10 @@ test('worker initializes all its components', async () => {
expect(scalingService.setupQueue).toHaveBeenCalledTimes(1);
expect(scalingService.setupWorker).toHaveBeenCalledTimes(1);
expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1);
expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1);
expect(orchestrationService.init).toHaveBeenCalledTimes(1);
expect(messageEventBus.send).toHaveBeenCalledTimes(1);
expect(taskRunnerServer.start).toHaveBeenCalledTimes(1);
expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1);
expect(config.getEnv('executions.mode')).toBe('queue');
});

View file

@ -36,6 +36,7 @@
"@types/xml2js": "catalog:"
},
"dependencies": {
"@langchain/core": "catalog:",
"@n8n/client-oauth2": "workspace:*",
"aws4": "1.11.0",
"axios": "catalog:",
@ -45,10 +46,10 @@
"file-type": "16.5.4",
"form-data": "catalog:",
"lodash": "catalog:",
"@langchain/core": "catalog:",
"luxon": "catalog:",
"mime-types": "2.1.35",
"n8n-workflow": "workspace:*",
"nanoid": "catalog:",
"oauth-1.0a": "2.2.6",
"p-cancelable": "2.1.1",
"pretty-bytes": "5.6.0",

View file

@ -1,9 +1,12 @@
import { createHash, randomBytes } from 'crypto';
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { ApplicationError, jsonParse, ALPHABET } from 'n8n-workflow';
import { customAlphabet } from 'nanoid';
import path from 'path';
import { Service } from 'typedi';
const nanoid = customAlphabet(ALPHABET, 16);
interface ReadOnlySettings {
encryptionKey: string;
}
@ -40,6 +43,12 @@ export class InstanceSettings {
private settings = this.loadOrCreate();
/**
* Fixed ID of this n8n instance, for telemetry.
* Derived from encryption key. Do not confuse with `hostId`.
*
* @example '258fce876abf5ea60eb86a2e777e5e190ff8f3e36b5b37aafec6636c31d4d1f9'
*/
readonly instanceId = this.generateInstanceId();
readonly instanceType: InstanceType;
@ -49,6 +58,8 @@ export class InstanceSettings {
this.instanceType = ['webhook', 'worker'].includes(command)
? (command as InstanceType)
: 'main';
this.hostId = `${this.instanceType}-${nanoid()}`;
}
/**
@ -61,6 +72,16 @@ export class InstanceSettings {
*/
instanceRole: InstanceRole = 'unset';
/**
* Transient ID of this n8n instance, for scaling mode.
* Reset on restart. Do not confuse with `instanceId`.
*
* @example 'main-bnxa1riryKUNHtln'
* @example 'worker-nDJR0FnSd2Vf6DB5'
* @example 'webhook-jxQ7AO8IzxEtfW1F'
*/
readonly hostId: string;
get isLeader() {
return this.instanceRole === 'leader';
}

View file

@ -69,4 +69,19 @@ describe('InstanceSettings', () => {
);
});
});
describe('constructor', () => {
it('should generate a `hostId`', () => {
const encryptionKey = 'test_key';
process.env.N8N_ENCRYPTION_KEY = encryptionKey;
jest.spyOn(fs, 'existsSync').mockReturnValueOnce(true);
jest.spyOn(fs, 'readFileSync').mockReturnValueOnce(JSON.stringify({ encryptionKey }));
const settings = new InstanceSettings();
const [instanceType, nanoid] = settings.hostId.split('-');
expect(instanceType).toEqual('main');
expect(nanoid).toHaveLength(16); // e.g. sDX6ZPc0bozv66zM
});
});
});

View file

@ -523,7 +523,8 @@ function showPinDataDiscoveryTooltip(dataItemsCount: number): void {
isManualTypeNode.value ||
isScheduledGroup.value ||
uiStore.isAnyModalOpen ||
dataItemsCount === 0
dataItemsCount === 0 ||
pinnedData.hasData.value
)
return;

View file

@ -15,6 +15,7 @@ vi.mock('vue-router', () => ({
fullPath: vi.fn(),
}),
RouterLink: vi.fn(),
useRouter: vi.fn(),
}));
let route: ReturnType<typeof useRoute>;

View file

@ -9,6 +9,7 @@ vi.mock('vue-router', () => ({
params: {},
}),
RouterLink: vi.fn(),
useRouter: vi.fn(),
}));
const initialState = {

View file

@ -10,6 +10,7 @@ vi.mock('vue-router', async (importOriginal) => {
useRoute: () => ({
params: {},
}),
useRouter: vi.fn(),
};
});

View file

@ -8,10 +8,12 @@ import {
import { useWorkflowsStore } from '@/stores/workflows.store';
import type { IExecutionResponse, INodeUi, IWorkflowDb, IWorkflowSettings } from '@/Interface';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import type { ExecutionSummary, IConnection, INodeExecutionData } from 'n8n-workflow';
import { type ExecutionSummary, type IConnection, type INodeExecutionData } from 'n8n-workflow';
import { stringSizeInBytes } from '@/utils/typesUtils';
import { dataPinningEventBus } from '@/event-bus';
import { useUIStore } from '@/stores/ui.store';
import type { PushPayload } from '@n8n/api-types';
import { flushPromises } from '@vue/test-utils';
vi.mock('@/api/workflows', () => ({
getWorkflows: vi.fn(),
@ -19,12 +21,18 @@ vi.mock('@/api/workflows', () => ({
getNewWorkflow: vi.fn(),
}));
const getNodeType = vi.fn();
vi.mock('@/stores/nodeTypes.store', () => ({
useNodeTypesStore: vi.fn(() => ({
getNodeType: vi.fn(),
getNodeType,
})),
}));
const track = vi.fn();
vi.mock('@/composables/useTelemetry', () => ({
useTelemetry: () => ({ track }),
}));
describe('useWorkflowsStore', () => {
let workflowsStore: ReturnType<typeof useWorkflowsStore>;
let uiStore: ReturnType<typeof useUIStore>;
@ -33,6 +41,7 @@ describe('useWorkflowsStore', () => {
setActivePinia(createPinia());
workflowsStore = useWorkflowsStore();
uiStore = useUIStore();
track.mockReset();
});
it('should initialize with default state', () => {
@ -441,4 +450,197 @@ describe('useWorkflowsStore', () => {
expect(uiStore.stateIsDirty).toBe(true);
});
});
describe('addNodeExecutionData', () => {
const { successEvent, errorEvent, executionReponse } = generateMockExecutionEvents();
it('should throw error if not initalized', () => {
expect(() => workflowsStore.addNodeExecutionData(successEvent)).toThrowError();
});
it('should add node success run data', () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
// ACT
workflowsStore.addNodeExecutionData(successEvent);
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
data: {
resultData: {
runData: {
[successEvent.nodeName]: [successEvent.data],
},
},
},
});
});
it('should add node error event and track errored executions', async () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
workflowsStore.addNode({
parameters: {},
id: '554c7ff4-7ee2-407c-8931-e34234c5056a',
name: 'Edit Fields',
type: 'n8n-nodes-base.set',
position: [680, 180],
typeVersion: 3.4,
});
getNodeType.mockReturnValue(getMockEditFieldsNode());
// ACT
workflowsStore.addNodeExecutionData(errorEvent);
await flushPromises();
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
data: {
resultData: {
runData: {
[errorEvent.nodeName]: [errorEvent.data],
},
},
},
});
expect(track).toHaveBeenCalledWith(
'Manual exec errored',
{
error_title: 'invalid syntax',
node_type: 'n8n-nodes-base.set',
node_type_version: 3.4,
node_id: '554c7ff4-7ee2-407c-8931-e34234c5056a',
node_graph_string:
'{"node_types":["n8n-nodes-base.set"],"node_connections":[],"nodes":{"0":{"id":"554c7ff4-7ee2-407c-8931-e34234c5056a","type":"n8n-nodes-base.set","version":3.4,"position":[680,180]}},"notes":{},"is_pinned":false}',
},
{
withPostHog: true,
},
);
});
});
});
function getMockEditFieldsNode() {
return {
displayName: 'Edit Fields (Set)',
name: 'n8n-nodes-base.set',
icon: 'fa:pen',
group: ['input'],
description: 'Modify, add, or remove item fields',
defaultVersion: 3.4,
iconColor: 'blue',
version: [3, 3.1, 3.2, 3.3, 3.4],
subtitle: '={{$parameter["mode"]}}',
defaults: {
name: 'Edit Fields',
},
inputs: ['main'],
outputs: ['main'],
properties: [],
};
}
function generateMockExecutionEvents() {
const executionReponse: IExecutionResponse = {
id: '1',
workflowData: {
id: '1',
name: '',
createdAt: '1',
updatedAt: '1',
nodes: [],
connections: {},
active: false,
versionId: '1',
},
finished: false,
mode: 'cli',
startedAt: new Date(),
status: 'new',
data: {
resultData: {
runData: {},
},
},
};
const successEvent: PushPayload<'nodeExecuteAfter'> = {
executionId: '59',
nodeName: 'When clicking Test workflow',
data: {
hints: [],
startTime: 1727867966633,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
},
};
const errorEvent: PushPayload<'nodeExecuteAfter'> = {
executionId: '61',
nodeName: 'Edit Fields',
data: {
hints: [],
startTime: 1727869043441,
executionTime: 2,
source: [
{
previousNode: 'When clicking Test workflow',
},
],
executionStatus: 'error',
// @ts-expect-error simpler data type, not BE class with methods
error: {
level: 'error',
tags: {
packageName: 'workflow',
},
context: {
itemIndex: 0,
},
functionality: 'regular',
name: 'NodeOperationError',
timestamp: 1727869043442,
node: {
parameters: {
mode: 'manual',
duplicateItem: false,
assignments: {
assignments: [
{
id: '87afdb19-4056-4551-93ef-d0126a34eb83',
name: "={{ $('Wh }}",
value: '',
type: 'string',
},
],
},
includeOtherFields: false,
options: {},
},
id: '9fb34d2d-7191-48de-8f18-91a6a28d0230',
name: 'Edit Fields',
type: 'n8n-nodes-base.set',
typeVersion: 3.4,
position: [1120, 180],
},
messages: [],
message: 'invalid syntax',
stack: 'NodeOperationError: invalid syntax',
},
},
};
return { executionReponse, errorEvent, successEvent };
}

View file

@ -80,6 +80,11 @@ import { useProjectsStore } from '@/stores/projects.store';
import type { ProjectSharingData } from '@/types/projects.types';
import type { PushPayload } from '@n8n/api-types';
import { useLocalStorage } from '@vueuse/core';
import { useTelemetry } from '@/composables/useTelemetry';
import { TelemetryHelpers } from 'n8n-workflow';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useRouter } from 'vue-router';
import { useSettingsStore } from './settings.store';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '',
@ -107,6 +112,10 @@ let cachedWorkflow: Workflow | null = null;
export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const uiStore = useUIStore();
const telemetry = useTelemetry();
const router = useRouter();
const workflowHelpers = useWorkflowHelpers({ router });
const settingsStore = useSettingsStore();
// -1 means the backend chooses the default
// 0 is the old flow
// 1 is the new flow
@ -1188,6 +1197,33 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
}
async function trackNodeExecution(pushData: PushPayload<'nodeExecuteAfter'>): Promise<void> {
const nodeName = pushData.nodeName;
if (pushData.data.error) {
const node = getNodeByName(nodeName);
telemetry.track(
'Manual exec errored',
{
error_title: pushData.data.error.message,
node_type: node?.type,
node_type_version: node?.typeVersion,
node_id: node?.id,
node_graph_string: JSON.stringify(
TelemetryHelpers.generateNodesGraph(
await workflowHelpers.getWorkflowDataToSave(),
workflowHelpers.getNodeTypes(),
{
isCloudDeployment: settingsStore.isCloudDeployment,
},
).nodeGraph,
),
},
{ withPostHog: true },
);
}
}
function addNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
if (!workflowExecutionData.value?.data) {
throw new Error('The "workflowExecutionData" is not initialized!');
@ -1209,6 +1245,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
};
}
workflowExecutionData.value.data!.resultData.runData[pushData.nodeName].push(pushData.data);
void trackNodeExecution(pushData);
}
function clearNodeExecutionData(nodeName: string): void {

View file

@ -15,6 +15,7 @@ vi.mock('vue-router', () => {
push,
}),
RouterLink: vi.fn(),
useRoute: vi.fn(),
};
});

View file

@ -178,6 +178,7 @@ const versionDescription: INodeTypeDescription = {
displayOptions: {
show: {
include: ['selected'],
'@version': [3, 3.1, 3.2],
},
},
},
@ -193,6 +194,45 @@ const versionDescription: INodeTypeDescription = {
displayOptions: {
show: {
include: ['except'],
'@version': [3, 3.1, 3.2],
},
},
},
{
displayName: 'Fields to Include',
name: 'includeFields',
type: 'string',
default: '',
placeholder: 'e.g. fieldToInclude1,fieldToInclude2',
description:
'Comma-separated list of the field names you want to include in the output. You can drag the selected fields from the input panel.',
requiresDataPath: 'multiple',
displayOptions: {
show: {
include: ['selected'],
'/includeOtherFields': [true],
},
hide: {
'@version': [3, 3.1, 3.2],
},
},
},
{
displayName: 'Fields to Exclude',
name: 'excludeFields',
type: 'string',
default: '',
placeholder: 'e.g. fieldToExclude1,fieldToExclude2',
description:
'Comma-separated list of the field names you want to exclude from the output. You can drag the selected fields from the input panel.',
requiresDataPath: 'multiple',
displayOptions: {
show: {
include: ['except'],
'/includeOtherFields': [true],
},
hide: {
'@version': [3, 3.1, 3.2],
},
},
},

View file

@ -2482,6 +2482,8 @@ export interface INodeGraphItem {
toolSettings?: IDataObject; //various langchain tool's settings
sql?: string; //merge node combineBySql, cloud only
workflow_id?: string; //@n8n/n8n-nodes-langchain.toolWorkflow and n8n-nodes-base.executeWorkflow
runs?: number;
items_total?: number;
}
export interface INodeNameIndex {

View file

@ -24,6 +24,8 @@ import type {
IWorkflowBase,
INodeTypes,
IDataObject,
IRunData,
ITaskData,
} from './Interfaces';
import { getNodeParameters } from './NodeHelpers';
@ -131,6 +133,21 @@ export function getDomainPath(raw: string, urlParts = URL_PARTS_REGEX): string {
}
}
function getNumberOfItemsInRuns(runs: ITaskData[]): number {
return runs.reduce((total, run) => {
const data = run.data ?? {};
let count = 0;
Object.keys(data).forEach((type) => {
const conn = data[type] ?? [];
conn.forEach((branch) => {
count += (branch ?? []).length;
});
});
return total + count;
}, 0);
}
export function generateNodesGraph(
workflow: Partial<IWorkflowBase>,
nodeTypes: INodeTypes,
@ -138,8 +155,10 @@ export function generateNodesGraph(
sourceInstanceId?: string;
nodeIdMap?: { [curr: string]: string };
isCloudDeployment?: boolean;
runData?: IRunData;
},
): INodesGraphResult {
const { runData } = options ?? {};
const nodeGraph: INodesGraph = {
node_types: [],
node_connections: [],
@ -200,6 +219,13 @@ export function generateNodesGraph(
position: node.position,
};
if (runData?.[node.name]) {
const runs = runData[node.name] ?? [];
nodeItem.runs = runs.length;
nodeItem.items_total = getNumberOfItemsInRuns(runs);
}
if (options?.sourceInstanceId) {
nodeItem.src_instance_id = options.sourceInstanceId;
}

View file

@ -3,6 +3,7 @@ import { v5 as uuidv5, v3 as uuidv3, v4 as uuidv4, v1 as uuidv1 } from 'uuid';
import { STICKY_NODE_TYPE } from '@/Constants';
import { ApplicationError } from '@/errors';
import type { IRunData } from '@/Interfaces';
import { NodeConnectionType, type IWorkflowBase } from '@/Interfaces';
import * as nodeHelpers from '@/NodeHelpers';
import {
@ -780,6 +781,108 @@ describe('generateNodesGraph', () => {
expect(() => generateNodesGraph(workflow, nodeTypes)).not.toThrow();
});
test('should add run and items count', () => {
const { workflow, runData } = generateTestWorkflowAndRunData();
expect(generateNodesGraph(workflow, nodeTypes, { runData })).toEqual({
nameIndices: {
DebugHelper: '4',
'Edit Fields': '1',
'Edit Fields1': '2',
'Edit Fields2': '3',
'Execute Workflow Trigger': '0',
Switch: '5',
},
nodeGraph: {
is_pinned: false,
node_connections: [
{
end: '1',
start: '0',
},
{
end: '4',
start: '0',
},
{
end: '5',
start: '1',
},
{
end: '1',
start: '4',
},
{
end: '2',
start: '5',
},
{
end: '3',
start: '5',
},
],
node_types: [
'n8n-nodes-base.executeWorkflowTrigger',
'n8n-nodes-base.set',
'n8n-nodes-base.set',
'n8n-nodes-base.set',
'n8n-nodes-base.debugHelper',
'n8n-nodes-base.switch',
],
nodes: {
'0': {
id: 'a2372c14-87de-42de-9f9e-1c499aa2c279',
items_total: 1,
position: [1000, 240],
runs: 1,
type: 'n8n-nodes-base.executeWorkflowTrigger',
version: 1,
},
'1': {
id: '0f7aa00e-248c-452c-8cd0-62cb55941633',
items_total: 4,
position: [1460, 640],
runs: 2,
type: 'n8n-nodes-base.set',
version: 3.1,
},
'2': {
id: '9165c185-9f1c-4ec1-87bf-76ca66dfae38',
items_total: 4,
position: [1860, 260],
runs: 2,
type: 'n8n-nodes-base.set',
version: 3.4,
},
'3': {
id: '7a915fd5-5987-4ff1-9509-06b24a0a4613',
position: [1940, 680],
type: 'n8n-nodes-base.set',
version: 3.4,
},
'4': {
id: '63050e7c-8ad5-4f44-8fdd-da555e40471b',
items_total: 3,
position: [1220, 240],
runs: 1,
type: 'n8n-nodes-base.debugHelper',
version: 1,
},
'5': {
id: 'fbf7525d-2d1d-4dcf-97a0-43b53d087ef3',
items_total: 4,
position: [1680, 640],
runs: 2,
type: 'n8n-nodes-base.switch',
version: 3.2,
},
},
notes: {},
},
webhookNodeNames: [],
});
});
});
function validUrls(idMaker: typeof alphanumericId | typeof email, char = CHAR) {
@ -886,3 +989,293 @@ function alphanumericId() {
}
const chooseRandomly = <T>(array: T[]) => array[randomInt(array.length)];
function generateTestWorkflowAndRunData(): { workflow: IWorkflowBase; runData: IRunData } {
const workflow: IWorkflowBase = {
meta: {
instanceId: 'a786b722078489c1fa382391a9f3476c2784761624deb2dfb4634827256d51a0',
},
nodes: [
{
parameters: {},
id: 'a2372c14-87de-42de-9f9e-1c499aa2c279',
name: 'Execute Workflow Trigger',
type: 'n8n-nodes-base.executeWorkflowTrigger',
typeVersion: 1,
position: [1000, 240],
},
{
parameters: {
options: {},
},
id: '0f7aa00e-248c-452c-8cd0-62cb55941633',
name: 'Edit Fields',
type: 'n8n-nodes-base.set',
typeVersion: 3.1,
position: [1460, 640],
},
{
parameters: {
options: {},
},
id: '9165c185-9f1c-4ec1-87bf-76ca66dfae38',
name: 'Edit Fields1',
type: 'n8n-nodes-base.set',
typeVersion: 3.4,
position: [1860, 260],
},
{
parameters: {
options: {},
},
id: '7a915fd5-5987-4ff1-9509-06b24a0a4613',
name: 'Edit Fields2',
type: 'n8n-nodes-base.set',
typeVersion: 3.4,
position: [1940, 680],
},
{
parameters: {
category: 'randomData',
randomDataSeed: '0',
randomDataCount: 3,
},
id: '63050e7c-8ad5-4f44-8fdd-da555e40471b',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [1220, 240],
},
{
id: 'fbf7525d-2d1d-4dcf-97a0-43b53d087ef3',
name: 'Switch',
type: 'n8n-nodes-base.switch',
typeVersion: 3.2,
position: [1680, 640],
parameters: {},
},
],
connections: {
'Execute Workflow Trigger': {
main: [
[
{
node: 'Edit Fields',
type: 'main' as NodeConnectionType,
index: 0,
},
{
node: 'DebugHelper',
type: 'main' as NodeConnectionType,
index: 0,
},
],
],
},
'Edit Fields': {
main: [
[
{
node: 'Switch',
type: 'main' as NodeConnectionType,
index: 0,
},
],
],
},
DebugHelper: {
main: [
[
{
node: 'Edit Fields',
type: 'main' as NodeConnectionType,
index: 0,
},
],
],
},
Switch: {
main: [
// @ts-ignore
null,
// @ts-ignore
null,
[
{
node: 'Edit Fields1',
type: 'main' as NodeConnectionType,
index: 0,
},
],
[
{
node: 'Edit Fields2',
type: 'main' as NodeConnectionType,
index: 0,
},
],
],
},
},
pinData: {},
};
const runData: IRunData = {
'Execute Workflow Trigger': [
{
hints: [],
startTime: 1727793340927,
executionTime: 0,
source: [],
executionStatus: 'success',
data: { main: [[{ json: {}, pairedItem: { item: 0 } }]] },
},
],
DebugHelper: [
{
hints: [],
startTime: 1727793340928,
executionTime: 0,
source: [{ previousNode: 'Execute Workflow Trigger' }],
executionStatus: 'success',
data: {
main: [
[
{
json: {
test: 'abc',
},
pairedItem: { item: 0 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 0 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 0 },
},
],
],
},
},
],
'Edit Fields': [
{
hints: [],
startTime: 1727793340928,
executionTime: 1,
source: [{ previousNode: 'DebugHelper' }],
executionStatus: 'success',
data: {
main: [
[
{
json: {
test: 'abc',
},
pairedItem: { item: 0 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 1 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 2 },
},
],
],
},
},
{
hints: [],
startTime: 1727793340931,
executionTime: 0,
source: [{ previousNode: 'Execute Workflow Trigger' }],
executionStatus: 'success',
data: { main: [[{ json: {}, pairedItem: { item: 0 } }]] },
},
],
Switch: [
{
hints: [],
startTime: 1727793340929,
executionTime: 1,
source: [{ previousNode: 'Edit Fields' }],
executionStatus: 'success',
data: {
main: [
[],
[],
[
{
json: {
test: 'abc',
},
pairedItem: { item: 0 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 1 },
},
{
json: {
test: 'abc',
},
pairedItem: { item: 2 },
},
],
[],
],
},
},
{
hints: [],
startTime: 1727793340931,
executionTime: 0,
source: [{ previousNode: 'Edit Fields', previousNodeRun: 1 }],
executionStatus: 'success',
data: { main: [[], [], [{ json: {}, pairedItem: { item: 0 } }], []] },
},
],
'Edit Fields1': [
{
hints: [],
startTime: 1727793340930,
executionTime: 0,
source: [{ previousNode: 'Switch', previousNodeOutput: 2 }],
executionStatus: 'success',
data: {
main: [
[
{ json: {}, pairedItem: { item: 0 } },
{ json: {}, pairedItem: { item: 1 } },
{ json: {}, pairedItem: { item: 2 } },
],
],
},
},
{
hints: [],
startTime: 1727793340932,
executionTime: 1,
source: [{ previousNode: 'Switch', previousNodeOutput: 2, previousNodeRun: 1 }],
executionStatus: 'success',
data: { main: [[{ json: {}, pairedItem: { item: 0 } }]] },
},
],
};
return { workflow, runData };
}

View file

@ -1112,6 +1112,9 @@ importers:
n8n-workflow:
specifier: workspace:*
version: link:../workflow
nanoid:
specifier: 'catalog:'
version: 3.3.6
oauth-1.0a:
specifier: 2.2.6
version: 2.2.6
@ -2206,7 +2209,7 @@ packages:
'@azure/core-http@3.0.4':
resolution: {integrity: sha512-Fok9VVhMdxAFOtqiiAtg74fL0UJkt0z3D+ouUUxcRLzZNBioPRAMJFVxiWoJljYpXsRi4GDQHzQHDc9AiYaIUQ==}
engines: {node: '>=14.0.0'}
deprecated: This package is no longer supported. Please migrate to use @azure/core-rest-pipeline
deprecated: deprecating as we migrated to core v2
'@azure/core-lro@2.4.0':
resolution: {integrity: sha512-F65+rYkll1dpw3RGm8/SSiSj+/QkMeYDanzS/QKlM1dmuneVyXbO46C88V1MRHluLGdMP6qfD3vDRYALn0z0tQ==}
@ -5769,6 +5772,9 @@ packages:
axios-retry@3.7.0:
resolution: {integrity: sha512-ZTnCkJbRtfScvwiRnoVskFAfvU0UG3xNcsjwTR0mawSbIJoothxn67gKsMaNAFHRXJ1RmuLhmZBzvyXi3+9WyQ==}
axios@1.7.3:
resolution: {integrity: sha512-Ar7ND9pU99eJ9GpoGQKhKf58GpUOgnzuaB7ueNQ5BMi0p+LZ5oaEnfF999fAArcTIBwXTCHAmGcHOZJaWPq9Nw==}
axios@1.7.4:
resolution: {integrity: sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==}
@ -14925,7 +14931,7 @@ snapshots:
'@n8n/localtunnel@3.0.0':
dependencies:
axios: 1.7.7(debug@4.3.6)
axios: 1.7.3(debug@4.3.6)
debug: 4.3.6(supports-color@8.1.1)
transitivePeerDependencies:
- supports-color
@ -17630,6 +17636,14 @@ snapshots:
'@babel/runtime': 7.24.7
is-retry-allowed: 2.2.0
axios@1.7.3(debug@4.3.6):
dependencies:
follow-redirects: 1.15.6(debug@4.3.6)
form-data: 4.0.0
proxy-from-env: 1.1.0
transitivePeerDependencies:
- debug
axios@1.7.4:
dependencies:
follow-redirects: 1.15.6(debug@4.3.6)
@ -17646,14 +17660,6 @@ snapshots:
transitivePeerDependencies:
- debug
axios@1.7.7(debug@4.3.6):
dependencies:
follow-redirects: 1.15.6(debug@4.3.6)
form-data: 4.0.0
proxy-from-env: 1.1.0
transitivePeerDependencies:
- debug
axios@1.7.7(debug@4.3.7):
dependencies:
follow-redirects: 1.15.6(debug@4.3.7)
@ -19293,7 +19299,7 @@ snapshots:
eslint-import-resolver-node@0.3.9:
dependencies:
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
is-core-module: 2.13.1
resolve: 1.22.8
transitivePeerDependencies:
@ -19318,7 +19324,7 @@ snapshots:
eslint-module-utils@2.8.0(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0):
dependencies:
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
optionalDependencies:
'@typescript-eslint/parser': 7.2.0(eslint@8.57.0)(typescript@5.6.2)
eslint: 8.57.0
@ -19338,7 +19344,7 @@ snapshots:
array.prototype.findlastindex: 1.2.3
array.prototype.flat: 1.3.2
array.prototype.flatmap: 1.3.2
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
doctrine: 2.1.0
eslint: 8.57.0
eslint-import-resolver-node: 0.3.9
@ -20136,7 +20142,7 @@ snapshots:
array-parallel: 0.1.3
array-series: 0.1.5
cross-spawn: 4.0.2
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
transitivePeerDependencies:
- supports-color
@ -23039,7 +23045,7 @@ snapshots:
pdf-parse@1.1.1:
dependencies:
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
node-ensure: 0.0.0
transitivePeerDependencies:
- supports-color
@ -23868,7 +23874,7 @@ snapshots:
rhea@1.0.24:
dependencies:
debug: 3.2.7(supports-color@8.1.1)
debug: 3.2.7(supports-color@5.5.0)
transitivePeerDependencies:
- supports-color