refactor(core): Remove dead orchestration code (#11266)

This commit is contained in:
Iván Ovejero 2024-10-15 16:47:31 +02:00 committed by GitHub
parent c2ad15646d
commit 76ab780cdd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 7 additions and 82 deletions

View file

@ -6,7 +6,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config'; import config from '@/config';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { WebhookServer } from '@/webhooks/webhook-server'; import { WebhookServer } from '@/webhooks/webhook-server';
import { BaseCommand } from './base-command'; import { BaseCommand } from './base-command';
@ -103,7 +103,7 @@ export class Webhook extends BaseCommand {
} }
async initOrchestration() { async initOrchestration() {
await Container.get(OrchestrationWebhookService).init(); await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init(); Container.get(PubSubHandler).init();
await Container.get(Subscriber).subscribe('n8n.commands'); await Container.get(Subscriber).subscribe('n8n.commands');

View file

@ -14,7 +14,7 @@ import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service'; import type { ScalingService } from '@/scaling/scaling.service';
import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { BaseCommand } from './base-command'; import { BaseCommand } from './base-command';
@ -140,7 +140,7 @@ export class Worker extends BaseCommand {
* The subscription connection adds a handler to handle the command messages * The subscription connection adds a handler to handle the command messages
*/ */
async initOrchestration() { async initOrchestration() {
await Container.get(OrchestrationWorkerService).init(); await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init(); Container.get(PubSubHandler).init();
await Container.get(Subscriber).subscribe('n8n.commands'); await Container.get(Subscriber).subscribe('n8n.commands');

View file

@ -38,7 +38,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [
'license:manage', 'license:manage',
'logStreaming:manage', 'logStreaming:manage',
'orchestration:read', 'orchestration:read',
'orchestration:list',
'saml:manage', 'saml:manage',
'securityAudit:generate', 'securityAudit:generate',
'sourceControl:pull', 'sourceControl:pull',

View file

@ -1,26 +0,0 @@
import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
export abstract class OrchestrationHandlerService {
protected initialized = false;
async init() {
await this.initSubscriber();
this.initialized = true;
}
async initWithOptions(
options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
) {
await this.initSubscriber(options);
this.initialized = true;
}
async shutdown() {
this.initialized = false;
}
protected abstract initSubscriber(
options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
): Promise<void>;
}

View file

@ -1,6 +0,0 @@
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export type MainResponseReceivedHandlerOptions = {
hostId: string;
publisher: Publisher;
};

View file

@ -1,16 +0,0 @@
import { Service } from 'typedi';
import config from '@/config';
import { OrchestrationService } from '../../orchestration.service';
@Service()
export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean {
return (
this.isInitialized &&
config.get('executions.mode') === 'queue' &&
this.instanceSettings.instanceType === 'webhook'
);
}
}

View file

@ -1,16 +0,0 @@
import { Service } from 'typedi';
import config from '@/config';
import { OrchestrationService } from '../../orchestration.service';
@Service()
export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean {
return (
this.isInitialized &&
config.get('executions.mode') === 'queue' &&
this.instanceSettings.instanceType === 'worker'
);
}
}

View file

@ -1,10 +0,0 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
export interface WorkerCommandReceivedHandlerOptions {
hostId: string;
publisher: Publisher;
getRunningJobIds: () => Array<string | number>;
getRunningJobsSummary: () => RunningJobSummary[];
}

View file

@ -18,7 +18,7 @@ import { TaskRunnerServer } from '@/runners/task-runner-server';
import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { ScalingService } from '@/scaling/scaling.service'; import { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { Telemetry } from '@/telemetry'; import { Telemetry } from '@/telemetry';
import { setupTestCommand } from '@test-integration/utils/test-command'; import { setupTestCommand } from '@test-integration/utils/test-command';
@ -35,7 +35,7 @@ const license = mockInstance(License, { loadCertStr: async () => '' });
const messageEventBus = mockInstance(MessageEventBus); const messageEventBus = mockInstance(MessageEventBus);
const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay);
const scalingService = mockInstance(ScalingService); const scalingService = mockInstance(ScalingService);
const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); const orchestrationService = mockInstance(OrchestrationService);
const taskRunnerServer = mockInstance(TaskRunnerServer); const taskRunnerServer = mockInstance(TaskRunnerServer);
const taskRunnerProcess = mockInstance(TaskRunnerProcess); const taskRunnerProcess = mockInstance(TaskRunnerProcess);
mockInstance(Publisher); mockInstance(Publisher);
@ -58,7 +58,7 @@ test('worker initializes all its components', async () => {
expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); expect(scalingService.setupQueue).toHaveBeenCalledTimes(1);
expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(scalingService.setupWorker).toHaveBeenCalledTimes(1);
expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1);
expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); expect(orchestrationService.init).toHaveBeenCalledTimes(1);
expect(messageEventBus.send).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1);
expect(taskRunnerServer.start).toHaveBeenCalledTimes(1); expect(taskRunnerServer.start).toHaveBeenCalledTimes(1);
expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1); expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1);