refactor(core): Organize Redis under scaling mode (#10864)

This commit is contained in:
Iván Ovejero 2024-09-19 09:52:48 +02:00 committed by GitHub
parent 91008b2676
commit 69c6e0790d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 133 additions and 212 deletions

View file

@ -3,11 +3,11 @@ import { mock } from 'jest-mock-extended';
import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';
} from '@/scaling/redis/redis-service-commands';
import type { RedisClientService } from '@/services/redis-client.service';
import { Publisher } from '../pubsub/publisher.service';

View file

@ -2,7 +2,7 @@ import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type { RedisClientService } from '@/services/redis-client.service';
import { Subscriber } from '../pubsub/subscriber.service';

View file

@ -1,3 +1,7 @@
export const QUEUE_NAME = 'jobs';
export const JOB_TYPE_NAME = 'job';
export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands';
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';

View file

@ -3,11 +3,11 @@ import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';
} from '@/scaling/redis/redis-service-commands';
import { RedisClientService } from '@/services/redis-client.service';
/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.

View file

@ -1,7 +1,8 @@
import type {
COMMAND_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from '@/services/redis/redis-constants';
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
/**
* Pubsub channel used by scaling mode:
@ -10,5 +11,90 @@ import type {
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/
export type ScalingPubSubChannel =
| typeof COMMAND_REDIS_CHANNEL
| typeof WORKER_RESPONSE_REDIS_CHANNEL;
| typeof COMMAND_PUBSUB_CHANNEL
| typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
export type PubSubMessageMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
'stop-worker': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
packageVersion: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
// currently 'workflow-failed-to-activate'
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};

View file

@ -3,7 +3,7 @@ import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import { RedisClientService } from '@/services/redis-client.service';
import type { ScalingPubSubChannel } from './pubsub.types';

View file

@ -24,7 +24,7 @@ import type {
JobStatus,
JobId,
QueueRecoveryContext,
PubSubMessage,
JobReport,
} from './scaling.types';
@Service()
@ -46,7 +46,7 @@ export class ScalingService {
async setupQueue() {
const { default: BullQueue } = await import('bull');
const { RedisClientService } = await import('@/services/redis/redis-client.service');
const { RedisClientService } = await import('@/services/redis-client.service');
const service = Container.get(RedisClientService);
const bullPrefix = this.globalConfig.queue.bull.prefix;
@ -265,7 +265,7 @@ export class ScalingService {
}
}
private isPubSubMessage(candidate: unknown): candidate is PubSubMessage {
private isPubSubMessage(candidate: unknown): candidate is JobReport {
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
}

View file

@ -23,11 +23,11 @@ export type JobStatus = Bull.JobStatus;
export type JobOptions = Bull.JobOptions;
export type PubSubMessage = MessageToMain | MessageToWorker;
export type JobReport = JobReportToMain | JobReportToWorker;
type MessageToMain = RespondToWebhookMessage;
type JobReportToMain = RespondToWebhookMessage;
type MessageToWorker = AbortJobMessage;
type JobReportToWorker = AbortJobMessage;
type RespondToWebhookMessage = {
kind: 'respond-to-webhook';

View file

@ -9,13 +9,13 @@ import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { Push } from '@/push';
import type { RedisServiceWorkerResponseObject } from '@/scaling/redis/redis-service-commands';
import * as helpers from '@/services/orchestration/helpers';
import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main';
import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/redis-service-commands';
import { RedisClientService } from '@/services/redis-client.service';
import { mockInstance } from '@test/mocking';
import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types';

View file

@ -36,7 +36,7 @@ export class CacheService extends TypedEmitter<CacheEvents> {
const useRedis = backend === 'redis' || (backend === 'auto' && mode === 'queue');
if (useRedis) {
const { RedisClientService } = await import('../redis/redis-client.service');
const { RedisClientService } = await import('../redis-client.service');
const redisClientService = Container.get(RedisClientService);
const prefixBase = config.getEnv('redis.prefix');

View file

@ -8,7 +8,10 @@ import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee';
import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/redis-service-commands';
import type {
RedisServiceBaseCommand,
RedisServiceCommand,
} from '../scaling/redis/redis-service-commands';
@Service()
export class OrchestrationService {

View file

@ -3,9 +3,9 @@ import os from 'node:os';
import { Container } from 'typedi';
import { Logger } from '@/logger';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import { COMMAND_REDIS_CHANNEL } from '../redis/redis-constants';
import type { RedisServiceCommandObject } from '../redis/redis-service-commands';
import type { RedisServiceCommandObject } from '../../scaling/redis/redis-service-commands';
export interface RedisServiceCommandLastReceived {
[date: string]: Date;
@ -18,7 +18,7 @@ export function messageToRedisServiceCommandObject(messageString: string) {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
Container.get(Logger).debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
`Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
);
return;
}

View file

@ -3,11 +3,11 @@ import { jsonParse } from 'n8n-workflow';
import Container from 'typedi';
import { Logger } from '@/logger';
import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/redis-constants';
import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { MainResponseReceivedHandlerOptions } from './types';
import { Push } from '../../../push';
import type { RedisServiceWorkerResponseObject } from '../../redis/redis-service-commands';
import type { RedisServiceWorkerResponseObject } from '../../../scaling/redis/redis-service-commands';
export async function handleWorkerResponseMessageMain(
messageString: string,
@ -19,7 +19,7 @@ export async function handleWorkerResponseMessageMain(
if (!workerResponse) {
Container.get(Logger).debug(
`Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`,
`Received invalid message via channel ${WORKER_RESPONSE_PUBSUB_CHANNEL}: "${messageString}"`,
);
return;
}

View file

@ -6,7 +6,7 @@ import config from '@/config';
import { TIME } from '@/constants';
import { Logger } from '@/logger';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { RedisClientService } from '@/services/redis/redis-client.service';
import { RedisClientService } from '@/services/redis-client.service';
import { TypedEmitter } from '@/typed-emitter';
type MultiMainEvents = {

View file

@ -1,12 +1,12 @@
import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageMain } from './handle-command-message-main';
import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main';
import type { MainResponseReceivedHandlerOptions } from './types';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redis/redis-constants';
@Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
@ -19,9 +19,9 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService
await this.subscriber.subscribe('n8n.worker-response');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
if (channel === WORKER_RESPONSE_PUBSUB_CHANNEL) {
await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) {
} else if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageMain(messageString);
}
});

View file

@ -1,88 +0,0 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type PubSubMessageMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
'stop-worker': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
packageVersion: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
// currently 'workflow-failed-to-activate'
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};

View file

@ -1,10 +1,10 @@
import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageWebhook } from './handle-command-message-webhook';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { COMMAND_REDIS_CHANNEL } from '../../redis/redis-constants';
@Service()
export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService {
@ -16,7 +16,7 @@ export class OrchestrationHandlerWebhookService extends OrchestrationHandlerServ
await this.subscriber.subscribe('n8n.commands');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageWebhook(messageString);
}
});

View file

@ -7,9 +7,9 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { Logger } from '@/logger';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { RedisServiceCommandObject } from '@/scaling/redis/redis-service-commands';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/redis-constants';
import type { RedisServiceCommandObject } from '@/services/redis/redis-service-commands';
import type { WorkerCommandReceivedHandlerOptions } from './types';
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
@ -17,7 +17,7 @@ import { debounceMessageReceiver, getOsCpuString } from '../helpers';
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
// eslint-disable-next-line complexity
return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
if (channel === COMMAND_PUBSUB_CHANNEL) {
if (!messageString) return;
const logger = Container.get(Logger);
let message: RedisServiceCommandObject;
@ -25,7 +25,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
logger.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
`Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
);
return;
}
@ -145,7 +145,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
}
logger.debug(
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`,
`Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`,
);
break;
}

View file

@ -1,5 +1,4 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
@ -9,14 +8,3 @@ export interface WorkerCommandReceivedHandlerOptions {
getRunningJobIds: () => Array<string | number>;
getRunningJobsSummary: () => RunningJobSummary[];
}
export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}

View file

@ -5,7 +5,7 @@ import { Service } from 'typedi';
import { Logger } from '@/logger';
import type { RedisClientType } from './redis.types';
import type { RedisClientType } from '../scaling/redis/redis.types';
@Service()
export class RedisClientService {

View file

@ -1,2 +0,0 @@
export const COMMAND_REDIS_CHANNEL = 'n8n.commands';
export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response';

View file

@ -1,70 +0,0 @@
import type Redis from 'ioredis';
import type { Cluster } from 'ioredis';
import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from './redis-client.service';
import type { RedisClientType } from './redis.types';
export type RedisServiceMessageHandler =
| ((channel: string, message: string) => void)
| ((stream: string, id: string, message: string[]) => void);
@Service()
class RedisServiceBase {
redisClient: Redis | Cluster | undefined;
isInitialized = false;
constructor(
protected readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {}
async init(type: RedisClientType): Promise<void> {
if (this.redisClient && this.isInitialized) {
return;
}
this.redisClient = this.redisClientService.createClient({ type });
this.redisClient.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) {
this.logger.warn('Error with Redis: ', error);
}
});
this.isInitialized = true;
}
async destroy(): Promise<void> {
if (!this.redisClient) {
return;
}
await this.redisClient.quit();
this.isInitialized = false;
this.redisClient = undefined;
}
}
export abstract class RedisServiceBaseSender extends RedisServiceBase {
senderId: string;
async init(type: RedisClientType): Promise<void> {
await super.init(type);
this.senderId = config.get('redis.queueModeId');
}
}
export abstract class RedisServiceBaseReceiver extends RedisServiceBase {
messageHandlers: Map<string, RedisServiceMessageHandler> = new Map();
addMessageHandler(handlerName: string, handler: RedisServiceMessageHandler): void {
this.messageHandlers.set(handlerName, handler);
}
removeMessageHandler(handlerName: string): void {
this.messageHandlers.delete(handlerName);
}
}