fix(core): Minor improvements to multi-main setup (no-changelog) (#8012)

- Move webhook, poller and trigger activation logs closer to activation
event
- Enrich response of `/debug/multi-main-setup`
- Ensure workflow updates broadcast activation state changes only if
state changed
- Fix bug on workflow activation after leadership change
- Ensure debug controller is not available in production

---------

Co-authored-by: Omar Ajoue <krynble@gmail.com>
This commit is contained in:
Iván Ovejero 2023-12-27 16:55:01 +01:00 committed by GitHub
parent f69ddcd796
commit 2c6ffb0153
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 23 deletions

View file

@ -292,6 +292,10 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
let path = '';
if (webhooks.length === 0) return;
this.logger.debug(`Adding webhooks for workflow "${workflow.name}" (ID ${workflow.id})`);
for (const webhookData of webhooks) {
const node = workflow.getNode(webhookData.node) as INode;
node.name = webhookData.node;
@ -699,14 +703,26 @@ export class ActiveWorkflowRunner implements IWebhookManager {
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
/**
* In a multi-main scenario, webhooks are stored in the database, while triggers
* and pollers are run only by the leader main instance.
*
* - During a regular workflow activation (i.e. not leadership change), only the
* leader should add webhooks to prevent duplicate insertions, and only the leader
* should handle triggers and pollers to prevent duplicate work.
*
* - During a leadership change, webhooks remain in storage and so need not be added
* again, and the new leader should take over the triggers and pollers that stopped
* running when the former leader became unresponsive.
*/
if (this.multiMainSetup.isEnabled) {
if (activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
} else {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}
}
try {
@ -744,14 +760,10 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
if (shouldAddWebhooks) {
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
}
if (shouldAddTriggersAndPollers) {
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
executionMode: 'trigger',
@ -936,6 +948,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
);
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
await this.activeWorkflows.add(
workflow.id,
workflow,

View file

@ -248,7 +248,7 @@ export class Server extends AbstractServer {
ActiveWorkflowsController,
];
if (Container.get(MultiMainSetup).isEnabled) {
if (process.env.NODE_ENV !== 'production' && Container.get(MultiMainSetup).isEnabled) {
const { DebugController } = await import('./controllers/debug.controller');
controllers.push(DebugController);
}

View file

@ -3,6 +3,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { In } from 'typeorm';
import { WebhookEntity } from '@/databases/entities/WebhookEntity';
@RestController('/debug')
export class DebugController {
@ -16,17 +17,27 @@ export class DebugController {
async getMultiMainSetupDetails() {
const leaderKey = await this.multiMainSetup.fetchLeaderKey();
const activeWorkflows = await this.workflowRepository.find({
const triggersAndPollers = await this.workflowRepository.find({
select: ['id', 'name'],
where: { id: In(this.activeWorkflowRunner.allActiveInMemory()) },
});
const webhooks = (await this.workflowRepository
.createQueryBuilder('workflow')
.select('DISTINCT workflow.id, workflow.name')
.innerJoin(WebhookEntity, 'webhook_entity', 'workflow.id = webhook_entity.workflowId')
.execute()) as Array<{ id: string; name: string }>;
const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors();
return {
instanceId: this.multiMainSetup.instanceId,
leaderKey,
activeWorkflows,
isLeader: this.multiMainSetup.isLeader,
activeWorkflows: {
webhooks, // webhook-based active workflows
triggersAndPollers, // poller- and trigger-based active workflows
},
activationErrors,
};
}

View file

@ -307,11 +307,13 @@ export class WorkflowService {
await this.multiMainSetup.init();
if (this.multiMainSetup.isEnabled) {
const newState = updatedWorkflow.active;
if (this.multiMainSetup.isEnabled && oldState !== newState) {
await this.multiMainSetup.broadcastWorkflowActiveStateChanged({
workflowId,
oldState,
newState: updatedWorkflow.active,
newState,
versionId: shared.workflow.versionId,
});
}

View file

@ -25,22 +25,39 @@ describe('DebugController', () => {
describe('GET /debug/multi-main-setup', () => {
test('should return multi-main setup details', async () => {
const workflowId = generateNanoId();
const activeWorkflows = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
const webhooks = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
const triggersAndPollers = [{ id: workflowId, name: randomName() }] as WorkflowEntity[];
const activationErrors = { [workflowId]: 'Failed to activate' };
const instanceId = 'main-71JdWtq306epIFki';
const leaderKey = 'some-leader-key';
workflowRepository.find.mockResolvedValue(activeWorkflows);
const createQueryBuilder = {
select: () => createQueryBuilder,
innerJoin: () => createQueryBuilder,
execute: () => webhooks,
};
workflowRepository.find.mockResolvedValue(triggersAndPollers);
activeWorkflowRunner.allActiveInMemory.mockReturnValue([workflowId]);
activeWorkflowRunner.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors);
jest
.spyOn(workflowRepository, 'createQueryBuilder')
.mockImplementation(() => createQueryBuilder);
jest.spyOn(MultiMainSetup.prototype, 'instanceId', 'get').mockReturnValue(instanceId);
jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue('some-leader-key');
jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey);
jest.spyOn(MultiMainSetup.prototype, 'isLeader', 'get').mockReturnValue(true);
const response = await ownerAgent.get('/debug/multi-main-setup').expect(200);
expect(response.body.data).toMatchObject({
instanceId,
leaderKey: 'some-leader-key',
activeWorkflows,
leaderKey,
isLeader: true,
activeWorkflows: {
webhooks,
triggersAndPollers,
},
activationErrors,
});
});

View file

@ -9,14 +9,17 @@ import { SharedWorkflowRepository } from '@/databases/repositories/sharedWorkflo
import { mock } from 'jest-mock-extended';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { Telemetry } from '@/telemetry';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
let workflowService: WorkflowService;
let activeWorkflowRunner: ActiveWorkflowRunner;
let multiMainSetup: MultiMainSetup;
beforeAll(async () => {
await testDb.init();
activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
multiMainSetup = mockInstance(MultiMainSetup);
mockInstance(Telemetry);
workflowService = new WorkflowService(
@ -29,7 +32,7 @@ beforeAll(async () => {
mock(),
mock(),
mock(),
mock(),
multiMainSetup,
mock(),
mock(),
mock(),
@ -82,4 +85,27 @@ describe('update()', () => {
expect(addSpy).not.toHaveBeenCalled();
});
test('should broadcast active workflow state change if state changed', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged');
workflow.active = false;
await workflowService.update(owner, workflow, workflow.id);
expect(broadcastSpy).toHaveBeenCalledTimes(1);
});
test('should not broadcast active workflow state change if state did not change', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged');
await workflowService.update(owner, workflow, workflow.id);
expect(broadcastSpy).not.toHaveBeenCalled();
});
});