From 60b15e16cb9bf7f6518ba961a056d3fd66e05d7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 15 Aug 2024 10:41:49 +0200 Subject: [PATCH] refactor(core): Simplify Redis client types (no-changelog) (#10397) --- .../cli/src/services/cache/cache.service.ts | 2 +- .../services/redis/RedisServiceBaseClasses.ts | 19 +++---------------- .../redis/RedisServicePubSubPublisher.ts | 2 +- .../redis/RedisServicePubSubSubscriber.ts | 2 +- .../services/redis/redis-client.service.ts | 3 ++- .../cli/src/services/redis/redis.types.ts | 19 +++++++++++++++++++ 6 files changed, 27 insertions(+), 20 deletions(-) create mode 100644 packages/cli/src/services/redis/redis.types.ts diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index daf51911ff..eaa58c32f0 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -45,7 +45,7 @@ export class CacheService extends TypedEmitter { ); const redisClient = redisClientService.createClient({ - type: 'client(cache)', + type: 'cache(n8n)', extraOptions: { keyPrefix: prefix }, }); diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index d694a1a774..b9db6125b0 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -4,20 +4,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/Logger'; import { RedisClientService } from './redis-client.service'; - -export type RedisClientType = - | 'subscriber' - | 'client' - | 'bclient' - | 'subscriber(bull)' - | 'client(bull)' - | 'bclient(bull)' - | 'client(cache)' - | 'publisher' - | 'consumer' - | 'producer' - | 'list-sender' - | 'list-receiver'; +import type { RedisClientType } from './redis.types'; export type RedisServiceMessageHandler = | ((channel: string, message: string) => void) @@ -34,7 +21,7 @@ class RedisServiceBase { private readonly redisClientService: RedisClientService, ) {} - async init(type: RedisClientType = 'client'): Promise { + async init(type: RedisClientType): Promise { if (this.redisClient && this.isInitialized) { return; } @@ -62,7 +49,7 @@ class RedisServiceBase { export abstract class RedisServiceBaseSender extends RedisServiceBase { senderId: string; - async init(type: RedisClientType = 'client'): Promise { + async init(type: RedisClientType): Promise { await super.init(type); this.senderId = config.get('redis.queueModeId'); } diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index ff810ba7c6..23ca9a5b69 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -9,7 +9,7 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; @Service() export class RedisServicePubSubPublisher extends RedisServiceBaseSender { async init(): Promise { - await super.init('publisher'); + await super.init('publisher(n8n)'); } async publish(channel: string, message: string): Promise { diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 8751826428..144647009f 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -5,7 +5,7 @@ import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; @Service() export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { async init(): Promise { - await super.init('subscriber'); + await super.init('subscriber(n8n)'); this.redisClient?.on('message', (channel: string, message: string) => { this.messageHandlers.forEach((handler: (channel: string, message: string) => void) => diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts index b5c86523e0..21f74bf074 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -2,7 +2,8 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; -import type { RedisClientType } from './RedisServiceBaseClasses'; +import type { RedisClientType } from './redis.types'; + import { OnShutdown } from '@/decorators/OnShutdown'; import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; import { GlobalConfig } from '@n8n/config'; diff --git a/packages/cli/src/services/redis/redis.types.ts b/packages/cli/src/services/redis/redis.types.ts new file mode 100644 index 0000000000..ed694904d7 --- /dev/null +++ b/packages/cli/src/services/redis/redis.types.ts @@ -0,0 +1,19 @@ +export type RedisClientType = N8nRedisClientType | BullRedisClientType; + +/** + * Redis client used by n8n. + * + * - `subscriber(n8n)` to listen for messages from scaling mode communication channels + * - `publisher(n8n)` to send messages into scaling mode communication channels + * - `cache(n8n)` for caching operations (variables, resource ownership, etc.) + */ +type N8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)'; + +/** + * Redis client used internally by Bull. Suffixed with `(bull)` at `ScalingService.setupQueue`. + * + * - `subscriber(bull)` for event listening + * - `client(bull)` for general queue operations + * - `bclient(bull)` for blocking operations when processing jobs + */ +type BullRedisClientType = 'subscriber(bull)' | 'client(bull)' | 'bclient(bull)';