mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 20:24:05 -08:00
fix(core): Do not load ScalingService in regular mode (no-changelog) (#10333)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
This commit is contained in:
parent
eef4fb8bb4
commit
1869c396f3
|
@ -29,7 +29,7 @@ import { ExternalHooks } from '@/ExternalHooks';
|
||||||
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
||||||
import { NodeTypes } from '@/NodeTypes';
|
import { NodeTypes } from '@/NodeTypes';
|
||||||
import type { Job, JobData, JobResult } from '@/scaling/types';
|
import type { Job, JobData, JobResult } from '@/scaling/types';
|
||||||
import { ScalingService } from '@/scaling/scaling.service';
|
import type { ScalingService } from '@/scaling/scaling.service';
|
||||||
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
||||||
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
||||||
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
||||||
|
@ -40,7 +40,7 @@ import { EventService } from './events/event.service';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class WorkflowRunner {
|
export class WorkflowRunner {
|
||||||
private readonly scalingService: ScalingService;
|
private scalingService: ScalingService;
|
||||||
|
|
||||||
private executionsMode = config.getEnv('executions.mode');
|
private executionsMode = config.getEnv('executions.mode');
|
||||||
|
|
||||||
|
@ -53,11 +53,7 @@ export class WorkflowRunner {
|
||||||
private readonly nodeTypes: NodeTypes,
|
private readonly nodeTypes: NodeTypes,
|
||||||
private readonly permissionChecker: PermissionChecker,
|
private readonly permissionChecker: PermissionChecker,
|
||||||
private readonly eventService: EventService,
|
private readonly eventService: EventService,
|
||||||
) {
|
) {}
|
||||||
if (this.executionsMode === 'queue') {
|
|
||||||
this.scalingService = Container.get(ScalingService);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** The process did error */
|
/** The process did error */
|
||||||
async processError(
|
async processError(
|
||||||
|
@ -360,6 +356,11 @@ export class WorkflowRunner {
|
||||||
loadStaticData: !!loadStaticData,
|
loadStaticData: !!loadStaticData,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (!this.scalingService) {
|
||||||
|
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||||
|
this.scalingService = Container.get(ScalingService);
|
||||||
|
}
|
||||||
|
|
||||||
let priority = 100;
|
let priority = 100;
|
||||||
if (realtime === true) {
|
if (realtime === true) {
|
||||||
// Jobs which require a direct response get a higher priority
|
// Jobs which require a direct response get a higher priority
|
||||||
|
@ -404,7 +405,7 @@ export class WorkflowRunner {
|
||||||
async (resolve, reject, onCancel) => {
|
async (resolve, reject, onCancel) => {
|
||||||
onCancel.shouldReject = false;
|
onCancel.shouldReject = false;
|
||||||
onCancel(async () => {
|
onCancel(async () => {
|
||||||
await Container.get(ScalingService).stopJob(job);
|
await this.scalingService.stopJob(job);
|
||||||
|
|
||||||
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
|
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
|
||||||
// "workflowExecuteAfter" which we require.
|
// "workflowExecuteAfter" which we require.
|
||||||
|
|
|
@ -4,7 +4,6 @@ import { ApplicationError } from 'n8n-workflow';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||||
import { ScalingService } from '@/scaling/scaling.service';
|
|
||||||
import { WebhookServer } from '@/webhooks/WebhookServer';
|
import { WebhookServer } from '@/webhooks/WebhookServer';
|
||||||
import { BaseCommand } from './BaseCommand';
|
import { BaseCommand } from './BaseCommand';
|
||||||
|
|
||||||
|
@ -96,6 +95,7 @@ export class Webhook extends BaseCommand {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||||
await Container.get(ScalingService).setupQueue();
|
await Container.get(ScalingService).setupQueue();
|
||||||
await this.server.start();
|
await this.server.start();
|
||||||
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
|
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
|
||||||
|
|
|
@ -7,7 +7,7 @@ import { sleep, ApplicationError } from 'n8n-workflow';
|
||||||
import * as Db from '@/Db';
|
import * as Db from '@/Db';
|
||||||
import * as ResponseHelper from '@/ResponseHelper';
|
import * as ResponseHelper from '@/ResponseHelper';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { ScalingService } from '@/scaling/scaling.service';
|
import type { ScalingService } from '@/scaling/scaling.service';
|
||||||
import { N8N_VERSION, inTest } from '@/constants';
|
import { N8N_VERSION, inTest } from '@/constants';
|
||||||
import type { ICredentialsOverwrite } from '@/Interfaces';
|
import type { ICredentialsOverwrite } from '@/Interfaces';
|
||||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||||
|
@ -171,6 +171,7 @@ export class Worker extends BaseCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
async initScalingService() {
|
async initScalingService() {
|
||||||
|
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||||
this.scalingService = Container.get(ScalingService);
|
this.scalingService = Container.get(ScalingService);
|
||||||
|
|
||||||
await this.scalingService.setupQueue();
|
await this.scalingService.setupQueue();
|
||||||
|
|
|
@ -6,15 +6,16 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err
|
||||||
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
|
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
|
||||||
import type { ActiveExecutions } from '@/ActiveExecutions';
|
import type { ActiveExecutions } from '@/ActiveExecutions';
|
||||||
import type { IExecutionResponse } from '@/Interfaces';
|
import type { IExecutionResponse } from '@/Interfaces';
|
||||||
import type { ScalingService } from '@/scaling/scaling.service';
|
import { ScalingService } from '@/scaling/scaling.service';
|
||||||
import type { WaitTracker } from '@/WaitTracker';
|
import type { WaitTracker } from '@/WaitTracker';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import type { ExecutionRequest } from '@/executions/execution.types';
|
import type { ExecutionRequest } from '@/executions/execution.types';
|
||||||
import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
||||||
import type { Job } from '@/scaling/types';
|
import type { Job } from '@/scaling/types';
|
||||||
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
describe('ExecutionService', () => {
|
describe('ExecutionService', () => {
|
||||||
const scalingService = mock<ScalingService>();
|
const scalingService = mockInstance(ScalingService);
|
||||||
const activeExecutions = mock<ActiveExecutions>();
|
const activeExecutions = mock<ActiveExecutions>();
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const waitTracker = mock<WaitTracker>();
|
const waitTracker = mock<WaitTracker>();
|
||||||
|
@ -23,7 +24,6 @@ describe('ExecutionService', () => {
|
||||||
const executionService = new ExecutionService(
|
const executionService = new ExecutionService(
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
scalingService,
|
|
||||||
activeExecutions,
|
activeExecutions,
|
||||||
executionRepository,
|
executionRepository,
|
||||||
mock(),
|
mock(),
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { Service } from 'typedi';
|
import { Container, Service } from 'typedi';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import { validate as jsonSchemaValidate } from 'jsonschema';
|
import { validate as jsonSchemaValidate } from 'jsonschema';
|
||||||
import type {
|
import type {
|
||||||
|
@ -24,7 +24,6 @@ import type {
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
} from '@/Interfaces';
|
} from '@/Interfaces';
|
||||||
import { NodeTypes } from '@/NodeTypes';
|
import { NodeTypes } from '@/NodeTypes';
|
||||||
import { ScalingService } from '@/scaling/scaling.service';
|
|
||||||
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
|
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
|
||||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
|
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
|
||||||
|
@ -85,7 +84,6 @@ export class ExecutionService {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly globalConfig: GlobalConfig,
|
private readonly globalConfig: GlobalConfig,
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly scalingService: ScalingService,
|
|
||||||
private readonly activeExecutions: ActiveExecutions,
|
private readonly activeExecutions: ActiveExecutions,
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly workflowRepository: WorkflowRepository,
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
|
@ -471,12 +469,14 @@ export class ExecutionService {
|
||||||
this.waitTracker.stopExecution(execution.id);
|
this.waitTracker.stopExecution(execution.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']);
|
const { ScalingService } = await import('@/scaling/scaling.service');
|
||||||
|
const scalingService = Container.get(ScalingService);
|
||||||
|
const jobs = await scalingService.findJobsByStatus(['active', 'waiting']);
|
||||||
|
|
||||||
const job = jobs.find(({ data }) => data.executionId === execution.id);
|
const job = jobs.find(({ data }) => data.executionId === execution.id);
|
||||||
|
|
||||||
if (job) {
|
if (job) {
|
||||||
await this.scalingService.stopJob(job);
|
await scalingService.stopJob(job);
|
||||||
} else {
|
} else {
|
||||||
this.logger.debug('Job to stop not in queue', { executionId: execution.id });
|
this.logger.debug('Job to stop not in queue', { executionId: execution.id });
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ describe('ExecutionService', () => {
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
mock(),
|
||||||
mock(),
|
|
||||||
executionRepository,
|
executionRepository,
|
||||||
Container.get(WorkflowRepository),
|
Container.get(WorkflowRepository),
|
||||||
mock(),
|
mock(),
|
||||||
|
|
Loading…
Reference in a new issue