From 581894fb552ef822ac20520f45a136d6279ac8dd Mon Sep 17 00:00:00 2001 From: cwang14 Date: Sun, 29 Sep 2024 17:28:51 +0800 Subject: [PATCH] bugfix: - destination will be removed by worker unexpected - get destinations should load from DB instead of cache --- .../message-event-bus/message-event-bus.ts | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index fec5b4845b..2340acc474 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -99,7 +99,7 @@ export class MessageEventBus extends EventEmitter { for (const destinationData of savedEventDestinations) { try { const destination = messageEventBusDestinationFromDb(this, destinationData); - if (destination) { + if (destination && destination.enabled) { await this.addDestination(destination, false); } } catch (error) { @@ -206,21 +206,36 @@ export class MessageEventBus extends EventEmitter { } async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) { - await this.removeDestination(destination.getId(), false); - this.destinations[destination.getId()] = destination; - this.destinations[destination.getId()].startListening(); if (notifyWorkers) { + await this.removeDestination(destination.getId(), false); + await destination.saveToDb(); await this.orchestrationService.publish('restart-event-bus'); } + this.destinations[destination.getId()] = destination; + this.destinations[destination.getId()].startListening(); return destination; } async findDestination(id?: string): Promise { - let result: MessageEventBusDestinationOptions[]; - if (id && Object.keys(this.destinations).includes(id)) { - result = [this.destinations[id].serialize()]; - } else { - result = Object.keys(this.destinations).map((e) => this.destinations[e].serialize()); + let result: MessageEventBusDestinationOptions[] = []; + if (id) { + const savedEventDestination = await this.eventDestinationsRepository.findOne({ where: { id } }); + if (savedEventDestination) { + const destination = messageEventBusDestinationFromDb(this, savedEventDestination); + if (destination) { + result = [destination.serialize()]; + } + } + } else { + const savedEventDestinations = await this.eventDestinationsRepository.find(); + if (savedEventDestinations.length > 0) { + for (const destinationData of savedEventDestinations) { + const destination = messageEventBusDestinationFromDb(this, destinationData) + if (destination) { + result.push(destination.serialize()); + } + } + } } return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); }