From 420b4271a90fd2902f6b4ae855fe17716c99695b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 16 Jan 2024 13:35:43 +0100 Subject: [PATCH] refactor(core): Move `typeorm` operators from Public API (no-changelog) (#8319) --- .../handlers/executions/executions.handler.ts | 15 +- .../handlers/executions/executions.service.ts | 110 --------------- .../repositories/execution.repository.ts | 130 +++++++++++++++++- packages/cli/src/databases/types.ts | 2 + .../cli/src/executions/executions.service.ts | 41 ++---- 5 files changed, 150 insertions(+), 148 deletions(-) delete mode 100644 packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts index 4acad28696..a54a57fbca 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts @@ -2,7 +2,6 @@ import type express from 'express'; import { Container } from 'typedi'; import { replaceCircularReferences } from 'n8n-workflow'; -import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service'; import { ActiveExecutions } from '@/ActiveExecutions'; import { authorize, validCursor } from '../../shared/middlewares/global.middleware'; import type { ExecutionRequest } from '../../../types'; @@ -26,7 +25,9 @@ export = { const { id } = req.params; // look for the execution on the workflow the user owns - const execution = await getExecutionInWorkflows(id, sharedWorkflowsIds, false); + const execution = await Container.get( + ExecutionRepository, + ).getExecutionInWorkflowsForPublicApi(id, sharedWorkflowsIds, false); if (!execution) { return res.status(404).json({ message: 'Not Found' }); @@ -57,7 +58,9 @@ export = { const { includeData = false } = req.query; // look for the execution on the workflow the user owns - const execution = await getExecutionInWorkflows(id, sharedWorkflowsIds, includeData); + const execution = await Container.get( + ExecutionRepository, + ).getExecutionInWorkflowsForPublicApi(id, sharedWorkflowsIds, includeData); if (!execution) { return res.status(404).json({ message: 'Not Found' }); @@ -105,13 +108,15 @@ export = { excludedExecutionsIds: runningExecutionsIds, }; - const executions = await getExecutions(filters); + const executions = + await Container.get(ExecutionRepository).getExecutionsForPublicApi(filters); const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id; filters.lastId = newLastId; - const count = await getExecutionsCount(filters); + const count = + await Container.get(ExecutionRepository).getExecutionsCountForPublicApi(filters); void Container.get(InternalHooks).onUserRetrievedAllExecutions({ user_id: req.user.id, diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts deleted file mode 100644 index b2811bdcda..0000000000 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts +++ /dev/null @@ -1,110 +0,0 @@ -import type { FindOptionsWhere } from 'typeorm'; -import { In, Not, Raw, LessThan } from 'typeorm'; -import { Container } from 'typedi'; -import type { ExecutionStatus } from 'n8n-workflow'; - -import type { IExecutionBase, IExecutionFlattedDb } from '@/Interfaces'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; - -function getStatusCondition(status: ExecutionStatus) { - const condition: Pick, 'status'> = {}; - - if (status === 'success') { - condition.status = 'success'; - } else if (status === 'waiting') { - condition.status = 'waiting'; - } else if (status === 'error') { - condition.status = In(['error', 'crashed', 'failed']); - } - - return condition; -} - -export async function getExecutions(params: { - limit: number; - includeData?: boolean; - lastId?: string; - workflowIds?: string[]; - status?: ExecutionStatus; - excludedExecutionsIds?: string[]; -}): Promise { - let where: FindOptionsWhere = {}; - - if (params.lastId && params.excludedExecutionsIds?.length) { - where.id = Raw((id) => `${id} < :lastId AND ${id} NOT IN (:...excludedExecutionsIds)`, { - lastId: params.lastId, - excludedExecutionsIds: params.excludedExecutionsIds, - }); - } else if (params.lastId) { - where.id = LessThan(params.lastId); - } else if (params.excludedExecutionsIds?.length) { - where.id = Not(In(params.excludedExecutionsIds)); - } - - if (params.status) { - where = { ...where, ...getStatusCondition(params.status) }; - } - - if (params.workflowIds) { - where = { ...where, workflowId: In(params.workflowIds) }; - } - - return Container.get(ExecutionRepository).findMultipleExecutions( - { - select: [ - 'id', - 'mode', - 'retryOf', - 'retrySuccessId', - 'startedAt', - 'stoppedAt', - 'workflowId', - 'waitTill', - 'finished', - ], - where, - order: { id: 'DESC' }, - take: params.limit, - relations: ['executionData'], - }, - { - includeData: params.includeData, - unflattenData: true, - }, - ); -} - -export async function getExecutionsCount(data: { - limit: number; - lastId?: string; - workflowIds?: string[]; - status?: ExecutionStatus; - excludedWorkflowIds?: string[]; -}): Promise { - // TODO: Consider moving this to the repository as well - const executions = await Container.get(ExecutionRepository).count({ - where: { - ...(data.lastId && { id: LessThan(data.lastId) }), - ...(data.status && { ...getStatusCondition(data.status) }), - ...(data.workflowIds && { workflowId: In(data.workflowIds) }), - ...(data.excludedWorkflowIds && { workflowId: Not(In(data.excludedWorkflowIds)) }), - }, - take: data.limit, - }); - - return executions; -} - -export async function getExecutionInWorkflows( - id: string, - workflowIds: string[], - includeData?: boolean, -): Promise { - return Container.get(ExecutionRepository).findSingleExecution(id, { - where: { - workflowId: In(workflowIds), - }, - includeData, - unflattenData: true, - }); -} diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 8021433f3b..9266604d7f 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -4,15 +4,18 @@ import { DataSource, In, IsNull, + LessThan, LessThanOrEqual, MoreThanOrEqual, Not, + Raw, Repository, } from 'typeorm'; import { DateUtils } from 'typeorm/util/DateUtils'; import type { FindManyOptions, FindOneOptions, + FindOperator, FindOptionsWhere, SelectQueryBuilder, } from 'typeorm'; @@ -32,7 +35,6 @@ import type { } from '@/Interfaces'; import config from '@/config'; -import type { IGetExecutionsQueryFilter } from '@/executions/executions.service'; import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers'; import type { ExecutionData } from '../entities/ExecutionData'; import { ExecutionEntity } from '../entities/ExecutionEntity'; @@ -540,4 +542,130 @@ export class ExecutionRepository extends Repository { }, }); } + + async getExecutionsCountForPublicApi(data: { + limit: number; + lastId?: string; + workflowIds?: string[]; + status?: ExecutionStatus; + excludedWorkflowIds?: string[]; + }): Promise { + const executions = await this.count({ + where: { + ...(data.lastId && { id: LessThan(data.lastId) }), + ...(data.status && { ...this.getStatusCondition(data.status) }), + ...(data.workflowIds && { workflowId: In(data.workflowIds) }), + ...(data.excludedWorkflowIds && { workflowId: Not(In(data.excludedWorkflowIds)) }), + }, + take: data.limit, + }); + + return executions; + } + + private getStatusCondition(status: ExecutionStatus) { + const condition: Pick, 'status'> = {}; + + if (status === 'success') { + condition.status = 'success'; + } else if (status === 'waiting') { + condition.status = 'waiting'; + } else if (status === 'error') { + condition.status = In(['error', 'crashed', 'failed']); + } + + return condition; + } + + async getExecutionsForPublicApi(params: { + limit: number; + includeData?: boolean; + lastId?: string; + workflowIds?: string[]; + status?: ExecutionStatus; + excludedExecutionsIds?: string[]; + }): Promise { + let where: FindOptionsWhere = {}; + + if (params.lastId && params.excludedExecutionsIds?.length) { + where.id = Raw((id) => `${id} < :lastId AND ${id} NOT IN (:...excludedExecutionsIds)`, { + lastId: params.lastId, + excludedExecutionsIds: params.excludedExecutionsIds, + }); + } else if (params.lastId) { + where.id = LessThan(params.lastId); + } else if (params.excludedExecutionsIds?.length) { + where.id = Not(In(params.excludedExecutionsIds)); + } + + if (params.status) { + where = { ...where, ...this.getStatusCondition(params.status) }; + } + + if (params.workflowIds) { + where = { ...where, workflowId: In(params.workflowIds) }; + } + + return this.findMultipleExecutions( + { + select: [ + 'id', + 'mode', + 'retryOf', + 'retrySuccessId', + 'startedAt', + 'stoppedAt', + 'workflowId', + 'waitTill', + 'finished', + ], + where, + order: { id: 'DESC' }, + take: params.limit, + relations: ['executionData'], + }, + { + includeData: params.includeData, + unflattenData: true, + }, + ); + } + + async getExecutionInWorkflowsForPublicApi( + id: string, + workflowIds: string[], + includeData?: boolean, + ): Promise { + return this.findSingleExecution(id, { + where: { + workflowId: In(workflowIds), + }, + includeData, + unflattenData: true, + }); + } + + async findIfShared(executionId: string, sharedWorkflowIds: string[]) { + return this.findSingleExecution(executionId, { + where: { + workflowId: In(sharedWorkflowIds), + }, + includeData: true, + unflattenData: false, + }); + } +} + +export interface IGetExecutionsQueryFilter { + id?: FindOperator | string; + finished?: boolean; + mode?: string; + retryOf?: string; + retrySuccessId?: string; + status?: ExecutionStatus[]; + workflowId?: string; + waitTill?: FindOperator | boolean; + metadata?: Array<{ key: string; value: string }>; + startedAfter?: string; + startedBefore?: string; } diff --git a/packages/cli/src/databases/types.ts b/packages/cli/src/databases/types.ts index 9574e5f201..25a9363a00 100644 --- a/packages/cli/src/databases/types.ts +++ b/packages/cli/src/databases/types.ts @@ -59,3 +59,5 @@ export interface Migration extends Function { } export type InsertResult = Array<{ insertId: number }>; + +export { QueryFailedError } from 'typeorm/error/QueryFailedError'; diff --git a/packages/cli/src/executions/executions.service.ts b/packages/cli/src/executions/executions.service.ts index 64fb3ff6c9..0b39da6ce9 100644 --- a/packages/cli/src/executions/executions.service.ts +++ b/packages/cli/src/executions/executions.service.ts @@ -2,15 +2,12 @@ import { validate as jsonSchemaValidate } from 'jsonschema'; import type { IWorkflowBase, JsonObject, - ExecutionStatus, ExecutionError, INode, IRunExecutionData, WorkflowExecuteMode, } from 'n8n-workflow'; import { ApplicationError, jsonParse, Workflow, WorkflowOperationError } from 'n8n-workflow'; -import type { FindOperator } from 'typeorm'; -import { In } from 'typeorm'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import type { User } from '@db/entities/User'; @@ -30,26 +27,13 @@ import { WorkflowRunner } from '@/WorkflowRunner'; import * as GenericHelpers from '@/GenericHelpers'; import { Container, Service } from 'typedi'; import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers'; +import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { Logger } from '@/Logger'; import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; -export interface IGetExecutionsQueryFilter { - id?: FindOperator | string; - finished?: boolean; - mode?: string; - retryOf?: string; - retrySuccessId?: string; - status?: ExecutionStatus[]; - workflowId?: string; - waitTill?: FindOperator | boolean; - metadata?: Array<{ key: string; value: string }>; - startedAfter?: string; - startedBefore?: string; -} - const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', type: 'object', @@ -193,14 +177,10 @@ export class ExecutionsService { if (!sharedWorkflowIds.length) return undefined; const { id: executionId } = req.params; - const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, { - where: { - id: executionId, - workflowId: In(sharedWorkflowIds), - }, - includeData: true, - unflattenData: false, - }); + const execution = await Container.get(ExecutionRepository).findIfShared( + executionId, + sharedWorkflowIds, + ); if (!execution) { Container.get(Logger).info( @@ -225,13 +205,10 @@ export class ExecutionsService { if (!sharedWorkflowIds.length) return false; const { id: executionId } = req.params; - const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, { - where: { - workflowId: In(sharedWorkflowIds), - }, - includeData: true, - unflattenData: true, - }); + const execution = (await Container.get(ExecutionRepository).findIfShared( + executionId, + sharedWorkflowIds, + )) as unknown as IExecutionResponse; if (!execution) { Container.get(Logger).info(