From 124ac26e43bef5e00f19abb356dd483ebc32bd21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Sun, 17 Nov 2024 15:21:28 +0100 Subject: [PATCH 1/8] feat(core): Implement task timeouts and heartbeats for runners (no-changelog) (#11690) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- .../@n8n/config/src/configs/runners.config.ts | 8 ++ packages/@n8n/config/test/config.test.ts | 2 + .../src/config/base-runner-config.ts | 17 ++- .../task-runner/src/healthcheck-server.ts | 38 +++++ packages/@n8n/task-runner/src/start.ts | 11 ++ .../src/runners/__tests__/task-broker.test.ts | 135 +++++++++++++++++- .../__tests__/task-runner-process.test.ts | 28 +++- .../__tests__/task-runner-ws-server.test.ts | 45 ++++++ ...default-task-runner-disconnect-analyzer.ts | 19 ++- .../task-runner-failed-heartbeat.error.ts | 32 +++++ .../errors/task-runner-timeout.error.ts | 34 +++++ ...nternal-task-runner-disconnect-analyzer.ts | 8 +- .../src/runners/runner-lifecycle-events.ts | 11 ++ packages/cli/src/runners/runner-types.ts | 10 +- packages/cli/src/runners/runner-ws-server.ts | 56 +++++++- .../cli/src/runners/task-broker.service.ts | 53 ++++++- .../cli/src/runners/task-runner-process.ts | 35 ++++- .../cli/src/runners/task-runner-server.ts | 4 +- 18 files changed, 511 insertions(+), 35 deletions(-) create mode 100644 packages/@n8n/task-runner/src/healthcheck-server.ts create mode 100644 packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts create mode 100644 packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts create mode 100644 packages/cli/src/runners/errors/task-runner-timeout.error.ts create mode 100644 packages/cli/src/runners/runner-lifecycle-events.ts diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 5a6969ba6f..b7d125cf53 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -53,4 +53,12 @@ export class TaskRunnersConfig { /** Should the output of deduplication be asserted for correctness */ @Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT') assertDeduplicationOutput: boolean = false; + + /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */ + @Env('N8N_RUNNERS_TASK_TIMEOUT') + taskTimeout: number = 60; + + /** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */ + @Env('N8N_RUNNERS_HEARTBEAT_INTERVAL') + heartbeatInterval: number = 30; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index eeb98269de..e8f6e549a9 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -234,6 +234,8 @@ describe('GlobalConfig', () => { maxOldSpaceSize: '', maxConcurrency: 5, assertDeduplicationOutput: false, + taskTimeout: 60, + heartbeatInterval: 30, }, sentry: { backendDsn: '', diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts index 01e00c177a..e7949d9704 100644 --- a/packages/@n8n/task-runner/src/config/base-runner-config.ts +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -1,4 +1,16 @@ -import { Config, Env } from '@n8n/config'; +import { Config, Env, Nested } from '@n8n/config'; + +@Config +class HealthcheckServerConfig { + @Env('N8N_RUNNERS_SERVER_ENABLED') + enabled: boolean = false; + + @Env('N8N_RUNNERS_SERVER_HOST') + host: string = '127.0.0.1'; + + @Env('N8N_RUNNERS_SERVER_PORT') + port: number = 5680; +} @Config export class BaseRunnerConfig { @@ -13,4 +25,7 @@ export class BaseRunnerConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; + + @Nested + healthcheckServer!: HealthcheckServerConfig; } diff --git a/packages/@n8n/task-runner/src/healthcheck-server.ts b/packages/@n8n/task-runner/src/healthcheck-server.ts new file mode 100644 index 0000000000..c6d8965a86 --- /dev/null +++ b/packages/@n8n/task-runner/src/healthcheck-server.ts @@ -0,0 +1,38 @@ +import { ApplicationError } from 'n8n-workflow'; +import { createServer } from 'node:http'; + +export class HealthcheckServer { + private server = createServer((_, res) => { + res.writeHead(200); + res.end('OK'); + }); + + async start(host: string, port: number) { + return await new Promise((resolve, reject) => { + const portInUseErrorHandler = (error: NodeJS.ErrnoException) => { + if (error.code === 'EADDRINUSE') { + reject(new ApplicationError(`Port ${port} is already in use`)); + } else { + reject(error); + } + }; + + this.server.on('error', portInUseErrorHandler); + + this.server.listen(port, host, () => { + this.server.removeListener('error', portInUseErrorHandler); + console.log(`Healthcheck server listening on ${host}, port ${port}`); + resolve(); + }); + }); + } + + async stop() { + return await new Promise((resolve, reject) => { + this.server.close((error) => { + if (error) reject(error); + else resolve(); + }); + }); + } +} diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index c6e8cb314c..e09ddf3332 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -3,8 +3,10 @@ import Container from 'typedi'; import { MainConfig } from './config/main-config'; import type { ErrorReporter } from './error-reporter'; +import type { HealthcheckServer } from './healthcheck-server'; import { JsTaskRunner } from './js-task-runner/js-task-runner'; +let healthcheckServer: HealthcheckServer | undefined; let runner: JsTaskRunner | undefined; let isShuttingDown = false; let errorReporter: ErrorReporter | undefined; @@ -22,6 +24,7 @@ function createSignalHandler(signal: string) { if (runner) { await runner.stop(); runner = undefined; + void healthcheckServer?.stop(); } if (errorReporter) { @@ -49,6 +52,14 @@ void (async function start() { runner = new JsTaskRunner(config); + const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer; + + if (enabled) { + const { HealthcheckServer } = await import('./healthcheck-server'); + healthcheckServer = new HealthcheckServer(); + await healthcheckServer.start(host, port); + } + process.on('SIGINT', createSignalHandler('SIGINT')); process.on('SIGTERM', createSignalHandler('SIGTERM')); })().catch((e) => { diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 614d04c3b5..4cbc4ebfc0 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -1,8 +1,12 @@ +import type { TaskRunnersConfig } from '@n8n/config'; import type { RunnerMessage, TaskResultData } from '@n8n/task-runner'; import { mock } from 'jest-mock-extended'; -import type { INodeTypeBaseDescription } from 'n8n-workflow'; +import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow'; + +import { Time } from '@/constants'; import { TaskRejectError } from '../errors'; +import type { RunnerLifecycleEvents } from '../runner-lifecycle-events'; import { TaskBroker } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; @@ -12,7 +16,7 @@ describe('TaskBroker', () => { let taskBroker: TaskBroker; beforeEach(() => { - taskBroker = new TaskBroker(mock()); + taskBroker = new TaskBroker(mock(), mock(), mock()); jest.restoreAllMocks(); }); @@ -618,4 +622,131 @@ describe('TaskBroker', () => { }); }); }); + + describe('task timeouts', () => { + let taskBroker: TaskBroker; + let config: TaskRunnersConfig; + let runnerLifecycleEvents = mock(); + + beforeAll(() => { + jest.useFakeTimers(); + config = mock({ taskTimeout: 30 }); + taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('on sending task, we should set up task timeout', async () => { + jest.spyOn(global, 'setTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const runner = mock({ id: runnerId }); + const runnerMessageCallback = jest.fn(); + + taskBroker.registerRunner(runner, runnerMessageCallback); + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId: 'requester1', taskType: 'test' }, + }); + + await taskBroker.sendTaskSettings(taskId, {}); + + expect(setTimeout).toHaveBeenCalledWith( + expect.any(Function), + config.taskTimeout * Time.seconds.toMilliseconds, + ); + }); + + it('on task completion, we should clear timeout', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const requesterCallback = jest.fn(); + + taskBroker.registerRequester(requesterId, requesterCallback); + taskBroker.setTasks({ + [taskId]: { + id: taskId, + runnerId, + requesterId, + taskType: 'test', + timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds), + }, + }); + + await taskBroker.taskDoneHandler(taskId, { result: [] }); + + expect(clearTimeout).toHaveBeenCalled(); + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + + it('on task error, we should clear timeout', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const requesterCallback = jest.fn(); + + taskBroker.registerRequester(requesterId, requesterCallback); + taskBroker.setTasks({ + [taskId]: { + id: taskId, + runnerId, + requesterId, + taskType: 'test', + timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds), + }, + }); + + await taskBroker.taskErrorHandler(taskId, new Error('Test error')); + + expect(clearTimeout).toHaveBeenCalled(); + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + + it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const runner = mock({ id: runnerId }); + const runnerCallback = jest.fn(); + const requesterCallback = jest.fn(); + + taskBroker.registerRunner(runner, runnerCallback); + taskBroker.registerRequester(requesterId, requesterCallback); + + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' }, + }); + + await taskBroker.sendTaskSettings(taskId, {}); + + jest.runAllTimers(); + + await Promise.resolve(); + + expect(runnerLifecycleEvents.emit).toHaveBeenCalledWith('runner:timed-out-during-task'); + + await Promise.resolve(); + + expect(clearTimeout).toHaveBeenCalled(); + + expect(requesterCallback).toHaveBeenCalledWith({ + type: 'broker:taskerror', + taskId, + error: new ApplicationError(`Task execution timed out after ${config.taskTimeout} seconds`), + }); + + await Promise.resolve(); + + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + }); }); diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index 92e8483d03..9eeb8d69fc 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -7,6 +7,8 @@ import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.serv import { TaskRunnerProcess } from '@/runners/task-runner-process'; import { mockInstance } from '@test/mocking'; +import type { RunnerLifecycleEvents } from '../runner-lifecycle-events'; + const spawnMock = jest.fn(() => mock({ stdout: { @@ -25,7 +27,7 @@ describe('TaskRunnerProcess', () => { runnerConfig.enabled = true; runnerConfig.mode = 'internal_childprocess'; const authService = mock(); - let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); + let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); afterEach(async () => { spawnMock.mockClear(); @@ -35,15 +37,35 @@ describe('TaskRunnerProcess', () => { it('should throw if runner mode is external', () => { runnerConfig.mode = 'external'; - expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow(); + expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow(); runnerConfig.mode = 'internal_childprocess'; }); + + it('should register listener for `runner:failed-heartbeat-check` event', () => { + const runnerLifecycleEvents = mock(); + new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + + expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( + 'runner:failed-heartbeat-check', + expect.any(Function), + ); + }); + + it('should register listener for `runner:timed-out-during-task` event', () => { + const runnerLifecycleEvents = mock(); + new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + + expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( + 'runner:timed-out-during-task', + expect.any(Function), + ); + }); }); describe('start', () => { beforeEach(() => { - taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); + taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); }); test.each([ diff --git a/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts new file mode 100644 index 0000000000..223cdbdc54 --- /dev/null +++ b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts @@ -0,0 +1,45 @@ +import type { TaskRunnersConfig } from '@n8n/config'; +import { mock } from 'jest-mock-extended'; + +import { Time } from '@/constants'; +import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; + +describe('TaskRunnerWsServer', () => { + describe('heartbeat timer', () => { + it('should set up heartbeat timer on server start', async () => { + const setIntervalSpy = jest.spyOn(global, 'setInterval'); + + const server = new TaskRunnerWsServer( + mock(), + mock(), + mock(), + mock({ path: '/runners', heartbeatInterval: 30 }), + mock(), + ); + + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + 30 * Time.seconds.toMilliseconds, + ); + + await server.shutdown(); + }); + + it('should clear heartbeat timer on server stop', async () => { + jest.spyOn(global, 'setInterval'); + const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); + + const server = new TaskRunnerWsServer( + mock(), + mock(), + mock(), + mock({ path: '/runners', heartbeatInterval: 30 }), + mock(), + ); + + await server.shutdown(); + + expect(clearIntervalSpy).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts index e101c65e28..d61179372b 100644 --- a/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts @@ -1,8 +1,10 @@ import { Service } from 'typedi'; +import config from '@/config'; + import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; -import type { DisconnectAnalyzer } from './runner-types'; -import type { TaskRunner } from './task-broker.service'; +import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error'; +import type { DisconnectAnalyzer, DisconnectErrorOptions } from './runner-types'; /** * Analyzes the disconnect reason of a task runner to provide a more @@ -10,7 +12,16 @@ import type { TaskRunner } from './task-broker.service'; */ @Service() export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer { - async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { - return new TaskRunnerDisconnectedError(runnerId); + async toDisconnectError(opts: DisconnectErrorOptions): Promise { + const { reason, heartbeatInterval } = opts; + + if (reason === 'failed-heartbeat-check' && heartbeatInterval) { + return new TaskRunnerFailedHeartbeatError( + heartbeatInterval, + config.get('deployment.type') !== 'cloud', + ); + } + + return new TaskRunnerDisconnectedError(opts.runnerId ?? 'Unknown runner ID'); } } diff --git a/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts b/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts new file mode 100644 index 0000000000..55b9448574 --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts @@ -0,0 +1,32 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerFailedHeartbeatError extends ApplicationError { + description: string; + + constructor(heartbeatInterval: number, isSelfHosted: boolean) { + super('Task execution aborted because runner became unresponsive'); + + const subtitle = + 'The task runner failed to respond as expected, so it was considered unresponsive, and the task was aborted. You can try the following:'; + + const fixes = { + optimizeScript: + 'Optimize your script to prevent CPU-intensive operations, e.g. by breaking them down into smaller chunks or batch processing.', + ensureTermination: + 'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.', + increaseInterval: `If your task can reasonably keep the task runner busy for more than ${heartbeatInterval} ${heartbeatInterval === 1 ? 'second' : 'seconds'}, increase the heartbeat interval using the N8N_RUNNERS_HEARTBEAT_INTERVAL environment variable.`, + }; + + const suggestions = [fixes.optimizeScript, fixes.ensureTermination]; + + if (isSelfHosted) suggestions.push(fixes.increaseInterval); + + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle}

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/cli/src/runners/errors/task-runner-timeout.error.ts b/packages/cli/src/runners/errors/task-runner-timeout.error.ts new file mode 100644 index 0000000000..88f3533028 --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-timeout.error.ts @@ -0,0 +1,34 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerTimeoutError extends ApplicationError { + description: string; + + constructor(taskTimeout: number, isSelfHosted: boolean) { + super( + `Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`, + ); + + const subtitle = + 'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted. You can try the following:'; + + const fixes = { + optimizeScript: + 'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.', + ensureTermination: + 'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.', + increaseTimeout: `If your task can reasonably take more than ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}, increase the timeout using the N8N_RUNNERS_TASK_TIMEOUT environment variable.`, + }; + + const suggestions = [fixes.optimizeScript, fixes.ensureTermination]; + + if (isSelfHosted) suggestions.push(fixes.increaseTimeout); + + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle}

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts index e3b9520f77..e27f76b628 100644 --- a/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts @@ -5,8 +5,8 @@ import config from '@/config'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { TaskRunnerOomError } from './errors/task-runner-oom-error'; +import type { DisconnectErrorOptions } from './runner-types'; import { SlidingWindowSignal } from './sliding-window-signal'; -import type { TaskRunner } from './task-broker.service'; import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; import { TaskRunnerProcess } from './task-runner-process'; @@ -38,13 +38,13 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco }); } - async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { + async toDisconnectError(opts: DisconnectErrorOptions): Promise { const exitCode = await this.awaitExitSignal(); if (exitCode === 'oom') { - return new TaskRunnerOomError(runnerId, this.isCloudDeployment); + return new TaskRunnerOomError(opts.runnerId ?? 'Unknown runner ID', this.isCloudDeployment); } - return await super.determineDisconnectReason(runnerId); + return await super.toDisconnectError(opts); } private async awaitExitSignal(): Promise { diff --git a/packages/cli/src/runners/runner-lifecycle-events.ts b/packages/cli/src/runners/runner-lifecycle-events.ts new file mode 100644 index 0000000000..8ea2da38b1 --- /dev/null +++ b/packages/cli/src/runners/runner-lifecycle-events.ts @@ -0,0 +1,11 @@ +import { Service } from 'typedi'; + +import { TypedEmitter } from '@/typed-emitter'; + +type RunnerLifecycleEventMap = { + 'runner:failed-heartbeat-check': never; + 'runner:timed-out-during-task': never; +}; + +@Service() +export class RunnerLifecycleEvents extends TypedEmitter {} diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index b373d3051e..132d688e98 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -6,7 +6,7 @@ import type { TaskRunner } from './task-broker.service'; import type { AuthlessRequest } from '../requests'; export interface DisconnectAnalyzer { - determineDisconnectReason(runnerId: TaskRunner['id']): Promise; + toDisconnectError(opts: DisconnectErrorOptions): Promise; } export type DataRequestType = 'input' | 'node' | 'all'; @@ -22,3 +22,11 @@ export interface TaskRunnerServerInitRequest } export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest }; + +export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown'; + +export type DisconnectErrorOptions = { + runnerId?: TaskRunner['id']; + reason?: DisconnectReason; + heartbeatInterval?: number; +}; diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index c691462558..27a0d779e7 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -1,12 +1,17 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { BrokerMessage, RunnerMessage } from '@n8n/task-runner'; +import { ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import type WebSocket from 'ws'; +import { Time } from '@/constants'; import { Logger } from '@/logging/logger.service'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; import type { DisconnectAnalyzer, + DisconnectReason, TaskRunnerServerInitRequest, TaskRunnerServerInitResponse, } from './runner-types'; @@ -20,11 +25,50 @@ function heartbeat(this: WebSocket) { export class TaskRunnerWsServer { runnerConnections: Map = new Map(); + private heartbeatTimer: NodeJS.Timer | undefined; + constructor( private readonly logger: Logger, private readonly taskBroker: TaskBroker, private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer, - ) {} + private readonly taskTunnersConfig: TaskRunnersConfig, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, + ) { + this.startHeartbeatChecks(); + } + + private startHeartbeatChecks() { + const { heartbeatInterval } = this.taskTunnersConfig; + + if (heartbeatInterval <= 0) { + throw new ApplicationError('Heartbeat interval must be greater than 0'); + } + + this.heartbeatTimer = setInterval(() => { + for (const [runnerId, connection] of this.runnerConnections.entries()) { + if (!connection.isAlive) { + void this.removeConnection(runnerId, 'failed-heartbeat-check'); + this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check'); + return; + } + connection.isAlive = false; + connection.ping(); + } + }, heartbeatInterval * Time.seconds.toMilliseconds); + } + + async shutdown() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + } + + await Promise.all( + Array.from(this.runnerConnections.keys()).map( + async (id) => await this.removeConnection(id, 'shutting-down'), + ), + ); + } setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) { this.disconnectAnalyzer = disconnectAnalyzer; @@ -97,11 +141,15 @@ export class TaskRunnerWsServer { ); } - async removeConnection(id: TaskRunner['id']) { + async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') { const connection = this.runnerConnections.get(id); if (connection) { - const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id); - this.taskBroker.deregisterRunner(id, disconnectReason); + const disconnectError = await this.disconnectAnalyzer.toDisconnectError({ + runnerId: id, + reason, + heartbeatInterval: this.taskTunnersConfig.heartbeatInterval, + }); + this.taskBroker.deregisterRunner(id, disconnectError); connection.close(); this.runnerConnections.delete(id); } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index daa5b48c07..9af7b19f55 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -1,3 +1,4 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { BrokerMessage, RequesterMessage, @@ -8,9 +9,13 @@ import { ApplicationError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { Service } from 'typedi'; +import config from '@/config'; +import { Time } from '@/constants'; import { Logger } from '@/logging/logger.service'; import { TaskRejectError } from './errors'; +import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; export interface TaskRunner { id: string; @@ -24,6 +29,7 @@ export interface Task { runnerId: TaskRunner['id']; requesterId: string; taskType: string; + timeout?: NodeJS.Timeout; } export interface TaskOffer { @@ -78,7 +84,15 @@ export class TaskBroker { private pendingTaskRequests: TaskRequest[] = []; - constructor(private readonly logger: Logger) {} + constructor( + private readonly logger: Logger, + private readonly taskRunnersConfig: TaskRunnersConfig, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, + ) { + if (this.taskRunnersConfig.taskTimeout <= 0) { + throw new ApplicationError('Task timeout must be greater than 0'); + } + } expireTasks() { const now = process.hrtime.bigint(); @@ -408,6 +422,14 @@ export class TaskBroker { async sendTaskSettings(taskId: Task['id'], settings: unknown) { const runner = await this.getRunnerOrFailTask(taskId); + + const task = this.tasks.get(taskId); + if (!task) return; + + task.timeout = setTimeout(async () => { + await this.handleTaskTimeout(taskId); + }, this.taskRunnersConfig.taskTimeout * Time.seconds.toMilliseconds); + await this.messageRunner(runner.id, { type: 'broker:tasksettings', taskId, @@ -415,11 +437,27 @@ export class TaskBroker { }); } + private async handleTaskTimeout(taskId: Task['id']) { + const task = this.tasks.get(taskId); + if (!task) return; + + this.runnerLifecycleEvents.emit('runner:timed-out-during-task'); + + await this.taskErrorHandler( + taskId, + new TaskRunnerTimeoutError( + this.taskRunnersConfig.taskTimeout, + config.getEnv('deployment.type') !== 'cloud', + ), + ); + } + async taskDoneHandler(taskId: Task['id'], data: TaskResultData) { const task = this.tasks.get(taskId); - if (!task) { - return; - } + if (!task) return; + + clearTimeout(task.timeout); + await this.requesters.get(task.requesterId)?.({ type: 'broker:taskdone', taskId: task.id, @@ -430,9 +468,10 @@ export class TaskBroker { async taskErrorHandler(taskId: Task['id'], error: unknown) { const task = this.tasks.get(taskId); - if (!task) { - return; - } + if (!task) return; + + clearTimeout(task.timeout); + await this.requesters.get(task.requesterId)?.({ type: 'broker:taskerror', taskId: task.id, diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index ba63cbe9e7..3129fcb524 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -10,6 +10,7 @@ import { Logger } from '@/logging/logger.service'; import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; import { forwardToLogger } from './forward-to-logger'; import { NodeProcessOomDetector } from './node-process-oom-detector'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; import { TypedEmitter } from '../typed-emitter'; type ChildProcess = ReturnType; @@ -70,6 +71,7 @@ export class TaskRunnerProcess extends TypedEmitter { logger: Logger, private readonly runnerConfig: TaskRunnersConfig, private readonly authService: TaskRunnerAuthService, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, ) { super(); @@ -79,6 +81,16 @@ export class TaskRunnerProcess extends TypedEmitter { ); this.logger = logger.scoped('task-runner'); + + this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => { + this.logger.warn('Task runner failed heartbeat check, restarting...'); + void this.forceRestart(); + }); + + this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => { + this.logger.warn('Task runner timed out during task, restarting...'); + void this.forceRestart(); + }); } async start() { @@ -116,9 +128,7 @@ export class TaskRunnerProcess extends TypedEmitter { @OnShutdown() async stop() { - if (!this.process) { - return; - } + if (!this.process) return; this.isShuttingDown = true; @@ -133,10 +143,22 @@ export class TaskRunnerProcess extends TypedEmitter { this.isShuttingDown = false; } - killNode() { - if (!this.process) { - return; + /** Force-restart a runner suspected of being unresponsive. */ + async forceRestart() { + if (!this.process) return; + + if (this.useLauncher) { + await this.killLauncher(); // @TODO: Implement SIGKILL in launcher + } else { + this.process.kill('SIGKILL'); } + + await this._runPromise; + } + + killNode() { + if (!this.process) return; + this.process.kill(); } @@ -173,7 +195,6 @@ export class TaskRunnerProcess extends TypedEmitter { this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }); resolveFn(); - // If we are not shutting down, restart the process if (!this.isShuttingDown) { setImmediate(async () => await this.start()); } diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index 56c56e02ae..1564195fe2 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -44,7 +44,7 @@ export class TaskRunnerServer { private readonly logger: Logger, private readonly globalConfig: GlobalConfig, private readonly taskRunnerAuthController: TaskRunnerAuthController, - private readonly taskRunnerService: TaskRunnerWsServer, + private readonly taskRunnerWsServer: TaskRunnerWsServer, ) { this.app = express(); this.app.disable('x-powered-by'); @@ -148,7 +148,7 @@ export class TaskRunnerServer { // eslint-disable-next-line @typescript-eslint/unbound-method this.taskRunnerAuthController.authMiddleware, (req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => - this.taskRunnerService.handleRequest(req, res), + this.taskRunnerWsServer.handleRequest(req, res), ); const authEndpoint = `${this.getEndpointBasePath()}/auth`; From d5ba1a059b7a67154f17f8ad3fcfe66c5c031059 Mon Sep 17 00:00:00 2001 From: Jon Date: Mon, 18 Nov 2024 11:54:20 +0000 Subject: [PATCH 2/8] fix(Google Sheets Trigger Node): Fix issue with regex showing correct sheet as invalid (#11770) --- .../Google/Sheet/GoogleSheetsTrigger.node.ts | 8 +- .../Google/Sheet/test/v2/utils/utils.test.ts | 81 ++++++++++++++++++- .../Sheet/v2/actions/sheet/Sheet.resource.ts | 8 +- packages/nodes-base/nodes/Google/constants.ts | 3 + 4 files changed, 89 insertions(+), 11 deletions(-) diff --git a/packages/nodes-base/nodes/Google/Sheet/GoogleSheetsTrigger.node.ts b/packages/nodes-base/nodes/Google/Sheet/GoogleSheetsTrigger.node.ts index 1558b7fd19..8993760e57 100644 --- a/packages/nodes-base/nodes/Google/Sheet/GoogleSheetsTrigger.node.ts +++ b/packages/nodes-base/nodes/Google/Sheet/GoogleSheetsTrigger.node.ts @@ -7,7 +7,7 @@ import type { } from 'n8n-workflow'; import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; -import { GOOGLE_DRIVE_FILE_URL_REGEX } from '../constants'; +import { GOOGLE_DRIVE_FILE_URL_REGEX, GOOGLE_SHEETS_SHEET_URL_REGEX } from '../constants'; import { apiRequest } from './v2/transport'; import { sheetsSearch, spreadSheetsSearch } from './v2/methods/listSearch'; import { GoogleSheet } from './v2/helpers/GoogleSheet'; @@ -137,15 +137,13 @@ export class GoogleSheetsTrigger implements INodeType { type: 'string', extractValue: { type: 'regex', - regex: - 'https:\\/\\/docs\\.google\\.com/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+\\/edit\\#gid=([0-9]+)', + regex: GOOGLE_SHEETS_SHEET_URL_REGEX, }, validation: [ { type: 'regex', properties: { - regex: - 'https:\\/\\/docs\\.google\\.com/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+\\/edit\\#gid=([0-9]+)', + regex: GOOGLE_SHEETS_SHEET_URL_REGEX, errorMessage: 'Not a valid Sheet URL', }, }, diff --git a/packages/nodes-base/nodes/Google/Sheet/test/v2/utils/utils.test.ts b/packages/nodes-base/nodes/Google/Sheet/test/v2/utils/utils.test.ts index 033f7249f8..c725d03bb9 100644 --- a/packages/nodes-base/nodes/Google/Sheet/test/v2/utils/utils.test.ts +++ b/packages/nodes-base/nodes/Google/Sheet/test/v2/utils/utils.test.ts @@ -1,9 +1,16 @@ -import type { IExecuteFunctions, INode, ResourceMapperField } from 'n8n-workflow'; +import { + NodeOperationError, + type IExecuteFunctions, + type INode, + type ResourceMapperField, +} from 'n8n-workflow'; + import { GoogleSheet } from '../../../v2/helpers/GoogleSheet'; import { addRowNumber, autoMapInputData, checkForSchemaChanges, + getSpreadsheetId, prepareSheetData, removeEmptyColumns, removeEmptyRows, @@ -11,6 +18,8 @@ import { trimToFirstEmptyRow, } from '../../../v2/helpers/GoogleSheets.utils'; +import { GOOGLE_SHEETS_SHEET_URL_REGEX } from '../../../../constants'; + describe('Test Google Sheets, addRowNumber', () => { it('should add row nomber', () => { const data = [ @@ -444,3 +453,73 @@ describe('Test Google Sheets, checkForSchemaChanges', () => { ).toThrow("Column names were updated after the node's setup"); }); }); + +describe('Test Google Sheets, getSpreadsheetId', () => { + let mockNode: INode; + + beforeEach(() => { + mockNode = { name: 'Google Sheets' } as INode; + jest.clearAllMocks(); + }); + + it('should throw an error if value is empty', () => { + expect(() => getSpreadsheetId(mockNode, 'url', '')).toThrow(NodeOperationError); + }); + + it('should return the ID from a valid URL', () => { + const url = + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0'; + const result = getSpreadsheetId(mockNode, 'url', url); + expect(result).toBe('1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms'); + }); + + it('should return an empty string for an invalid URL', () => { + const url = 'https://docs.google.com/spreadsheets/d/'; + const result = getSpreadsheetId(mockNode, 'url', url); + expect(result).toBe(''); + }); + + it('should return the value for documentIdType byId or byList', () => { + const value = '1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms'; + expect(getSpreadsheetId(mockNode, 'id', value)).toBe(value); + expect(getSpreadsheetId(mockNode, 'list', value)).toBe(value); + }); +}); + +describe('Test Google Sheets, Google Sheets Sheet URL Regex', () => { + const regex = new RegExp(GOOGLE_SHEETS_SHEET_URL_REGEX); + + it('should match a valid Google Sheets URL', () => { + const urls = [ + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0', + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=123456', + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit?gid=654321#gid=654321', + ]; + for (const url of urls) { + expect(regex.test(url)).toBe(true); + } + }); + + it('should not match an invalid Google Sheets URL', () => { + const url = 'https://docs.google.com/spreadsheets/d/'; + expect(regex.test(url)).toBe(false); + }); + + it('should not match a URL that does not match the pattern', () => { + const url = + 'https://example.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0'; + expect(regex.test(url)).toBe(false); + }); + + it('should extract the gid from a valid Google Sheets URL', () => { + const urls = [ + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=12345', + 'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit?gid=12345#gid=12345', + ]; + for (const url of urls) { + const match = url.match(regex); + expect(match).not.toBeNull(); + expect(match?.[1]).toBe('12345'); + } + }); +}); diff --git a/packages/nodes-base/nodes/Google/Sheet/v2/actions/sheet/Sheet.resource.ts b/packages/nodes-base/nodes/Google/Sheet/v2/actions/sheet/Sheet.resource.ts index 3cb7866aa5..d46e72811a 100644 --- a/packages/nodes-base/nodes/Google/Sheet/v2/actions/sheet/Sheet.resource.ts +++ b/packages/nodes-base/nodes/Google/Sheet/v2/actions/sheet/Sheet.resource.ts @@ -1,5 +1,5 @@ import type { INodeProperties } from 'n8n-workflow'; -import { GOOGLE_DRIVE_FILE_URL_REGEX } from '../../../../constants'; +import { GOOGLE_DRIVE_FILE_URL_REGEX, GOOGLE_SHEETS_SHEET_URL_REGEX } from '../../../../constants'; import * as append from './append.operation'; import * as appendOrUpdate from './appendOrUpdate.operation'; import * as clear from './clear.operation'; @@ -156,15 +156,13 @@ export const descriptions: INodeProperties[] = [ type: 'string', extractValue: { type: 'regex', - regex: - 'https:\\/\\/docs\\.google.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)', + regex: GOOGLE_SHEETS_SHEET_URL_REGEX, }, validation: [ { type: 'regex', properties: { - regex: - 'https:\\/\\/docs\\.google.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)', + regex: GOOGLE_SHEETS_SHEET_URL_REGEX, errorMessage: 'Not a valid Sheet URL', }, }, diff --git a/packages/nodes-base/nodes/Google/constants.ts b/packages/nodes-base/nodes/Google/constants.ts index 374cb345b5..e0cd0555c9 100644 --- a/packages/nodes-base/nodes/Google/constants.ts +++ b/packages/nodes-base/nodes/Google/constants.ts @@ -3,3 +3,6 @@ export const GOOGLE_DRIVE_FILE_URL_REGEX = export const GOOGLE_DRIVE_FOLDER_URL_REGEX = 'https:\\/\\/drive\\.google\\.com(?:\\/.*|)\\/folders\\/([0-9a-zA-Z\\-_]+)(?:\\/.*|)'; + +export const GOOGLE_SHEETS_SHEET_URL_REGEX = + 'https:\\/\\/docs\\.google\\.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)'; From 61696c3db313cdc97925af728ff5c68415f9b6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 18 Nov 2024 12:58:26 +0100 Subject: [PATCH 3/8] feat(core): Improve handling of manual executions with wait nodes (#11750) Co-authored-by: Michael Kret --- packages/@n8n/api-types/src/push/execution.ts | 8 + .../__tests__/save-execution-progress.test.ts | 32 ---- .../save-execution-progress.ts | 2 +- .../to-save-settings.ts | 28 +-- packages/cli/src/webhooks/webhook-helpers.ts | 5 + .../src/workflow-execute-additional-data.ts | 38 ++-- .../cli/templates/form-trigger.handlebars | 8 - packages/core/src/WorkflowExecute.ts | 5 +- .../editor-ui/src/components/InputPanel.vue | 5 +- .../src/components/NodeExecuteButton.vue | 17 +- .../editor-ui/src/components/OutputPanel.vue | 2 +- packages/editor-ui/src/components/RunData.vue | 19 +- .../src/composables/usePushConnection.ts | 5 +- .../src/composables/useRunWorkflow.test.ts | 75 +------- .../src/composables/useRunWorkflow.ts | 164 +----------------- .../src/stores/workflows.store.test.ts | 29 ++-- .../editor-ui/src/stores/workflows.store.ts | 90 +++++++--- .../src/utils/executionUtils.test.ts | 148 +++++++++++++++- .../editor-ui/src/utils/executionUtils.ts | 15 +- packages/editor-ui/src/views/NodeView.v2.vue | 15 +- packages/editor-ui/src/views/NodeView.vue | 17 +- 21 files changed, 325 insertions(+), 402 deletions(-) diff --git a/packages/@n8n/api-types/src/push/execution.ts b/packages/@n8n/api-types/src/push/execution.ts index 3c7459dec5..9c723e2817 100644 --- a/packages/@n8n/api-types/src/push/execution.ts +++ b/packages/@n8n/api-types/src/push/execution.ts @@ -12,6 +12,13 @@ type ExecutionStarted = { }; }; +type ExecutionWaiting = { + type: 'executionWaiting'; + data: { + executionId: string; + }; +}; + type ExecutionFinished = { type: 'executionFinished'; data: { @@ -45,6 +52,7 @@ type NodeExecuteAfter = { export type ExecutionPushMessage = | ExecutionStarted + | ExecutionWaiting | ExecutionFinished | ExecutionRecovered | NodeExecuteBefore diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts b/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts index b0db5becac..d89f2fb734 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts +++ b/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts @@ -1,5 +1,4 @@ import { - deepCopy, ErrorReporterProxy, type IRunExecutionData, type ITaskData, @@ -87,37 +86,6 @@ test('should update execution when saving progress is enabled', async () => { expect(reporterSpy).not.toHaveBeenCalled(); }); -test('should update execution when saving progress is disabled, but waitTill is defined', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: false, - }); - - const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error'); - - executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse); - - const args = deepCopy(commonArgs); - args[4].waitTill = new Date(); - await saveExecutionProgress(...args); - - expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', { - data: { - executionData: undefined, - resultData: { - lastNodeExecuted: 'My Node', - runData: { - 'My Node': [{}], - }, - }, - startData: {}, - }, - status: 'running', - }); - - expect(reporterSpy).not.toHaveBeenCalled(); -}); - test('should report error on failure', async () => { jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ ...commonSettings, diff --git a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts index 6cd1cfd08f..ca9899e1ec 100644 --- a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts +++ b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts @@ -16,7 +16,7 @@ export async function saveExecutionProgress( ) { const saveSettings = toSaveSettings(workflowData.settings); - if (!saveSettings.progress && !executionData.waitTill) return; + if (!saveSettings.progress) return; const logger = Container.get(Logger); diff --git a/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts b/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts index 7a25adaeba..a7af8f3ddc 100644 --- a/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts +++ b/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts @@ -18,20 +18,20 @@ export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) { PROGRESS: config.getEnv('executions.saveExecutionProgress'), }; + const { + saveDataErrorExecution = DEFAULTS.ERROR, + saveDataSuccessExecution = DEFAULTS.SUCCESS, + saveManualExecutions = DEFAULTS.MANUAL, + saveExecutionProgress = DEFAULTS.PROGRESS, + } = workflowSettings; + return { - error: workflowSettings.saveDataErrorExecution - ? workflowSettings.saveDataErrorExecution !== 'none' - : DEFAULTS.ERROR !== 'none', - success: workflowSettings.saveDataSuccessExecution - ? workflowSettings.saveDataSuccessExecution !== 'none' - : DEFAULTS.SUCCESS !== 'none', - manual: - workflowSettings === undefined || workflowSettings.saveManualExecutions === 'DEFAULT' - ? DEFAULTS.MANUAL - : (workflowSettings.saveManualExecutions ?? DEFAULTS.MANUAL), - progress: - workflowSettings === undefined || workflowSettings.saveExecutionProgress === 'DEFAULT' - ? DEFAULTS.PROGRESS - : (workflowSettings.saveExecutionProgress ?? DEFAULTS.PROGRESS), + error: saveDataErrorExecution === 'DEFAULT' ? DEFAULTS.ERROR : saveDataErrorExecution === 'all', + success: + saveDataSuccessExecution === 'DEFAULT' + ? DEFAULTS.SUCCESS + : saveDataSuccessExecution === 'all', + manual: saveManualExecutions === 'DEFAULT' ? DEFAULTS.MANUAL : saveManualExecutions, + progress: saveExecutionProgress === 'DEFAULT' ? DEFAULTS.PROGRESS : saveExecutionProgress, }; } diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 72628b8351..6110584f7e 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -464,6 +464,11 @@ export async function executeWebhook( projectId: project?.id, }; + // When resuming from a wait node, copy over the pushRef from the execution-data + if (!runData.pushRef) { + runData.pushRef = runExecutionData.pushRef; + } + let responsePromise: IDeferredPromise | undefined; if (responseMode === 'responseNode') { responsePromise = createDeferredPromise(); diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 08d6ba09e4..97322f4fe0 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -307,7 +307,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { }, ], workflowExecuteAfter: [ - async function (this: WorkflowHooks): Promise { + async function (this: WorkflowHooks, fullRunData: IRun): Promise { const { pushRef, executionId } = this; if (pushRef === undefined) return; @@ -318,7 +318,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId, }); - pushInstance.send('executionFinished', { executionId }, pushRef); + const pushType = + fullRunData.status === 'waiting' ? 'executionWaiting' : 'executionFinished'; + pushInstance.send(pushType, { executionId }, pushRef); }, ], }; @@ -430,22 +432,21 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { (executionStatus === 'success' && !saveSettings.success) || (executionStatus !== 'success' && !saveSettings.error); - if (shouldNotSave && !fullRunData.waitTill) { - if (!fullRunData.waitTill && !isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); + if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); - return; - } + await Container.get(ExecutionRepository).hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + + return; } // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive @@ -1110,6 +1111,9 @@ export function getWorkflowHooksWorkerMain( hookFunctions.nodeExecuteAfter = []; hookFunctions.workflowExecuteAfter = [ async function (this: WorkflowHooks, fullRunData: IRun): Promise { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; + const executionStatus = determineFinalExecutionStatus(fullRunData); const saveSettings = toSaveSettings(this.workflowData.settings); diff --git a/packages/cli/templates/form-trigger.handlebars b/packages/cli/templates/form-trigger.handlebars index 57d93cb291..02611f5b5b 100644 --- a/packages/cli/templates/form-trigger.handlebars +++ b/packages/cli/templates/form-trigger.handlebars @@ -740,14 +740,6 @@ } return; - }).then(() => { - window.addEventListener('storage', function(event) { - if (event.key === 'n8n_redirect_to_next_form_test_page' && event.newValue) { - const newUrl = event.newValue; - localStorage.removeItem('n8n_redirect_to_next_form_test_page'); - window.location.replace(newUrl); - } - }); }) .catch(function (error) { console.error('Error:', error); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 2ae12908ac..c6e0316038 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -916,7 +916,6 @@ export class WorkflowExecute { let nodeSuccessData: INodeExecutionData[][] | null | undefined; let runIndex: number; let startTime: number; - let taskData: ITaskData; if (this.runExecutionData.startData === undefined) { this.runExecutionData.startData = {}; @@ -1446,13 +1445,13 @@ export class WorkflowExecute { this.runExecutionData.resultData.runData[executionNode.name] = []; } - taskData = { + const taskData: ITaskData = { hints: executionHints, startTime, executionTime: new Date().getTime() - startTime, source: !executionData.source ? [] : executionData.source.main, metadata: executionData.metadata, - executionStatus: 'success', + executionStatus: this.runExecutionData.waitTill ? 'waiting' : 'success', }; if (executionError !== undefined) { diff --git a/packages/editor-ui/src/components/InputPanel.vue b/packages/editor-ui/src/components/InputPanel.vue index 169c61de46..a3ddc8ed64 100644 --- a/packages/editor-ui/src/components/InputPanel.vue +++ b/packages/editor-ui/src/components/InputPanel.vue @@ -212,7 +212,10 @@ const activeNodeType = computed(() => { return nodeTypesStore.getNodeType(activeNode.value.type, activeNode.value.typeVersion); }); -const waitingMessage = computed(() => waitingNodeTooltip()); +const waitingMessage = computed(() => { + const parentNode = parentNodes.value[0]; + return parentNode && waitingNodeTooltip(workflowsStore.getNodeByName(parentNode.name)); +}); watch( inputMode, diff --git a/packages/editor-ui/src/components/NodeExecuteButton.vue b/packages/editor-ui/src/components/NodeExecuteButton.vue index 43b4dfa7dc..f801f0701c 100644 --- a/packages/editor-ui/src/components/NodeExecuteButton.vue +++ b/packages/editor-ui/src/components/NodeExecuteButton.vue @@ -65,7 +65,7 @@ const lastPopupCountUpdate = ref(0); const codeGenerationInProgress = ref(false); const router = useRouter(); -const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution } = useRunWorkflow({ router }); +const { runWorkflow, stopCurrentExecution } = useRunWorkflow({ router }); const workflowsStore = useWorkflowsStore(); const externalHooks = useExternalHooks(); @@ -353,17 +353,10 @@ async function onClick() { telemetry.track('User clicked execute node button', telemetryPayload); await externalHooks.run('nodeExecuteButton.onClick', telemetryPayload); - if (workflowsStore.isWaitingExecution) { - await runWorkflowResolvePending({ - destinationNode: props.nodeName, - source: 'RunData.ExecuteNodeButton', - }); - } else { - await runWorkflow({ - destinationNode: props.nodeName, - source: 'RunData.ExecuteNodeButton', - }); - } + await runWorkflow({ + destinationNode: props.nodeName, + source: 'RunData.ExecuteNodeButton', + }); emit('execute'); } diff --git a/packages/editor-ui/src/components/OutputPanel.vue b/packages/editor-ui/src/components/OutputPanel.vue index 0503ad5c94..0ca7e3830e 100644 --- a/packages/editor-ui/src/components/OutputPanel.vue +++ b/packages/editor-ui/src/components/OutputPanel.vue @@ -352,7 +352,7 @@ const activatePane = () => {