feat(core): Offload manual executions to workers (#11284)

This commit is contained in:
Iván Ovejero 2025-01-03 10:43:05 +01:00 committed by GitHub
parent b6230b63f2
commit 9432aa0b00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 287 additions and 61 deletions

View file

@ -9,6 +9,7 @@ export const LOG_SCOPES = [
'multi-main-setup', 'multi-main-setup',
'pruning', 'pruning',
'pubsub', 'pubsub',
'push',
'redis', 'redis',
'scaling', 'scaling',
'waiting-executions', 'waiting-executions',
@ -70,10 +71,13 @@ export class LoggingConfig {
* - `external-secrets` * - `external-secrets`
* - `license` * - `license`
* - `multi-main-setup` * - `multi-main-setup`
* - `pruning`
* - `pubsub` * - `pubsub`
* - `push`
* - `redis` * - `redis`
* - `scaling` * - `scaling`
* - `waiting-executions` * - `waiting-executions`
* - `task-runner`
* *
* @example * @example
* `N8N_LOG_SCOPES=license` * `N8N_LOG_SCOPES=license`

View file

@ -1,6 +1,7 @@
import { readFileSync } from 'fs'; import { readFileSync } from 'fs';
import type { n8n } from 'n8n-core'; import type { n8n } from 'n8n-core';
import { jsonParse } from 'n8n-workflow'; import type { ITaskDataConnections } from 'n8n-workflow';
import { jsonParse, TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
import { resolve, join, dirname } from 'path'; import { resolve, join, dirname } from 'path';
const { NODE_ENV, E2E_TESTS } = process.env; const { NODE_ENV, E2E_TESTS } = process.env;
@ -161,6 +162,22 @@ export const ARTIFICIAL_TASK_DATA = {
], ],
}; };
/**
* Connections for an item standing in for a manual execution data item too
* large to be sent live via pubsub. This signals to the client to direct the
* user to the execution history.
*/
export const TRIMMED_TASK_DATA_CONNECTIONS: ITaskDataConnections = {
main: [
[
{
json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true },
pairedItem: undefined,
},
],
],
};
/** Lowest priority, meaning shut down happens after other groups */ /** Lowest priority, meaning shut down happens after other groups */
export const LOWEST_SHUTDOWN_PRIORITY = 0; export const LOWEST_SHUTDOWN_PRIORITY = 0;
export const DEFAULT_SHUTDOWN_PRIORITY = 100; export const DEFAULT_SHUTDOWN_PRIORITY = 100;

View file

@ -183,7 +183,7 @@ describe('ExecutionService', () => {
describe('scaling mode', () => { describe('scaling mode', () => {
describe('manual execution', () => { describe('manual execution', () => {
it('should delegate to regular mode in scaling mode', async () => { it('should stop a `running` execution in scaling mode', async () => {
/** /**
* Arrange * Arrange
*/ */
@ -197,6 +197,8 @@ describe('ExecutionService', () => {
concurrencyControl.has.mockReturnValue(false); concurrencyControl.has.mockReturnValue(false);
activeExecutions.has.mockReturnValue(true); activeExecutions.has.mockReturnValue(true);
waitTracker.has.mockReturnValue(false); waitTracker.has.mockReturnValue(false);
const job = mock<Job>({ data: { executionId: '123' } });
scalingService.findJobsByStatus.mockResolvedValue([job]);
executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>()); executionRepository.stopDuringRun.mockResolvedValue(mock<IExecutionResponse>());
// @ts-expect-error Private method // @ts-expect-error Private method
const stopInRegularModeSpy = jest.spyOn(executionService, 'stopInRegularMode'); const stopInRegularModeSpy = jest.spyOn(executionService, 'stopInRegularMode');
@ -209,7 +211,7 @@ describe('ExecutionService', () => {
/** /**
* Assert * Assert
*/ */
expect(stopInRegularModeSpy).toHaveBeenCalledWith(execution); expect(stopInRegularModeSpy).not.toHaveBeenCalled();
expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id); expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id);
expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution); expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution);

View file

@ -464,11 +464,6 @@ export class ExecutionService {
} }
private async stopInScalingMode(execution: IExecutionResponse) { private async stopInScalingMode(execution: IExecutionResponse) {
if (execution.mode === 'manual') {
// manual executions in scaling mode are processed by main
return await this.stopInRegularMode(execution);
}
if (this.activeExecutions.has(execution.id)) { if (this.activeExecutions.has(execution.id)) {
this.activeExecutions.stopExecution(execution.id); this.activeExecutions.stopExecution(execution.id);
} }

View file

@ -20,7 +20,7 @@ describe('Push', () => {
test('should validate pushRef on requests for websocket backend', () => { test('should validate pushRef on requests for websocket backend', () => {
config.set('push.backend', 'websocket'); config.set('push.backend', 'websocket');
const push = new Push(mock(), mock()); const push = new Push(mock(), mock(), mock());
const ws = mock<WebSocket>(); const ws = mock<WebSocket>();
const request = mock<WebSocketPushRequest>({ user, ws }); const request = mock<WebSocketPushRequest>({ user, ws });
request.query = { pushRef: '' }; request.query = { pushRef: '' };
@ -33,7 +33,7 @@ describe('Push', () => {
test('should validate pushRef on requests for SSE backend', () => { test('should validate pushRef on requests for SSE backend', () => {
config.set('push.backend', 'sse'); config.set('push.backend', 'sse');
const push = new Push(mock(), mock()); const push = new Push(mock(), mock(), mock());
const request = mock<SSEPushRequest>({ user, ws: undefined }); const request = mock<SSEPushRequest>({ user, ws: undefined });
request.query = { pushRef: '' }; request.query = { pushRef: '' };
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError); expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);

View file

@ -2,7 +2,8 @@ import type { PushMessage } from '@n8n/api-types';
import type { Application } from 'express'; import type { Application } from 'express';
import { ServerResponse } from 'http'; import { ServerResponse } from 'http';
import type { Server } from 'http'; import type { Server } from 'http';
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings, Logger } from 'n8n-core';
import { deepCopy } from 'n8n-workflow';
import type { Socket } from 'net'; import type { Socket } from 'net';
import { Container, Service } from 'typedi'; import { Container, Service } from 'typedi';
import { parse as parseUrl } from 'url'; import { parse as parseUrl } from 'url';
@ -10,6 +11,7 @@ import { Server as WSServer } from 'ws';
import { AuthService } from '@/auth/auth.service'; import { AuthService } from '@/auth/auth.service';
import config from '@/config'; import config from '@/config';
import { TRIMMED_TASK_DATA_CONNECTIONS } from '@/constants';
import type { User } from '@/databases/entities/user'; import type { User } from '@/databases/entities/user';
import { OnShutdown } from '@/decorators/on-shutdown'; import { OnShutdown } from '@/decorators/on-shutdown';
import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { BadRequestError } from '@/errors/response-errors/bad-request.error';
@ -27,6 +29,12 @@ type PushEvents = {
const useWebSockets = config.getEnv('push.backend') === 'websocket'; const useWebSockets = config.getEnv('push.backend') === 'websocket';
/**
* Max allowed size of a push message in bytes. Events going through the pubsub
* channel are trimmed if exceeding this size.
*/
const MAX_PAYLOAD_SIZE_BYTES = 5 * 1024 * 1024; // 5 MiB
/** /**
* Push service for uni- or bi-directional communication with frontend clients. * Push service for uni- or bi-directional communication with frontend clients.
* Uses either server-sent events (SSE, unidirectional from backend --> frontend) * Uses either server-sent events (SSE, unidirectional from backend --> frontend)
@ -43,8 +51,10 @@ export class Push extends TypedEmitter<PushEvents> {
constructor( constructor(
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly publisher: Publisher, private readonly publisher: Publisher,
private readonly logger: Logger,
) { ) {
super(); super();
this.logger = this.logger.scoped('push');
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
} }
@ -85,18 +95,14 @@ export class Push extends TypedEmitter<PushEvents> {
this.backend.sendToAll(pushMsg); this.backend.sendToAll(pushMsg);
} }
/** Returns whether a given push ref is registered. */
hasPushRef(pushRef: string) {
return this.backend.hasPushRef(pushRef);
}
send(pushMsg: PushMessage, pushRef: string) { send(pushMsg: PushMessage, pushRef: string) {
/** if (this.shouldRelayViaPubSub(pushRef)) {
* Multi-main setup: In a manual webhook execution, the main process that this.relayViaPubSub(pushMsg, pushRef);
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecycle events to the creator's frontend.
*/
if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsg, pushRef },
});
return; return;
} }
@ -111,6 +117,66 @@ export class Push extends TypedEmitter<PushEvents> {
onShutdown() { onShutdown() {
this.backend.closeAllConnections(); this.backend.closeAllConnections();
} }
/**
* Whether to relay a push message via pubsub channel to other instances,
* instead of pushing the message directly to the frontend.
*
* This is needed in two scenarios:
*
* In scaling mode, in single- or multi-main setup, in a manual execution, a
* worker has no connection to a frontend and so relays to all mains lifecycle
* events for manual executions. Only the main who holds the session for the
* execution will push to the frontend who commissioned the execution.
*
* In scaling mode, in multi-main setup, in a manual webhook execution, if
* the main who handles a webhook is not the main who created the webhook,
* the handler main relays execution lifecycle events to all mains. Only
* the main who holds the session for the execution will push events to
* the frontend who commissioned the execution.
*/
private shouldRelayViaPubSub(pushRef: string) {
const { isWorker, isMultiMain } = this.instanceSettings;
return isWorker || (isMultiMain && !this.hasPushRef(pushRef));
}
/**
* Relay a push message via the `n8n.commands` pubsub channel,
* reducing the payload size if too large.
*
* See {@link shouldRelayViaPubSub} for more details.
*/
private relayViaPubSub(pushMsg: PushMessage, pushRef: string) {
const eventSizeBytes = new TextEncoder().encode(JSON.stringify(pushMsg.data)).length;
if (eventSizeBytes <= MAX_PAYLOAD_SIZE_BYTES) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsg, pushRef },
});
return;
}
// too large for pubsub channel, trim it
const pushMsgCopy = deepCopy(pushMsg);
const toMb = (bytes: number) => (bytes / (1024 * 1024)).toFixed(0);
const eventMb = toMb(eventSizeBytes);
const maxMb = toMb(MAX_PAYLOAD_SIZE_BYTES);
const { type } = pushMsgCopy;
this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`);
if (type === 'nodeExecuteAfter') pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS;
else if (type === 'executionFinished') pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsgCopy, pushRef },
});
}
} }
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {

View file

@ -19,6 +19,7 @@ describe('JobProcessor', () => {
mock(), mock(),
mock(), mock(),
mock(), mock(),
mock(),
); );
const result = await jobProcessor.processJob(mock<Job>()); const result = await jobProcessor.processJob(mock<Job>());

View file

@ -11,7 +11,6 @@ import type { ExternalSecretsManager } from '@/external-secrets.ee/external-secr
import type { IWorkflowDb } from '@/interfaces'; import type { IWorkflowDb } from '@/interfaces';
import type { License } from '@/license'; import type { License } from '@/license';
import type { Push } from '@/push'; import type { Push } from '@/push';
import type { WebSocketPush } from '@/push/websocket.push';
import type { CommunityPackagesService } from '@/services/community-packages.service'; import type { CommunityPackagesService } from '@/services/community-packages.service';
import type { TestWebhooks } from '@/webhooks/test-webhooks'; import type { TestWebhooks } from '@/webhooks/test-webhooks';
@ -829,9 +828,7 @@ describe('PubSubHandler', () => {
flattedRunData: '[]', flattedRunData: '[]',
}; };
push.getBackend.mockReturnValue( push.hasPushRef.mockReturnValue(true);
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);
eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef }); eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef });
@ -858,9 +855,7 @@ describe('PubSubHandler', () => {
const workflowEntity = mock<IWorkflowDb>({ id: 'test-workflow-id' }); const workflowEntity = mock<IWorkflowDb>({ id: 'test-workflow-id' });
const pushRef = 'test-push-ref'; const pushRef = 'test-push-ref';
push.getBackend.mockReturnValue( push.hasPushRef.mockReturnValue(true);
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);
testWebhooks.toWorkflow.mockReturnValue(mock<Workflow>({ id: 'test-workflow-id' })); testWebhooks.toWorkflow.mockReturnValue(mock<Workflow>({ id: 'test-workflow-id' }));
eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef }); eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef });

View file

@ -1,6 +1,11 @@
import type { RunningJobSummary } from '@n8n/api-types'; import type { RunningJobSummary } from '@n8n/api-types';
import { ErrorReporter, InstanceSettings, WorkflowExecute, Logger } from 'n8n-core'; import { InstanceSettings, WorkflowExecute, ErrorReporter, Logger } from 'n8n-core';
import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; import type {
ExecutionStatus,
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
import type PCancelable from 'p-cancelable'; import type PCancelable from 'p-cancelable';
import { Service } from 'typedi'; import { Service } from 'typedi';
@ -8,6 +13,7 @@ import { Service } from 'typedi';
import config from '@/config'; import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { ManualExecutionService } from '@/manual-execution.service';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
@ -34,6 +40,7 @@ export class JobProcessor {
private readonly workflowRepository: WorkflowRepository, private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes, private readonly nodeTypes: NodeTypes,
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService,
) { ) {
this.logger = this.logger.scoped('scaling'); this.logger = this.logger.scoped('scaling');
} }
@ -115,13 +122,20 @@ export class JobProcessor {
executionTimeoutTimestamp, executionTimeoutTimestamp,
); );
const { pushRef } = job.data;
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
execution.mode, execution.mode,
job.data.executionId, job.data.executionId,
execution.workflowData, execution.workflowData,
{ retryOf: execution.retryOf as string }, { retryOf: execution.retryOf as string, pushRef },
); );
if (pushRef) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef });
}
additionalData.hooks.hookFunctions.sendResponse = [ additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => { async (response: IExecuteResponsePromiseData): Promise<void> => {
const msg: RespondToWebhookMessage = { const msg: RespondToWebhookMessage = {
@ -146,7 +160,31 @@ export class JobProcessor {
let workflowExecute: WorkflowExecute; let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>; let workflowRun: PCancelable<IRun>;
if (execution.data !== undefined) {
const { startData, resultData, manualData, isTestWebhook } = execution.data;
if (execution.mode === 'manual' && !isTestWebhook) {
const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
workflowData: execution.workflowData,
destinationNode: startData?.destinationNode,
startNodes: startData?.startNodes,
runData: resultData.runData,
pinData: resultData.pinData,
partialExecutionVersion: manualData?.partialExecutionVersion,
dirtyNodeNames: manualData?.dirtyNodeNames,
triggerToStartFrom: manualData?.triggerToStartFrom,
userId: manualData?.userId,
};
workflowRun = this.manualExecutionService.runManually(
data,
workflow,
additionalData,
executionId,
resultData.pinData,
);
} else if (execution.data !== undefined) {
workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data); workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data);
workflowRun = workflowExecute.processRunExecutionData(workflow); workflowRun = workflowExecute.processRunExecutionData(workflow);
} else { } else {

View file

@ -160,12 +160,12 @@ export class PubSubHandler {
'display-workflow-activation-error': async ({ workflowId, errorMessage }) => 'display-workflow-activation-error': async ({ workflowId, errorMessage }) =>
this.push.broadcast({ type: 'workflowFailedToActivate', data: { workflowId, errorMessage } }), this.push.broadcast({ type: 'workflowFailedToActivate', data: { workflowId, errorMessage } }),
'relay-execution-lifecycle-event': async ({ pushRef, ...pushMsg }) => { 'relay-execution-lifecycle-event': async ({ pushRef, ...pushMsg }) => {
if (!this.push.getBackend().hasPushRef(pushRef)) return; if (!this.push.hasPushRef(pushRef)) return;
this.push.send(pushMsg, pushRef); this.push.send(pushMsg, pushRef);
}, },
'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => {
if (!this.push.getBackend().hasPushRef(pushRef)) return; if (!this.push.hasPushRef(pushRef)) return;
this.testWebhooks.clearTimeout(webhookKey); this.testWebhooks.clearTimeout(webhookKey);

View file

@ -12,6 +12,7 @@ export type JobId = Job['id'];
export type JobData = { export type JobData = {
executionId: string; executionId: string;
loadStaticData: boolean; loadStaticData: boolean;
pushRef?: string;
}; };
export type JobResult = { export type JobResult = {

View file

@ -154,11 +154,7 @@ export class TestWebhooks implements IWebhookManager {
* the webhook. If so, after the test webhook has been successfully executed, * the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks. * the handler process commands the creator process to clear its test webhooks.
*/ */
if ( if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
this.instanceSettings.isMultiMain &&
pushRef &&
!this.push.getBackend().hasPushRef(pushRef)
) {
void this.publisher.publishCommand({ void this.publisher.publishCommand({
command: 'clear-test-webhooks', command: 'clear-test-webhooks',
payload: { webhookKey: key, workflowEntity, pushRef }, payload: { webhookKey: key, workflowEntity, pushRef },

View file

@ -37,10 +37,12 @@ import {
FORM_NODE_TYPE, FORM_NODE_TYPE,
NodeOperationError, NodeOperationError,
} from 'n8n-workflow'; } from 'n8n-workflow';
import assert from 'node:assert';
import { finished } from 'stream/promises'; import { finished } from 'stream/promises';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions'; import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { Project } from '@/databases/entities/project'; import type { Project } from '@/databases/entities/project';
import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';
@ -531,6 +533,15 @@ export async function executeWebhook(
}); });
} }
if (
config.getEnv('executions.mode') === 'queue' &&
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' &&
runData.executionMode === 'manual'
) {
assert(runData.executionData);
runData.executionData.isTestWebhook = true;
}
// Start now to run the workflow // Start now to run the workflow
executionId = await Container.get(WorkflowRunner).run( executionId = await Container.get(WorkflowRunner).run(
runData, runData,

View file

@ -5,7 +5,13 @@
import type { PushMessage, PushType } from '@n8n/api-types'; import type { PushMessage, PushType } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import { stringify } from 'flatted'; import { stringify } from 'flatted';
import { ErrorReporter, Logger, WorkflowExecute, isObjectLiteral } from 'n8n-core'; import {
ErrorReporter,
Logger,
InstanceSettings,
WorkflowExecute,
isObjectLiteral,
} from 'n8n-core';
import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow'; import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow';
import type { import type {
IDataObject, IDataObject,
@ -1076,8 +1082,7 @@ function getWorkflowHooksIntegrated(
} }
/** /**
* Returns WorkflowHooks instance for running integrated workflows * Returns WorkflowHooks instance for worker in scaling mode.
* (Workflows which get started inside of another workflow)
*/ */
export function getWorkflowHooksWorkerExecuter( export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
@ -1093,6 +1098,17 @@ export function getWorkflowHooksWorkerExecuter(
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
} }
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
const pushHooks = hookFunctionsPush();
for (const key of Object.keys(pushHooks)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
// eslint-disable-next-line prefer-spread
hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]);
}
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
} }

View file

@ -82,7 +82,7 @@ export class WorkflowRunner {
// in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled // in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415 // by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415
if (isQueueMode && executionMode !== 'manual') { if (isQueueMode) {
const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, { const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, {
includeData: false, includeData: false,
}); });
@ -153,9 +153,13 @@ export class WorkflowRunner {
this.activeExecutions.attachResponsePromise(executionId, responsePromise); this.activeExecutions.attachResponsePromise(executionId, responsePromise);
} }
if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { // @TODO: Reduce to true branch once feature is stable
// Do not run "manual" executions in bull because sending events to the const shouldEnqueue =
// frontend would not be possible process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
? this.executionsMode === 'queue'
: this.executionsMode === 'queue' && data.executionMode !== 'manual';
if (shouldEnqueue) {
await this.enqueueExecution(executionId, data, loadStaticData, realtime); await this.enqueueExecution(executionId, data, loadStaticData, realtime);
} else { } else {
await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId);
@ -349,6 +353,7 @@ export class WorkflowRunner {
const jobData: JobData = { const jobData: JobData = {
executionId, executionId,
loadStaticData: !!loadStaticData, loadStaticData: !!loadStaticData,
pushRef: data.pushRef,
}; };
if (!this.scalingService) { if (!this.scalingService) {

View file

@ -15,6 +15,7 @@ import type {
import { SubworkflowOperationError, Workflow } from 'n8n-workflow'; import { SubworkflowOperationError, Workflow } from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
import config from '@/config';
import type { Project } from '@/databases/entities/project'; import type { Project } from '@/databases/entities/project';
import type { User } from '@/databases/entities/user'; import type { User } from '@/databases/entities/user';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@ -146,6 +147,35 @@ export class WorkflowExecutionService {
triggerToStartFrom, triggerToStartFrom,
}; };
/**
* Historically, manual executions in scaling mode ran in the main process,
* so some execution details were never persisted in the database.
*
* Currently, manual executions in scaling mode are offloaded to workers,
* so we persist all details to give workers full access to them.
*/
if (
config.getEnv('executions.mode') === 'queue' &&
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
) {
data.executionData = {
startData: {
startNodes,
destinationNode,
},
resultData: {
pinData,
runData,
},
manualData: {
userId: data.userId,
partialExecutionVersion: data.partialExecutionVersion,
dirtyNodeNames,
triggerToStartFrom,
},
};
}
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];
if (pinnedTrigger && !hasRunData(pinnedTrigger)) { if (pinnedTrigger && !hasRunData(pinnedTrigger)) {

View file

@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor
import * as testDb from '@test-integration/test-db'; import * as testDb from '@test-integration/test-db';
describe('CollaborationService', () => { describe('CollaborationService', () => {
mockInstance(Push, new Push(mock(), mock())); mockInstance(Push, new Push(mock(), mock(), mock()));
let pushService: Push; let pushService: Push;
let collaborationService: CollaborationService; let collaborationService: CollaborationService;
let owner: User; let owner: User;

View file

@ -113,6 +113,10 @@ export class InstanceSettings {
return !this.isMultiMain; return !this.isMultiMain;
} }
get isWorker() {
return this.instanceType === 'worker';
}
get isLeader() { get isLeader() {
return this.instanceRole === 'leader'; return this.instanceRole === 'leader';
} }

View file

@ -36,6 +36,7 @@ export { default as N8nOption } from './N8nOption';
export { default as N8nPopover } from './N8nPopover'; export { default as N8nPopover } from './N8nPopover';
export { default as N8nPulse } from './N8nPulse'; export { default as N8nPulse } from './N8nPulse';
export { default as N8nRadioButtons } from './N8nRadioButtons'; export { default as N8nRadioButtons } from './N8nRadioButtons';
export { default as N8nRoute } from './N8nRoute';
export { default as N8nRecycleScroller } from './N8nRecycleScroller'; export { default as N8nRecycleScroller } from './N8nRecycleScroller';
export { default as N8nResizeWrapper } from './N8nResizeWrapper'; export { default as N8nResizeWrapper } from './N8nResizeWrapper';
export { default as N8nSelect } from './N8nSelect'; export { default as N8nSelect } from './N8nSelect';

View file

@ -1,18 +1,19 @@
<script setup lang="ts"> <script setup lang="ts">
import { useStorage } from '@/composables/useStorage'; import { useStorage } from '@/composables/useStorage';
import { saveAs } from 'file-saver'; import { saveAs } from 'file-saver';
import type { import {
IBinaryData, type IBinaryData,
IConnectedNode, type IConnectedNode,
IDataObject, type IDataObject,
INodeExecutionData, type INodeExecutionData,
INodeOutputConfiguration, type INodeOutputConfiguration,
IRunData, type IRunData,
IRunExecutionData, type IRunExecutionData,
ITaskMetadata, type ITaskMetadata,
NodeError, type NodeError,
NodeHint, type NodeHint,
Workflow, type Workflow,
TRIMMED_TASK_DATA_CONNECTIONS_KEY,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeConnectionType, NodeHelpers } from 'n8n-workflow'; import { NodeConnectionType, NodeHelpers } from 'n8n-workflow';
import { computed, defineAsyncComponent, onBeforeUnmount, onMounted, ref, toRef, watch } from 'vue'; import { computed, defineAsyncComponent, onBeforeUnmount, onMounted, ref, toRef, watch } from 'vue';
@ -64,6 +65,7 @@ import { isEqual, isObject } from 'lodash-es';
import { import {
N8nBlockUi, N8nBlockUi,
N8nButton, N8nButton,
N8nRoute,
N8nCallout, N8nCallout,
N8nIconButton, N8nIconButton,
N8nInfoTip, N8nInfoTip,
@ -275,6 +277,10 @@ const isArtificialRecoveredEventItem = computed(
() => rawInputData.value?.[0]?.json?.isArtificialRecoveredEventItem, () => rawInputData.value?.[0]?.json?.isArtificialRecoveredEventItem,
); );
const isTrimmedManualExecutionDataItem = computed(
() => rawInputData.value?.[0]?.json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY],
);
const subworkflowExecutionError = computed(() => { const subworkflowExecutionError = computed(() => {
if (!node.value) return null; if (!node.value) return null;
return { return {
@ -1245,6 +1251,10 @@ function onSearchClear() {
document.dispatchEvent(new KeyboardEvent('keyup', { key: '/' })); document.dispatchEvent(new KeyboardEvent('keyup', { key: '/' }));
} }
function onExecutionHistoryNavigate() {
ndvStore.setActiveNodeName(null);
}
function getExecutionLinkLabel(task: ITaskMetadata): string | undefined { function getExecutionLinkLabel(task: ITaskMetadata): string | undefined {
if (task.parentExecution) { if (task.parentExecution) {
return i18n.baseText('runData.openParentExecution', { return i18n.baseText('runData.openParentExecution', {
@ -1310,7 +1320,7 @@ defineExpose({ enterEditMode });
<slot name="header"></slot> <slot name="header"></slot>
<div <div
v-show="!hasRunError" v-show="!hasRunError && !isTrimmedManualExecutionDataItem"
:class="$style.displayModes" :class="$style.displayModes"
data-test-id="run-data-pane-header" data-test-id="run-data-pane-header"
@click.stop @click.stop
@ -1591,6 +1601,20 @@ defineExpose({ enterEditMode });
</N8nText> </N8nText>
</div> </div>
<div v-else-if="isTrimmedManualExecutionDataItem" :class="$style.center">
<N8nText bold color="text-dark" size="large">
{{ i18n.baseText('runData.trimmedData.title') }}
</N8nText>
<N8nText>
{{ i18n.baseText('runData.trimmedData.message') }}
</N8nText>
<N8nButton size="small" @click="onExecutionHistoryNavigate">
<N8nRoute :to="`/workflow/${workflowsStore.workflowId}/executions`">
{{ i18n.baseText('runData.trimmedData.button') }}
</N8nRoute>
</N8nButton>
</div>
<div v-else-if="hasNodeRun && isArtificialRecoveredEventItem" :class="$style.center"> <div v-else-if="hasNodeRun && isArtificialRecoveredEventItem" :class="$style.center">
<slot name="recovered-artificial-output-data"></slot> <slot name="recovered-artificial-output-data"></slot>
</div> </div>

View file

@ -1662,6 +1662,9 @@
"runData.aiContentBlock.tokens": "{count} Tokens", "runData.aiContentBlock.tokens": "{count} Tokens",
"runData.aiContentBlock.tokens.prompt": "Prompt:", "runData.aiContentBlock.tokens.prompt": "Prompt:",
"runData.aiContentBlock.tokens.completion": "Completion:", "runData.aiContentBlock.tokens.completion": "Completion:",
"runData.trimmedData.title": "Data too large to display",
"runData.trimmedData.message": "The data is too large to be shown here. View the full details in 'Executions' tab.",
"runData.trimmedData.button": "See execution",
"saveButton.save": "@:_reusableBaseText.save", "saveButton.save": "@:_reusableBaseText.save",
"saveButton.saved": "Saved", "saveButton.saved": "Saved",
"saveWorkflowButton.hint": "Save workflow", "saveWorkflowButton.hint": "Save workflow",

View file

@ -88,3 +88,10 @@ export const LANGCHAIN_CUSTOM_TOOLS = [
export const SEND_AND_WAIT_OPERATION = 'sendAndWait'; export const SEND_AND_WAIT_OPERATION = 'sendAndWait';
export const AI_TRANSFORM_CODE_GENERATED_FOR_PROMPT = 'codeGeneratedForPrompt'; export const AI_TRANSFORM_CODE_GENERATED_FOR_PROMPT = 'codeGeneratedForPrompt';
export const AI_TRANSFORM_JS_CODE = 'jsCode'; export const AI_TRANSFORM_JS_CODE = 'jsCode';
/**
* Key for an item standing in for a manual execution data item too large to be
* sent live via pubsub. See {@link TRIMMED_TASK_DATA_CONNECTIONS} in constants
* in `cli` package.
*/
export const TRIMMED_TASK_DATA_CONNECTIONS_KEY = '__isTrimmedManualExecutionDataItem';

View file

@ -2118,6 +2118,7 @@ export interface IRun {
// The RunData, ExecuteData and WaitForExecution contain often the same data. // The RunData, ExecuteData and WaitForExecution contain often the same data.
export interface IRunExecutionData { export interface IRunExecutionData {
startData?: { startData?: {
startNodes?: StartNodeData[];
destinationNode?: string; destinationNode?: string;
runNodeFilter?: string[]; runNodeFilter?: string[];
}; };
@ -2141,6 +2142,15 @@ export interface IRunExecutionData {
parentExecution?: RelatedExecution; parentExecution?: RelatedExecution;
waitTill?: Date; waitTill?: Date;
pushRef?: string; pushRef?: string;
/** Whether this execution was started by a test webhook call. */
isTestWebhook?: boolean;
/** Data needed for a worker to run a manual execution. */
manualData?: Pick<
IWorkflowExecutionDataProcess,
'partialExecutionVersion' | 'dirtyNodeNames' | 'triggerToStartFrom' | 'userId'
>;
} }
export interface IRunData { export interface IRunData {