refactor(core): Move active workflows endpoints to a decorated controller class (no-changelog) (#8101)

This is a continuation of migrating all rest endpoints to decorated controller classes
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-12-22 11:28:42 +01:00 committed by GitHub
parent 39e45d8b92
commit 021add0f39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 226 additions and 133 deletions

View file

@ -99,17 +99,17 @@ const switchBetweenEditorAndHistory = () => {
workflowPage.getters.canvasNodes().first().should('be.visible');
workflowPage.getters.canvasNodes().last().should('be.visible');
}
};
const switchBetweenEditorAndWorkflowlist = () => {
cy.getByTestId('menu-item').first().click();
cy.wait(['@getUsers', '@getWorkflows', '@getActive', '@getCredentials']);
cy.wait(['@getUsers', '@getWorkflows', '@getActiveWorkflows', '@getCredentials']);
cy.getByTestId('resources-list-item').first().click();
workflowPage.getters.canvasNodes().first().should('be.visible');
workflowPage.getters.canvasNodes().last().should('be.visible');
}
};
const zoomInAndCheckNodes = () => {
cy.getByTestId('zoom-in-button').click();
@ -119,7 +119,7 @@ const zoomInAndCheckNodes = () => {
workflowPage.getters.canvasNodes().first().should('not.be.visible');
workflowPage.getters.canvasNodes().last().should('not.be.visible');
}
};
describe('Editor actions should work', () => {
beforeEach(() => {
@ -199,7 +199,7 @@ describe('Editor zoom should work after route changes', () => {
cy.intercept('GET', '/rest/workflow-history/workflow/*').as('getHistory');
cy.intercept('GET', '/rest/users').as('getUsers');
cy.intercept('GET', '/rest/workflows').as('getWorkflows');
cy.intercept('GET', '/rest/active').as('getActive');
cy.intercept('GET', '/rest/active-workflows').as('getActiveWorkflows');
cy.intercept('GET', '/rest/credentials').as('getCredentials');
switchBetweenEditorAndHistory();

View file

@ -45,7 +45,6 @@ import type {
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ExecutionsService } from './executions/executions.service';
@ -57,44 +56,40 @@ import {
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import { whereClause } from './UserManagement/UserManagementHelper';
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';
import type { Scope } from '@n8n/permissions';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
interface QueuedActivation {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
}
@Service()
export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows();
private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
};
} = {};
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};
constructor(
private readonly logger: Logger,
private readonly activeWorkflows: ActiveWorkflows,
private readonly activeExecutions: ActiveExecutions,
private readonly externalHooks: ExternalHooks,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
private readonly executionService: ExecutionsService,
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly activeWorkflowsService: ActiveWorkflowsService,
) {}
async init() {
@ -119,7 +114,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
const activeWorkflows = await this.allActiveInStorage();
const activeWorkflows = await this.activeWorkflowsService.getAllActiveIdsInStorage();
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
// Make sure IDs are unique
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
@ -269,50 +264,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return this.activeWorkflows.allActiveWorkflows();
}
/**
* Get the IDs of active workflows from storage.
*/
async allActiveInStorage(options?: { user: User; scope: Scope | Scope[] }) {
const isFullAccess = !options?.user || options.user.hasGlobalScope(options.scope);
const activationErrors = await this.activationErrorsService.getAll();
if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});
return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !activationErrors[workflowId]);
}
const where = whereClause({
user: options.user,
globalScope: 'workflow:list',
entityType: 'workflow',
});
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});
const activeIds = activeWorkflows.map((workflow) => workflow.id);
Object.assign(where, { workflowId: In(activeIds) });
const sharings = await this.sharedWorkflowRepository.find({
select: ['workflowId'],
where,
});
return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !activationErrors[workflowId]);
}
/**
* Returns if the workflow is stored as `active`.
*
@ -328,13 +279,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return !!workflow?.active;
}
/**
* Return error if there was a problem activating the workflow
*/
async getActivationError(workflowId: string) {
return this.activationErrorsService.get(workflowId);
}
/**
* Register workflow-defined webhooks in the `workflow_entity` table.
*/

View file

@ -46,7 +46,7 @@ import {
TEMPLATES_DIR,
} from '@/constants';
import { credentialsController } from '@/credentials/credentials.controller';
import type { CurlHelper, ExecutionRequest, WorkflowRequest } from '@/requests';
import type { CurlHelper, ExecutionRequest } from '@/requests';
import { registerController } from '@/decorators';
import { AuthController } from '@/controllers/auth.controller';
import { BinaryDataController } from '@/controllers/binaryData.controller';
@ -66,7 +66,6 @@ import { WorkflowStatisticsController } from '@/controllers/workflowStatistics.c
import { ExternalSecretsController } from '@/ExternalSecrets/ExternalSecrets.controller.ee';
import { executionsController } from '@/executions/executions.controller';
import { isApiEnabled, loadPublicApiVersions } from '@/PublicApi';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import type { ICredentialsOverwrite, IDiagnosticInfo, IExecutionsStopData } from '@/Interfaces';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
@ -112,6 +111,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers';
import type { FrontendService } from './services/frontend.service';
import { RoleService } from './services/role.service';
import { UserService } from './services/user.service';
import { ActiveWorkflowsController } from './controllers/activeWorkflows.controller';
import { OrchestrationController } from './controllers/orchestration.controller';
import { WorkflowHistoryController } from './workflows/workflowHistory/workflowHistory.controller.ee';
import { InvitationController } from './controllers/invitation.controller';
@ -305,6 +305,7 @@ export class Server extends AbstractServer {
),
Container.get(VariablesController),
Container.get(RoleController),
Container.get(ActiveWorkflowsController),
];
if (Container.get(MultiMainSetup).isEnabled) {
@ -443,50 +444,6 @@ export class Server extends AbstractServer {
this.logger.warn(`Source Control initialization failed: ${error.message}`);
}
// ----------------------------------------
// Active Workflows
// ----------------------------------------
// Returns the active workflow ids
this.app.get(
`/${this.restEndpoint}/active`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
return this.activeWorkflowRunner.allActiveInStorage({
user: req.user,
scope: 'workflow:list',
});
}),
);
// Returns if the workflow with the given id had any activation errors
this.app.get(
`/${this.restEndpoint}/active/error/:id`,
ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
const { id: workflowId } = req.params;
const shared = await Container.get(SharedWorkflowRepository).findOne({
relations: ['workflow'],
where: whereClause({
user: req.user,
globalScope: 'workflow:read',
entityType: 'workflow',
entityId: workflowId,
}),
});
if (!shared) {
this.logger.verbose('User attempted to access workflow errors without permissions', {
workflowId,
userId: req.user.id,
});
throw new BadRequestError(`Workflow with ID "${workflowId}" could not be found.`);
}
return this.activeWorkflowRunner.getActivationError(workflowId);
}),
);
// ----------------------------------------
// curl-converter
// ----------------------------------------

View file

@ -0,0 +1,25 @@
import { Service } from 'typedi';
import { Authorized, Get, RestController } from '@/decorators';
import { WorkflowRequest } from '@/requests';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
@Service()
@Authorized()
@RestController('/active-workflows')
export class ActiveWorkflowsController {
constructor(private readonly activeWorkflowsService: ActiveWorkflowsService) {}
@Get('/')
async getActiveWorkflows(req: WorkflowRequest.GetAllActive) {
return this.activeWorkflowsService.getAllActiveIdsFor(req.user);
}
@Get('/error/:id')
async getActivationError(req: WorkflowRequest.GetActivationError) {
const {
user,
params: { id: workflowId },
} = req;
return this.activeWorkflowsService.getActivationError(workflowId, user);
}
}

View file

@ -1,10 +1,31 @@
import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm';
import { DataSource, type FindOptionsWhere, Repository, In } from 'typeorm';
import { SharedWorkflow } from '../entities/SharedWorkflow';
import { type User } from '../entities/User';
@Service()
export class SharedWorkflowRepository extends Repository<SharedWorkflow> {
constructor(dataSource: DataSource) {
super(SharedWorkflow, dataSource.manager);
}
async hasAccess(workflowId: string, user: User) {
const where: FindOptionsWhere<SharedWorkflow> = {
workflowId,
};
if (!user.hasGlobalScope('workflow:read')) {
where.userId = user.id;
}
return this.exist({ where });
}
async getSharedWorkflowIds(workflowIds: string[]) {
const sharedWorkflows = await this.find({
select: ['workflowId'],
where: {
workflowId: In(workflowIds),
},
});
return sharedWorkflows.map((sharing) => sharing.workflowId);
}
}

View file

@ -23,6 +23,14 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
});
}
async getActiveIds() {
const activeWorkflows = await this.find({
select: ['id'],
where: { active: true },
});
return activeWorkflows.map((workflow) => workflow.id);
}
async findById(workflowId: string) {
return this.findOne({
where: { id: workflowId },

View file

@ -0,0 +1,52 @@
import { Service } from 'typedi';
import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActivationErrorsService } from '@/ActivationErrors.service';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { Logger } from '@/Logger';
@Service()
export class ActiveWorkflowsService {
constructor(
private readonly logger: Logger,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly activationErrorsService: ActivationErrorsService,
) {}
async getAllActiveIdsInStorage() {
const activationErrors = await this.activationErrorsService.getAll();
const activeWorkflowIds = await this.workflowRepository.getActiveIds();
return activeWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}
async getAllActiveIdsFor(user: User) {
const activationErrors = await this.activationErrorsService.getAll();
const activeWorkflowIds = await this.workflowRepository.getActiveIds();
const hasFullAccess = user.hasGlobalScope('workflow:list');
if (hasFullAccess) {
return activeWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}
const sharedWorkflowIds =
await this.sharedWorkflowRepository.getSharedWorkflowIds(activeWorkflowIds);
return sharedWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}
async getActivationError(workflowId: string, user: User) {
const hasAccess = await this.sharedWorkflowRepository.hasAccess(workflowId, user);
if (!hasAccess) {
this.logger.verbose('User attempted to access workflow errors without permissions', {
workflowId,
userId: user.id,
});
throw new BadRequestError(`Workflow with ID "${workflowId}" could not be found.`);
}
return this.activationErrorsService.get(workflowId);
}
}

View file

@ -2,7 +2,6 @@ import { Container } from 'typedi';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import { ActiveWorkflows } from 'n8n-core';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@ -26,9 +25,9 @@ import { createOwner } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows';
import { ExecutionsService } from '@/executions/executions.service';
import { WorkflowService } from '@/workflows/workflow.service';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows);
mockInstance(Push);
mockInstance(SecretsHelper);
mockInstance(ExecutionsService);
@ -45,6 +44,7 @@ setSchedulerAsLoadedNode();
const externalHooks = mockInstance(ExternalHooks);
let activeWorkflowsService: ActiveWorkflowsService;
let activeWorkflowRunner: ActiveWorkflowRunner;
let owner: User;
@ -59,6 +59,7 @@ const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [
beforeAll(async () => {
await testDb.init();
activeWorkflowsService = Container.get(ActiveWorkflowsService);
activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
owner = await createOwner();
});
@ -90,8 +91,8 @@ describe('init()', () => {
test('should start with no active workflows', async () => {
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(0);
const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage();
expect(inStorage).toHaveLength(0);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(0);
@ -102,8 +103,8 @@ describe('init()', () => {
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(1);
const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage();
expect(inStorage).toHaveLength(1);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(1);
@ -115,8 +116,8 @@ describe('init()', () => {
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(2);
const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage();
expect(inStorage).toHaveLength(2);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(2);

View file

@ -0,0 +1,81 @@
import type { ActivationErrorsService } from '@/ActivationErrors.service';
import type { User } from '@db/entities/User';
import type { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import type { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { mock } from 'jest-mock-extended';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
describe('ActiveWorkflowsService', () => {
const user = mock<User>();
const workflowRepository = mock<WorkflowRepository>();
const sharedWorkflowRepository = mock<SharedWorkflowRepository>();
const activationErrorsService = mock<ActivationErrorsService>();
const service = new ActiveWorkflowsService(
mock(),
workflowRepository,
sharedWorkflowRepository,
activationErrorsService,
);
const activeIds = ['1', '2', '3', '4'];
beforeEach(() => jest.clearAllMocks());
describe('getAllActiveIdsInStorage', () => {
it('should filter out any workflow ids that have activation errors', async () => {
activationErrorsService.getAll.mockResolvedValue({ 1: 'some error' });
workflowRepository.getActiveIds.mockResolvedValue(activeIds);
const ids = await service.getAllActiveIdsInStorage();
expect(ids).toEqual(['2', '3', '4']);
});
});
describe('getAllActiveIdsFor', () => {
beforeEach(() => {
activationErrorsService.getAll.mockResolvedValue({ 1: 'some error' });
workflowRepository.getActiveIds.mockResolvedValue(activeIds);
});
it('should return all workflow ids when user has full access', async () => {
user.hasGlobalScope.mockReturnValue(true);
const ids = await service.getAllActiveIdsFor(user);
expect(ids).toEqual(['2', '3', '4']);
expect(user.hasGlobalScope).toHaveBeenCalledWith('workflow:list');
expect(sharedWorkflowRepository.getSharedWorkflowIds).not.toHaveBeenCalled();
});
it('should filter out workflow ids that the user does not have access to', async () => {
user.hasGlobalScope.mockReturnValue(false);
sharedWorkflowRepository.getSharedWorkflowIds.mockResolvedValue(['3']);
const ids = await service.getAllActiveIdsFor(user);
expect(ids).toEqual(['3']);
expect(user.hasGlobalScope).toHaveBeenCalledWith('workflow:list');
expect(sharedWorkflowRepository.getSharedWorkflowIds).toHaveBeenCalledWith(activeIds);
});
});
describe('getActivationError', () => {
const workflowId = 'workflowId';
it('should throw a BadRequestError a user does not have access to the workflow id', async () => {
sharedWorkflowRepository.hasAccess.mockResolvedValue(false);
await expect(service.getActivationError(workflowId, user)).rejects.toThrow(BadRequestError);
expect(sharedWorkflowRepository.hasAccess).toHaveBeenCalledWith(workflowId, user);
expect(activationErrorsService.get).not.toHaveBeenCalled();
});
it('should return the error when the user has access', async () => {
sharedWorkflowRepository.hasAccess.mockResolvedValue(true);
activationErrorsService.get.mockResolvedValue('some-error');
const error = await service.getActivationError(workflowId, user);
expect(error).toEqual('some-error');
expect(sharedWorkflowRepository.hasAccess).toHaveBeenCalledWith(workflowId, user);
expect(activationErrorsService.get).toHaveBeenCalledWith(workflowId);
});
});
});

View file

@ -1,3 +1,4 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type {
@ -22,10 +23,9 @@ import {
import type { IWorkflowData } from './Interfaces';
@Service()
export class ActiveWorkflows {
private activeWorkflows: {
[workflowId: string]: IWorkflowData;
} = {};
private activeWorkflows: { [workflowId: string]: IWorkflowData } = {};
/**
* Returns if the workflow is active in memory.

View file

@ -24,7 +24,7 @@ export async function getWorkflows(context: IRestApiContext, filter?: object) {
}
export async function getActiveWorkflows(context: IRestApiContext) {
return makeRestApiRequest(context, 'GET', '/active');
return makeRestApiRequest(context, 'GET', '/active-workflows');
}
export async function getCurrentExecutions(context: IRestApiContext, filter: IDataObject) {

View file

@ -373,7 +373,11 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
async getActivationError(id: string): Promise<string | undefined> {
const rootStore = useRootStore();
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
return makeRestApiRequest(
rootStore.getRestApiContext,
'GET',
`/active-workflows/error/${id}`,
);
},
async fetchAllWorkflows(): Promise<IWorkflowDb[]> {