feat(core): Support bidirectional communication between specific mains and specific workers (#10377)

This commit is contained in:
Iván Ovejero 2024-08-20 12:32:31 +02:00 committed by GitHub
parent 51f3e84dff
commit d0fc9dee0e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 57 additions and 25 deletions

View file

@ -239,7 +239,10 @@ export class Start extends BaseCommand {
await orchestrationService.init(); await orchestrationService.init();
await Container.get(OrchestrationHandlerMainService).init(); await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationService).redisPublisher,
});
if (!orchestrationService.isMultiMainSetupEnabled) return; if (!orchestrationService.isMultiMainSetupEnabled) return;

View file

@ -19,6 +19,7 @@ import { Push } from '@/push';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis/redis-client.service';
import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types';
const instanceSettings = Container.get(InstanceSettings); const instanceSettings = Container.get(InstanceSettings);
const redisClientService = mockInstance(RedisClientService); const redisClientService = mockInstance(RedisClientService);
@ -96,8 +97,9 @@ describe('Orchestration Service', () => {
test('should handle worker responses', async () => { test('should handle worker responses', async () => {
const response = await handleWorkerResponseMessageMain( const response = await handleWorkerResponseMessageMain(
JSON.stringify(workerRestartEventBusResponse), JSON.stringify(workerRestartEventBusResponse),
mock<MainResponseReceivedHandlerOptions>(),
); );
expect(response.command).toEqual('restartEventBus'); expect(response?.command).toEqual('restartEventBus');
}); });
test('should handle command messages from others', async () => { test('should handle command messages from others', async () => {

View file

@ -2,6 +2,7 @@ import Container from 'typedi';
import { RedisService } from './redis.service'; import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types';
export abstract class OrchestrationHandlerService { export abstract class OrchestrationHandlerService {
protected initialized = false; protected initialized = false;
@ -19,7 +20,9 @@ export abstract class OrchestrationHandlerService {
this.initialized = true; this.initialized = true;
} }
async initWithOptions(options: WorkerCommandReceivedHandlerOptions) { async initWithOptions(
options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
) {
await this.initSubscriber(options); await this.initSubscriber(options);
this.initialized = true; this.initialized = true;
} }
@ -29,5 +32,7 @@ export abstract class OrchestrationHandlerService {
this.initialized = false; this.initialized = false;
} }
protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise<void>; protected abstract initSubscriber(
options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions,
): Promise<void>;
} }

View file

@ -3,25 +3,40 @@ import Container from 'typedi';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { Push } from '../../../push'; import { Push } from '../../../push';
import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands';
import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/RedisConstants';
import type { MainResponseReceivedHandlerOptions } from './types';
export async function handleWorkerResponseMessageMain(messageString: string) { export async function handleWorkerResponseMessageMain(
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString); messageString: string,
if (workerResponse) { options: MainResponseReceivedHandlerOptions,
switch (workerResponse.command) { ) {
case 'getStatus': const workerResponse = jsonParse<RedisServiceWorkerResponseObject | null>(messageString, {
const push = Container.get(Push); fallbackValue: null,
push.broadcast('sendWorkerStatusMessage', { });
workerId: workerResponse.workerId,
status: workerResponse.payload, if (!workerResponse) {
}); Container.get(Logger).debug(
break; `Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`,
case 'getId': );
break; return;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
} }
if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return;
switch (workerResponse.command) {
case 'getStatus':
Container.get(Push).broadcast('sendWorkerStatusMessage', {
workerId: workerResponse.workerId,
status: workerResponse.payload,
});
break;
case 'getId':
break;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
return workerResponse; return workerResponse;
} }

View file

@ -3,10 +3,11 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi
import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain';
import { handleCommandMessageMain } from './handleCommandMessageMain'; import { handleCommandMessageMain } from './handleCommandMessageMain';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { MainResponseReceivedHandlerOptions } from './types';
@Service() @Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService { export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
async initSubscriber() { async initSubscriber(options: MainResponseReceivedHandlerOptions) {
this.redisSubscriber = await this.redisService.getPubSubSubscriber(); this.redisSubscriber = await this.redisService.getPubSubSubscriber();
await this.redisSubscriber.subscribeToCommandChannel(); await this.redisSubscriber.subscribeToCommandChannel();
@ -16,7 +17,7 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService
'OrchestrationMessageReceiver', 'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => { async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessageMain(messageString); await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) { } else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessageMain(messageString); await handleCommandMessageMain(messageString);
} }

View file

@ -0,0 +1,6 @@
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
export type MainResponseReceivedHandlerOptions = {
queueModeId: string;
redisPublisher: RedisServicePubSubPublisher;
};

View file

@ -94,7 +94,7 @@ export type RedisServiceWorkerResponseObject = {
workflowId: string; workflowId: string;
}; };
} }
); ) & { targets?: string[] };
export type RedisServiceCommandObject = { export type RedisServiceCommandObject = {
targets?: string[]; targets?: string[];