From 9432aa0b00e74faf4651ac673f18e16b7e56e145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 3 Jan 2025 10:43:05 +0100 Subject: [PATCH] feat(core): Offload manual executions to workers (#11284) --- .../@n8n/config/src/configs/logging.config.ts | 4 + packages/cli/src/constants.ts | 19 +++- .../__tests__/execution.service.test.ts | 6 +- .../cli/src/executions/execution.service.ts | 5 -- packages/cli/src/push/__tests__/index.test.ts | 4 +- packages/cli/src/push/index.ts | 90 ++++++++++++++++--- .../__tests__/job-processor.service.test.ts | 1 + .../scaling/__tests__/pubsub-handler.test.ts | 9 +- packages/cli/src/scaling/job-processor.ts | 46 +++++++++- .../cli/src/scaling/pubsub/pubsub-handler.ts | 4 +- packages/cli/src/scaling/scaling.types.ts | 1 + packages/cli/src/webhooks/test-webhooks.ts | 6 +- packages/cli/src/webhooks/webhook-helpers.ts | 11 +++ .../src/workflow-execute-additional-data.ts | 22 ++++- packages/cli/src/workflow-runner.ts | 13 ++- .../workflows/workflow-execution.service.ts | 30 +++++++ .../collaboration.service.test.ts | 2 +- packages/core/src/InstanceSettings.ts | 4 + .../design-system/src/components/index.ts | 1 + packages/editor-ui/src/components/RunData.vue | 50 ++++++++--- .../src/plugins/i18n/locales/en.json | 3 + packages/workflow/src/Constants.ts | 7 ++ packages/workflow/src/Interfaces.ts | 10 +++ 23 files changed, 287 insertions(+), 61 deletions(-) diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index ef4661c115..733278c3e3 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -9,6 +9,7 @@ export const LOG_SCOPES = [ 'multi-main-setup', 'pruning', 'pubsub', + 'push', 'redis', 'scaling', 'waiting-executions', @@ -70,10 +71,13 @@ export class LoggingConfig { * - `external-secrets` * - `license` * - `multi-main-setup` + * - `pruning` * - `pubsub` + * - `push` * - `redis` * - `scaling` * - `waiting-executions` + * - `task-runner` * * @example * `N8N_LOG_SCOPES=license` diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 6d742ecc8c..4bd1890c4e 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -1,6 +1,7 @@ import { readFileSync } from 'fs'; import type { n8n } from 'n8n-core'; -import { jsonParse } from 'n8n-workflow'; +import type { ITaskDataConnections } from 'n8n-workflow'; +import { jsonParse, TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow'; import { resolve, join, dirname } from 'path'; const { NODE_ENV, E2E_TESTS } = process.env; @@ -161,6 +162,22 @@ export const ARTIFICIAL_TASK_DATA = { ], }; +/** + * Connections for an item standing in for a manual execution data item too + * large to be sent live via pubsub. This signals to the client to direct the + * user to the execution history. + */ +export const TRIMMED_TASK_DATA_CONNECTIONS: ITaskDataConnections = { + main: [ + [ + { + json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true }, + pairedItem: undefined, + }, + ], + ], +}; + /** Lowest priority, meaning shut down happens after other groups */ export const LOWEST_SHUTDOWN_PRIORITY = 0; export const DEFAULT_SHUTDOWN_PRIORITY = 100; diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index 101773a0f7..e425c1e588 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -183,7 +183,7 @@ describe('ExecutionService', () => { describe('scaling mode', () => { describe('manual execution', () => { - it('should delegate to regular mode in scaling mode', async () => { + it('should stop a `running` execution in scaling mode', async () => { /** * Arrange */ @@ -197,6 +197,8 @@ describe('ExecutionService', () => { concurrencyControl.has.mockReturnValue(false); activeExecutions.has.mockReturnValue(true); waitTracker.has.mockReturnValue(false); + const job = mock({ data: { executionId: '123' } }); + scalingService.findJobsByStatus.mockResolvedValue([job]); executionRepository.stopDuringRun.mockResolvedValue(mock()); // @ts-expect-error Private method const stopInRegularModeSpy = jest.spyOn(executionService, 'stopInRegularMode'); @@ -209,7 +211,7 @@ describe('ExecutionService', () => { /** * Assert */ - expect(stopInRegularModeSpy).toHaveBeenCalledWith(execution); + expect(stopInRegularModeSpy).not.toHaveBeenCalled(); expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id); expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 86338da924..9eba37773f 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -464,11 +464,6 @@ export class ExecutionService { } private async stopInScalingMode(execution: IExecutionResponse) { - if (execution.mode === 'manual') { - // manual executions in scaling mode are processed by main - return await this.stopInRegularMode(execution); - } - if (this.activeExecutions.has(execution.id)) { this.activeExecutions.stopExecution(execution.id); } diff --git a/packages/cli/src/push/__tests__/index.test.ts b/packages/cli/src/push/__tests__/index.test.ts index 03457926b1..ae46487a38 100644 --- a/packages/cli/src/push/__tests__/index.test.ts +++ b/packages/cli/src/push/__tests__/index.test.ts @@ -20,7 +20,7 @@ describe('Push', () => { test('should validate pushRef on requests for websocket backend', () => { config.set('push.backend', 'websocket'); - const push = new Push(mock(), mock()); + const push = new Push(mock(), mock(), mock()); const ws = mock(); const request = mock({ user, ws }); request.query = { pushRef: '' }; @@ -33,7 +33,7 @@ describe('Push', () => { test('should validate pushRef on requests for SSE backend', () => { config.set('push.backend', 'sse'); - const push = new Push(mock(), mock()); + const push = new Push(mock(), mock(), mock()); const request = mock({ user, ws: undefined }); request.query = { pushRef: '' }; expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError); diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 7325981d0b..20d0c7a9c5 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -2,7 +2,8 @@ import type { PushMessage } from '@n8n/api-types'; import type { Application } from 'express'; import { ServerResponse } from 'http'; import type { Server } from 'http'; -import { InstanceSettings } from 'n8n-core'; +import { InstanceSettings, Logger } from 'n8n-core'; +import { deepCopy } from 'n8n-workflow'; import type { Socket } from 'net'; import { Container, Service } from 'typedi'; import { parse as parseUrl } from 'url'; @@ -10,6 +11,7 @@ import { Server as WSServer } from 'ws'; import { AuthService } from '@/auth/auth.service'; import config from '@/config'; +import { TRIMMED_TASK_DATA_CONNECTIONS } from '@/constants'; import type { User } from '@/databases/entities/user'; import { OnShutdown } from '@/decorators/on-shutdown'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; @@ -27,6 +29,12 @@ type PushEvents = { const useWebSockets = config.getEnv('push.backend') === 'websocket'; +/** + * Max allowed size of a push message in bytes. Events going through the pubsub + * channel are trimmed if exceeding this size. + */ +const MAX_PAYLOAD_SIZE_BYTES = 5 * 1024 * 1024; // 5 MiB + /** * Push service for uni- or bi-directional communication with frontend clients. * Uses either server-sent events (SSE, unidirectional from backend --> frontend) @@ -43,8 +51,10 @@ export class Push extends TypedEmitter { constructor( private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, + private readonly logger: Logger, ) { super(); + this.logger = this.logger.scoped('push'); if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); } @@ -85,18 +95,14 @@ export class Push extends TypedEmitter { this.backend.sendToAll(pushMsg); } + /** Returns whether a given push ref is registered. */ + hasPushRef(pushRef: string) { + return this.backend.hasPushRef(pushRef); + } + send(pushMsg: PushMessage, pushRef: string) { - /** - * Multi-main setup: In a manual webhook execution, the main process that - * handles a webhook might not be the same as the main process that created - * the webhook. If so, the handler process commands the creator process to - * relay the former's execution lifecycle events to the creator's frontend. - */ - if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) { - void this.publisher.publishCommand({ - command: 'relay-execution-lifecycle-event', - payload: { ...pushMsg, pushRef }, - }); + if (this.shouldRelayViaPubSub(pushRef)) { + this.relayViaPubSub(pushMsg, pushRef); return; } @@ -111,6 +117,66 @@ export class Push extends TypedEmitter { onShutdown() { this.backend.closeAllConnections(); } + + /** + * Whether to relay a push message via pubsub channel to other instances, + * instead of pushing the message directly to the frontend. + * + * This is needed in two scenarios: + * + * In scaling mode, in single- or multi-main setup, in a manual execution, a + * worker has no connection to a frontend and so relays to all mains lifecycle + * events for manual executions. Only the main who holds the session for the + * execution will push to the frontend who commissioned the execution. + * + * In scaling mode, in multi-main setup, in a manual webhook execution, if + * the main who handles a webhook is not the main who created the webhook, + * the handler main relays execution lifecycle events to all mains. Only + * the main who holds the session for the execution will push events to + * the frontend who commissioned the execution. + */ + private shouldRelayViaPubSub(pushRef: string) { + const { isWorker, isMultiMain } = this.instanceSettings; + + return isWorker || (isMultiMain && !this.hasPushRef(pushRef)); + } + + /** + * Relay a push message via the `n8n.commands` pubsub channel, + * reducing the payload size if too large. + * + * See {@link shouldRelayViaPubSub} for more details. + */ + private relayViaPubSub(pushMsg: PushMessage, pushRef: string) { + const eventSizeBytes = new TextEncoder().encode(JSON.stringify(pushMsg.data)).length; + + if (eventSizeBytes <= MAX_PAYLOAD_SIZE_BYTES) { + void this.publisher.publishCommand({ + command: 'relay-execution-lifecycle-event', + payload: { ...pushMsg, pushRef }, + }); + return; + } + + // too large for pubsub channel, trim it + + const pushMsgCopy = deepCopy(pushMsg); + + const toMb = (bytes: number) => (bytes / (1024 * 1024)).toFixed(0); + const eventMb = toMb(eventSizeBytes); + const maxMb = toMb(MAX_PAYLOAD_SIZE_BYTES); + const { type } = pushMsgCopy; + + this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`); + + if (type === 'nodeExecuteAfter') pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS; + else if (type === 'executionFinished') pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB + + void this.publisher.publishCommand({ + command: 'relay-execution-lifecycle-event', + payload: { ...pushMsgCopy, pushRef }, + }); + } } export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index 73264e6382..897986c915 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -19,6 +19,7 @@ describe('JobProcessor', () => { mock(), mock(), mock(), + mock(), ); const result = await jobProcessor.processJob(mock()); diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index b92c18c885..bc15911913 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -11,7 +11,6 @@ import type { ExternalSecretsManager } from '@/external-secrets.ee/external-secr import type { IWorkflowDb } from '@/interfaces'; import type { License } from '@/license'; import type { Push } from '@/push'; -import type { WebSocketPush } from '@/push/websocket.push'; import type { CommunityPackagesService } from '@/services/community-packages.service'; import type { TestWebhooks } from '@/webhooks/test-webhooks'; @@ -829,9 +828,7 @@ describe('PubSubHandler', () => { flattedRunData: '[]', }; - push.getBackend.mockReturnValue( - mock({ hasPushRef: jest.fn().mockReturnValue(true) }), - ); + push.hasPushRef.mockReturnValue(true); eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef }); @@ -858,9 +855,7 @@ describe('PubSubHandler', () => { const workflowEntity = mock({ id: 'test-workflow-id' }); const pushRef = 'test-push-ref'; - push.getBackend.mockReturnValue( - mock({ hasPushRef: jest.fn().mockReturnValue(true) }), - ); + push.hasPushRef.mockReturnValue(true); testWebhooks.toWorkflow.mockReturnValue(mock({ id: 'test-workflow-id' })); eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef }); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 5e760e40c1..16a5efd3b3 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -1,6 +1,11 @@ import type { RunningJobSummary } from '@n8n/api-types'; -import { ErrorReporter, InstanceSettings, WorkflowExecute, Logger } from 'n8n-core'; -import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; +import { InstanceSettings, WorkflowExecute, ErrorReporter, Logger } from 'n8n-core'; +import type { + ExecutionStatus, + IExecuteResponsePromiseData, + IRun, + IWorkflowExecutionDataProcess, +} from 'n8n-workflow'; import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; import { Service } from 'typedi'; @@ -8,6 +13,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -34,6 +40,7 @@ export class JobProcessor { private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, private readonly instanceSettings: InstanceSettings, + private readonly manualExecutionService: ManualExecutionService, ) { this.logger = this.logger.scoped('scaling'); } @@ -115,13 +122,20 @@ export class JobProcessor { executionTimeoutTimestamp, ); + const { pushRef } = job.data; + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( execution.mode, job.data.executionId, execution.workflowData, - { retryOf: execution.retryOf as string }, + { retryOf: execution.retryOf as string, pushRef }, ); + if (pushRef) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef }); + } + additionalData.hooks.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { const msg: RespondToWebhookMessage = { @@ -146,7 +160,31 @@ export class JobProcessor { let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; - if (execution.data !== undefined) { + + const { startData, resultData, manualData, isTestWebhook } = execution.data; + + if (execution.mode === 'manual' && !isTestWebhook) { + const data: IWorkflowExecutionDataProcess = { + executionMode: execution.mode, + workflowData: execution.workflowData, + destinationNode: startData?.destinationNode, + startNodes: startData?.startNodes, + runData: resultData.runData, + pinData: resultData.pinData, + partialExecutionVersion: manualData?.partialExecutionVersion, + dirtyNodeNames: manualData?.dirtyNodeNames, + triggerToStartFrom: manualData?.triggerToStartFrom, + userId: manualData?.userId, + }; + + workflowRun = this.manualExecutionService.runManually( + data, + workflow, + additionalData, + executionId, + resultData.pinData, + ); + } else if (execution.data !== undefined) { workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data); workflowRun = workflowExecute.processRunExecutionData(workflow); } else { diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 10a763f8f1..9059027704 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -160,12 +160,12 @@ export class PubSubHandler { 'display-workflow-activation-error': async ({ workflowId, errorMessage }) => this.push.broadcast({ type: 'workflowFailedToActivate', data: { workflowId, errorMessage } }), 'relay-execution-lifecycle-event': async ({ pushRef, ...pushMsg }) => { - if (!this.push.getBackend().hasPushRef(pushRef)) return; + if (!this.push.hasPushRef(pushRef)) return; this.push.send(pushMsg, pushRef); }, 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { - if (!this.push.getBackend().hasPushRef(pushRef)) return; + if (!this.push.hasPushRef(pushRef)) return; this.testWebhooks.clearTimeout(webhookKey); diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index ae7e790a16..3c69294172 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -12,6 +12,7 @@ export type JobId = Job['id']; export type JobData = { executionId: string; loadStaticData: boolean; + pushRef?: string; }; export type JobResult = { diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index b90b1db59d..ff5d47fd50 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -154,11 +154,7 @@ export class TestWebhooks implements IWebhookManager { * the webhook. If so, after the test webhook has been successfully executed, * the handler process commands the creator process to clear its test webhooks. */ - if ( - this.instanceSettings.isMultiMain && - pushRef && - !this.push.getBackend().hasPushRef(pushRef) - ) { + if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) { void this.publisher.publishCommand({ command: 'clear-test-webhooks', payload: { webhookKey: key, workflowEntity, pushRef }, diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 51b81fad83..0566e72a10 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -37,10 +37,12 @@ import { FORM_NODE_TYPE, NodeOperationError, } from 'n8n-workflow'; +import assert from 'node:assert'; import { finished } from 'stream/promises'; import { Container } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; +import config from '@/config'; import type { Project } from '@/databases/entities/project'; import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; @@ -531,6 +533,15 @@ export async function executeWebhook( }); } + if ( + config.getEnv('executions.mode') === 'queue' && + process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' && + runData.executionMode === 'manual' + ) { + assert(runData.executionData); + runData.executionData.isTestWebhook = true; + } + // Start now to run the workflow executionId = await Container.get(WorkflowRunner).run( runData, diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index de9bc85c14..ac2dab4d88 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -5,7 +5,13 @@ import type { PushMessage, PushType } from '@n8n/api-types'; import { GlobalConfig } from '@n8n/config'; import { stringify } from 'flatted'; -import { ErrorReporter, Logger, WorkflowExecute, isObjectLiteral } from 'n8n-core'; +import { + ErrorReporter, + Logger, + InstanceSettings, + WorkflowExecute, + isObjectLiteral, +} from 'n8n-core'; import { ApplicationError, NodeOperationError, Workflow, WorkflowHooks } from 'n8n-workflow'; import type { IDataObject, @@ -1076,8 +1082,7 @@ function getWorkflowHooksIntegrated( } /** - * Returns WorkflowHooks instance for running integrated workflows - * (Workflows which get started inside of another workflow) + * Returns WorkflowHooks instance for worker in scaling mode. */ export function getWorkflowHooksWorkerExecuter( mode: WorkflowExecuteMode, @@ -1093,6 +1098,17 @@ export function getWorkflowHooksWorkerExecuter( hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); } + if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { + const pushHooks = hookFunctionsPush(); + for (const key of Object.keys(pushHooks)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + // eslint-disable-next-line prefer-spread + hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]); + } + } + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); } diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 93917d7987..1b31feb7c7 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -82,7 +82,7 @@ export class WorkflowRunner { // in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled // by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415 - if (isQueueMode && executionMode !== 'manual') { + if (isQueueMode) { const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, { includeData: false, }); @@ -153,9 +153,13 @@ export class WorkflowRunner { this.activeExecutions.attachResponsePromise(executionId, responsePromise); } - if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { - // Do not run "manual" executions in bull because sending events to the - // frontend would not be possible + // @TODO: Reduce to true branch once feature is stable + const shouldEnqueue = + process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' + ? this.executionsMode === 'queue' + : this.executionsMode === 'queue' && data.executionMode !== 'manual'; + + if (shouldEnqueue) { await this.enqueueExecution(executionId, data, loadStaticData, realtime); } else { await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); @@ -349,6 +353,7 @@ export class WorkflowRunner { const jobData: JobData = { executionId, loadStaticData: !!loadStaticData, + pushRef: data.pushRef, }; if (!this.scalingService) { diff --git a/packages/cli/src/workflows/workflow-execution.service.ts b/packages/cli/src/workflows/workflow-execution.service.ts index 842ddfe726..10d882121c 100644 --- a/packages/cli/src/workflows/workflow-execution.service.ts +++ b/packages/cli/src/workflows/workflow-execution.service.ts @@ -15,6 +15,7 @@ import type { import { SubworkflowOperationError, Workflow } from 'n8n-workflow'; import { Service } from 'typedi'; +import config from '@/config'; import type { Project } from '@/databases/entities/project'; import type { User } from '@/databases/entities/user'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; @@ -146,6 +147,35 @@ export class WorkflowExecutionService { triggerToStartFrom, }; + /** + * Historically, manual executions in scaling mode ran in the main process, + * so some execution details were never persisted in the database. + * + * Currently, manual executions in scaling mode are offloaded to workers, + * so we persist all details to give workers full access to them. + */ + if ( + config.getEnv('executions.mode') === 'queue' && + process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' + ) { + data.executionData = { + startData: { + startNodes, + destinationNode, + }, + resultData: { + pinData, + runData, + }, + manualData: { + userId: data.userId, + partialExecutionVersion: data.partialExecutionVersion, + dirtyNodeNames, + triggerToStartFrom, + }, + }; + } + const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; if (pinnedTrigger && !hasRunData(pinnedTrigger)) { diff --git a/packages/cli/test/integration/collaboration/collaboration.service.test.ts b/packages/cli/test/integration/collaboration/collaboration.service.test.ts index ab7a8314b3..e6951644fd 100644 --- a/packages/cli/test/integration/collaboration/collaboration.service.test.ts +++ b/packages/cli/test/integration/collaboration/collaboration.service.test.ts @@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor import * as testDb from '@test-integration/test-db'; describe('CollaborationService', () => { - mockInstance(Push, new Push(mock(), mock())); + mockInstance(Push, new Push(mock(), mock(), mock())); let pushService: Push; let collaborationService: CollaborationService; let owner: User; diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index 814f75ef94..ebdfb7fd63 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -113,6 +113,10 @@ export class InstanceSettings { return !this.isMultiMain; } + get isWorker() { + return this.instanceType === 'worker'; + } + get isLeader() { return this.instanceRole === 'leader'; } diff --git a/packages/design-system/src/components/index.ts b/packages/design-system/src/components/index.ts index 38484308ef..ce4360fe46 100644 --- a/packages/design-system/src/components/index.ts +++ b/packages/design-system/src/components/index.ts @@ -36,6 +36,7 @@ export { default as N8nOption } from './N8nOption'; export { default as N8nPopover } from './N8nPopover'; export { default as N8nPulse } from './N8nPulse'; export { default as N8nRadioButtons } from './N8nRadioButtons'; +export { default as N8nRoute } from './N8nRoute'; export { default as N8nRecycleScroller } from './N8nRecycleScroller'; export { default as N8nResizeWrapper } from './N8nResizeWrapper'; export { default as N8nSelect } from './N8nSelect'; diff --git a/packages/editor-ui/src/components/RunData.vue b/packages/editor-ui/src/components/RunData.vue index 5c5fa0dbac..d852d3ee45 100644 --- a/packages/editor-ui/src/components/RunData.vue +++ b/packages/editor-ui/src/components/RunData.vue @@ -1,18 +1,19 @@