diff --git a/packages/@n8n/api-types/src/push/execution.ts b/packages/@n8n/api-types/src/push/execution.ts index 3c7459dec5..9c723e2817 100644 --- a/packages/@n8n/api-types/src/push/execution.ts +++ b/packages/@n8n/api-types/src/push/execution.ts @@ -12,6 +12,13 @@ type ExecutionStarted = { }; }; +type ExecutionWaiting = { + type: 'executionWaiting'; + data: { + executionId: string; + }; +}; + type ExecutionFinished = { type: 'executionFinished'; data: { @@ -45,6 +52,7 @@ type NodeExecuteAfter = { export type ExecutionPushMessage = | ExecutionStarted + | ExecutionWaiting | ExecutionFinished | ExecutionRecovered | NodeExecuteBefore diff --git a/packages/@n8n/config/src/configs/diagnostics.config.ts b/packages/@n8n/config/src/configs/diagnostics.config.ts new file mode 100644 index 0000000000..58e4740b35 --- /dev/null +++ b/packages/@n8n/config/src/configs/diagnostics.config.ts @@ -0,0 +1,30 @@ +import { Config, Env, Nested } from '../decorators'; + +@Config +class PostHogConfig { + /** API key for PostHog. */ + @Env('N8N_DIAGNOSTICS_POSTHOG_API_KEY') + apiKey: string = 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo'; + + /** API host for PostHog. */ + @Env('N8N_DIAGNOSTICS_POSTHOG_API_HOST') + apiHost: string = 'https://ph.n8n.io'; +} + +@Config +export class DiagnosticsConfig { + /** Whether diagnostics are enabled. */ + @Env('N8N_DIAGNOSTICS_ENABLED') + enabled: boolean = false; + + /** Diagnostics config for frontend. */ + @Env('N8N_DIAGNOSTICS_CONFIG_FRONTEND') + frontendConfig: string = '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io'; + + /** Diagnostics config for backend. */ + @Env('N8N_DIAGNOSTICS_CONFIG_BACKEND') + backendConfig: string = '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io'; + + @Nested + posthogConfig: PostHogConfig; +} diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 5a6969ba6f..b7d125cf53 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -53,4 +53,12 @@ export class TaskRunnersConfig { /** Should the output of deduplication be asserted for correctness */ @Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT') assertDeduplicationOutput: boolean = false; + + /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */ + @Env('N8N_RUNNERS_TASK_TIMEOUT') + taskTimeout: number = 60; + + /** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */ + @Env('N8N_RUNNERS_HEARTBEAT_INTERVAL') + heartbeatInterval: number = 30; } diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index 0a89535ee3..a1c0a1f43b 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -1,6 +1,7 @@ import { CacheConfig } from './configs/cache.config'; import { CredentialsConfig } from './configs/credentials.config'; import { DatabaseConfig } from './configs/database.config'; +import { DiagnosticsConfig } from './configs/diagnostics.config'; import { EndpointsConfig } from './configs/endpoints.config'; import { EventBusConfig } from './configs/event-bus.config'; import { ExternalSecretsConfig } from './configs/external-secrets.config'; @@ -117,4 +118,7 @@ export class GlobalConfig { @Nested pruning: PruningConfig; + + @Nested + diagnostics: DiagnosticsConfig; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index eeb98269de..c60431e97a 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -234,6 +234,8 @@ describe('GlobalConfig', () => { maxOldSpaceSize: '', maxConcurrency: 5, assertDeduplicationOutput: false, + taskTimeout: 60, + heartbeatInterval: 30, }, sentry: { backendDsn: '', @@ -280,6 +282,15 @@ describe('GlobalConfig', () => { hardDeleteInterval: 15, softDeleteInterval: 60, }, + diagnostics: { + enabled: false, + frontendConfig: '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io', + backendConfig: '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io', + posthogConfig: { + apiKey: 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo', + apiHost: 'https://ph.n8n.io', + }, + }, }; it('should use all default values when no env variables are defined', () => { diff --git a/packages/@n8n/nodes-langchain/credentials/AnthropicApi.credentials.ts b/packages/@n8n/nodes-langchain/credentials/AnthropicApi.credentials.ts index 2ee2aa94dc..80bea68713 100644 --- a/packages/@n8n/nodes-langchain/credentials/AnthropicApi.credentials.ts +++ b/packages/@n8n/nodes-langchain/credentials/AnthropicApi.credentials.ts @@ -35,15 +35,15 @@ export class AnthropicApi implements ICredentialType { test: ICredentialTestRequest = { request: { baseURL: 'https://api.anthropic.com', - url: '/v1/complete', + url: '/v1/messages', method: 'POST', headers: { 'anthropic-version': '2023-06-01', }, body: { - model: 'claude-2', - prompt: '\n\nHuman: Hello, world!\n\nAssistant:', - max_tokens_to_sample: 256, + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'Hey' }], + max_tokens: 1, }, }, }; diff --git a/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsAzureOpenAi/EmbeddingsAzureOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsAzureOpenAi/EmbeddingsAzureOpenAi.node.ts index a75a93c9f4..8c178543fc 100644 --- a/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsAzureOpenAi/EmbeddingsAzureOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsAzureOpenAi/EmbeddingsAzureOpenAi.node.ts @@ -87,6 +87,36 @@ export class EmbeddingsAzureOpenAi implements INodeType { 'Maximum amount of time a request is allowed to take in seconds. Set to -1 for no timeout.', type: 'number', }, + { + displayName: 'Dimensions', + name: 'dimensions', + default: undefined, + description: + 'The number of dimensions the resulting output embeddings should have. Only supported in text-embedding-3 and later models.', + type: 'options', + options: [ + { + name: '256', + value: 256, + }, + { + name: '512', + value: 512, + }, + { + name: '1024', + value: 1024, + }, + { + name: '1536', + value: 1536, + }, + { + name: '3072', + value: 3072, + }, + ], + }, ], }, ], @@ -105,6 +135,7 @@ export class EmbeddingsAzureOpenAi implements INodeType { batchSize?: number; stripNewLines?: boolean; timeout?: number; + dimensions?: number | undefined; }; if (options.timeout === -1) { diff --git a/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsOpenAI/EmbeddingsOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsOpenAI/EmbeddingsOpenAi.node.ts index 167581ed2e..aececc09ae 100644 --- a/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsOpenAI/EmbeddingsOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/embeddings/EmbeddingsOpenAI/EmbeddingsOpenAi.node.ts @@ -135,6 +135,36 @@ export class EmbeddingsOpenAi implements INodeType { type: 'collection', default: {}, options: [ + { + displayName: 'Dimensions', + name: 'dimensions', + default: undefined, + description: + 'The number of dimensions the resulting output embeddings should have. Only supported in text-embedding-3 and later models.', + type: 'options', + options: [ + { + name: '256', + value: 256, + }, + { + name: '512', + value: 512, + }, + { + name: '1024', + value: 1024, + }, + { + name: '1536', + value: 1536, + }, + { + name: '3072', + value: 3072, + }, + ], + }, { displayName: 'Base URL', name: 'baseURL', @@ -179,6 +209,7 @@ export class EmbeddingsOpenAi implements INodeType { batchSize?: number; stripNewLines?: boolean; timeout?: number; + dimensions?: number | undefined; }; if (options.timeout === -1) { diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts index 01e00c177a..e7949d9704 100644 --- a/packages/@n8n/task-runner/src/config/base-runner-config.ts +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -1,4 +1,16 @@ -import { Config, Env } from '@n8n/config'; +import { Config, Env, Nested } from '@n8n/config'; + +@Config +class HealthcheckServerConfig { + @Env('N8N_RUNNERS_SERVER_ENABLED') + enabled: boolean = false; + + @Env('N8N_RUNNERS_SERVER_HOST') + host: string = '127.0.0.1'; + + @Env('N8N_RUNNERS_SERVER_PORT') + port: number = 5680; +} @Config export class BaseRunnerConfig { @@ -13,4 +25,7 @@ export class BaseRunnerConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; + + @Nested + healthcheckServer!: HealthcheckServerConfig; } diff --git a/packages/@n8n/task-runner/src/healthcheck-server.ts b/packages/@n8n/task-runner/src/healthcheck-server.ts new file mode 100644 index 0000000000..c6d8965a86 --- /dev/null +++ b/packages/@n8n/task-runner/src/healthcheck-server.ts @@ -0,0 +1,38 @@ +import { ApplicationError } from 'n8n-workflow'; +import { createServer } from 'node:http'; + +export class HealthcheckServer { + private server = createServer((_, res) => { + res.writeHead(200); + res.end('OK'); + }); + + async start(host: string, port: number) { + return await new Promise((resolve, reject) => { + const portInUseErrorHandler = (error: NodeJS.ErrnoException) => { + if (error.code === 'EADDRINUSE') { + reject(new ApplicationError(`Port ${port} is already in use`)); + } else { + reject(error); + } + }; + + this.server.on('error', portInUseErrorHandler); + + this.server.listen(port, host, () => { + this.server.removeListener('error', portInUseErrorHandler); + console.log(`Healthcheck server listening on ${host}, port ${port}`); + resolve(); + }); + }); + } + + async stop() { + return await new Promise((resolve, reject) => { + this.server.close((error) => { + if (error) reject(error); + else resolve(); + }); + }); + } +} diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index c6e8cb314c..e09ddf3332 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -3,8 +3,10 @@ import Container from 'typedi'; import { MainConfig } from './config/main-config'; import type { ErrorReporter } from './error-reporter'; +import type { HealthcheckServer } from './healthcheck-server'; import { JsTaskRunner } from './js-task-runner/js-task-runner'; +let healthcheckServer: HealthcheckServer | undefined; let runner: JsTaskRunner | undefined; let isShuttingDown = false; let errorReporter: ErrorReporter | undefined; @@ -22,6 +24,7 @@ function createSignalHandler(signal: string) { if (runner) { await runner.stop(); runner = undefined; + void healthcheckServer?.stop(); } if (errorReporter) { @@ -49,6 +52,14 @@ void (async function start() { runner = new JsTaskRunner(config); + const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer; + + if (enabled) { + const { HealthcheckServer } = await import('./healthcheck-server'); + healthcheckServer = new HealthcheckServer(); + await healthcheckServer.start(host, port); + } + process.on('SIGINT', createSignalHandler('SIGINT')); process.on('SIGTERM', createSignalHandler('SIGTERM')); })().catch((e) => { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 9f8bc45232..54fa07e7f5 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -296,43 +296,6 @@ export const schema = { }, }, - diagnostics: { - enabled: { - doc: 'Whether diagnostic mode is enabled.', - format: Boolean, - default: true, - env: 'N8N_DIAGNOSTICS_ENABLED', - }, - config: { - posthog: { - apiKey: { - doc: 'API key for PostHog', - format: String, - default: 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo', - env: 'N8N_DIAGNOSTICS_POSTHOG_API_KEY', - }, - apiHost: { - doc: 'API host for PostHog', - format: String, - default: 'https://ph.n8n.io', - env: 'N8N_DIAGNOSTICS_POSTHOG_API_HOST', - }, - }, - frontend: { - doc: 'Diagnostics config for frontend.', - format: String, - default: '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io', - env: 'N8N_DIAGNOSTICS_CONFIG_FRONTEND', - }, - backend: { - doc: 'Diagnostics config for backend.', - format: String, - default: '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io', - env: 'N8N_DIAGNOSTICS_CONFIG_BACKEND', - }, - }, - }, - defaultLocale: { doc: 'Default locale for the UI', format: String, diff --git a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts index 7e98877dc7..58d694e556 100644 --- a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts @@ -2,7 +2,6 @@ import type { GlobalConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import type { IWorkflowBase } from 'n8n-workflow'; -import config from '@/config'; import { N8N_VERSION } from '@/constants'; import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import type { ProjectRelationRepository } from '@/databases/repositories/project-relation.repository'; @@ -66,7 +65,7 @@ describe('TelemetryEventRelay', () => { }); beforeEach(() => { - config.set('diagnostics.enabled', true); + globalConfig.diagnostics.enabled = true; }); afterEach(() => { @@ -75,7 +74,7 @@ describe('TelemetryEventRelay', () => { describe('init', () => { it('with diagnostics enabled, should init telemetry and register listeners', async () => { - config.set('diagnostics.enabled', true); + globalConfig.diagnostics.enabled = true; const telemetryEventRelay = new TelemetryEventRelay( eventService, telemetry, @@ -96,7 +95,7 @@ describe('TelemetryEventRelay', () => { }); it('with diagnostics disabled, should neither init telemetry nor register listeners', async () => { - config.set('diagnostics.enabled', false); + globalConfig.diagnostics.enabled = false; const telemetryEventRelay = new TelemetryEventRelay( eventService, telemetry, diff --git a/packages/cli/src/events/relays/telemetry.event-relay.ts b/packages/cli/src/events/relays/telemetry.event-relay.ts index 88f954ab93..9e7b026597 100644 --- a/packages/cli/src/events/relays/telemetry.event-relay.ts +++ b/packages/cli/src/events/relays/telemetry.event-relay.ts @@ -37,7 +37,7 @@ export class TelemetryEventRelay extends EventRelay { } async init() { - if (!config.getEnv('diagnostics.enabled')) return; + if (!this.globalConfig.diagnostics.enabled) return; await this.telemetry.init(); diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts b/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts index b0db5becac..d89f2fb734 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts +++ b/packages/cli/src/execution-lifecycle-hooks/__tests__/save-execution-progress.test.ts @@ -1,5 +1,4 @@ import { - deepCopy, ErrorReporterProxy, type IRunExecutionData, type ITaskData, @@ -87,37 +86,6 @@ test('should update execution when saving progress is enabled', async () => { expect(reporterSpy).not.toHaveBeenCalled(); }); -test('should update execution when saving progress is disabled, but waitTill is defined', async () => { - jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ - ...commonSettings, - progress: false, - }); - - const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error'); - - executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse); - - const args = deepCopy(commonArgs); - args[4].waitTill = new Date(); - await saveExecutionProgress(...args); - - expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', { - data: { - executionData: undefined, - resultData: { - lastNodeExecuted: 'My Node', - runData: { - 'My Node': [{}], - }, - }, - startData: {}, - }, - status: 'running', - }); - - expect(reporterSpy).not.toHaveBeenCalled(); -}); - test('should report error on failure', async () => { jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ ...commonSettings, diff --git a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts index 6cd1cfd08f..ca9899e1ec 100644 --- a/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts +++ b/packages/cli/src/execution-lifecycle-hooks/save-execution-progress.ts @@ -16,7 +16,7 @@ export async function saveExecutionProgress( ) { const saveSettings = toSaveSettings(workflowData.settings); - if (!saveSettings.progress && !executionData.waitTill) return; + if (!saveSettings.progress) return; const logger = Container.get(Logger); diff --git a/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts b/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts index 7a25adaeba..a7af8f3ddc 100644 --- a/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts +++ b/packages/cli/src/execution-lifecycle-hooks/to-save-settings.ts @@ -18,20 +18,20 @@ export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) { PROGRESS: config.getEnv('executions.saveExecutionProgress'), }; + const { + saveDataErrorExecution = DEFAULTS.ERROR, + saveDataSuccessExecution = DEFAULTS.SUCCESS, + saveManualExecutions = DEFAULTS.MANUAL, + saveExecutionProgress = DEFAULTS.PROGRESS, + } = workflowSettings; + return { - error: workflowSettings.saveDataErrorExecution - ? workflowSettings.saveDataErrorExecution !== 'none' - : DEFAULTS.ERROR !== 'none', - success: workflowSettings.saveDataSuccessExecution - ? workflowSettings.saveDataSuccessExecution !== 'none' - : DEFAULTS.SUCCESS !== 'none', - manual: - workflowSettings === undefined || workflowSettings.saveManualExecutions === 'DEFAULT' - ? DEFAULTS.MANUAL - : (workflowSettings.saveManualExecutions ?? DEFAULTS.MANUAL), - progress: - workflowSettings === undefined || workflowSettings.saveExecutionProgress === 'DEFAULT' - ? DEFAULTS.PROGRESS - : (workflowSettings.saveExecutionProgress ?? DEFAULTS.PROGRESS), + error: saveDataErrorExecution === 'DEFAULT' ? DEFAULTS.ERROR : saveDataErrorExecution === 'all', + success: + saveDataSuccessExecution === 'DEFAULT' + ? DEFAULTS.SUCCESS + : saveDataSuccessExecution === 'all', + manual: saveManualExecutions === 'DEFAULT' ? DEFAULTS.MANUAL : saveManualExecutions, + progress: saveExecutionProgress === 'DEFAULT' ? DEFAULTS.PROGRESS : saveExecutionProgress, }; } diff --git a/packages/cli/src/posthog/__tests__/posthog.test.ts b/packages/cli/src/posthog/__tests__/posthog.test.ts index 5c8fe282bf..5e11d24773 100644 --- a/packages/cli/src/posthog/__tests__/posthog.test.ts +++ b/packages/cli/src/posthog/__tests__/posthog.test.ts @@ -3,7 +3,6 @@ import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; import { PostHog } from 'posthog-node'; -import config from '@/config'; import { PostHogClient } from '@/posthog'; import { mockInstance } from '@test/mocking'; @@ -20,12 +19,11 @@ describe('PostHog', () => { const globalConfig = mock({ logging: { level: 'debug' } }); beforeAll(() => { - config.set('diagnostics.config.posthog.apiKey', apiKey); - config.set('diagnostics.config.posthog.apiHost', apiHost); + globalConfig.diagnostics.posthogConfig = { apiKey, apiHost }; }); beforeEach(() => { - config.set('diagnostics.enabled', true); + globalConfig.diagnostics.enabled = true; jest.resetAllMocks(); }); @@ -37,7 +35,7 @@ describe('PostHog', () => { }); it('does not initialize or track if diagnostics are not enabled', async () => { - config.set('diagnostics.enabled', false); + globalConfig.diagnostics.enabled = false; const ph = new PostHogClient(instanceSettings, globalConfig); await ph.init(); diff --git a/packages/cli/src/posthog/index.ts b/packages/cli/src/posthog/index.ts index 8dec9755b3..be025c8a85 100644 --- a/packages/cli/src/posthog/index.ts +++ b/packages/cli/src/posthog/index.ts @@ -4,7 +4,6 @@ import type { FeatureFlags, ITelemetryTrackProperties } from 'n8n-workflow'; import type { PostHog } from 'posthog-node'; import { Service } from 'typedi'; -import config from '@/config'; import type { PublicUser } from '@/interfaces'; @Service() @@ -17,14 +16,14 @@ export class PostHogClient { ) {} async init() { - const enabled = config.getEnv('diagnostics.enabled'); + const { enabled, posthogConfig } = this.globalConfig.diagnostics; if (!enabled) { return; } const { PostHog } = await import('posthog-node'); - this.postHog = new PostHog(config.getEnv('diagnostics.config.posthog.apiKey'), { - host: config.getEnv('diagnostics.config.posthog.apiHost'), + this.postHog = new PostHog(posthogConfig.apiKey, { + host: posthogConfig.apiHost, }); const logLevel = this.globalConfig.logging.level; diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index c3b8e69409..2f9a6a26e7 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -1,8 +1,12 @@ +import type { TaskRunnersConfig } from '@n8n/config'; import type { RunnerMessage, TaskResultData } from '@n8n/task-runner'; import { mock } from 'jest-mock-extended'; -import type { INodeTypeBaseDescription } from 'n8n-workflow'; +import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow'; + +import { Time } from '@/constants'; import { TaskRejectError } from '../errors'; +import type { RunnerLifecycleEvents } from '../runner-lifecycle-events'; import { TaskBroker } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; @@ -12,7 +16,7 @@ describe('TaskBroker', () => { let taskBroker: TaskBroker; beforeEach(() => { - taskBroker = new TaskBroker(mock()); + taskBroker = new TaskBroker(mock(), mock(), mock()); jest.restoreAllMocks(); }); @@ -707,4 +711,131 @@ describe('TaskBroker', () => { }); }); }); + + describe('task timeouts', () => { + let taskBroker: TaskBroker; + let config: TaskRunnersConfig; + let runnerLifecycleEvents = mock(); + + beforeAll(() => { + jest.useFakeTimers(); + config = mock({ taskTimeout: 30 }); + taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('on sending task, we should set up task timeout', async () => { + jest.spyOn(global, 'setTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const runner = mock({ id: runnerId }); + const runnerMessageCallback = jest.fn(); + + taskBroker.registerRunner(runner, runnerMessageCallback); + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId: 'requester1', taskType: 'test' }, + }); + + await taskBroker.sendTaskSettings(taskId, {}); + + expect(setTimeout).toHaveBeenCalledWith( + expect.any(Function), + config.taskTimeout * Time.seconds.toMilliseconds, + ); + }); + + it('on task completion, we should clear timeout', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const requesterCallback = jest.fn(); + + taskBroker.registerRequester(requesterId, requesterCallback); + taskBroker.setTasks({ + [taskId]: { + id: taskId, + runnerId, + requesterId, + taskType: 'test', + timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds), + }, + }); + + await taskBroker.taskDoneHandler(taskId, { result: [] }); + + expect(clearTimeout).toHaveBeenCalled(); + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + + it('on task error, we should clear timeout', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const requesterCallback = jest.fn(); + + taskBroker.registerRequester(requesterId, requesterCallback); + taskBroker.setTasks({ + [taskId]: { + id: taskId, + runnerId, + requesterId, + taskType: 'test', + timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds), + }, + }); + + await taskBroker.taskErrorHandler(taskId, new Error('Test error')); + + expect(clearTimeout).toHaveBeenCalled(); + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + + it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => { + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const runner = mock({ id: runnerId }); + const runnerCallback = jest.fn(); + const requesterCallback = jest.fn(); + + taskBroker.registerRunner(runner, runnerCallback); + taskBroker.registerRequester(requesterId, requesterCallback); + + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' }, + }); + + await taskBroker.sendTaskSettings(taskId, {}); + + jest.runAllTimers(); + + await Promise.resolve(); + + expect(runnerLifecycleEvents.emit).toHaveBeenCalledWith('runner:timed-out-during-task'); + + await Promise.resolve(); + + expect(clearTimeout).toHaveBeenCalled(); + + expect(requesterCallback).toHaveBeenCalledWith({ + type: 'broker:taskerror', + taskId, + error: new ApplicationError(`Task execution timed out after ${config.taskTimeout} seconds`), + }); + + await Promise.resolve(); + + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); + }); }); diff --git a/packages/cli/src/runners/__tests__/task-runner-process.test.ts b/packages/cli/src/runners/__tests__/task-runner-process.test.ts index 92e8483d03..9eeb8d69fc 100644 --- a/packages/cli/src/runners/__tests__/task-runner-process.test.ts +++ b/packages/cli/src/runners/__tests__/task-runner-process.test.ts @@ -7,6 +7,8 @@ import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.serv import { TaskRunnerProcess } from '@/runners/task-runner-process'; import { mockInstance } from '@test/mocking'; +import type { RunnerLifecycleEvents } from '../runner-lifecycle-events'; + const spawnMock = jest.fn(() => mock({ stdout: { @@ -25,7 +27,7 @@ describe('TaskRunnerProcess', () => { runnerConfig.enabled = true; runnerConfig.mode = 'internal_childprocess'; const authService = mock(); - let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); + let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); afterEach(async () => { spawnMock.mockClear(); @@ -35,15 +37,35 @@ describe('TaskRunnerProcess', () => { it('should throw if runner mode is external', () => { runnerConfig.mode = 'external'; - expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow(); + expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow(); runnerConfig.mode = 'internal_childprocess'; }); + + it('should register listener for `runner:failed-heartbeat-check` event', () => { + const runnerLifecycleEvents = mock(); + new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + + expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( + 'runner:failed-heartbeat-check', + expect.any(Function), + ); + }); + + it('should register listener for `runner:timed-out-during-task` event', () => { + const runnerLifecycleEvents = mock(); + new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents); + + expect(runnerLifecycleEvents.on).toHaveBeenCalledWith( + 'runner:timed-out-during-task', + expect.any(Function), + ); + }); }); describe('start', () => { beforeEach(() => { - taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService); + taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock()); }); test.each([ diff --git a/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts new file mode 100644 index 0000000000..223cdbdc54 --- /dev/null +++ b/packages/cli/src/runners/__tests__/task-runner-ws-server.test.ts @@ -0,0 +1,45 @@ +import type { TaskRunnersConfig } from '@n8n/config'; +import { mock } from 'jest-mock-extended'; + +import { Time } from '@/constants'; +import { TaskRunnerWsServer } from '@/runners/runner-ws-server'; + +describe('TaskRunnerWsServer', () => { + describe('heartbeat timer', () => { + it('should set up heartbeat timer on server start', async () => { + const setIntervalSpy = jest.spyOn(global, 'setInterval'); + + const server = new TaskRunnerWsServer( + mock(), + mock(), + mock(), + mock({ path: '/runners', heartbeatInterval: 30 }), + mock(), + ); + + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + 30 * Time.seconds.toMilliseconds, + ); + + await server.shutdown(); + }); + + it('should clear heartbeat timer on server stop', async () => { + jest.spyOn(global, 'setInterval'); + const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); + + const server = new TaskRunnerWsServer( + mock(), + mock(), + mock(), + mock({ path: '/runners', heartbeatInterval: 30 }), + mock(), + ); + + await server.shutdown(); + + expect(clearIntervalSpy).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts index e101c65e28..d61179372b 100644 --- a/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/runners/default-task-runner-disconnect-analyzer.ts @@ -1,8 +1,10 @@ import { Service } from 'typedi'; +import config from '@/config'; + import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error'; -import type { DisconnectAnalyzer } from './runner-types'; -import type { TaskRunner } from './task-broker.service'; +import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error'; +import type { DisconnectAnalyzer, DisconnectErrorOptions } from './runner-types'; /** * Analyzes the disconnect reason of a task runner to provide a more @@ -10,7 +12,16 @@ import type { TaskRunner } from './task-broker.service'; */ @Service() export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer { - async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { - return new TaskRunnerDisconnectedError(runnerId); + async toDisconnectError(opts: DisconnectErrorOptions): Promise { + const { reason, heartbeatInterval } = opts; + + if (reason === 'failed-heartbeat-check' && heartbeatInterval) { + return new TaskRunnerFailedHeartbeatError( + heartbeatInterval, + config.get('deployment.type') !== 'cloud', + ); + } + + return new TaskRunnerDisconnectedError(opts.runnerId ?? 'Unknown runner ID'); } } diff --git a/packages/cli/src/runners/errors/missing-auth-token.error.ts b/packages/cli/src/runners/errors/missing-auth-token.error.ts new file mode 100644 index 0000000000..3c99a09edb --- /dev/null +++ b/packages/cli/src/runners/errors/missing-auth-token.error.ts @@ -0,0 +1,7 @@ +export class MissingAuthTokenError extends Error { + constructor() { + super( + 'Missing auth token. When `N8N_RUNNERS_MODE` is `external`, it is required to set `N8N_RUNNERS_AUTH_TOKEN`. Its value should be a shared secret between the main instance and the launcher.', + ); + } +} diff --git a/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts b/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts new file mode 100644 index 0000000000..55b9448574 --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-failed-heartbeat.error.ts @@ -0,0 +1,32 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerFailedHeartbeatError extends ApplicationError { + description: string; + + constructor(heartbeatInterval: number, isSelfHosted: boolean) { + super('Task execution aborted because runner became unresponsive'); + + const subtitle = + 'The task runner failed to respond as expected, so it was considered unresponsive, and the task was aborted. You can try the following:'; + + const fixes = { + optimizeScript: + 'Optimize your script to prevent CPU-intensive operations, e.g. by breaking them down into smaller chunks or batch processing.', + ensureTermination: + 'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.', + increaseInterval: `If your task can reasonably keep the task runner busy for more than ${heartbeatInterval} ${heartbeatInterval === 1 ? 'second' : 'seconds'}, increase the heartbeat interval using the N8N_RUNNERS_HEARTBEAT_INTERVAL environment variable.`, + }; + + const suggestions = [fixes.optimizeScript, fixes.ensureTermination]; + + if (isSelfHosted) suggestions.push(fixes.increaseInterval); + + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle}

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/cli/src/runners/errors/task-runner-timeout.error.ts b/packages/cli/src/runners/errors/task-runner-timeout.error.ts new file mode 100644 index 0000000000..88f3533028 --- /dev/null +++ b/packages/cli/src/runners/errors/task-runner-timeout.error.ts @@ -0,0 +1,34 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskRunnerTimeoutError extends ApplicationError { + description: string; + + constructor(taskTimeout: number, isSelfHosted: boolean) { + super( + `Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`, + ); + + const subtitle = + 'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted. You can try the following:'; + + const fixes = { + optimizeScript: + 'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.', + ensureTermination: + 'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.', + increaseTimeout: `If your task can reasonably take more than ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}, increase the timeout using the N8N_RUNNERS_TASK_TIMEOUT environment variable.`, + }; + + const suggestions = [fixes.optimizeScript, fixes.ensureTermination]; + + if (isSelfHosted) suggestions.push(fixes.increaseTimeout); + + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle}

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts index e3b9520f77..e27f76b628 100644 --- a/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts +++ b/packages/cli/src/runners/internal-task-runner-disconnect-analyzer.ts @@ -5,8 +5,8 @@ import config from '@/config'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; import { TaskRunnerOomError } from './errors/task-runner-oom-error'; +import type { DisconnectErrorOptions } from './runner-types'; import { SlidingWindowSignal } from './sliding-window-signal'; -import type { TaskRunner } from './task-broker.service'; import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process'; import { TaskRunnerProcess } from './task-runner-process'; @@ -38,13 +38,13 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco }); } - async determineDisconnectReason(runnerId: TaskRunner['id']): Promise { + async toDisconnectError(opts: DisconnectErrorOptions): Promise { const exitCode = await this.awaitExitSignal(); if (exitCode === 'oom') { - return new TaskRunnerOomError(runnerId, this.isCloudDeployment); + return new TaskRunnerOomError(opts.runnerId ?? 'Unknown runner ID', this.isCloudDeployment); } - return await super.determineDisconnectReason(runnerId); + return await super.toDisconnectError(opts); } private async awaitExitSignal(): Promise { diff --git a/packages/cli/src/runners/runner-lifecycle-events.ts b/packages/cli/src/runners/runner-lifecycle-events.ts new file mode 100644 index 0000000000..8ea2da38b1 --- /dev/null +++ b/packages/cli/src/runners/runner-lifecycle-events.ts @@ -0,0 +1,11 @@ +import { Service } from 'typedi'; + +import { TypedEmitter } from '@/typed-emitter'; + +type RunnerLifecycleEventMap = { + 'runner:failed-heartbeat-check': never; + 'runner:timed-out-during-task': never; +}; + +@Service() +export class RunnerLifecycleEvents extends TypedEmitter {} diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index b373d3051e..132d688e98 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -6,7 +6,7 @@ import type { TaskRunner } from './task-broker.service'; import type { AuthlessRequest } from '../requests'; export interface DisconnectAnalyzer { - determineDisconnectReason(runnerId: TaskRunner['id']): Promise; + toDisconnectError(opts: DisconnectErrorOptions): Promise; } export type DataRequestType = 'input' | 'node' | 'all'; @@ -22,3 +22,11 @@ export interface TaskRunnerServerInitRequest } export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest }; + +export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown'; + +export type DisconnectErrorOptions = { + runnerId?: TaskRunner['id']; + reason?: DisconnectReason; + heartbeatInterval?: number; +}; diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index c691462558..27a0d779e7 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -1,12 +1,17 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { BrokerMessage, RunnerMessage } from '@n8n/task-runner'; +import { ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import type WebSocket from 'ws'; +import { Time } from '@/constants'; import { Logger } from '@/logging/logger.service'; import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; import type { DisconnectAnalyzer, + DisconnectReason, TaskRunnerServerInitRequest, TaskRunnerServerInitResponse, } from './runner-types'; @@ -20,11 +25,50 @@ function heartbeat(this: WebSocket) { export class TaskRunnerWsServer { runnerConnections: Map = new Map(); + private heartbeatTimer: NodeJS.Timer | undefined; + constructor( private readonly logger: Logger, private readonly taskBroker: TaskBroker, private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer, - ) {} + private readonly taskTunnersConfig: TaskRunnersConfig, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, + ) { + this.startHeartbeatChecks(); + } + + private startHeartbeatChecks() { + const { heartbeatInterval } = this.taskTunnersConfig; + + if (heartbeatInterval <= 0) { + throw new ApplicationError('Heartbeat interval must be greater than 0'); + } + + this.heartbeatTimer = setInterval(() => { + for (const [runnerId, connection] of this.runnerConnections.entries()) { + if (!connection.isAlive) { + void this.removeConnection(runnerId, 'failed-heartbeat-check'); + this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check'); + return; + } + connection.isAlive = false; + connection.ping(); + } + }, heartbeatInterval * Time.seconds.toMilliseconds); + } + + async shutdown() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = undefined; + } + + await Promise.all( + Array.from(this.runnerConnections.keys()).map( + async (id) => await this.removeConnection(id, 'shutting-down'), + ), + ); + } setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) { this.disconnectAnalyzer = disconnectAnalyzer; @@ -97,11 +141,15 @@ export class TaskRunnerWsServer { ); } - async removeConnection(id: TaskRunner['id']) { + async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') { const connection = this.runnerConnections.get(id); if (connection) { - const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id); - this.taskBroker.deregisterRunner(id, disconnectReason); + const disconnectError = await this.disconnectAnalyzer.toDisconnectError({ + runnerId: id, + reason, + heartbeatInterval: this.taskTunnersConfig.heartbeatInterval, + }); + this.taskBroker.deregisterRunner(id, disconnectError); connection.close(); this.runnerConnections.delete(id); } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 22cc6e8548..af76fb6cac 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -1,3 +1,4 @@ +import { TaskRunnersConfig } from '@n8n/config'; import type { BrokerMessage, RequesterMessage, @@ -8,9 +9,13 @@ import { ApplicationError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { Service } from 'typedi'; +import config from '@/config'; +import { Time } from '@/constants'; import { Logger } from '@/logging/logger.service'; import { TaskDeferredError, TaskRejectError } from './errors'; +import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; export interface TaskRunner { id: string; @@ -24,6 +29,7 @@ export interface Task { runnerId: TaskRunner['id']; requesterId: string; taskType: string; + timeout?: NodeJS.Timeout; } export interface TaskOffer { @@ -80,7 +86,15 @@ export class TaskBroker { private pendingTaskRequests: TaskRequest[] = []; - constructor(private readonly logger: Logger) {} + constructor( + private readonly logger: Logger, + private readonly taskRunnersConfig: TaskRunnersConfig, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, + ) { + if (this.taskRunnersConfig.taskTimeout <= 0) { + throw new ApplicationError('Task timeout must be greater than 0'); + } + } expireTasks() { const now = process.hrtime.bigint(); @@ -426,6 +440,14 @@ export class TaskBroker { async sendTaskSettings(taskId: Task['id'], settings: unknown) { const runner = await this.getRunnerOrFailTask(taskId); + + const task = this.tasks.get(taskId); + if (!task) return; + + task.timeout = setTimeout(async () => { + await this.handleTaskTimeout(taskId); + }, this.taskRunnersConfig.taskTimeout * Time.seconds.toMilliseconds); + await this.messageRunner(runner.id, { type: 'broker:tasksettings', taskId, @@ -433,11 +455,27 @@ export class TaskBroker { }); } + private async handleTaskTimeout(taskId: Task['id']) { + const task = this.tasks.get(taskId); + if (!task) return; + + this.runnerLifecycleEvents.emit('runner:timed-out-during-task'); + + await this.taskErrorHandler( + taskId, + new TaskRunnerTimeoutError( + this.taskRunnersConfig.taskTimeout, + config.getEnv('deployment.type') !== 'cloud', + ), + ); + } + async taskDoneHandler(taskId: Task['id'], data: TaskResultData) { const task = this.tasks.get(taskId); - if (!task) { - return; - } + if (!task) return; + + clearTimeout(task.timeout); + await this.requesters.get(task.requesterId)?.({ type: 'broker:taskdone', taskId: task.id, @@ -448,9 +486,10 @@ export class TaskBroker { async taskErrorHandler(taskId: Task['id'], error: unknown) { const task = this.tasks.get(taskId); - if (!task) { - return; - } + if (!task) return; + + clearTimeout(task.timeout); + await this.requesters.get(task.requesterId)?.({ type: 'broker:taskerror', taskId: task.id, diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts index fe476ad341..612f3d4fc1 100644 --- a/packages/cli/src/runners/task-runner-module.ts +++ b/packages/cli/src/runners/task-runner-module.ts @@ -4,6 +4,7 @@ import Container, { Service } from 'typedi'; import type { TaskRunnerProcess } from '@/runners/task-runner-process'; +import { MissingAuthTokenError } from './errors/missing-auth-token.error'; import { TaskRunnerWsServer } from './runner-ws-server'; import type { LocalTaskManager } from './task-managers/local-task-manager'; import type { TaskRunnerServer } from './task-runner-server'; @@ -28,13 +29,14 @@ export class TaskRunnerModule { async start() { a.ok(this.runnerConfig.enabled, 'Task runner is disabled'); + const { mode, authToken } = this.runnerConfig; + + if (mode === 'external' && !authToken) throw new MissingAuthTokenError(); + await this.loadTaskManager(); await this.loadTaskRunnerServer(); - if ( - this.runnerConfig.mode === 'internal_childprocess' || - this.runnerConfig.mode === 'internal_launcher' - ) { + if (mode === 'internal_childprocess' || mode === 'internal_launcher') { await this.startInternalTaskRunner(); } } diff --git a/packages/cli/src/runners/task-runner-process.ts b/packages/cli/src/runners/task-runner-process.ts index ba63cbe9e7..3129fcb524 100644 --- a/packages/cli/src/runners/task-runner-process.ts +++ b/packages/cli/src/runners/task-runner-process.ts @@ -10,6 +10,7 @@ import { Logger } from '@/logging/logger.service'; import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; import { forwardToLogger } from './forward-to-logger'; import { NodeProcessOomDetector } from './node-process-oom-detector'; +import { RunnerLifecycleEvents } from './runner-lifecycle-events'; import { TypedEmitter } from '../typed-emitter'; type ChildProcess = ReturnType; @@ -70,6 +71,7 @@ export class TaskRunnerProcess extends TypedEmitter { logger: Logger, private readonly runnerConfig: TaskRunnersConfig, private readonly authService: TaskRunnerAuthService, + private readonly runnerLifecycleEvents: RunnerLifecycleEvents, ) { super(); @@ -79,6 +81,16 @@ export class TaskRunnerProcess extends TypedEmitter { ); this.logger = logger.scoped('task-runner'); + + this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => { + this.logger.warn('Task runner failed heartbeat check, restarting...'); + void this.forceRestart(); + }); + + this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => { + this.logger.warn('Task runner timed out during task, restarting...'); + void this.forceRestart(); + }); } async start() { @@ -116,9 +128,7 @@ export class TaskRunnerProcess extends TypedEmitter { @OnShutdown() async stop() { - if (!this.process) { - return; - } + if (!this.process) return; this.isShuttingDown = true; @@ -133,10 +143,22 @@ export class TaskRunnerProcess extends TypedEmitter { this.isShuttingDown = false; } - killNode() { - if (!this.process) { - return; + /** Force-restart a runner suspected of being unresponsive. */ + async forceRestart() { + if (!this.process) return; + + if (this.useLauncher) { + await this.killLauncher(); // @TODO: Implement SIGKILL in launcher + } else { + this.process.kill('SIGKILL'); } + + await this._runPromise; + } + + killNode() { + if (!this.process) return; + this.process.kill(); } @@ -173,7 +195,6 @@ export class TaskRunnerProcess extends TypedEmitter { this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }); resolveFn(); - // If we are not shutting down, restart the process if (!this.isShuttingDown) { setImmediate(async () => await this.start()); } diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index 56c56e02ae..eb428b52fa 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -44,7 +44,7 @@ export class TaskRunnerServer { private readonly logger: Logger, private readonly globalConfig: GlobalConfig, private readonly taskRunnerAuthController: TaskRunnerAuthController, - private readonly taskRunnerService: TaskRunnerWsServer, + private readonly taskRunnerWsServer: TaskRunnerWsServer, ) { this.app = express(); this.app.disable('x-powered-by'); @@ -148,7 +148,7 @@ export class TaskRunnerServer { // eslint-disable-next-line @typescript-eslint/unbound-method this.taskRunnerAuthController.authMiddleware, (req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) => - this.taskRunnerService.handleRequest(req, res), + this.taskRunnerWsServer.handleRequest(req, res), ); const authEndpoint = `${this.getEndpointBasePath()}/auth`; @@ -181,7 +181,10 @@ export class TaskRunnerServer { const response = new ServerResponse(request); response.writeHead = (statusCode) => { - if (statusCode > 200) ws.close(); + if (statusCode > 200) { + this.logger.error(`Task runner connection attempt failed with status code ${statusCode}`); + ws.close(); + } return response; }; diff --git a/packages/cli/src/security-audit/risk-reporters/instance-risk-reporter.ts b/packages/cli/src/security-audit/risk-reporters/instance-risk-reporter.ts index b0d6ccfad3..fafad308ad 100644 --- a/packages/cli/src/security-audit/risk-reporters/instance-risk-reporter.ts +++ b/packages/cli/src/security-audit/risk-reporters/instance-risk-reporter.ts @@ -103,7 +103,7 @@ export class InstanceRiskReporter implements RiskReporter { }; settings.telemetry = { - diagnosticsEnabled: config.getEnv('diagnostics.enabled'), + diagnosticsEnabled: this.globalConfig.diagnostics.enabled, }; return settings; diff --git a/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts b/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts index fdecb7ae5a..6d28dbe563 100644 --- a/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts +++ b/packages/cli/src/services/__tests__/workflow-statistics.service.test.ts @@ -39,7 +39,7 @@ describe('WorkflowStatisticsService', () => { }); Object.assign(entityManager, { connection: dataSource }); - config.set('diagnostics.enabled', true); + globalConfig.diagnostics.enabled = true; config.set('deployment.type', 'n8n-testing'); mocked(ownershipService.getWorkflowProjectCached).mockResolvedValue(fakeProject); mocked(ownershipService.getPersonalProjectOwnerCached).mockResolvedValue(fakeUser); diff --git a/packages/cli/src/services/frontend.service.ts b/packages/cli/src/services/frontend.service.ts index 081ef5fd4a..db77bd418d 100644 --- a/packages/cli/src/services/frontend.service.ts +++ b/packages/cli/src/services/frontend.service.ts @@ -66,11 +66,11 @@ export class FrontendService { const restEndpoint = this.globalConfig.endpoints.rest; const telemetrySettings: ITelemetrySettings = { - enabled: config.getEnv('diagnostics.enabled'), + enabled: this.globalConfig.diagnostics.enabled, }; if (telemetrySettings.enabled) { - const conf = config.getEnv('diagnostics.config.frontend'); + const conf = this.globalConfig.diagnostics.frontendConfig; const [key, url] = conf.split(';'); if (!key || !url) { @@ -122,15 +122,15 @@ export class FrontendService { instanceId: this.instanceSettings.instanceId, telemetry: telemetrySettings, posthog: { - enabled: config.getEnv('diagnostics.enabled'), - apiHost: config.getEnv('diagnostics.config.posthog.apiHost'), - apiKey: config.getEnv('diagnostics.config.posthog.apiKey'), + enabled: this.globalConfig.diagnostics.enabled, + apiHost: this.globalConfig.diagnostics.posthogConfig.apiHost, + apiKey: this.globalConfig.diagnostics.posthogConfig.apiKey, autocapture: false, disableSessionRecording: config.getEnv('deployment.type') !== 'cloud', debug: this.globalConfig.logging.level === 'debug', }, personalizationSurveyEnabled: - config.getEnv('personalization.enabled') && config.getEnv('diagnostics.enabled'), + config.getEnv('personalization.enabled') && this.globalConfig.diagnostics.enabled, defaultLocale: config.getEnv('defaultLocale'), userManagement: { quota: this.license.getUsersLimit(), diff --git a/packages/cli/src/telemetry/__tests__/telemetry.test.ts b/packages/cli/src/telemetry/__tests__/telemetry.test.ts index 04a6cecfca..d0f9fae3cd 100644 --- a/packages/cli/src/telemetry/__tests__/telemetry.test.ts +++ b/packages/cli/src/telemetry/__tests__/telemetry.test.ts @@ -21,6 +21,10 @@ describe('Telemetry', () => { const instanceId = 'Telemetry unit test'; const testDateTime = new Date('2022-01-01 00:00:00'); const instanceSettings = mockInstance(InstanceSettings, { instanceId }); + const globalConfig = mock({ + diagnostics: { enabled: true }, + logging: { level: 'info', outputs: ['console'] }, + }); beforeAll(() => { // @ts-expect-error Spying on private method @@ -28,7 +32,6 @@ describe('Telemetry', () => { jest.useFakeTimers(); jest.setSystemTime(testDateTime); - config.set('diagnostics.enabled', true); config.set('deployment.type', 'n8n-testing'); }); @@ -45,14 +48,7 @@ describe('Telemetry', () => { const postHog = new PostHogClient(instanceSettings, mock()); await postHog.init(); - telemetry = new Telemetry( - mock(), - postHog, - mock(), - instanceSettings, - mock(), - mock({ logging: { level: 'info', outputs: ['console'] } }), - ); + telemetry = new Telemetry(mock(), postHog, mock(), instanceSettings, mock(), globalConfig); // @ts-expect-error Assigning to private property telemetry.rudderStack = mockRudderStack; }); diff --git a/packages/cli/src/telemetry/index.ts b/packages/cli/src/telemetry/index.ts index d9a8e590f4..a8d39d898e 100644 --- a/packages/cli/src/telemetry/index.ts +++ b/packages/cli/src/telemetry/index.ts @@ -5,7 +5,6 @@ import { InstanceSettings } from 'n8n-core'; import type { ITelemetryTrackProperties } from 'n8n-workflow'; import { Container, Service } from 'typedi'; -import config from '@/config'; import { LOWEST_SHUTDOWN_PRIORITY, N8N_VERSION } from '@/constants'; import { ProjectRelationRepository } from '@/databases/repositories/project-relation.repository'; import { ProjectRepository } from '@/databases/repositories/project.repository'; @@ -54,10 +53,9 @@ export class Telemetry { ) {} async init() { - const enabled = config.getEnv('diagnostics.enabled'); + const { enabled, backendConfig } = this.globalConfig.diagnostics; if (enabled) { - const conf = config.getEnv('diagnostics.config.backend'); - const [key, dataPlaneUrl] = conf.split(';'); + const [key, dataPlaneUrl] = backendConfig.split(';'); if (!key || !dataPlaneUrl) { this.logger.warn('Diagnostics backend config is invalid'); diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 72628b8351..6110584f7e 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -464,6 +464,11 @@ export async function executeWebhook( projectId: project?.id, }; + // When resuming from a wait node, copy over the pushRef from the execution-data + if (!runData.pushRef) { + runData.pushRef = runExecutionData.pushRef; + } + let responsePromise: IDeferredPromise | undefined; if (responseMode === 'responseNode') { responsePromise = createDeferredPromise(); diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 08d6ba09e4..97322f4fe0 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -307,7 +307,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { }, ], workflowExecuteAfter: [ - async function (this: WorkflowHooks): Promise { + async function (this: WorkflowHooks, fullRunData: IRun): Promise { const { pushRef, executionId } = this; if (pushRef === undefined) return; @@ -318,7 +318,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId, }); - pushInstance.send('executionFinished', { executionId }, pushRef); + const pushType = + fullRunData.status === 'waiting' ? 'executionWaiting' : 'executionFinished'; + pushInstance.send(pushType, { executionId }, pushRef); }, ], }; @@ -430,22 +432,21 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { (executionStatus === 'success' && !saveSettings.success) || (executionStatus !== 'success' && !saveSettings.error); - if (shouldNotSave && !fullRunData.waitTill) { - if (!fullRunData.waitTill && !isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); + if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { + executeErrorWorkflow( + this.workflowData, + fullRunData, + this.mode, + this.executionId, + this.retryOf, + ); - return; - } + await Container.get(ExecutionRepository).hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + + return; } // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive @@ -1110,6 +1111,9 @@ export function getWorkflowHooksWorkerMain( hookFunctions.nodeExecuteAfter = []; hookFunctions.workflowExecuteAfter = [ async function (this: WorkflowHooks, fullRunData: IRun): Promise { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; + const executionStatus = determineFinalExecutionStatus(fullRunData); const saveSettings = toSaveSettings(this.workflowData.settings); diff --git a/packages/cli/templates/form-trigger.handlebars b/packages/cli/templates/form-trigger.handlebars index 57d93cb291..02611f5b5b 100644 --- a/packages/cli/templates/form-trigger.handlebars +++ b/packages/cli/templates/form-trigger.handlebars @@ -740,14 +740,6 @@ } return; - }).then(() => { - window.addEventListener('storage', function(event) { - if (event.key === 'n8n_redirect_to_next_form_test_page' && event.newValue) { - const newUrl = event.newValue; - localStorage.removeItem('n8n_redirect_to_next_form_test_page'); - window.location.replace(newUrl); - } - }); }) .catch(function (error) { console.error('Error:', error); diff --git a/packages/cli/test/integration/runners/task-runner-module.external.test.ts b/packages/cli/test/integration/runners/task-runner-module.external.test.ts index 4974abfb39..bdabdf56ae 100644 --- a/packages/cli/test/integration/runners/task-runner-module.external.test.ts +++ b/packages/cli/test/integration/runners/task-runner-module.external.test.ts @@ -1,6 +1,7 @@ import { TaskRunnersConfig } from '@n8n/config'; import Container from 'typedi'; +import { MissingAuthTokenError } from '@/runners/errors/missing-auth-token.error'; import { TaskRunnerModule } from '@/runners/task-runner-module'; import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/runners/default-task-runner-disconnect-analyzer'; @@ -10,6 +11,7 @@ describe('TaskRunnerModule in external mode', () => { const runnerConfig = Container.get(TaskRunnersConfig); runnerConfig.mode = 'external'; runnerConfig.port = 0; + runnerConfig.authToken = 'test'; const module = Container.get(TaskRunnerModule); afterEach(async () => { @@ -24,6 +26,17 @@ describe('TaskRunnerModule in external mode', () => { await expect(module.start()).rejects.toThrow('Task runner is disabled'); }); + it('should throw if auth token is missing', async () => { + const runnerConfig = new TaskRunnersConfig(); + runnerConfig.mode = 'external'; + runnerConfig.enabled = true; + runnerConfig.authToken = ''; + + const module = new TaskRunnerModule(runnerConfig); + + await expect(module.start()).rejects.toThrowError(MissingAuthTokenError); + }); + it('should start the task runner', async () => { runnerConfig.enabled = true; diff --git a/packages/cli/test/integration/security-audit/instance-risk-reporter.test.ts b/packages/cli/test/integration/security-audit/instance-risk-reporter.test.ts index 928667b518..58a2a2c9a8 100644 --- a/packages/cli/test/integration/security-audit/instance-risk-reporter.test.ts +++ b/packages/cli/test/integration/security-audit/instance-risk-reporter.test.ts @@ -1,9 +1,9 @@ +import { GlobalConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import { NodeConnectionType } from 'n8n-workflow'; import Container from 'typedi'; import { v4 as uuid } from 'uuid'; -import config from '@/config'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { generateNanoId } from '@/databases/utils/generators'; import { INSTANCE_REPORT, WEBHOOK_VALIDATOR_NODE_TYPES } from '@/security-audit/constants'; @@ -239,8 +239,7 @@ test('should not report outdated instance when up to date', async () => { }); test('should report security settings', async () => { - config.set('diagnostics.enabled', true); - + Container.get(GlobalConfig).diagnostics.enabled = true; const testAudit = await securityAuditService.run(['instance']); const section = getRiskSection( diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 2ae12908ac..c6e0316038 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -916,7 +916,6 @@ export class WorkflowExecute { let nodeSuccessData: INodeExecutionData[][] | null | undefined; let runIndex: number; let startTime: number; - let taskData: ITaskData; if (this.runExecutionData.startData === undefined) { this.runExecutionData.startData = {}; @@ -1446,13 +1445,13 @@ export class WorkflowExecute { this.runExecutionData.resultData.runData[executionNode.name] = []; } - taskData = { + const taskData: ITaskData = { hints: executionHints, startTime, executionTime: new Date().getTime() - startTime, source: !executionData.source ? [] : executionData.source.main, metadata: executionData.metadata, - executionStatus: 'success', + executionStatus: this.runExecutionData.waitTill ? 'waiting' : 'success', }; if (executionError !== undefined) { diff --git a/packages/editor-ui/src/components/InputPanel.vue b/packages/editor-ui/src/components/InputPanel.vue index 169c61de46..a3ddc8ed64 100644 --- a/packages/editor-ui/src/components/InputPanel.vue +++ b/packages/editor-ui/src/components/InputPanel.vue @@ -212,7 +212,10 @@ const activeNodeType = computed(() => { return nodeTypesStore.getNodeType(activeNode.value.type, activeNode.value.typeVersion); }); -const waitingMessage = computed(() => waitingNodeTooltip()); +const waitingMessage = computed(() => { + const parentNode = parentNodes.value[0]; + return parentNode && waitingNodeTooltip(workflowsStore.getNodeByName(parentNode.name)); +}); watch( inputMode, diff --git a/packages/editor-ui/src/components/NodeExecuteButton.vue b/packages/editor-ui/src/components/NodeExecuteButton.vue index 43b4dfa7dc..f801f0701c 100644 --- a/packages/editor-ui/src/components/NodeExecuteButton.vue +++ b/packages/editor-ui/src/components/NodeExecuteButton.vue @@ -65,7 +65,7 @@ const lastPopupCountUpdate = ref(0); const codeGenerationInProgress = ref(false); const router = useRouter(); -const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution } = useRunWorkflow({ router }); +const { runWorkflow, stopCurrentExecution } = useRunWorkflow({ router }); const workflowsStore = useWorkflowsStore(); const externalHooks = useExternalHooks(); @@ -353,17 +353,10 @@ async function onClick() { telemetry.track('User clicked execute node button', telemetryPayload); await externalHooks.run('nodeExecuteButton.onClick', telemetryPayload); - if (workflowsStore.isWaitingExecution) { - await runWorkflowResolvePending({ - destinationNode: props.nodeName, - source: 'RunData.ExecuteNodeButton', - }); - } else { - await runWorkflow({ - destinationNode: props.nodeName, - source: 'RunData.ExecuteNodeButton', - }); - } + await runWorkflow({ + destinationNode: props.nodeName, + source: 'RunData.ExecuteNodeButton', + }); emit('execute'); } diff --git a/packages/editor-ui/src/components/OutputPanel.vue b/packages/editor-ui/src/components/OutputPanel.vue index 0503ad5c94..0ca7e3830e 100644 --- a/packages/editor-ui/src/components/OutputPanel.vue +++ b/packages/editor-ui/src/components/OutputPanel.vue @@ -352,7 +352,7 @@ const activatePane = () => {