mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
refactor(core): Validate job data on worker (#12548)
This commit is contained in:
parent
e2b6fa4d3d
commit
b8d5847592
|
@ -7,8 +7,8 @@ import {
|
||||||
jsonStringify,
|
jsonStringify,
|
||||||
ErrorReporterProxy,
|
ErrorReporterProxy,
|
||||||
ensureError,
|
ensureError,
|
||||||
|
type IExecuteResponsePromiseData,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
|
|
||||||
import { strict } from 'node:assert';
|
import { strict } from 'node:assert';
|
||||||
import Container, { Service } from 'typedi';
|
import Container, { Service } from 'typedi';
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
|
||||||
import { EventService } from '@/events/event.service';
|
import { EventService } from '@/events/event.service';
|
||||||
import { Logger } from '@/logging/logger.service';
|
import { Logger } from '@/logging/logger.service';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
import { assertNever } from '@/utils';
|
import { assertNever, isObjectLiteral } from '@/utils';
|
||||||
|
|
||||||
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
|
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
|
||||||
import { JobProcessor } from './job-processor';
|
import { JobProcessor } from './job-processor';
|
||||||
|
@ -91,6 +91,12 @@ export class ScalingService {
|
||||||
|
|
||||||
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
|
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
|
||||||
try {
|
try {
|
||||||
|
if (!this.hasValidJobData(job)) {
|
||||||
|
throw new ApplicationError('Worker received invalid job', {
|
||||||
|
extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
await this.jobProcessor.processJob(job);
|
await this.jobProcessor.processJob(job);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await this.reportJobProcessingError(ensureError(error), job);
|
await this.reportJobProcessingError(ensureError(error), job);
|
||||||
|
@ -493,5 +499,9 @@ export class ScalingService {
|
||||||
: jsonStringify(error, { replaceCircularRefs: true });
|
: jsonStringify(error, { replaceCircularRefs: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private hasValidJobData(job: Job) {
|
||||||
|
return isObjectLiteral(job.data) && 'executionId' in job.data && 'loadStaticData' in job.data;
|
||||||
|
}
|
||||||
|
|
||||||
// #endregion
|
// #endregion
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue