mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 21:07:28 -08:00
refactor(core): Eliminate dead Redis code (no-changelog) (#8292)
This commit is contained in:
parent
5fbd7971e0
commit
0f4f472a72
|
@ -1,10 +1,6 @@
|
|||
import { Service } from 'typedi';
|
||||
import { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
|
||||
import { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
||||
import { RedisServiceListReceiver } from './redis/RedisServiceListReceiver';
|
||||
import { RedisServiceListSender } from './redis/RedisServiceListSender';
|
||||
import { RedisServiceStreamConsumer } from './redis/RedisServiceStreamConsumer';
|
||||
import { RedisServiceStreamProducer } from './redis/RedisServiceStreamProducer';
|
||||
|
||||
/*
|
||||
* This is a convenience service that provides access to all the Redis clients.
|
||||
|
@ -14,10 +10,6 @@ export class RedisService {
|
|||
constructor(
|
||||
private redisServicePubSubSubscriber: RedisServicePubSubSubscriber,
|
||||
private redisServicePubSubPublisher: RedisServicePubSubPublisher,
|
||||
private redisServiceListReceiver: RedisServiceListReceiver,
|
||||
private redisServiceListSender: RedisServiceListSender,
|
||||
private redisServiceStreamConsumer: RedisServiceStreamConsumer,
|
||||
private redisServiceStreamProducer: RedisServiceStreamProducer,
|
||||
) {}
|
||||
|
||||
async getPubSubSubscriber() {
|
||||
|
@ -29,24 +21,4 @@ export class RedisService {
|
|||
await this.redisServicePubSubPublisher.init();
|
||||
return this.redisServicePubSubPublisher;
|
||||
}
|
||||
|
||||
async getListSender() {
|
||||
await this.redisServiceListSender.init();
|
||||
return this.redisServiceListSender;
|
||||
}
|
||||
|
||||
async getListReceiver() {
|
||||
await this.redisServiceListReceiver.init();
|
||||
return this.redisServiceListReceiver;
|
||||
}
|
||||
|
||||
async getStreamProducer() {
|
||||
await this.redisServiceStreamProducer.init();
|
||||
return this.redisServiceStreamProducer;
|
||||
}
|
||||
|
||||
async getStreamConsumer() {
|
||||
await this.redisServiceStreamConsumer.init();
|
||||
return this.redisServiceStreamConsumer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
import { Service } from 'typedi';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper';
|
||||
import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands';
|
||||
import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses';
|
||||
|
||||
@Service()
|
||||
export class RedisServiceListReceiver extends RedisServiceBaseReceiver {
|
||||
async init(): Promise<void> {
|
||||
await super.init('list-receiver');
|
||||
}
|
||||
|
||||
async popFromHead(list: string): Promise<string | null | undefined> {
|
||||
if (!this.redisClient) {
|
||||
await this.init();
|
||||
}
|
||||
return this.redisClient?.lpop(list);
|
||||
}
|
||||
|
||||
async popFromTail(list: string): Promise<string | null | undefined> {
|
||||
if (!this.redisClient) {
|
||||
await this.init();
|
||||
}
|
||||
return this.redisClient?.rpop(list);
|
||||
}
|
||||
|
||||
private poppedResultToWorkerResponse(
|
||||
poppedResult: string | null | undefined,
|
||||
list: string = WORKER_RESPONSE_REDIS_LIST,
|
||||
): RedisServiceWorkerResponseObject | null {
|
||||
if (poppedResult) {
|
||||
try {
|
||||
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(poppedResult);
|
||||
if (workerResponse) {
|
||||
// TODO: Handle worker response
|
||||
console.log('Received worker response', workerResponse);
|
||||
}
|
||||
return workerResponse;
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Error parsing worker response on list ${list}: ${(error as Error).message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async popOldestWorkerResponse(): Promise<RedisServiceWorkerResponseObject | null> {
|
||||
const poppedResult = await this.popFromTail(WORKER_RESPONSE_REDIS_LIST);
|
||||
return this.poppedResultToWorkerResponse(poppedResult);
|
||||
}
|
||||
|
||||
async popLatestWorkerResponse(): Promise<RedisServiceWorkerResponseObject | null> {
|
||||
const poppedResult = await this.popFromHead(WORKER_RESPONSE_REDIS_LIST);
|
||||
return this.poppedResultToWorkerResponse(poppedResult);
|
||||
}
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
import { Service } from 'typedi';
|
||||
import { WORKER_RESPONSE_REDIS_LIST } from './RedisServiceHelper';
|
||||
import type { RedisServiceWorkerResponseObject } from './RedisServiceCommands';
|
||||
import { RedisServiceBaseSender } from './RedisServiceBaseClasses';
|
||||
|
||||
@Service()
|
||||
export class RedisServiceListSender extends RedisServiceBaseSender {
|
||||
async init(): Promise<void> {
|
||||
await super.init('list-sender');
|
||||
}
|
||||
|
||||
async prepend(list: string, message: string): Promise<void> {
|
||||
if (!this.redisClient) {
|
||||
await this.init();
|
||||
}
|
||||
await this.redisClient?.lpush(list, message);
|
||||
}
|
||||
|
||||
async append(list: string, message: string): Promise<void> {
|
||||
if (!this.redisClient) {
|
||||
await this.init();
|
||||
}
|
||||
await this.redisClient?.rpush(list, message);
|
||||
}
|
||||
|
||||
async appendWorkerResponse(message: RedisServiceWorkerResponseObject): Promise<void> {
|
||||
await this.prepend(WORKER_RESPONSE_REDIS_LIST, JSON.stringify(message));
|
||||
}
|
||||
}
|
|
@ -1,91 +0,0 @@
|
|||
import { Service } from 'typedi';
|
||||
import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses';
|
||||
|
||||
type LastId = string;
|
||||
|
||||
type StreamName = string;
|
||||
|
||||
type StreamDetails = {
|
||||
lastId: LastId;
|
||||
pollingInterval: number;
|
||||
waiter: NodeJS.Timer | undefined;
|
||||
};
|
||||
|
||||
@Service()
|
||||
export class RedisServiceStreamConsumer extends RedisServiceBaseReceiver {
|
||||
// while actively listening, the stream name and last id are stored here
|
||||
// removing the entry will stop the listener
|
||||
streams: Map<StreamName, StreamDetails> = new Map();
|
||||
|
||||
async init(): Promise<void> {
|
||||
await super.init('consumer');
|
||||
}
|
||||
|
||||
async listenToStream(stream: StreamName, lastId = '$'): Promise<void> {
|
||||
if (!this.redisClient) {
|
||||
await this.init();
|
||||
}
|
||||
this.logger.debug(`Redis client now listening to stream ${stream} starting with id ${lastId}`);
|
||||
this.setLastId(stream, lastId);
|
||||
const interval = this.streams.get(stream)?.pollingInterval ?? 1000;
|
||||
const waiter = setInterval(async () => {
|
||||
const currentLastId = this.streams.get(stream)?.lastId ?? '$';
|
||||
const results = await this.redisClient?.xread(
|
||||
'BLOCK',
|
||||
interval,
|
||||
'STREAMS',
|
||||
stream,
|
||||
currentLastId,
|
||||
);
|
||||
if (results && results.length > 0) {
|
||||
const [_key, messages] = results[0];
|
||||
if (messages.length > 0) {
|
||||
messages.forEach(([id, message]) => {
|
||||
this.messageHandlers.forEach((handler) => handler(stream, id, message));
|
||||
});
|
||||
// Pass the last id of the results to the next round.
|
||||
const newLastId = messages[messages.length - 1][0];
|
||||
this.setLastId(stream, newLastId);
|
||||
}
|
||||
}
|
||||
}, interval);
|
||||
this.setWaiter(stream, waiter);
|
||||
}
|
||||
|
||||
stopListeningToStream(stream: StreamName): void {
|
||||
this.logger.debug(`Redis client stopped listening to stream ${stream}`);
|
||||
const existing = this.streams.get(stream);
|
||||
if (existing?.waiter) {
|
||||
clearInterval(existing.waiter);
|
||||
}
|
||||
this.streams.delete(stream);
|
||||
}
|
||||
|
||||
private updateStreamDetails(stream: StreamName, details: Partial<StreamDetails>): void {
|
||||
const existing = this.streams.get(stream);
|
||||
this.streams.set(stream, {
|
||||
lastId: details.lastId ?? existing?.lastId ?? '$',
|
||||
waiter: details.waiter ?? existing?.waiter,
|
||||
pollingInterval: details.pollingInterval ?? existing?.pollingInterval ?? 1000,
|
||||
});
|
||||
}
|
||||
|
||||
async setPollingInterval(stream: StreamName, pollingInterval: number): Promise<void> {
|
||||
this.updateStreamDetails(stream, { pollingInterval });
|
||||
if (this.streams.get(stream)?.waiter) {
|
||||
this.stopListeningToStream(stream);
|
||||
await this.listenToStream(stream);
|
||||
}
|
||||
}
|
||||
|
||||
setLastId(stream: StreamName, lastId: string): void {
|
||||
this.updateStreamDetails(stream, { lastId });
|
||||
}
|
||||
|
||||
setWaiter(stream: StreamName, waiter: NodeJS.Timeout): void {
|
||||
// only update the waiter if the stream is still being listened to
|
||||
if (this.streams.get(stream)) {
|
||||
this.updateStreamDetails(stream, { waiter });
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
import type { RedisValue } from 'ioredis';
|
||||
import { Service } from 'typedi';
|
||||
import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage';
|
||||
import {
|
||||
COMMAND_REDIS_STREAM,
|
||||
EVENT_BUS_REDIS_STREAM,
|
||||
WORKER_RESPONSE_REDIS_STREAM,
|
||||
} from './RedisServiceHelper';
|
||||
import type {
|
||||
RedisServiceCommandObject,
|
||||
RedisServiceWorkerResponseObject,
|
||||
} from './RedisServiceCommands';
|
||||
import { RedisServiceBaseSender } from './RedisServiceBaseClasses';
|
||||
|
||||
@Service()
|
||||
export class RedisServiceStreamProducer extends RedisServiceBaseSender {
|
||||
async init(): Promise<void> {
|
||||
await super.init('producer');
|
||||
}
|
||||
|
||||
async add(streamName: string, values: RedisValue[]): Promise<void> {
|
||||
await this.redisClient?.xadd(streamName, '*', 'senderId', this.senderId, ...values);
|
||||
}
|
||||
|
||||
async addToEventStream(message: AbstractEventMessage): Promise<void> {
|
||||
await this.add(EVENT_BUS_REDIS_STREAM, [
|
||||
'message',
|
||||
message.eventName,
|
||||
'event',
|
||||
message.toString(),
|
||||
]);
|
||||
}
|
||||
|
||||
async addToCommandChannel(message: RedisServiceCommandObject): Promise<void> {
|
||||
await this.add(COMMAND_REDIS_STREAM, ['command', JSON.stringify(message)]);
|
||||
}
|
||||
|
||||
async addToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise<void> {
|
||||
await this.add(WORKER_RESPONSE_REDIS_STREAM, ['response', JSON.stringify(message)]);
|
||||
}
|
||||
}
|
|
@ -55,25 +55,6 @@ describe('RedisService', () => {
|
|||
await pub.destroy();
|
||||
});
|
||||
|
||||
test('should create list sender and receiver', async () => {
|
||||
const sender = await redisService.getListSender();
|
||||
const receiver = await redisService.getListReceiver();
|
||||
expect(sender).toBeDefined();
|
||||
expect(receiver).toBeDefined();
|
||||
await sender.prepend(LIST_CHANNEL, 'middle');
|
||||
await sender.prepend(LIST_CHANNEL, 'first');
|
||||
await sender.append(LIST_CHANNEL, 'end');
|
||||
let popResult = await receiver.popFromHead(LIST_CHANNEL);
|
||||
expect(popResult).toBe('first');
|
||||
popResult = await receiver.popFromTail(LIST_CHANNEL);
|
||||
expect(popResult).toBe('end');
|
||||
await sender.prepend(LIST_CHANNEL, 'somevalue');
|
||||
popResult = await receiver.popFromTail(LIST_CHANNEL);
|
||||
expect(popResult).toBe('middle');
|
||||
await sender.destroy();
|
||||
await receiver.destroy();
|
||||
});
|
||||
|
||||
// NOTE: This test is failing because the mock Redis client does not support streams apparently
|
||||
// eslint-disable-next-line n8n-local-rules/no-skipped-tests
|
||||
test.skip('should create stream producer and consumer', async () => {
|
||||
|
|
Loading…
Reference in a new issue