diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 536e835c72..f20d0764c6 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,6 +1,6 @@ import { GlobalConfig } from '@n8n/config'; import { Container, Service } from '@n8n/di'; -import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core'; +import { ErrorReporter, InstanceSettings, isObjectLiteral, Logger } from 'n8n-core'; import { ApplicationError, BINARY_ENCODING, @@ -93,6 +93,12 @@ export class ScalingService { void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { try { + if (!this.hasValidJobData(job)) { + throw new ApplicationError('Worker received invalid job', { + extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) }, + }); + } + await this.jobProcessor.processJob(job); } catch (error) { await this.reportJobProcessingError(ensureError(error), job); @@ -503,5 +509,9 @@ export class ScalingService { : jsonStringify(error, { replaceCircularRefs: true }); } + private hasValidJobData(job: Job) { + return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data; + } + // #endregion }