From bf7392a878d1f44a642dbeb465601f3adf785864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 27 Sep 2024 12:35:01 +0200 Subject: [PATCH] refactor(core): Make all pubsub messages type-safe (#10990) --- .../message-event-bus/message-event-bus.ts | 4 +- .../external-secrets-manager.ee.ts | 2 +- packages/cli/src/license.ts | 4 +- .../__tests__/publisher.service.test.ts | 11 +- .../src/scaling/pubsub/publisher.service.ts | 10 +- .../cli/src/scaling/pubsub/pubsub.types.ts | 260 ++++++++++++------ .../src/scaling/pubsub/subscriber.service.ts | 10 +- .../scaling/redis/redis-service-commands.ts | 103 ------- packages/cli/src/scaling/redis/redis.types.ts | 4 +- .../__tests__/orchestration.service.test.ts | 19 +- .../cli/src/services/orchestration.service.ts | 23 +- .../cli/src/services/orchestration/helpers.ts | 9 +- .../main/handle-command-message-main.ts | 47 ++-- .../handle-worker-response-message-main.ts | 11 +- .../webhook/handle-command-message-webhook.ts | 28 +- .../worker/handle-command-message-worker.ts | 40 ++- packages/cli/src/utlity.types.ts | 6 + 17 files changed, 285 insertions(+), 306 deletions(-) delete mode 100644 packages/cli/src/scaling/redis/redis-service-commands.ts create mode 100644 packages/cli/src/utlity.types.ts diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index cf9a585478..fec5b4845b 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -210,7 +210,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await this.orchestrationService.publish('restartEventBus'); + await this.orchestrationService.publish('restart-event-bus'); } return destination; } @@ -236,7 +236,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await this.orchestrationService.publish('restartEventBus'); + await this.orchestrationService.publish('restart-event-bus'); } return result; } diff --git a/packages/cli/src/external-secrets/external-secrets-manager.ee.ts b/packages/cli/src/external-secrets/external-secrets-manager.ee.ts index dd33132d37..76db778d0a 100644 --- a/packages/cli/src/external-secrets/external-secrets-manager.ee.ts +++ b/packages/cli/src/external-secrets/external-secrets-manager.ee.ts @@ -79,7 +79,7 @@ export class ExternalSecretsManager { } async broadcastReloadExternalSecretsProviders() { - await Container.get(OrchestrationService).publish('reloadExternalSecretsProviders'); + await Container.get(OrchestrationService).publish('reload-external-secrets-providers'); } private decryptSecretsSettings(value: string): ExternalSecretsSettings { diff --git a/packages/cli/src/license.ts b/packages/cli/src/license.ts index 75a57efd2c..81fb9eb608 100644 --- a/packages/cli/src/license.ts +++ b/packages/cli/src/license.ts @@ -146,7 +146,7 @@ export class License { this.orchestrationService.isFollower ) { this.logger.debug( - '[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...', + '[Multi-main setup] Instance is follower, skipping sending of "reload-license" command...', ); return; } @@ -160,7 +160,7 @@ export class License { if (config.getEnv('executions.mode') === 'queue') { const { Publisher } = await import('@/scaling/pubsub/publisher.service'); - await Container.get(Publisher).publishCommand({ command: 'reloadLicense' }); + await Container.get(Publisher).publishCommand({ command: 'reload-license' }); } const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3'; diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index 311ee0bbb8..f26c93cc42 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -3,13 +3,10 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; import { generateNanoId } from '@/databases/utils/generators'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from '@/scaling/redis/redis-service-commands'; import type { RedisClientService } from '@/services/redis-client.service'; import { Publisher } from '../pubsub/publisher.service'; +import type { PubSub } from '../pubsub/pubsub.types'; describe('Publisher', () => { let queueModeId: string; @@ -49,7 +46,7 @@ describe('Publisher', () => { describe('publishCommand', () => { it('should publish command into `n8n.commands` pubsub channel', async () => { const publisher = new Publisher(mock(), redisClientService); - const msg = mock({ command: 'reloadLicense' }); + const msg = mock({ command: 'reload-license' }); await publisher.publishCommand(msg); @@ -63,8 +60,8 @@ describe('Publisher', () => { describe('publishWorkerResponse', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { const publisher = new Publisher(mock(), redisClientService); - const msg = mock({ - command: 'reloadExternalSecretsProviders', + const msg = mock({ + command: 'reload-external-secrets-providers', }); await publisher.publishWorkerResponse(msg); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index 1f13ef9896..f015890d48 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -3,12 +3,10 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logger'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from '@/scaling/redis/redis-service-commands'; import { RedisClientService } from '@/services/redis-client.service'; +import type { PubSub } from './pubsub.types'; + /** * Responsible for publishing messages into the pubsub channels used by scaling mode. */ @@ -44,7 +42,7 @@ export class Publisher { // #region Publishing /** Publish a command into the `n8n.commands` channel. */ - async publishCommand(msg: Omit) { + async publishCommand(msg: Omit) { await this.client.publish( 'n8n.commands', JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }), @@ -54,7 +52,7 @@ export class Publisher { } /** Publish a response for a command into the `n8n.worker-response` channel. */ - async publishWorkerResponse(msg: RedisServiceWorkerResponseObject) { + async publishWorkerResponse(msg: PubSub.WorkerResponse) { await this.client.publish('n8n.worker-response', JSON.stringify(msg)); this.logger.debug(`Published response for ${msg.command} to worker response channel`); diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index 191f4b62d9..13643440fb 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -1,96 +1,198 @@ import type { PushType, WorkerStatus } from '@n8n/api-types'; import type { IWorkflowDb } from '@/interfaces'; +import type { Resolve } from '@/utlity.types'; import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; -/** Pubsub channel used by scaling mode. */ -export type PubSubChannel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL; +export namespace PubSub { + // ---------------------------------- + // channels + // ---------------------------------- -/** Handler function for every message received via a `PubSubChannel`. */ -export type PubSubHandlerFn = (msg: string) => void; + /** Pubsub channel used by scaling mode. */ + export type Channel = typeof COMMAND_PUBSUB_CHANNEL | typeof WORKER_RESPONSE_PUBSUB_CHANNEL; -export type PubSubMessageMap = { - // #region Lifecycle + /** Handler function for every message received via a pubsub channel. */ + export type HandlerFn = (msg: string) => void; - 'reload-license': never; + // ---------------------------------- + // commands + // ---------------------------------- - 'restart-event-bus': { - result: 'success' | 'error'; - error?: string; + export type CommandMap = { + // #region Lifecycle + + 'reload-license': never; + + 'restart-event-bus': never; + + 'reload-external-secrets-providers': never; + + // #endregion + + // #region Community packages + + 'community-package-install': { + packageName: string; + packageVersion: string; + }; + + 'community-package-update': { + packageName: string; + packageVersion: string; + }; + + 'community-package-uninstall': { + packageName: string; + }; + + // #endregion + + // #region Worker view + + 'get-worker-id': never; + + 'get-worker-status': never; + + // #endregion + + // #region Multi-main setup + + 'add-webhooks-triggers-and-pollers': { + workflowId: string; + }; + + 'remove-triggers-and-pollers': { + workflowId: string; + }; + + 'display-workflow-activation': { + workflowId: string; + }; + + 'display-workflow-deactivation': { + workflowId: string; + }; + + 'display-workflow-activation-error': { + workflowId: string; + errorMessage: string; + }; + + 'relay-execution-lifecycle-event': { + type: PushType; + args: Record; + pushRef: string; + }; + + 'clear-test-webhooks': { + webhookKey: string; + workflowEntity: IWorkflowDb; + pushRef: string; + }; + + // #endregion }; - 'reload-external-secrets-providers': { - result: 'success' | 'error'; - error?: string; + type _ToCommand = { + senderId: string; + targets?: string[]; + command: CommandKey; + } & (CommandMap[CommandKey] extends never + ? { payload?: never } // some commands carry no payload + : { payload: CommandMap[CommandKey] }); + + type ToCommand = Resolve<_ToCommand>; + + namespace Command { + export type ReloadLicense = ToCommand<'reload-license'>; + export type RestartEventBus = ToCommand<'restart-event-bus'>; + export type ReloadExternalSecretsProviders = ToCommand<'reload-external-secrets-providers'>; + export type CommunityPackageInstall = ToCommand<'community-package-install'>; + export type CommunityPackageUpdate = ToCommand<'community-package-update'>; + export type CommunityPackageUninstall = ToCommand<'community-package-uninstall'>; + export type GetWorkerId = ToCommand<'get-worker-id'>; + export type GetWorkerStatus = ToCommand<'get-worker-status'>; + export type AddWebhooksTriggersAndPollers = ToCommand<'add-webhooks-triggers-and-pollers'>; + export type RemoveTriggersAndPollers = ToCommand<'remove-triggers-and-pollers'>; + export type DisplayWorkflowActivation = ToCommand<'display-workflow-activation'>; + export type DisplayWorkflowDeactivation = ToCommand<'display-workflow-deactivation'>; + export type DisplayWorkflowActivationError = ToCommand<'display-workflow-activation-error'>; + export type RelayExecutionLifecycleEvent = ToCommand<'relay-execution-lifecycle-event'>; + export type ClearTestWebhooks = ToCommand<'clear-test-webhooks'>; + } + + /** Command sent via the `n8n.commands` pubsub channel. */ + export type Command = + | Command.ReloadLicense + | Command.RestartEventBus + | Command.ReloadExternalSecretsProviders + | Command.CommunityPackageInstall + | Command.CommunityPackageUpdate + | Command.CommunityPackageUninstall + | Command.GetWorkerId + | Command.GetWorkerStatus + | Command.AddWebhooksTriggersAndPollers + | Command.RemoveTriggersAndPollers + | Command.DisplayWorkflowActivation + | Command.DisplayWorkflowDeactivation + | Command.DisplayWorkflowActivationError + | Command.RelayExecutionLifecycleEvent + | Command.ClearTestWebhooks; + + // ---------------------------------- + // worker responses + // ---------------------------------- + + export type WorkerResponseMap = { + // #region Lifecycle + + 'restart-event-bus': { + result: 'success' | 'error'; + error?: string; + }; + + 'reload-external-secrets-providers': { + result: 'success' | 'error'; + error?: string; + }; + + // #endregion + + // #region Worker view + + 'get-worker-id': never; + + 'get-worker-status': WorkerStatus; + + // #endregion }; - 'stop-worker': never; + type _ToWorkerResponse = { + workerId: string; + targets?: string[]; + command: WorkerResponseKey; + } & (WorkerResponseMap[WorkerResponseKey] extends never + ? { payload?: never } // some responses carry no payload + : { payload: WorkerResponseMap[WorkerResponseKey] }); - // #endregion + type ToWorkerResponse = Resolve< + _ToWorkerResponse + >; - // #region Community packages + namespace WorkerResponse { + export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>; + export type ReloadExternalSecretsProviders = + ToWorkerResponse<'reload-external-secrets-providers'>; + export type GetWorkerId = ToWorkerResponse<'get-worker-id'>; + export type GetWorkerStatus = ToWorkerResponse<'get-worker-status'>; + } - 'community-package-install': { - packageName: string; - packageVersion: string; - }; - - 'community-package-update': { - packageName: string; - packageVersion: string; - }; - - 'community-package-uninstall': { - packageName: string; - packageVersion: string; - }; - - // #endregion - - // #region Worker view - - 'get-worker-id': never; - - 'get-worker-status': WorkerStatus; - - // #endregion - - // #region Multi-main setup - - 'add-webhooks-triggers-and-pollers': { - workflowId: string; - }; - - 'remove-triggers-and-pollers': { - workflowId: string; - }; - - 'display-workflow-activation': { - workflowId: string; - }; - - 'display-workflow-deactivation': { - workflowId: string; - }; - - // currently 'workflow-failed-to-activate' - 'display-workflow-activation-error': { - workflowId: string; - errorMessage: string; - }; - - 'relay-execution-lifecycle-event': { - type: PushType; - args: Record; - pushRef: string; - }; - - 'clear-test-webhooks': { - webhookKey: string; - workflowEntity: IWorkflowDb; - pushRef: string; - }; - - // #endregion -}; + /** Response sent via the `n8n.worker-response` pubsub channel. */ + export type WorkerResponse = + | WorkerResponse.RestartEventBus + | WorkerResponse.ReloadExternalSecretsProviders + | WorkerResponse.GetWorkerId + | WorkerResponse.GetWorkerStatus; +} diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 5d4529fdb9..e1951f924e 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -5,7 +5,7 @@ import config from '@/config'; import { Logger } from '@/logger'; import { RedisClientService } from '@/services/redis-client.service'; -import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types'; +import type { PubSub } from './pubsub.types'; /** * Responsible for subscribing to the pubsub channels used by scaling mode. @@ -14,7 +14,7 @@ import type { PubSubHandlerFn, PubSubChannel } from './pubsub.types'; export class Subscriber { private readonly client: SingleNodeClient | MultiNodeClient; - private readonly handlers: Map = new Map(); + private readonly handlers: Map = new Map(); // #region Lifecycle @@ -29,7 +29,7 @@ export class Subscriber { this.client.on('error', (error) => this.logger.error(error.message)); - this.client.on('message', (channel: PubSubChannel, message) => { + this.client.on('message', (channel: PubSub.Channel, message) => { this.handlers.get(channel)?.(message); }); } @@ -47,7 +47,7 @@ export class Subscriber { // #region Subscribing - async subscribe(channel: PubSubChannel) { + async subscribe(channel: PubSub.Channel) { await this.client.subscribe(channel, (error) => { if (error) { this.logger.error('Failed to subscribe to channel', { channel, cause: error }); @@ -59,7 +59,7 @@ export class Subscriber { } /** Set the message handler function for a channel. */ - setMessageHandler(channel: PubSubChannel, handlerFn: PubSubHandlerFn) { + setMessageHandler(channel: PubSub.Channel, handlerFn: PubSub.HandlerFn) { this.handlers.set(channel, handlerFn); } diff --git a/packages/cli/src/scaling/redis/redis-service-commands.ts b/packages/cli/src/scaling/redis/redis-service-commands.ts deleted file mode 100644 index e64d1e97fc..0000000000 --- a/packages/cli/src/scaling/redis/redis-service-commands.ts +++ /dev/null @@ -1,103 +0,0 @@ -import type { PushType, WorkerStatus } from '@n8n/api-types'; - -import type { IWorkflowDb } from '@/interfaces'; - -export type RedisServiceCommand = - | 'getStatus' - | 'getId' - | 'restartEventBus' - | 'stopWorker' - | 'reloadLicense' - | 'reloadExternalSecretsProviders' - | 'community-package-install' - | 'community-package-update' - | 'community-package-uninstall' - | 'display-workflow-activation' // multi-main only - | 'display-workflow-deactivation' // multi-main only - | 'add-webhooks-triggers-and-pollers' // multi-main only - | 'remove-triggers-and-pollers' // multi-main only - | 'workflow-failed-to-activate' // multi-main only - | 'relay-execution-lifecycle-event' // multi-main only - | 'clear-test-webhooks'; // multi-main only - -/** - * An object to be sent via Redis pubsub from the main process to the workers. - * @field command: The command to be executed. - * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. - * @field payload: Optional arguments to be sent with the command. - */ -export type RedisServiceBaseCommand = - | { - senderId: string; - command: Exclude< - RedisServiceCommand, - | 'relay-execution-lifecycle-event' - | 'clear-test-webhooks' - | 'community-package-install' - | 'community-package-update' - | 'community-package-uninstall' - >; - payload?: { - [key: string]: string | number | boolean | string[] | number[] | boolean[]; - }; - } - | { - senderId: string; - command: 'relay-execution-lifecycle-event'; - payload: { type: PushType; args: Record; pushRef: string }; - } - | { - senderId: string; - command: 'clear-test-webhooks'; - payload: { webhookKey: string; workflowEntity: IWorkflowDb; pushRef: string }; - } - | { - senderId: string; - command: - | 'community-package-install' - | 'community-package-update' - | 'community-package-uninstall'; - payload: { packageName: string; packageVersion: string }; - }; - -export type RedisServiceWorkerResponseObject = { - workerId: string; -} & ( - | RedisServiceBaseCommand - | { - command: 'getStatus'; - payload: WorkerStatus; - } - | { - command: 'getId'; - } - | { - command: 'restartEventBus'; - payload: { - result: 'success' | 'error'; - error?: string; - }; - } - | { - command: 'reloadExternalSecretsProviders'; - payload: { - result: 'success' | 'error'; - error?: string; - }; - } - | { - command: 'stopWorker'; - } - | { - command: 'workflowActiveStateChanged'; - payload: { - oldState: boolean; - newState: boolean; - workflowId: string; - }; - } -) & { targets?: string[] }; - -export type RedisServiceCommandObject = { - targets?: string[]; -} & RedisServiceBaseCommand; diff --git a/packages/cli/src/scaling/redis/redis.types.ts b/packages/cli/src/scaling/redis/redis.types.ts index ed694904d7..ec7f2397fa 100644 --- a/packages/cli/src/scaling/redis/redis.types.ts +++ b/packages/cli/src/scaling/redis/redis.types.ts @@ -3,8 +3,8 @@ export type RedisClientType = N8nRedisClientType | BullRedisClientType; /** * Redis client used by n8n. * - * - `subscriber(n8n)` to listen for messages from scaling mode communication channels - * - `publisher(n8n)` to send messages into scaling mode communication channels + * - `subscriber(n8n)` to listen for messages from scaling mode pubsub channels + * - `publisher(n8n)` to send messages into scaling mode pubsub channels * - `cache(n8n)` for caching operations (variables, resource ownership, etc.) */ type N8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index f77dcd90cc..7bbd797000 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -9,7 +9,7 @@ import config from '@/config'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { Push } from '@/push'; -import type { RedisServiceWorkerResponseObject } from '@/scaling/redis/redis-service-commands'; +import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import * as helpers from '@/services/orchestration/helpers'; import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main'; import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main'; @@ -34,10 +34,9 @@ mockInstance(ActiveWorkflowManager); let queueModeId: string; -const workerRestartEventBusResponse: RedisServiceWorkerResponseObject = { - senderId: 'test', +const workerRestartEventBusResponse: PubSub.WorkerResponse = { workerId: 'test', - command: 'restartEventBus', + command: 'restart-event-bus', payload: { result: 'success', }, @@ -78,18 +77,18 @@ describe('Orchestration Service', () => { JSON.stringify(workerRestartEventBusResponse), mock(), ); - expect(response?.command).toEqual('restartEventBus'); + expect(response?.command).toEqual('restart-event-bus'); }); test('should handle command messages from others', async () => { const responseFalseId = await handleCommandMessageMain( JSON.stringify({ senderId: 'test', - command: 'reloadLicense', + command: 'reload-license', }), ); expect(responseFalseId).toBeDefined(); - expect(responseFalseId!.command).toEqual('reloadLicense'); + expect(responseFalseId!.command).toEqual('reload-license'); expect(responseFalseId!.senderId).toEqual('test'); }); @@ -98,7 +97,7 @@ describe('Orchestration Service', () => { JSON.stringify({ ...workerRestartEventBusResponse, senderId: queueModeId }), ); expect(response).toBeDefined(); - expect(response!.command).toEqual('restartEventBus'); + expect(response!.command).toEqual('restart-event-bus'); expect(response!.senderId).toEqual(queueModeId); expect(eventBus.restart).not.toHaveBeenCalled(); }); @@ -118,13 +117,13 @@ describe('Orchestration Service', () => { const res1 = await handleCommandMessageMain( JSON.stringify({ senderId: 'test', - command: 'reloadExternalSecretsProviders', + command: 'reload-external-secrets-providers', }), ); const res2 = await handleCommandMessageMain( JSON.stringify({ senderId: 'test', - command: 'reloadExternalSecretsProviders', + command: 'reload-external-secrets-providers', }), ); expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 80f428bb81..1ee7c26876 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -5,13 +5,10 @@ import Container, { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logger'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; +import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; -import type { - RedisServiceBaseCommand, - RedisServiceCommand, -} from '../scaling/redis/redis-service-commands'; @Service() export class OrchestrationService { @@ -100,14 +97,18 @@ export class OrchestrationService { // pubsub // ---------------------------------- - async publish(command: RedisServiceCommand, data?: unknown) { + async publish( + commandKey: CommandKey, + payload?: PubSub.CommandMap[CommandKey], + ) { if (!this.sanityCheck()) return; - const payload = data as RedisServiceBaseCommand['payload']; + this.logger.debug( + `[Instance ID ${this.instanceId}] Publishing command "${commandKey}"`, + payload, + ); - this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload); - - await this.publisher.publishCommand({ command, payload }); + await this.publisher.publishCommand({ command: commandKey, payload }); } // ---------------------------------- @@ -117,7 +118,7 @@ export class OrchestrationService { async getWorkerStatus(id?: string) { if (!this.sanityCheck()) return; - const command = 'getStatus'; + const command = 'get-worker-status'; this.logger.debug(`Sending "${command}" to command channel`); @@ -130,7 +131,7 @@ export class OrchestrationService { async getWorkerIds() { if (!this.sanityCheck()) return; - const command = 'getId'; + const command = 'get-worker-id'; this.logger.debug(`Sending "${command}" to command channel`); diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts index fd5e444cff..a5470138dc 100644 --- a/packages/cli/src/services/orchestration/helpers.ts +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -4,8 +4,7 @@ import { Container } from 'typedi'; import { Logger } from '@/logger'; import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; - -import type { RedisServiceCommandObject } from '../../scaling/redis/redis-service-commands'; +import type { PubSub } from '@/scaling/pubsub/pubsub.types'; export interface RedisServiceCommandLastReceived { [date: string]: Date; @@ -13,9 +12,9 @@ export interface RedisServiceCommandLastReceived { export function messageToRedisServiceCommandObject(messageString: string) { if (!messageString) return; - let message: RedisServiceCommandObject; + let message: PubSub.Command; try { - message = jsonParse(messageString); + message = jsonParse(messageString); } catch { Container.get(Logger).debug( `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, @@ -27,7 +26,7 @@ export function messageToRedisServiceCommandObject(messageString: string) { const lastReceived: RedisServiceCommandLastReceived = {}; -export function debounceMessageReceiver(message: RedisServiceCommandObject, timeout: number = 100) { +export function debounceMessageReceiver(message: PubSub.Command, timeout: number = 100) { const now = new Date(); const lastReceivedDate = lastReceived[message.command]; if (lastReceivedDate && now.getTime() - lastReceivedDate.getTime() < timeout) { diff --git a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts b/packages/cli/src/services/orchestration/main/handle-command-message-main.ts index 15917e11b7..49fce27aef 100644 --- a/packages/cli/src/services/orchestration/main/handle-command-message-main.ts +++ b/packages/cli/src/services/orchestration/main/handle-command-message-main.ts @@ -47,12 +47,9 @@ export async function handleCommandMessageMain(messageString: string) { const push = Container.get(Push); switch (message.command) { - case 'reloadLicense': + case 'reload-license': if (!debounceMessageReceiver(message, 500)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { @@ -60,20 +57,14 @@ export async function handleCommandMessageMain(messageString: string) { } await Container.get(License).reload(); break; - case 'restartEventBus': + case 'restart-event-bus': if (!debounceMessageReceiver(message, 200)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } await Container.get(MessageEventBus).restart(); - case 'reloadExternalSecretsProviders': + case 'reload-external-secrets-providers': if (!debounceMessageReceiver(message, 200)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } await Container.get(ExternalSecretsManager).reloadAllProviders(); break; @@ -83,19 +74,21 @@ export async function handleCommandMessageMain(messageString: string) { if (!debounceMessageReceiver(message, 200)) { return message; } - const { packageName, packageVersion } = message.payload; + const { packageName } = message.payload; const communityPackagesService = Container.get(CommunityPackagesService); if (message.command === 'community-package-uninstall') { await communityPackagesService.removeNpmPackage(packageName); } else { - await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion); + await communityPackagesService.installOrUpdateNpmPackage( + packageName, + message.payload.packageVersion, + ); } break; case 'add-webhooks-triggers-and-pollers': { if (!debounceMessageReceiver(message, 100)) { - message.payload = { result: 'debounced' }; - return message; + return { ...message, payload: { result: 'debounced' } }; } const orchestrationService = Container.get(OrchestrationService); @@ -124,7 +117,7 @@ export async function handleCommandMessageMain(messageString: string) { errorMessage: error.message, }); - await Container.get(OrchestrationService).publish('workflow-failed-to-activate', { + await Container.get(OrchestrationService).publish('display-workflow-activation-error', { workflowId, errorMessage: error.message, }); @@ -136,8 +129,7 @@ export async function handleCommandMessageMain(messageString: string) { case 'remove-triggers-and-pollers': { if (!debounceMessageReceiver(message, 100)) { - message.payload = { result: 'debounced' }; - return message; + return { ...message, payload: { result: 'debounced' } }; } const orchestrationService = Container.get(OrchestrationService); @@ -163,8 +155,7 @@ export async function handleCommandMessageMain(messageString: string) { case 'display-workflow-activation': { if (!debounceMessageReceiver(message, 100)) { - message.payload = { result: 'debounced' }; - return message; + return { ...message, payload: { result: 'debounced' } }; } const { workflowId } = message.payload ?? {}; @@ -178,8 +169,7 @@ export async function handleCommandMessageMain(messageString: string) { case 'display-workflow-deactivation': { if (!debounceMessageReceiver(message, 100)) { - message.payload = { result: 'debounced' }; - return message; + return { ...message, payload: { result: 'debounced' } }; } const { workflowId } = message.payload ?? {}; @@ -191,10 +181,9 @@ export async function handleCommandMessageMain(messageString: string) { break; } - case 'workflow-failed-to-activate': { + case 'display-workflow-activation-error': { if (!debounceMessageReceiver(message, 100)) { - message.payload = { result: 'debounced' }; - return message; + return { ...message, payload: { result: 'debounced' } }; } const { workflowId, errorMessage } = message.payload ?? {}; diff --git a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts index da3cf5a507..7842c71463 100644 --- a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts +++ b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts @@ -1,19 +1,18 @@ -import type { WorkerStatus } from '@n8n/api-types'; import { jsonParse } from 'n8n-workflow'; import Container from 'typedi'; import { Logger } from '@/logger'; import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; +import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import type { MainResponseReceivedHandlerOptions } from './types'; import { Push } from '../../../push'; -import type { RedisServiceWorkerResponseObject } from '../../../scaling/redis/redis-service-commands'; export async function handleWorkerResponseMessageMain( messageString: string, options: MainResponseReceivedHandlerOptions, ) { - const workerResponse = jsonParse(messageString, { + const workerResponse = jsonParse(messageString, { fallbackValue: null, }); @@ -27,13 +26,13 @@ export async function handleWorkerResponseMessageMain( if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return; switch (workerResponse.command) { - case 'getStatus': + case 'get-worker-status': Container.get(Push).broadcast('sendWorkerStatusMessage', { workerId: workerResponse.workerId, - status: workerResponse.payload as WorkerStatus, + status: workerResponse.payload, }); break; - case 'getId': + case 'get-worker-id': break; default: Container.get(Logger).debug( diff --git a/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts b/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts index 542b8f1f52..3555139a99 100644 --- a/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts +++ b/packages/cli/src/services/orchestration/webhook/handle-command-message-webhook.ts @@ -33,12 +33,9 @@ export async function handleCommandMessageWebhook(messageString: string) { } switch (message.command) { - case 'reloadLicense': + case 'reload-license': if (!debounceMessageReceiver(message, 500)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { @@ -50,20 +47,14 @@ export async function handleCommandMessageWebhook(messageString: string) { } await Container.get(License).reload(); break; - case 'restartEventBus': + case 'restart-event-bus': if (!debounceMessageReceiver(message, 200)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } await Container.get(MessageEventBus).restart(); - case 'reloadExternalSecretsProviders': + case 'reload-external-secrets-providers': if (!debounceMessageReceiver(message, 200)) { - message.payload = { - result: 'debounced', - }; - return message; + return { ...message, payload: { result: 'debounced' } }; } await Container.get(ExternalSecretsManager).reloadAllProviders(); break; @@ -73,12 +64,15 @@ export async function handleCommandMessageWebhook(messageString: string) { if (!debounceMessageReceiver(message, 200)) { return message; } - const { packageName, packageVersion } = message.payload; + const { packageName } = message.payload; const communityPackagesService = Container.get(CommunityPackagesService); if (message.command === 'community-package-uninstall') { await communityPackagesService.removeNpmPackage(packageName); } else { - await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion); + await communityPackagesService.installOrUpdateNpmPackage( + packageName, + message.payload.packageVersion, + ); } break; diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts index 45b6bd57c9..3c5b108010 100644 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts @@ -8,7 +8,7 @@ import { ExternalSecretsManager } from '@/external-secrets/external-secrets-mana import { License } from '@/license'; import { Logger } from '@/logger'; import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; -import type { RedisServiceCommandObject } from '@/scaling/redis/redis-service-commands'; +import type { PubSub } from '@/scaling/pubsub/pubsub.types'; import { CommunityPackagesService } from '@/services/community-packages.service'; import type { WorkerCommandReceivedHandlerOptions } from './types'; @@ -22,9 +22,9 @@ export async function getWorkerCommandReceivedHandler( if (!messageString) return; const logger = Container.get(Logger); - let message: RedisServiceCommandObject; + let message: PubSub.Command; try { - message = jsonParse(messageString); + message = jsonParse(messageString); } catch { logger.debug( `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, @@ -39,11 +39,11 @@ export async function getWorkerCommandReceivedHandler( return; // early return if the message is not for this worker } switch (message.command) { - case 'getStatus': + case 'get-worker-status': if (!debounceMessageReceiver(message, 500)) return; await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'getStatus', + command: 'get-worker-status', payload: { workerId: options.queueModeId, runningJobsSummary: options.getRunningJobsSummary(), @@ -66,20 +66,20 @@ export async function getWorkerCommandReceivedHandler( }, }); break; - case 'getId': + case 'get-worker-id': if (!debounceMessageReceiver(message, 500)) return; await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'getId', + command: 'get-worker-id', }); break; - case 'restartEventBus': + case 'restart-event-bus': if (!debounceMessageReceiver(message, 500)) return; try { await Container.get(MessageEventBus).restart(); await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'restartEventBus', + command: 'restart-event-bus', payload: { result: 'success', }, @@ -87,7 +87,7 @@ export async function getWorkerCommandReceivedHandler( } catch (error) { await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'restartEventBus', + command: 'restart-event-bus', payload: { result: 'error', error: (error as Error).message, @@ -95,13 +95,13 @@ export async function getWorkerCommandReceivedHandler( }); } break; - case 'reloadExternalSecretsProviders': + case 'reload-external-secrets-providers': if (!debounceMessageReceiver(message, 500)) return; try { await Container.get(ExternalSecretsManager).reloadAllProviders(); await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'reloadExternalSecretsProviders', + command: 'reload-external-secrets-providers', payload: { result: 'success', }, @@ -109,7 +109,7 @@ export async function getWorkerCommandReceivedHandler( } catch (error) { await options.publisher.publishWorkerResponse({ workerId: options.queueModeId, - command: 'reloadExternalSecretsProviders', + command: 'reload-external-secrets-providers', payload: { result: 'error', error: (error as Error).message, @@ -121,23 +121,21 @@ export async function getWorkerCommandReceivedHandler( case 'community-package-update': case 'community-package-uninstall': if (!debounceMessageReceiver(message, 500)) return; - const { packageName, packageVersion } = message.payload; + const { packageName } = message.payload; const communityPackagesService = Container.get(CommunityPackagesService); if (message.command === 'community-package-uninstall') { await communityPackagesService.removeNpmPackage(packageName); } else { - await communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion); + await communityPackagesService.installOrUpdateNpmPackage( + packageName, + message.payload.packageVersion, + ); } break; - case 'reloadLicense': + case 'reload-license': if (!debounceMessageReceiver(message, 500)) return; await Container.get(License).reload(); break; - case 'stopWorker': - if (!debounceMessageReceiver(message, 500)) return; - // TODO: implement proper shutdown - // await this.stopProcess(); - break; default: if ( message.command === 'relay-execution-lifecycle-event' || diff --git a/packages/cli/src/utlity.types.ts b/packages/cli/src/utlity.types.ts new file mode 100644 index 0000000000..c126a31717 --- /dev/null +++ b/packages/cli/src/utlity.types.ts @@ -0,0 +1,6 @@ +/** + * Display an intersection type without implementation details. + * @doc https://effectivetypescript.com/2022/02/25/gentips-4-display/ + */ +// eslint-disable-next-line @typescript-eslint/ban-types +export type Resolve = T extends Function ? T : { [K in keyof T]: T[K] };