diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index 06b7fe05b4..311ee0bbb8 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -3,11 +3,11 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; import { generateNanoId } from '@/databases/utils/generators'; -import type { RedisClientService } from '@/services/redis/redis-client.service'; import type { RedisServiceCommandObject, RedisServiceWorkerResponseObject, -} from '@/services/redis/redis-service-commands'; +} from '@/scaling/redis/redis-service-commands'; +import type { RedisClientService } from '@/services/redis-client.service'; import { Publisher } from '../pubsub/publisher.service'; diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 96566b7152..9b3eb53452 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -2,7 +2,7 @@ import type { Redis as SingleNodeClient } from 'ioredis'; import { mock } from 'jest-mock-extended'; import config from '@/config'; -import type { RedisClientService } from '@/services/redis/redis-client.service'; +import type { RedisClientService } from '@/services/redis-client.service'; import { Subscriber } from '../pubsub/subscriber.service'; diff --git a/packages/cli/src/scaling/constants.ts b/packages/cli/src/scaling/constants.ts index 8ef5f716b1..4cf0563be3 100644 --- a/packages/cli/src/scaling/constants.ts +++ b/packages/cli/src/scaling/constants.ts @@ -1,3 +1,7 @@ export const QUEUE_NAME = 'jobs'; export const JOB_TYPE_NAME = 'job'; + +export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands'; + +export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response'; diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index fee4724d87..1f13ef9896 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -3,11 +3,11 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logger'; -import { RedisClientService } from '@/services/redis/redis-client.service'; import type { RedisServiceCommandObject, RedisServiceWorkerResponseObject, -} from '@/services/redis/redis-service-commands'; +} from '@/scaling/redis/redis-service-commands'; +import { RedisClientService } from '@/services/redis-client.service'; /** * Responsible for publishing messages into the pubsub channels used by scaling mode. diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index b7f56904b6..9a02cab6ea 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -1,7 +1,8 @@ -import type { - COMMAND_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from '@/services/redis/redis-constants'; +import type { PushType, WorkerStatus } from '@n8n/api-types'; + +import type { IWorkflowDb } from '@/interfaces'; + +import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; /** * Pubsub channel used by scaling mode: @@ -10,5 +11,90 @@ import type { * - `n8n.worker-response` for messages sent by workers in response to commands from main processes */ export type ScalingPubSubChannel = - | typeof COMMAND_REDIS_CHANNEL - | typeof WORKER_RESPONSE_REDIS_CHANNEL; + | typeof COMMAND_PUBSUB_CHANNEL + | typeof WORKER_RESPONSE_PUBSUB_CHANNEL; + +export type PubSubMessageMap = { + // #region Lifecycle + + 'reload-license': never; + + 'restart-event-bus': { + result: 'success' | 'error'; + error?: string; + }; + + 'reload-external-secrets-providers': { + result: 'success' | 'error'; + error?: string; + }; + + 'stop-worker': never; + + // #endregion + + // #region Community packages + + 'community-package-install': { + packageName: string; + packageVersion: string; + }; + + 'community-package-update': { + packageName: string; + packageVersion: string; + }; + + 'community-package-uninstall': { + packageName: string; + packageVersion: string; + }; + + // #endregion + + // #region Worker view + + 'get-worker-id': never; + + 'get-worker-status': WorkerStatus; + + // #endregion + + // #region Multi-main setup + + 'add-webhooks-triggers-and-pollers': { + workflowId: string; + }; + + 'remove-triggers-and-pollers': { + workflowId: string; + }; + + 'display-workflow-activation': { + workflowId: string; + }; + + 'display-workflow-deactivation': { + workflowId: string; + }; + + // currently 'workflow-failed-to-activate' + 'display-workflow-activation-error': { + workflowId: string; + errorMessage: string; + }; + + 'relay-execution-lifecycle-event': { + type: PushType; + args: Record; + pushRef: string; + }; + + 'clear-test-webhooks': { + webhookKey: string; + workflowEntity: IWorkflowDb; + pushRef: string; + }; + + // #endregion +}; diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 5335c4b04e..83a18c761a 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -3,7 +3,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logger'; -import { RedisClientService } from '@/services/redis/redis-client.service'; +import { RedisClientService } from '@/services/redis-client.service'; import type { ScalingPubSubChannel } from './pubsub.types'; diff --git a/packages/cli/src/services/redis/redis-service-commands.ts b/packages/cli/src/scaling/redis/redis-service-commands.ts similarity index 100% rename from packages/cli/src/services/redis/redis-service-commands.ts rename to packages/cli/src/scaling/redis/redis-service-commands.ts diff --git a/packages/cli/src/services/redis/redis.types.ts b/packages/cli/src/scaling/redis/redis.types.ts similarity index 100% rename from packages/cli/src/services/redis/redis.types.ts rename to packages/cli/src/scaling/redis/redis.types.ts diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 3e3461908b..552802ba70 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -24,7 +24,7 @@ import type { JobStatus, JobId, QueueRecoveryContext, - PubSubMessage, + JobReport, } from './scaling.types'; @Service() @@ -46,7 +46,7 @@ export class ScalingService { async setupQueue() { const { default: BullQueue } = await import('bull'); - const { RedisClientService } = await import('@/services/redis/redis-client.service'); + const { RedisClientService } = await import('@/services/redis-client.service'); const service = Container.get(RedisClientService); const bullPrefix = this.globalConfig.queue.bull.prefix; @@ -265,7 +265,7 @@ export class ScalingService { } } - private isPubSubMessage(candidate: unknown): candidate is PubSubMessage { + private isPubSubMessage(candidate: unknown): candidate is JobReport { return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; } diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index ad5b7b3643..fa8210450f 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -23,11 +23,11 @@ export type JobStatus = Bull.JobStatus; export type JobOptions = Bull.JobOptions; -export type PubSubMessage = MessageToMain | MessageToWorker; +export type JobReport = JobReportToMain | JobReportToWorker; -type MessageToMain = RespondToWebhookMessage; +type JobReportToMain = RespondToWebhookMessage; -type MessageToWorker = AbortJobMessage; +type JobReportToWorker = AbortJobMessage; type RespondToWebhookMessage = { kind: 'respond-to-webhook'; diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index b5c6c6a80c..f77dcd90cc 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -9,13 +9,13 @@ import config from '@/config'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { Push } from '@/push'; +import type { RedisServiceWorkerResponseObject } from '@/scaling/redis/redis-service-commands'; import * as helpers from '@/services/orchestration/helpers'; import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main'; import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationService } from '@/services/orchestration.service'; -import { RedisClientService } from '@/services/redis/redis-client.service'; -import type { RedisServiceWorkerResponseObject } from '@/services/redis/redis-service-commands'; +import { RedisClientService } from '@/services/redis-client.service'; import { mockInstance } from '@test/mocking'; import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types'; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 3fe674b4b3..3eda66ecb8 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -36,7 +36,7 @@ export class CacheService extends TypedEmitter { const useRedis = backend === 'redis' || (backend === 'auto' && mode === 'queue'); if (useRedis) { - const { RedisClientService } = await import('../redis/redis-client.service'); + const { RedisClientService } = await import('../redis-client.service'); const redisClientService = Container.get(RedisClientService); const prefixBase = config.getEnv('redis.prefix'); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 8e4963070d..80f428bb81 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -8,7 +8,10 @@ import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; -import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/redis-service-commands'; +import type { + RedisServiceBaseCommand, + RedisServiceCommand, +} from '../scaling/redis/redis-service-commands'; @Service() export class OrchestrationService { diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts index cfd5bca51a..fd5e444cff 100644 --- a/packages/cli/src/services/orchestration/helpers.ts +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -3,9 +3,9 @@ import os from 'node:os'; import { Container } from 'typedi'; import { Logger } from '@/logger'; +import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; -import { COMMAND_REDIS_CHANNEL } from '../redis/redis-constants'; -import type { RedisServiceCommandObject } from '../redis/redis-service-commands'; +import type { RedisServiceCommandObject } from '../../scaling/redis/redis-service-commands'; export interface RedisServiceCommandLastReceived { [date: string]: Date; @@ -18,7 +18,7 @@ export function messageToRedisServiceCommandObject(messageString: string) { message = jsonParse(messageString); } catch { Container.get(Logger).debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, ); return; } diff --git a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts index 79e09116d5..da3cf5a507 100644 --- a/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts +++ b/packages/cli/src/services/orchestration/main/handle-worker-response-message-main.ts @@ -3,11 +3,11 @@ import { jsonParse } from 'n8n-workflow'; import Container from 'typedi'; import { Logger } from '@/logger'; -import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/redis-constants'; +import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; import type { MainResponseReceivedHandlerOptions } from './types'; import { Push } from '../../../push'; -import type { RedisServiceWorkerResponseObject } from '../../redis/redis-service-commands'; +import type { RedisServiceWorkerResponseObject } from '../../../scaling/redis/redis-service-commands'; export async function handleWorkerResponseMessageMain( messageString: string, @@ -19,7 +19,7 @@ export async function handleWorkerResponseMessageMain( if (!workerResponse) { Container.get(Logger).debug( - `Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`, + `Received invalid message via channel ${WORKER_RESPONSE_PUBSUB_CHANNEL}: "${messageString}"`, ); return; } diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts index a03389ce15..98dbce7fde 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts @@ -6,7 +6,7 @@ import config from '@/config'; import { TIME } from '@/constants'; import { Logger } from '@/logger'; import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { RedisClientService } from '@/services/redis/redis-client.service'; +import { RedisClientService } from '@/services/redis-client.service'; import { TypedEmitter } from '@/typed-emitter'; type MultiMainEvents = { diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 0aec5c0f08..1ba2e4a177 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -1,12 +1,12 @@ import { Service } from 'typedi'; +import { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { handleCommandMessageMain } from './handle-command-message-main'; import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main'; import type { MainResponseReceivedHandlerOptions } from './types'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redis/redis-constants'; @Service() export class OrchestrationHandlerMainService extends OrchestrationHandlerService { @@ -19,9 +19,9 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService await this.subscriber.subscribe('n8n.worker-response'); this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + if (channel === WORKER_RESPONSE_PUBSUB_CHANNEL) { await handleWorkerResponseMessageMain(messageString, options); - } else if (channel === COMMAND_REDIS_CHANNEL) { + } else if (channel === COMMAND_PUBSUB_CHANNEL) { await handleCommandMessageMain(messageString); } }); diff --git a/packages/cli/src/services/orchestration/pubsub/pubsub-message-map.ts b/packages/cli/src/services/orchestration/pubsub/pubsub-message-map.ts deleted file mode 100644 index 64769cb4e2..0000000000 --- a/packages/cli/src/services/orchestration/pubsub/pubsub-message-map.ts +++ /dev/null @@ -1,88 +0,0 @@ -import type { PushType, WorkerStatus } from '@n8n/api-types'; - -import type { IWorkflowDb } from '@/interfaces'; - -export type PubSubMessageMap = { - // #region Lifecycle - - 'reload-license': never; - - 'restart-event-bus': { - result: 'success' | 'error'; - error?: string; - }; - - 'reload-external-secrets-providers': { - result: 'success' | 'error'; - error?: string; - }; - - 'stop-worker': never; - - // #endregion - - // #region Community packages - - 'community-package-install': { - packageName: string; - packageVersion: string; - }; - - 'community-package-update': { - packageName: string; - packageVersion: string; - }; - - 'community-package-uninstall': { - packageName: string; - packageVersion: string; - }; - - // #endregion - - // #region Worker view - - 'get-worker-id': never; - - 'get-worker-status': WorkerStatus; - - // #endregion - - // #region Multi-main setup - - 'add-webhooks-triggers-and-pollers': { - workflowId: string; - }; - - 'remove-triggers-and-pollers': { - workflowId: string; - }; - - 'display-workflow-activation': { - workflowId: string; - }; - - 'display-workflow-deactivation': { - workflowId: string; - }; - - // currently 'workflow-failed-to-activate' - 'display-workflow-activation-error': { - workflowId: string; - errorMessage: string; - }; - - 'relay-execution-lifecycle-event': { - type: PushType; - args: Record; - pushRef: string; - }; - - 'clear-test-webhooks': { - webhookKey: string; - workflowEntity: IWorkflowDb; - pushRef: string; - }; - - // #endregion -}; diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts index 5ecef9ade6..0a6c9b7b45 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts @@ -1,10 +1,10 @@ import { Service } from 'typedi'; +import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { handleCommandMessageWebhook } from './handle-command-message-webhook'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; -import { COMMAND_REDIS_CHANNEL } from '../../redis/redis-constants'; @Service() export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService { @@ -16,7 +16,7 @@ export class OrchestrationHandlerWebhookService extends OrchestrationHandlerServ await this.subscriber.subscribe('n8n.commands'); this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { - if (channel === COMMAND_REDIS_CHANNEL) { + if (channel === COMMAND_PUBSUB_CHANNEL) { await handleCommandMessageWebhook(messageString); } }); diff --git a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts index 13cb8ceaff..7d5d1cb5f2 100644 --- a/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts +++ b/packages/cli/src/services/orchestration/worker/handle-command-message-worker.ts @@ -7,9 +7,9 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus' import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { License } from '@/license'; import { Logger } from '@/logger'; +import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants'; +import type { RedisServiceCommandObject } from '@/scaling/redis/redis-service-commands'; import { CommunityPackagesService } from '@/services/community-packages.service'; -import { COMMAND_REDIS_CHANNEL } from '@/services/redis/redis-constants'; -import type { RedisServiceCommandObject } from '@/services/redis/redis-service-commands'; import type { WorkerCommandReceivedHandlerOptions } from './types'; import { debounceMessageReceiver, getOsCpuString } from '../helpers'; @@ -17,7 +17,7 @@ import { debounceMessageReceiver, getOsCpuString } from '../helpers'; export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { // eslint-disable-next-line complexity return async (channel: string, messageString: string) => { - if (channel === COMMAND_REDIS_CHANNEL) { + if (channel === COMMAND_PUBSUB_CHANNEL) { if (!messageString) return; const logger = Container.get(Logger); let message: RedisServiceCommandObject; @@ -25,7 +25,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa message = jsonParse(messageString); } catch { logger.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`, ); return; } @@ -145,7 +145,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa } logger.debug( - `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, + `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`, ); break; } diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index df500ee3c1..d821a194b2 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -1,5 +1,4 @@ import type { RunningJobSummary } from '@n8n/api-types'; -import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; @@ -9,14 +8,3 @@ export interface WorkerCommandReceivedHandlerOptions { getRunningJobIds: () => Array; getRunningJobsSummary: () => RunningJobSummary[]; } - -export interface WorkerJobStatusSummary { - jobId: string; - executionId: string; - retryOf?: string; - startedAt: Date; - mode: WorkflowExecuteMode; - workflowName: string; - workflowId: string; - status: ExecutionStatus; -} diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis-client.service.ts similarity index 98% rename from packages/cli/src/services/redis/redis-client.service.ts rename to packages/cli/src/services/redis-client.service.ts index 30bad8b631..dc0d3b8cde 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis-client.service.ts @@ -5,7 +5,7 @@ import { Service } from 'typedi'; import { Logger } from '@/logger'; -import type { RedisClientType } from './redis.types'; +import type { RedisClientType } from '../scaling/redis/redis.types'; @Service() export class RedisClientService { diff --git a/packages/cli/src/services/redis/redis-constants.ts b/packages/cli/src/services/redis/redis-constants.ts deleted file mode 100644 index 038e94e9ce..0000000000 --- a/packages/cli/src/services/redis/redis-constants.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const COMMAND_REDIS_CHANNEL = 'n8n.commands'; -export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response'; diff --git a/packages/cli/src/services/redis/redis-service-base-classes.ts b/packages/cli/src/services/redis/redis-service-base-classes.ts deleted file mode 100644 index d652ef10c5..0000000000 --- a/packages/cli/src/services/redis/redis-service-base-classes.ts +++ /dev/null @@ -1,70 +0,0 @@ -import type Redis from 'ioredis'; -import type { Cluster } from 'ioredis'; -import { Service } from 'typedi'; - -import config from '@/config'; -import { Logger } from '@/logger'; - -import { RedisClientService } from './redis-client.service'; -import type { RedisClientType } from './redis.types'; - -export type RedisServiceMessageHandler = - | ((channel: string, message: string) => void) - | ((stream: string, id: string, message: string[]) => void); - -@Service() -class RedisServiceBase { - redisClient: Redis | Cluster | undefined; - - isInitialized = false; - - constructor( - protected readonly logger: Logger, - private readonly redisClientService: RedisClientService, - ) {} - - async init(type: RedisClientType): Promise { - if (this.redisClient && this.isInitialized) { - return; - } - this.redisClient = this.redisClientService.createClient({ type }); - - this.redisClient.on('error', (error) => { - if (!String(error).includes('ECONNREFUSED')) { - this.logger.warn('Error with Redis: ', error); - } - }); - - this.isInitialized = true; - } - - async destroy(): Promise { - if (!this.redisClient) { - return; - } - await this.redisClient.quit(); - this.isInitialized = false; - this.redisClient = undefined; - } -} - -export abstract class RedisServiceBaseSender extends RedisServiceBase { - senderId: string; - - async init(type: RedisClientType): Promise { - await super.init(type); - this.senderId = config.get('redis.queueModeId'); - } -} - -export abstract class RedisServiceBaseReceiver extends RedisServiceBase { - messageHandlers: Map = new Map(); - - addMessageHandler(handlerName: string, handler: RedisServiceMessageHandler): void { - this.messageHandlers.set(handlerName, handler); - } - - removeMessageHandler(handlerName: string): void { - this.messageHandlers.delete(handlerName); - } -}