refactor(core): Rename ActiveWorkflowRunner to ActiveWorkflowManager (no-changelog) (#9280)

This commit is contained in:
Iván Ovejero 2024-05-06 17:54:05 +02:00 committed by GitHub
parent 552cf8f3db
commit 7b925ab871
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 129 additions and 126 deletions

View file

@ -58,7 +58,7 @@ interface QueuedActivation {
}
@Service()
export class ActiveWorkflowRunner {
export class ActiveWorkflowManager {
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};
constructor(

View file

@ -26,7 +26,7 @@ import type {
StartNodeData,
} from 'n8n-workflow';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import type { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import type { WorkflowExecute } from 'n8n-core';
@ -638,7 +638,7 @@ export interface N8nApp {
app: Application;
restEndpoint: string;
externalHooks: ExternalHooks;
activeWorkflowRunner: ActiveWorkflowRunner;
activeWorkflowManager: ActiveWorkflowManager;
}
export type UserSettings = Pick<User, 'id' | 'settings'>;

View file

@ -5,7 +5,7 @@ import type { FindOptionsWhere } from '@n8n/typeorm';
import { In, Like, QueryFailedError } from '@n8n/typeorm';
import { v4 as uuid } from 'uuid';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import config from '@/config';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ExternalHooks } from '@/ExternalHooks';
@ -179,12 +179,12 @@ export = {
await replaceInvalidCredentials(updateData);
addNodeIds(updateData);
const workflowRunner = Container.get(ActiveWorkflowRunner);
const workflowManager = Container.get(ActiveWorkflowManager);
if (sharedWorkflow.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await workflowRunner.remove(id);
await workflowManager.remove(id);
}
try {
@ -197,7 +197,7 @@ export = {
if (sharedWorkflow.workflow.active) {
try {
await workflowRunner.add(sharedWorkflow.workflowId, 'update');
await workflowManager.add(sharedWorkflow.workflowId, 'update');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
@ -236,7 +236,7 @@ export = {
if (!sharedWorkflow.workflow.active) {
try {
await Container.get(ActiveWorkflowRunner).add(sharedWorkflow.workflowId, 'activate');
await Container.get(ActiveWorkflowManager).add(sharedWorkflow.workflowId, 'activate');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
@ -268,10 +268,10 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}
const workflowRunner = Container.get(ActiveWorkflowRunner);
const activeWorkflowManager = Container.get(ActiveWorkflowManager);
if (sharedWorkflow.workflow.active) {
await workflowRunner.remove(sharedWorkflow.workflowId);
await activeWorkflowManager.remove(sharedWorkflow.workflowId);
await setWorkflowAsInactive(sharedWorkflow.workflow);

View file

@ -12,7 +12,7 @@ import { jsonParse } from 'n8n-workflow';
import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Server } from '@/Server';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
@ -57,7 +57,7 @@ export class Start extends BaseCommand {
}),
};
protected activeWorkflowRunner: ActiveWorkflowRunner;
protected activeWorkflowManager: ActiveWorkflowManager;
protected server = Container.get(Server);
@ -92,14 +92,14 @@ export class Start extends BaseCommand {
try {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
this.activeWorkflowManager.removeAllQueuedWorkflowActivations();
Container.get(WaitTracker).stopTracking();
await this.externalHooks?.run('n8n.stop', []);
if (Container.get(OrchestrationService).isMultiMainSetupEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
await Container.get(OrchestrationService).shutdown();
}
@ -171,7 +171,7 @@ export class Start extends BaseCommand {
}
await super.init();
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);
await this.initLicense();
@ -212,11 +212,11 @@ export class Start extends BaseCommand {
orchestrationService.multiMainSetup
.on('leader-stepdown', async () => {
await this.license.reinit(); // to disable renewal
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
})
.on('leader-takeover', async () => {
await this.license.reinit(); // to enable renewal
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.addAllTriggerAndPollerBasedWorkflows();
});
}
@ -286,7 +286,7 @@ export class Start extends BaseCommand {
await this.initPruning();
// Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init();
await this.activeWorkflowManager.init();
const editorUrl = Container.get(UrlService).baseUrl;
this.log(`\nEditor is now accessible via:\n${editorUrl}`);

View file

@ -1,5 +1,5 @@
import { Get, RestController } from '@/decorators';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { OrchestrationService } from '@/services/orchestration.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
@ -7,7 +7,7 @@ import { WorkflowRepository } from '@/databases/repositories/workflow.repository
export class DebugController {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly workflowRepository: WorkflowRepository,
) {}
@ -16,12 +16,12 @@ export class DebugController {
const leaderKey = await this.orchestrationService.multiMainSetup.fetchLeaderKey();
const triggersAndPollers = await this.workflowRepository.findIn(
this.activeWorkflowRunner.allActiveInMemory(),
this.activeWorkflowManager.allActiveInMemory(),
);
const webhooks = await this.workflowRepository.findWebhookBasedActiveWorkflows();
const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors();
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();
return {
instanceId: this.orchestrationService.instanceId,

View file

@ -3,7 +3,7 @@ import { v4 as uuid } from 'uuid';
import config from '@/config';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { UserRepository } from '@db/repositories/user.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { License } from '@/License';
import { LICENSE_FEATURES, inE2ETests } from '@/constants';
@ -87,7 +87,7 @@ export class E2EController {
license: License,
private readonly settingsRepo: SettingsRepository,
private readonly userRepo: UserRepository,
private readonly workflowRunner: ActiveWorkflowRunner,
private readonly workflowRunner: ActiveWorkflowManager,
private readonly mfaService: MfaService,
private readonly cacheService: CacheService,
private readonly push: Push,

View file

@ -11,7 +11,7 @@ import {
UserRoleChangePayload,
UserSettingsUpdatePayload,
} from '@/requests';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import type { PublicUser, ITelemetryUserDeletionData } from '@/Interfaces';
import { AuthIdentity } from '@db/entities/AuthIdentity';
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
@ -36,7 +36,7 @@ export class UsersController {
private readonly sharedCredentialsRepository: SharedCredentialsRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly userRepository: UserRepository,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly authService: AuthService,
private readonly userService: UserService,
) {}
@ -264,7 +264,7 @@ export class UsersController {
ownedSharedWorkflows.map(async ({ workflow }) => {
if (workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflow.id);
await this.activeWorkflowManager.remove(workflow.id);
}
return workflow;
}),

View file

@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906994 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN triggerCount integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}
async down({ queryRunner, tablePrefix }: MigrationContext) {

View file

@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906995 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}
async down({ queryRunner, tablePrefix }: MigrationContext) {

View file

@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906993 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}
async down({ queryRunner, tablePrefix }: MigrationContext) {

View file

@ -17,7 +17,7 @@ import type { Variables } from '@db/entities/Variables';
import { SharedCredentials } from '@db/entities/SharedCredentials';
import type { WorkflowTagMapping } from '@db/entities/WorkflowTagMapping';
import type { TagEntity } from '@db/entities/TagEntity';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { In } from '@n8n/typeorm';
import { isUniqueConstraintError } from '@/ResponseHelper';
import type { SourceControlWorkflowVersionId } from './types/sourceControlWorkflowVersionId';
@ -45,7 +45,7 @@ export class SourceControlImportService {
constructor(
private readonly logger: Logger,
private readonly variablesService: VariablesService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly tagRepository: TagRepository,
instanceSettings: InstanceSettings,
) {
@ -203,7 +203,7 @@ export class SourceControlImportService {
}
public async importWorkflowFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
const workflowRunner = this.activeWorkflowRunner;
const workflowRunner = this.activeWorkflowManager;
const candidateIds = candidates.map((c) => c.id);
const existingWorkflows = await Container.get(WorkflowRepository).findByIds(candidateIds, {
fields: ['id', 'name', 'versionId', 'active'],

View file

@ -5,7 +5,7 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Push } from '@/push';
import { TestWebhooks } from '@/TestWebhooks';
import { OrchestrationService } from '@/services/orchestration.service';
@ -93,7 +93,7 @@ export async function handleCommandMessageMain(messageString: string) {
const { workflowId } = message.payload;
try {
await Container.get(ActiveWorkflowRunner).add(workflowId, 'activate', undefined, {
await Container.get(ActiveWorkflowManager).add(workflowId, 'activate', undefined, {
shouldPublish: false, // prevent leader re-publishing message
});
@ -134,10 +134,10 @@ export async function handleCommandMessageMain(messageString: string) {
const { workflowId } = message.payload;
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
const activeWorkflowManager = Container.get(ActiveWorkflowManager);
await activeWorkflowRunner.removeActivationError(workflowId);
await activeWorkflowRunner.removeWorkflowTriggersAndPollers(workflowId);
await activeWorkflowManager.removeActivationError(workflowId);
await activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId);
push.broadcast('workflowDeactivated', { workflowId });

View file

@ -13,7 +13,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { validateEntity } from '@/GenericHelpers';
import { ExternalHooks } from '@/ExternalHooks';
@ -41,7 +41,7 @@ export class WorkflowService {
private readonly workflowHistoryService: WorkflowHistoryService,
private readonly orchestrationService: OrchestrationService,
private readonly externalHooks: ExternalHooks,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
) {}
async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
@ -120,7 +120,7 @@ export class WorkflowService {
* will take effect only on removing and re-adding.
*/
if (shared.workflow.active) {
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}
const workflowSettings = workflow.settings ?? {};
@ -200,7 +200,7 @@ export class WorkflowService {
// When the workflow is supposed to be active add it again
try {
await this.externalHooks.run('workflow.activate', [updatedWorkflow]);
await this.activeWorkflowRunner.add(
await this.activeWorkflowManager.add(
workflowId,
shared.workflow.active ? 'update' : 'activate',
);
@ -245,7 +245,7 @@ export class WorkflowService {
if (sharedWorkflow.workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}
const idsForDeletion = await this.executionRepository

View file

@ -4,7 +4,7 @@ import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { ExternalHooks } from '@/ExternalHooks';
import { Push } from '@/push';
import { SecretsHelper } from '@/SecretsHelpers';
@ -47,7 +47,7 @@ Object.assign(loader.loadedNodes, {
const webhookService = mockInstance(WebhookService);
const externalHooks = mockInstance(ExternalHooks);
let runner: ActiveWorkflowRunner;
let activeWorkflowManager: ActiveWorkflowManager;
let createActiveWorkflow: () => Promise<WorkflowEntity>;
let createInactiveWorkflow: () => Promise<WorkflowEntity>;
@ -55,7 +55,7 @@ let createInactiveWorkflow: () => Promise<WorkflowEntity>;
beforeAll(async () => {
await testDb.init();
runner = Container.get(ActiveWorkflowRunner);
activeWorkflowManager = Container.get(ActiveWorkflowManager);
const owner = await createOwner();
createActiveWorkflow = async () => await createWorkflow({ active: true }, owner);
@ -64,7 +64,7 @@ beforeAll(async () => {
afterEach(async () => {
await testDb.truncate(['Workflow', 'Webhook']);
await runner.removeAll();
await activeWorkflowManager.removeAll();
jest.restoreAllMocks();
});
@ -74,18 +74,18 @@ afterAll(async () => {
describe('init()', () => {
it('should load workflows into memory', async () => {
await runner.init();
await activeWorkflowManager.init();
expect(runner.allActiveInMemory()).toHaveLength(0);
expect(activeWorkflowManager.allActiveInMemory()).toHaveLength(0);
await createActiveWorkflow();
await runner.init();
await activeWorkflowManager.init();
expect(runner.allActiveInMemory()).toHaveLength(1);
expect(activeWorkflowManager.allActiveInMemory()).toHaveLength(1);
});
it('should call external hook', async () => {
await runner.init();
await activeWorkflowManager.init();
const [hook, arg] = externalHooks.run.mock.calls[0];
@ -100,7 +100,7 @@ describe('init()', () => {
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
.mockReturnValue(true);
await runner.init();
await activeWorkflowManager.init();
expect(checkSpy).toHaveBeenCalledTimes(2);
});
@ -110,17 +110,17 @@ describe('isActive()', () => {
it('should return `true` for active workflow in storage', async () => {
const dbWorkflow = await createActiveWorkflow();
await runner.init();
await activeWorkflowManager.init();
await expect(runner.isActive(dbWorkflow.id)).resolves.toBe(true);
await expect(activeWorkflowManager.isActive(dbWorkflow.id)).resolves.toBe(true);
});
it('should return `false` for inactive workflow in storage', async () => {
const dbWorkflow = await createInactiveWorkflow();
await runner.init();
await activeWorkflowManager.init();
await expect(runner.isActive(dbWorkflow.id)).resolves.toBe(false);
await expect(activeWorkflowManager.isActive(dbWorkflow.id)).resolves.toBe(false);
});
});
@ -129,13 +129,13 @@ describe('add()', () => {
test.each(['activate', 'update'])(
"should add webhooks, triggers and pollers for workflow in '%s' activation mode",
async (mode: WorkflowActivateMode) => {
await runner.init();
await activeWorkflowManager.init();
const dbWorkflow = await createActiveWorkflow();
const addWebhooksSpy = jest.spyOn(runner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(runner, 'addTriggersAndPollers');
const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowManager, 'addTriggersAndPollers');
await runner.add(dbWorkflow.id, mode);
await activeWorkflowManager.add(dbWorkflow.id, mode);
const [argWorkflow] = addWebhooksSpy.mock.calls[0];
const [_, _argWorkflow] = addTriggersAndPollersSpy.mock.calls[0];
@ -158,10 +158,10 @@ describe('removeAll()', () => {
await createActiveWorkflow();
await createActiveWorkflow();
await runner.init();
await runner.removeAll();
await activeWorkflowManager.init();
await activeWorkflowManager.removeAll();
expect(runner.allActiveInMemory()).toHaveLength(0);
expect(activeWorkflowManager.allActiveInMemory()).toHaveLength(0);
});
});
@ -170,8 +170,8 @@ describe('remove()', () => {
it('should remove all webhooks of a workflow from database', async () => {
const dbWorkflow = await createActiveWorkflow();
await runner.init();
await runner.remove(dbWorkflow.id);
await activeWorkflowManager.init();
await activeWorkflowManager.remove(dbWorkflow.id);
expect(webhookService.deleteWorkflowWebhooks).toHaveBeenCalledTimes(1);
});
@ -183,18 +183,21 @@ describe('remove()', () => {
.spyOn(WebhookHelpers, 'getWorkflowWebhooks')
.mockReturnValue([mock<IWebhookData>({ path: 'some-path' })]);
await runner.init();
await runner.remove(dbWorkflow.id);
await activeWorkflowManager.init();
await activeWorkflowManager.remove(dbWorkflow.id);
expect(deleteWebhookSpy).toHaveBeenCalledTimes(1);
});
it('should stop running triggers and pollers', async () => {
const dbWorkflow = await createActiveWorkflow();
const removeTriggersAndPollersSpy = jest.spyOn(runner, 'removeWorkflowTriggersAndPollers');
const removeTriggersAndPollersSpy = jest.spyOn(
activeWorkflowManager,
'removeWorkflowTriggersAndPollers',
);
await runner.init();
await runner.remove(dbWorkflow.id);
await activeWorkflowManager.init();
await activeWorkflowManager.remove(dbWorkflow.id);
expect(removeTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
@ -208,9 +211,9 @@ describe('executeErrorWorkflow()', () => {
const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow');
await runner.init();
await activeWorkflowManager.init();
runner.executeErrorWorkflow(
activeWorkflowManager.executeErrorWorkflow(
new NodeOperationError(node, 'Something went wrong'),
dbWorkflow,
'trigger',
@ -222,16 +225,16 @@ describe('executeErrorWorkflow()', () => {
it('should be called on failure to activate due to 401', async () => {
const dbWorkflow = await createActiveWorkflow();
const [node] = dbWorkflow.nodes;
const executeSpy = jest.spyOn(runner, 'executeErrorWorkflow');
const executeSpy = jest.spyOn(activeWorkflowManager, 'executeErrorWorkflow');
jest.spyOn(runner, 'add').mockImplementation(() => {
jest.spyOn(activeWorkflowManager, 'add').mockImplementation(() => {
throw new NodeApiError(node, {
httpCode: '401',
message: 'Authorization failed - please check your credentials',
});
});
await runner.init();
await activeWorkflowManager.init();
expect(executeSpy).toHaveBeenCalledTimes(1);
const [error, _dbWorkflow] = executeSpy.mock.calls[0];
@ -270,7 +273,7 @@ describe('addWebhooks()', () => {
jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true);
jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined);
await runner.addWebhooks(workflow, additionalData, 'trigger', 'init');
await activeWorkflowManager.addWebhooks(workflow, additionalData, 'trigger', 'init');
expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1);
});

View file

@ -1,4 +1,4 @@
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import type { SuperAgentTest } from 'supertest';
import * as utils from './shared/utils/';
@ -6,7 +6,7 @@ import { createUser } from './shared/db/users';
import { mockInstance } from '../shared/mocking';
describe('Auth Middleware', () => {
mockInstance(ActiveWorkflowRunner);
mockInstance(ActiveWorkflowManager);
const testServer = utils.setupTestServer({
endpointGroups: ['me', 'auth', 'owner', 'users', 'invitations'],

View file

@ -1,5 +1,5 @@
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { mockInstance } from '../shared/mocking';
import { randomName } from './shared/random';
import { generateNanoId } from '@/databases/utils/generators';
@ -12,7 +12,7 @@ import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'
describe('DebugController', () => {
const workflowRepository = mockInstance(WorkflowRepository);
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
const activeWorkflowManager = mockInstance(ActiveWorkflowManager);
let testServer = setupTestServer({ endpointGroups: ['debug'] });
let ownerAgent: SuperAgentTest;
@ -34,8 +34,8 @@ describe('DebugController', () => {
workflowRepository.findIn.mockResolvedValue(triggersAndPollers);
workflowRepository.findWebhookBasedActiveWorkflows.mockResolvedValue(webhooks);
activeWorkflowRunner.allActiveInMemory.mockReturnValue([workflowId]);
activeWorkflowRunner.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors);
activeWorkflowManager.allActiveInMemory.mockReturnValue([workflowId]);
activeWorkflowManager.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors);
jest.spyOn(OrchestrationService.prototype, 'instanceId', 'get').mockReturnValue(instanceId);
jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey);

View file

@ -1,6 +1,6 @@
import type { SuperAgentTest } from 'supertest';
import type { User } from '@db/entities/User';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import type { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { randomApiKey } from '../shared/random';
import * as utils from '../shared/utils/';
@ -24,7 +24,7 @@ let user2: User;
let authOwnerAgent: SuperAgentTest;
let authUser1Agent: SuperAgentTest;
let authUser2Agent: SuperAgentTest;
let workflowRunner: ActiveWorkflowRunner;
let workflowRunner: ActiveWorkflowManager;
const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] });
@ -37,7 +37,7 @@ beforeAll(async () => {
await utils.initBinaryDataService();
await utils.initNodeTypes();
workflowRunner = await utils.initActiveWorkflowRunner();
workflowRunner = await utils.initActiveWorkflowManager();
});
beforeEach(async () => {

View file

@ -7,7 +7,7 @@ import type { TagEntity } from '@db/entities/TagEntity';
import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { ExecutionService } from '@/executions/execution.service';
import { randomApiKey } from '../shared/random';
@ -22,7 +22,7 @@ let owner: User;
let member: User;
let authOwnerAgent: SuperAgentTest;
let authMemberAgent: SuperAgentTest;
let workflowRunner: ActiveWorkflowRunner;
let activeWorkflowManager: ActiveWorkflowManager;
const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] });
const license = testServer.license;
@ -42,9 +42,9 @@ beforeAll(async () => {
await utils.initNodeTypes();
workflowRunner = Container.get(ActiveWorkflowRunner);
activeWorkflowManager = Container.get(ActiveWorkflowManager);
await workflowRunner.init();
await activeWorkflowManager.init();
});
beforeEach(async () => {
@ -62,7 +62,7 @@ beforeEach(async () => {
});
afterEach(async () => {
await workflowRunner?.removeAll();
await activeWorkflowManager?.removeAll();
});
const testWithAPIKey =
@ -517,7 +517,7 @@ describe('POST /workflows/:id/activate', () => {
expect(sharedWorkflow?.workflow.active).toBe(true);
// check whether the workflow is on the active workflow runner
expect(await workflowRunner.isActive(workflow.id)).toBe(true);
expect(await activeWorkflowManager.isActive(workflow.id)).toBe(true);
});
test('should set non-owned workflow as active when owner', async () => {
@ -561,7 +561,7 @@ describe('POST /workflows/:id/activate', () => {
expect(sharedWorkflow?.workflow.active).toBe(true);
// check whether the workflow is on the active workflow runner
expect(await workflowRunner.isActive(workflow.id)).toBe(true);
expect(await activeWorkflowManager.isActive(workflow.id)).toBe(true);
});
});
@ -615,7 +615,7 @@ describe('POST /workflows/:id/deactivate', () => {
// check whether the workflow is deactivated in the database
expect(sharedWorkflow?.workflow.active).toBe(false);
expect(await workflowRunner.isActive(workflow.id)).toBe(false);
expect(await activeWorkflowManager.isActive(workflow.id)).toBe(false);
});
test('should deactivate non-owned workflow when owner', async () => {
@ -660,7 +660,7 @@ describe('POST /workflows/:id/deactivate', () => {
expect(sharedWorkflow?.workflow.active).toBe(false);
expect(await workflowRunner.isActive(workflow.id)).toBe(false);
expect(await activeWorkflowManager.isActive(workflow.id)).toBe(false);
});
});

View file

@ -29,7 +29,7 @@ export { setupTestServer } from './testServer';
/**
* Initialize node types.
*/
export async function initActiveWorkflowRunner() {
export async function initActiveWorkflowManager() {
mockInstance(OrchestrationService, {
isMultiMainSetupEnabled: false,
shouldAddWebhooks: jest.fn().mockReturnValue(true),
@ -37,10 +37,10 @@ export async function initActiveWorkflowRunner() {
mockInstance(Push);
mockInstance(ExecutionService);
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
const workflowRunner = Container.get(ActiveWorkflowRunner);
await workflowRunner.init();
return workflowRunner;
const { ActiveWorkflowManager } = await import('@/ActiveWorkflowManager');
const activeWorkflowManager = Container.get(ActiveWorkflowManager);
await activeWorkflowManager.init();
return activeWorkflowManager;
}
/**

View file

@ -11,7 +11,7 @@ import { Push } from '@/push';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { mockInstance } from '../shared/mocking';
import { initActiveWorkflowRunner } from './shared/utils';
import { initActiveWorkflowManager } from './shared/utils';
import * as testDb from './shared/testDb';
import { createUser } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows';
@ -41,7 +41,7 @@ describe('Webhook API', () => {
nodeTypes.getByName.mockReturnValue(node);
nodeTypes.getByNameAndVersion.mockReturnValue(node);
await initActiveWorkflowRunner();
await initActiveWorkflowManager();
const server = new (class extends AbstractServer {})();
await server.start();
@ -144,7 +144,7 @@ describe('Webhook API', () => {
nodeTypes.getByName.mockReturnValue(node);
nodeTypes.getByNameAndVersion.mockReturnValue(node);
await initActiveWorkflowRunner();
await initActiveWorkflowManager();
const server = new (class extends AbstractServer {})();
await server.start();

View file

@ -1,6 +1,6 @@
import Container from 'typedi';
import { mock } from 'jest-mock-extended';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
@ -14,7 +14,7 @@ import { createOwner } from '../shared/db/users';
import { createWorkflow } from '../shared/db/workflows';
let workflowService: WorkflowService;
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
const activeWorkflowManager = mockInstance(ActiveWorkflowManager);
const orchestrationService = mockInstance(OrchestrationService);
mockInstance(MessageEventBus);
mockInstance(Telemetry);
@ -34,7 +34,7 @@ beforeAll(async () => {
mock(),
orchestrationService,
mock(),
activeWorkflowRunner,
activeWorkflowManager,
);
});
@ -52,8 +52,8 @@ describe('update()', () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
const removeSpy = jest.spyOn(activeWorkflowManager, 'remove');
const addSpy = jest.spyOn(activeWorkflowManager, 'add');
await workflowService.update(owner, workflow, workflow.id);
@ -71,8 +71,8 @@ describe('update()', () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
const removeSpy = jest.spyOn(activeWorkflowManager, 'remove');
const addSpy = jest.spyOn(activeWorkflowManager, 'add');
workflow.active = false;
await workflowService.update(owner, workflow, workflow.id);

View file

@ -5,7 +5,7 @@ import type { INode } from 'n8n-workflow';
import type { User } from '@db/entities/User';
import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { WorkflowSharingService } from '@/workflows/workflowSharing.service';
import { mockInstance } from '../../shared/mocking';
@ -29,7 +29,7 @@ let authMemberAgent: SuperAgentTest;
let authAnotherMemberAgent: SuperAgentTest;
let saveCredential: SaveCredentialFunction;
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
const activeWorkflowManager = mockInstance(ActiveWorkflowManager);
const sharingSpy = jest.spyOn(License.prototype, 'isSharingEnabled').mockReturnValue(true);
const testServer = utils.setupTestServer({
@ -54,8 +54,8 @@ beforeAll(async () => {
});
beforeEach(async () => {
activeWorkflowRunner.add.mockReset();
activeWorkflowRunner.remove.mockReset();
activeWorkflowManager.add.mockReset();
activeWorkflowManager.remove.mockReset();
await testDb.truncate(['Workflow', 'SharedWorkflow', 'WorkflowHistory']);
});
@ -1152,7 +1152,7 @@ describe('PATCH /workflows/:id - activate workflow', () => {
const response = await authOwnerAgent.patch(`/workflows/${workflow.id}`).send(payload);
expect(response.statusCode).toBe(200);
expect(activeWorkflowRunner.add).toBeCalled();
expect(activeWorkflowManager.add).toBeCalled();
const {
data: { id, versionId, active },
@ -1174,8 +1174,8 @@ describe('PATCH /workflows/:id - activate workflow', () => {
const response = await authOwnerAgent.patch(`/workflows/${workflow.id}`).send(payload);
expect(response.statusCode).toBe(200);
expect(activeWorkflowRunner.add).not.toBeCalled();
expect(activeWorkflowRunner.remove).toBeCalled();
expect(activeWorkflowManager.add).not.toBeCalled();
expect(activeWorkflowManager.remove).toBeCalled();
const {
data: { id, versionId, active },

View file

@ -8,7 +8,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { ListQuery } from '@/requests';
import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { EnterpriseWorkflowService } from '@/workflows/workflow.service.ee';
import { mockInstance } from '../../shared/mocking';
@ -32,7 +32,7 @@ const license = testServer.license;
const { objectContaining, arrayContaining, any } = expect;
const activeWorkflowRunnerLike = mockInstance(ActiveWorkflowRunner);
const activeWorkflowManagerLike = mockInstance(ActiveWorkflowManager);
beforeAll(async () => {
owner = await createOwner();
@ -574,7 +574,7 @@ describe('PATCH /workflows/:id', () => {
const response = await authOwnerAgent.patch(`/workflows/${workflow.id}`).send(payload);
expect(response.statusCode).toBe(200);
expect(activeWorkflowRunnerLike.add).toBeCalled();
expect(activeWorkflowManagerLike.add).toBeCalled();
const {
data: { id, versionId, active },
@ -596,8 +596,8 @@ describe('PATCH /workflows/:id', () => {
const response = await authOwnerAgent.patch(`/workflows/${workflow.id}`).send(payload);
expect(response.statusCode).toBe(200);
expect(activeWorkflowRunnerLike.add).not.toBeCalled();
expect(activeWorkflowRunnerLike.remove).toBeCalled();
expect(activeWorkflowManagerLike.add).not.toBeCalled();
expect(activeWorkflowManagerLike.remove).toBeCalled();
const {
data: { id, versionId, active },

View file

@ -11,13 +11,13 @@ import * as helpers from '@/services/orchestration/helpers';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { Logger } from '@/Logger';
import { Push } from '@/push';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { mockInstance } from '../../shared/mocking';
import type { WorkflowActivateMode } from 'n8n-workflow';
const os = Container.get(OrchestrationService);
const handler = Container.get(OrchestrationHandlerMainService);
mockInstance(ActiveWorkflowRunner);
mockInstance(ActiveWorkflowManager);
let queueModeId: string;

View file

@ -3319,7 +3319,7 @@ export function copyInputItems(items: INodeExecutionData[], properties: string[]
/**
* Returns the execute functions the poll nodes have access to.
*/
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowRunner.add
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowManager.add
export function getExecutePollFunctions(
workflow: Workflow,
node: INode,
@ -3382,7 +3382,7 @@ export function getExecutePollFunctions(
/**
* Returns the execute functions the trigger nodes have access to.
*/
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowRunner.add
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowManager.add
export function getExecuteTriggerFunctions(
workflow: Workflow,
node: INode,