diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 43a9703087..d9d2f011fb 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -6,7 +6,7 @@ import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; -import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; +import { OrchestrationService } from '@/services/orchestration.service'; import { WebhookServer } from '@/webhooks/webhook-server'; import { BaseCommand } from './base-command'; @@ -103,7 +103,7 @@ export class Webhook extends BaseCommand { } async initOrchestration() { - await Container.get(OrchestrationWebhookService).init(); + await Container.get(OrchestrationService).init(); Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 5dfb5c210b..96f151f547 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -14,7 +14,7 @@ import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; -import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; +import { OrchestrationService } from '@/services/orchestration.service'; import { BaseCommand } from './base-command'; @@ -140,7 +140,7 @@ export class Worker extends BaseCommand { * The subscription connection adds a handler to handle the command messages */ async initOrchestration() { - await Container.get(OrchestrationWorkerService).init(); + await Container.get(OrchestrationService).init(); Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); diff --git a/packages/cli/src/permissions/global-roles.ts b/packages/cli/src/permissions/global-roles.ts index 664cd8384e..6315c3c617 100644 --- a/packages/cli/src/permissions/global-roles.ts +++ b/packages/cli/src/permissions/global-roles.ts @@ -38,7 +38,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [ 'license:manage', 'logStreaming:manage', 'orchestration:read', - 'orchestration:list', 'saml:manage', 'securityAudit:generate', 'sourceControl:pull', diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts deleted file mode 100644 index e994ff6308..0000000000 --- a/packages/cli/src/services/orchestration.handler.base.service.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types'; -import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; - -export abstract class OrchestrationHandlerService { - protected initialized = false; - - async init() { - await this.initSubscriber(); - this.initialized = true; - } - - async initWithOptions( - options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, - ) { - await this.initSubscriber(options); - this.initialized = true; - } - - async shutdown() { - this.initialized = false; - } - - protected abstract initSubscriber( - options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, - ): Promise; -} diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts deleted file mode 100644 index 461630d396..0000000000 --- a/packages/cli/src/services/orchestration/main/types.ts +++ /dev/null @@ -1,6 +0,0 @@ -import type { Publisher } from '@/scaling/pubsub/publisher.service'; - -export type MainResponseReceivedHandlerOptions = { - hostId: string; - publisher: Publisher; -}; diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts deleted file mode 100644 index 6b1c86fc6a..0000000000 --- a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Service } from 'typedi'; - -import config from '@/config'; - -import { OrchestrationService } from '../../orchestration.service'; - -@Service() -export class OrchestrationWebhookService extends OrchestrationService { - sanityCheck(): boolean { - return ( - this.isInitialized && - config.get('executions.mode') === 'queue' && - this.instanceSettings.instanceType === 'webhook' - ); - } -} diff --git a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts deleted file mode 100644 index 1d0d822aeb..0000000000 --- a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Service } from 'typedi'; - -import config from '@/config'; - -import { OrchestrationService } from '../../orchestration.service'; - -@Service() -export class OrchestrationWorkerService extends OrchestrationService { - sanityCheck(): boolean { - return ( - this.isInitialized && - config.get('executions.mode') === 'queue' && - this.instanceSettings.instanceType === 'worker' - ); - } -} diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts deleted file mode 100644 index afe7362210..0000000000 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ /dev/null @@ -1,10 +0,0 @@ -import type { RunningJobSummary } from '@n8n/api-types'; - -import type { Publisher } from '@/scaling/pubsub/publisher.service'; - -export interface WorkerCommandReceivedHandlerOptions { - hostId: string; - publisher: Publisher; - getRunningJobIds: () => Array; - getRunningJobsSummary: () => RunningJobSummary[]; -} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index fad6eb3fd7..ce3280aa48 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -18,7 +18,7 @@ import { TaskRunnerServer } from '@/runners/task-runner-server'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; -import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; +import { OrchestrationService } from '@/services/orchestration.service'; import { Telemetry } from '@/telemetry'; import { setupTestCommand } from '@test-integration/utils/test-command'; @@ -35,7 +35,7 @@ const license = mockInstance(License, { loadCertStr: async () => '' }); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); -const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); +const orchestrationService = mockInstance(OrchestrationService); const taskRunnerServer = mockInstance(TaskRunnerServer); const taskRunnerProcess = mockInstance(TaskRunnerProcess); mockInstance(Publisher); @@ -58,7 +58,7 @@ test('worker initializes all its components', async () => { expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); - expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); + expect(orchestrationService.init).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); expect(taskRunnerServer.start).toHaveBeenCalledTimes(1); expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1);