- destination will be removed by worker unexpected
- get destinations should load from DB instead of cache
This commit is contained in:
cwang14 2024-09-29 17:28:51 +08:00
parent 0ca9c076ca
commit 581894fb55
No known key found for this signature in database
GPG key ID: CCEB28E51A507A37

View file

@ -99,7 +99,7 @@ export class MessageEventBus extends EventEmitter {
for (const destinationData of savedEventDestinations) { for (const destinationData of savedEventDestinations) {
try { try {
const destination = messageEventBusDestinationFromDb(this, destinationData); const destination = messageEventBusDestinationFromDb(this, destinationData);
if (destination) { if (destination && destination.enabled) {
await this.addDestination(destination, false); await this.addDestination(destination, false);
} }
} catch (error) { } catch (error) {
@ -206,21 +206,36 @@ export class MessageEventBus extends EventEmitter {
} }
async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) { async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) {
await this.removeDestination(destination.getId(), false);
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) { if (notifyWorkers) {
await this.removeDestination(destination.getId(), false);
await destination.saveToDb();
await this.orchestrationService.publish('restart-event-bus'); await this.orchestrationService.publish('restart-event-bus');
} }
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
return destination; return destination;
} }
async findDestination(id?: string): Promise<MessageEventBusDestinationOptions[]> { async findDestination(id?: string): Promise<MessageEventBusDestinationOptions[]> {
let result: MessageEventBusDestinationOptions[]; let result: MessageEventBusDestinationOptions[] = [];
if (id && Object.keys(this.destinations).includes(id)) { if (id) {
result = [this.destinations[id].serialize()]; const savedEventDestination = await this.eventDestinationsRepository.findOne({ where: { id } });
if (savedEventDestination) {
const destination = messageEventBusDestinationFromDb(this, savedEventDestination);
if (destination) {
result = [destination.serialize()];
}
}
} else { } else {
result = Object.keys(this.destinations).map((e) => this.destinations[e].serialize()); const savedEventDestinations = await this.eventDestinationsRepository.find();
if (savedEventDestinations.length > 0) {
for (const destinationData of savedEventDestinations) {
const destination = messageEventBusDestinationFromDb(this, destinationData)
if (destination) {
result.push(destination.serialize());
}
}
}
} }
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
} }