From 563a0fc446f0e6fd64e10ee473a40fd5333081d2 Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Fri, 4 Oct 2024 15:36:04 +0200 Subject: [PATCH] fix(core): Print errors that happen before the execution starts on the worker instead of just on the main instance (#11099) --- packages/cli/src/scaling/scaling.service.ts | 29 ++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index c670c52b99..75f1ff5833 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,6 +1,12 @@ import { GlobalConfig } from '@n8n/config'; import { InstanceSettings } from 'n8n-core'; -import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify } from 'n8n-workflow'; +import { + ApplicationError, + BINARY_ENCODING, + sleep, + jsonStringify, + ErrorReporterProxy, +} from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import { strict } from 'node:assert'; import Container, { Service } from 'typedi'; @@ -78,11 +84,22 @@ export class ScalingService { this.assertWorker(); this.assertQueue(); - void this.queue.process( - JOB_TYPE_NAME, - concurrency, - async (job: Job) => await this.jobProcessor.processJob(job), - ); + void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { + try { + await this.jobProcessor.processJob(job); + } catch (error: unknown) { + // Errors thrown here will be sent to the main instance by bull. Logging + // them out and rethrowing them allows to find out which worker had the + // issue. + this.logger.error('[ScalingService] Executing a job errored', { + jobId: job.id, + executionId: job.data.executionId, + error, + }); + ErrorReporterProxy.error(error); + throw error; + } + }); this.logger.debug('[ScalingService] Worker setup completed'); }