diff --git a/packages/cli/src/services/redis.service.ts b/packages/cli/src/services/redis.service.ts index 46557fb0df..d6cb11b294 100644 --- a/packages/cli/src/services/redis.service.ts +++ b/packages/cli/src/services/redis.service.ts @@ -1,10 +1,6 @@ import { Service } from 'typedi'; import { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; -import { RedisServiceListReceiver } from './redis/RedisServiceListReceiver'; -import { RedisServiceListSender } from './redis/RedisServiceListSender'; -import { RedisServiceStreamConsumer } from './redis/RedisServiceStreamConsumer'; -import { RedisServiceStreamProducer } from './redis/RedisServiceStreamProducer'; /* * This is a convenience service that provides access to all the Redis clients. @@ -14,10 +10,6 @@ export class RedisService { constructor( private redisServicePubSubSubscriber: RedisServicePubSubSubscriber, private redisServicePubSubPublisher: RedisServicePubSubPublisher, - private redisServiceListReceiver: RedisServiceListReceiver, - private redisServiceListSender: RedisServiceListSender, - private redisServiceStreamConsumer: RedisServiceStreamConsumer, - private redisServiceStreamProducer: RedisServiceStreamProducer, ) {} async getPubSubSubscriber() { @@ -29,24 +21,4 @@ export class RedisService { await this.redisServicePubSubPublisher.init(); return this.redisServicePubSubPublisher; } - - async getListSender() { - await this.redisServiceListSender.init(); - return this.redisServiceListSender; - } - - async getListReceiver() { - await this.redisServiceListReceiver.init(); - return this.redisServiceListReceiver; - } - - async getStreamProducer() { - await this.redisServiceStreamProducer.init(); - return this.redisServiceStreamProducer; - } - - async getStreamConsumer() { - await this.redisServiceStreamConsumer.init(); - return this.redisServiceStreamConsumer; - } } diff --git a/packages/cli/src/services/redis/RedisServiceListReceiver.ts b/packages/cli/src/services/redis/RedisServiceListReceiver.ts deleted file mode 100644 index 1de36fee6a..0000000000 --- a/packages/cli/src/services/redis/RedisServiceListReceiver.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { Service } from 'typedi'; -import { jsonParse } from 'n8n-workflow'; -import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper'; -import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands'; -import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; - -@Service() -export class RedisServiceListReceiver extends RedisServiceBaseReceiver { - async init(): Promise<void> { - await super.init('list-receiver'); - } - - async popFromHead(list: string): Promise<string | null | undefined> { - if (!this.redisClient) { - await this.init(); - } - return this.redisClient?.lpop(list); - } - - async popFromTail(list: string): Promise<string | null | undefined> { - if (!this.redisClient) { - await this.init(); - } - return this.redisClient?.rpop(list); - } - - private poppedResultToWorkerResponse( - poppedResult: string | null | undefined, - list: string = WORKER_RESPONSE_REDIS_LIST, - ): RedisServiceWorkerResponseObject | null { - if (poppedResult) { - try { - const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(poppedResult); - if (workerResponse) { - // TODO: Handle worker response - console.log('Received worker response', workerResponse); - } - return workerResponse; - } catch (error) { - this.logger.warn( - `Error parsing worker response on list ${list}: ${(error as Error).message}`, - ); - } - } - return null; - } - - async popOldestWorkerResponse(): Promise<RedisServiceWorkerResponseObject | null> { - const poppedResult = await this.popFromTail(WORKER_RESPONSE_REDIS_LIST); - return this.poppedResultToWorkerResponse(poppedResult); - } - - async popLatestWorkerResponse(): Promise<RedisServiceWorkerResponseObject | null> { - const poppedResult = await this.popFromHead(WORKER_RESPONSE_REDIS_LIST); - return this.poppedResultToWorkerResponse(poppedResult); - } -} diff --git a/packages/cli/src/services/redis/RedisServiceListSender.ts b/packages/cli/src/services/redis/RedisServiceListSender.ts deleted file mode 100644 index 93dcd15992..0000000000 --- a/packages/cli/src/services/redis/RedisServiceListSender.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Service } from 'typedi'; -import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper'; -import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands'; -import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; - -@Service() -export class RedisServiceListSender extends RedisServiceBaseSender { - async init(): Promise<void> { - await super.init('list-sender'); - } - - async prepend(list: string, message: string): Promise<void> { - if (!this.redisClient) { - await this.init(); - } - await this.redisClient?.lpush(list, message); - } - - async append(list: string, message: string): Promise<void> { - if (!this.redisClient) { - await this.init(); - } - await this.redisClient?.rpush(list, message); - } - - async appendWorkerResponse(message: RedisServiceWorkerResponseObject): Promise<void> { - await this.prepend(WORKER_RESPONSE_REDIS_LIST, JSON.stringify(message)); - } -} diff --git a/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts b/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts deleted file mode 100644 index 64bff03634..0000000000 --- a/packages/cli/src/services/redis/RedisServiceStreamConsumer.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { Service } from 'typedi'; -import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; - -type LastId = string; - -type StreamName = string; - -type StreamDetails = { - lastId: LastId; - pollingInterval: number; - waiter: NodeJS.Timer | undefined; -}; - -@Service() -export class RedisServiceStreamConsumer extends RedisServiceBaseReceiver { - // while actively listening, the stream name and last id are stored here - // removing the entry will stop the listener - streams: Map<StreamName, StreamDetails> = new Map(); - - async init(): Promise<void> { - await super.init('consumer'); - } - - async listenToStream(stream: StreamName, lastId = '$'): Promise<void> { - if (!this.redisClient) { - await this.init(); - } - this.logger.debug(`Redis client now listening to stream ${stream} starting with id ${lastId}`); - this.setLastId(stream, lastId); - const interval = this.streams.get(stream)?.pollingInterval ?? 1000; - const waiter = setInterval(async () => { - const currentLastId = this.streams.get(stream)?.lastId ?? '$'; - const results = await this.redisClient?.xread( - 'BLOCK', - interval, - 'STREAMS', - stream, - currentLastId, - ); - if (results && results.length > 0) { - const [_key, messages] = results[0]; - if (messages.length > 0) { - messages.forEach(([id, message]) => { - this.messageHandlers.forEach((handler) => handler(stream, id, message)); - }); - // Pass the last id of the results to the next round. - const newLastId = messages[messages.length - 1][0]; - this.setLastId(stream, newLastId); - } - } - }, interval); - this.setWaiter(stream, waiter); - } - - stopListeningToStream(stream: StreamName): void { - this.logger.debug(`Redis client stopped listening to stream ${stream}`); - const existing = this.streams.get(stream); - if (existing?.waiter) { - clearInterval(existing.waiter); - } - this.streams.delete(stream); - } - - private updateStreamDetails(stream: StreamName, details: Partial<StreamDetails>): void { - const existing = this.streams.get(stream); - this.streams.set(stream, { - lastId: details.lastId ?? existing?.lastId ?? '$', - waiter: details.waiter ?? existing?.waiter, - pollingInterval: details.pollingInterval ?? existing?.pollingInterval ?? 1000, - }); - } - - async setPollingInterval(stream: StreamName, pollingInterval: number): Promise<void> { - this.updateStreamDetails(stream, { pollingInterval }); - if (this.streams.get(stream)?.waiter) { - this.stopListeningToStream(stream); - await this.listenToStream(stream); - } - } - - setLastId(stream: StreamName, lastId: string): void { - this.updateStreamDetails(stream, { lastId }); - } - - setWaiter(stream: StreamName, waiter: NodeJS.Timeout): void { - // only update the waiter if the stream is still being listened to - if (this.streams.get(stream)) { - this.updateStreamDetails(stream, { waiter }); - } - } -} diff --git a/packages/cli/src/services/redis/RedisServiceStreamProducer.ts b/packages/cli/src/services/redis/RedisServiceStreamProducer.ts deleted file mode 100644 index 6fe03208e0..0000000000 --- a/packages/cli/src/services/redis/RedisServiceStreamProducer.ts +++ /dev/null @@ -1,41 +0,0 @@ -import type { RedisValue } from 'ioredis'; -import { Service } from 'typedi'; -import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; -import { - COMMAND_REDIS_STREAM, - EVENT_BUS_REDIS_STREAM, - WORKER_RESPONSE_REDIS_STREAM, -} from './RedisServiceHelper'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from './RedisServiceCommands'; -import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; - -@Service() -export class RedisServiceStreamProducer extends RedisServiceBaseSender { - async init(): Promise<void> { - await super.init('producer'); - } - - async add(streamName: string, values: RedisValue[]): Promise<void> { - await this.redisClient?.xadd(streamName, '*', 'senderId', this.senderId, ...values); - } - - async addToEventStream(message: AbstractEventMessage): Promise<void> { - await this.add(EVENT_BUS_REDIS_STREAM, [ - 'message', - message.eventName, - 'event', - message.toString(), - ]); - } - - async addToCommandChannel(message: RedisServiceCommandObject): Promise<void> { - await this.add(COMMAND_REDIS_STREAM, ['command', JSON.stringify(message)]); - } - - async addToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise<void> { - await this.add(WORKER_RESPONSE_REDIS_STREAM, ['response', JSON.stringify(message)]); - } -} diff --git a/packages/cli/test/unit/services/redis.service.test.ts b/packages/cli/test/unit/services/redis.service.test.ts index 309593ce24..04fb980db6 100644 --- a/packages/cli/test/unit/services/redis.service.test.ts +++ b/packages/cli/test/unit/services/redis.service.test.ts @@ -55,25 +55,6 @@ describe('RedisService', () => { await pub.destroy(); }); - test('should create list sender and receiver', async () => { - const sender = await redisService.getListSender(); - const receiver = await redisService.getListReceiver(); - expect(sender).toBeDefined(); - expect(receiver).toBeDefined(); - await sender.prepend(LIST_CHANNEL, 'middle'); - await sender.prepend(LIST_CHANNEL, 'first'); - await sender.append(LIST_CHANNEL, 'end'); - let popResult = await receiver.popFromHead(LIST_CHANNEL); - expect(popResult).toBe('first'); - popResult = await receiver.popFromTail(LIST_CHANNEL); - expect(popResult).toBe('end'); - await sender.prepend(LIST_CHANNEL, 'somevalue'); - popResult = await receiver.popFromTail(LIST_CHANNEL); - expect(popResult).toBe('middle'); - await sender.destroy(); - await receiver.destroy(); - }); - // NOTE: This test is failing because the mock Redis client does not support streams apparently // eslint-disable-next-line n8n-local-rules/no-skipped-tests test.skip('should create stream producer and consumer', async () => {