mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
refactor(core): Validate job data on worker (#12548)
This commit is contained in:
parent
176c068524
commit
b4ae44d9ce
|
@ -1,6 +1,6 @@
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import { Container, Service } from '@n8n/di';
|
import { Container, Service } from '@n8n/di';
|
||||||
import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core';
|
import { ErrorReporter, InstanceSettings, isObjectLiteral, Logger } from 'n8n-core';
|
||||||
import {
|
import {
|
||||||
ApplicationError,
|
ApplicationError,
|
||||||
BINARY_ENCODING,
|
BINARY_ENCODING,
|
||||||
|
@ -93,6 +93,12 @@ export class ScalingService {
|
||||||
|
|
||||||
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
|
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
|
||||||
try {
|
try {
|
||||||
|
if (!this.hasValidJobData(job)) {
|
||||||
|
throw new ApplicationError('Worker received invalid job', {
|
||||||
|
extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
await this.jobProcessor.processJob(job);
|
await this.jobProcessor.processJob(job);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await this.reportJobProcessingError(ensureError(error), job);
|
await this.reportJobProcessingError(ensureError(error), job);
|
||||||
|
@ -503,5 +509,9 @@ export class ScalingService {
|
||||||
: jsonStringify(error, { replaceCircularRefs: true });
|
: jsonStringify(error, { replaceCircularRefs: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private hasValidJobData(job: Job) {
|
||||||
|
return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data;
|
||||||
|
}
|
||||||
|
|
||||||
// #endregion
|
// #endregion
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue