mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
refactor(core): Simplify Redis client types (no-changelog) (#10397)
This commit is contained in:
parent
613cdd2ba2
commit
60b15e16cb
|
@ -45,7 +45,7 @@ export class CacheService extends TypedEmitter<CacheEvents> {
|
||||||
);
|
);
|
||||||
|
|
||||||
const redisClient = redisClientService.createClient({
|
const redisClient = redisClientService.createClient({
|
||||||
type: 'client(cache)',
|
type: 'cache(n8n)',
|
||||||
extraOptions: { keyPrefix: prefix },
|
extraOptions: { keyPrefix: prefix },
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -4,20 +4,7 @@ import { Service } from 'typedi';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import { RedisClientService } from './redis-client.service';
|
import { RedisClientService } from './redis-client.service';
|
||||||
|
import type { RedisClientType } from './redis.types';
|
||||||
export type RedisClientType =
|
|
||||||
| 'subscriber'
|
|
||||||
| 'client'
|
|
||||||
| 'bclient'
|
|
||||||
| 'subscriber(bull)'
|
|
||||||
| 'client(bull)'
|
|
||||||
| 'bclient(bull)'
|
|
||||||
| 'client(cache)'
|
|
||||||
| 'publisher'
|
|
||||||
| 'consumer'
|
|
||||||
| 'producer'
|
|
||||||
| 'list-sender'
|
|
||||||
| 'list-receiver';
|
|
||||||
|
|
||||||
export type RedisServiceMessageHandler =
|
export type RedisServiceMessageHandler =
|
||||||
| ((channel: string, message: string) => void)
|
| ((channel: string, message: string) => void)
|
||||||
|
@ -34,7 +21,7 @@ class RedisServiceBase {
|
||||||
private readonly redisClientService: RedisClientService,
|
private readonly redisClientService: RedisClientService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async init(type: RedisClientType = 'client'): Promise<void> {
|
async init(type: RedisClientType): Promise<void> {
|
||||||
if (this.redisClient && this.isInitialized) {
|
if (this.redisClient && this.isInitialized) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -62,7 +49,7 @@ class RedisServiceBase {
|
||||||
export abstract class RedisServiceBaseSender extends RedisServiceBase {
|
export abstract class RedisServiceBaseSender extends RedisServiceBase {
|
||||||
senderId: string;
|
senderId: string;
|
||||||
|
|
||||||
async init(type: RedisClientType = 'client'): Promise<void> {
|
async init(type: RedisClientType): Promise<void> {
|
||||||
await super.init(type);
|
await super.init(type);
|
||||||
this.senderId = config.get('redis.queueModeId');
|
this.senderId = config.get('redis.queueModeId');
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses';
|
||||||
@Service()
|
@Service()
|
||||||
export class RedisServicePubSubPublisher extends RedisServiceBaseSender {
|
export class RedisServicePubSubPublisher extends RedisServiceBaseSender {
|
||||||
async init(): Promise<void> {
|
async init(): Promise<void> {
|
||||||
await super.init('publisher');
|
await super.init('publisher(n8n)');
|
||||||
}
|
}
|
||||||
|
|
||||||
async publish(channel: string, message: string): Promise<void> {
|
async publish(channel: string, message: string): Promise<void> {
|
||||||
|
|
|
@ -5,7 +5,7 @@ import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses';
|
||||||
@Service()
|
@Service()
|
||||||
export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
|
export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver {
|
||||||
async init(): Promise<void> {
|
async init(): Promise<void> {
|
||||||
await super.init('subscriber');
|
await super.init('subscriber(n8n)');
|
||||||
|
|
||||||
this.redisClient?.on('message', (channel: string, message: string) => {
|
this.redisClient?.on('message', (channel: string, message: string) => {
|
||||||
this.messageHandlers.forEach((handler: (channel: string, message: string) => void) =>
|
this.messageHandlers.forEach((handler: (channel: string, message: string) => void) =>
|
||||||
|
|
|
@ -2,7 +2,8 @@ import { Service } from 'typedi';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import ioRedis from 'ioredis';
|
import ioRedis from 'ioredis';
|
||||||
import type { Cluster, RedisOptions } from 'ioredis';
|
import type { Cluster, RedisOptions } from 'ioredis';
|
||||||
import type { RedisClientType } from './RedisServiceBaseClasses';
|
import type { RedisClientType } from './redis.types';
|
||||||
|
|
||||||
import { OnShutdown } from '@/decorators/OnShutdown';
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||||
import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
|
import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
|
19
packages/cli/src/services/redis/redis.types.ts
Normal file
19
packages/cli/src/services/redis/redis.types.ts
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
export type RedisClientType = N8nRedisClientType | BullRedisClientType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Redis client used by n8n.
|
||||||
|
*
|
||||||
|
* - `subscriber(n8n)` to listen for messages from scaling mode communication channels
|
||||||
|
* - `publisher(n8n)` to send messages into scaling mode communication channels
|
||||||
|
* - `cache(n8n)` for caching operations (variables, resource ownership, etc.)
|
||||||
|
*/
|
||||||
|
type N8nRedisClientType = 'subscriber(n8n)' | 'publisher(n8n)' | 'cache(n8n)';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Redis client used internally by Bull. Suffixed with `(bull)` at `ScalingService.setupQueue`.
|
||||||
|
*
|
||||||
|
* - `subscriber(bull)` for event listening
|
||||||
|
* - `client(bull)` for general queue operations
|
||||||
|
* - `bclient(bull)` for blocking operations when processing jobs
|
||||||
|
*/
|
||||||
|
type BullRedisClientType = 'subscriber(bull)' | 'client(bull)' | 'bclient(bull)';
|
Loading…
Reference in a new issue