refactor(core): Move typeorm operators from Public API (no-changelog) (#8319)

This commit is contained in:
Iván Ovejero 2024-01-16 13:35:43 +01:00 committed by GitHub
parent e1acb5911a
commit 420b4271a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 150 additions and 148 deletions

View file

@ -2,7 +2,6 @@ import type express from 'express';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { replaceCircularReferences } from 'n8n-workflow'; import { replaceCircularReferences } from 'n8n-workflow';
import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import { authorize, validCursor } from '../../shared/middlewares/global.middleware'; import { authorize, validCursor } from '../../shared/middlewares/global.middleware';
import type { ExecutionRequest } from '../../../types'; import type { ExecutionRequest } from '../../../types';
@ -26,7 +25,9 @@ export = {
const { id } = req.params; const { id } = req.params;
// look for the execution on the workflow the user owns // look for the execution on the workflow the user owns
const execution = await getExecutionInWorkflows(id, sharedWorkflowsIds, false); const execution = await Container.get(
ExecutionRepository,
).getExecutionInWorkflowsForPublicApi(id, sharedWorkflowsIds, false);
if (!execution) { if (!execution) {
return res.status(404).json({ message: 'Not Found' }); return res.status(404).json({ message: 'Not Found' });
@ -57,7 +58,9 @@ export = {
const { includeData = false } = req.query; const { includeData = false } = req.query;
// look for the execution on the workflow the user owns // look for the execution on the workflow the user owns
const execution = await getExecutionInWorkflows(id, sharedWorkflowsIds, includeData); const execution = await Container.get(
ExecutionRepository,
).getExecutionInWorkflowsForPublicApi(id, sharedWorkflowsIds, includeData);
if (!execution) { if (!execution) {
return res.status(404).json({ message: 'Not Found' }); return res.status(404).json({ message: 'Not Found' });
@ -105,13 +108,15 @@ export = {
excludedExecutionsIds: runningExecutionsIds, excludedExecutionsIds: runningExecutionsIds,
}; };
const executions = await getExecutions(filters); const executions =
await Container.get(ExecutionRepository).getExecutionsForPublicApi(filters);
const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id; const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id;
filters.lastId = newLastId; filters.lastId = newLastId;
const count = await getExecutionsCount(filters); const count =
await Container.get(ExecutionRepository).getExecutionsCountForPublicApi(filters);
void Container.get(InternalHooks).onUserRetrievedAllExecutions({ void Container.get(InternalHooks).onUserRetrievedAllExecutions({
user_id: req.user.id, user_id: req.user.id,

View file

@ -1,110 +0,0 @@
import type { FindOptionsWhere } from 'typeorm';
import { In, Not, Raw, LessThan } from 'typeorm';
import { Container } from 'typedi';
import type { ExecutionStatus } from 'n8n-workflow';
import type { IExecutionBase, IExecutionFlattedDb } from '@/Interfaces';
import { ExecutionRepository } from '@db/repositories/execution.repository';
function getStatusCondition(status: ExecutionStatus) {
const condition: Pick<FindOptionsWhere<IExecutionFlattedDb>, 'status'> = {};
if (status === 'success') {
condition.status = 'success';
} else if (status === 'waiting') {
condition.status = 'waiting';
} else if (status === 'error') {
condition.status = In(['error', 'crashed', 'failed']);
}
return condition;
}
export async function getExecutions(params: {
limit: number;
includeData?: boolean;
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedExecutionsIds?: string[];
}): Promise<IExecutionBase[]> {
let where: FindOptionsWhere<IExecutionFlattedDb> = {};
if (params.lastId && params.excludedExecutionsIds?.length) {
where.id = Raw((id) => `${id} < :lastId AND ${id} NOT IN (:...excludedExecutionsIds)`, {
lastId: params.lastId,
excludedExecutionsIds: params.excludedExecutionsIds,
});
} else if (params.lastId) {
where.id = LessThan(params.lastId);
} else if (params.excludedExecutionsIds?.length) {
where.id = Not(In(params.excludedExecutionsIds));
}
if (params.status) {
where = { ...where, ...getStatusCondition(params.status) };
}
if (params.workflowIds) {
where = { ...where, workflowId: In(params.workflowIds) };
}
return Container.get(ExecutionRepository).findMultipleExecutions(
{
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
],
where,
order: { id: 'DESC' },
take: params.limit,
relations: ['executionData'],
},
{
includeData: params.includeData,
unflattenData: true,
},
);
}
export async function getExecutionsCount(data: {
limit: number;
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedWorkflowIds?: string[];
}): Promise<number> {
// TODO: Consider moving this to the repository as well
const executions = await Container.get(ExecutionRepository).count({
where: {
...(data.lastId && { id: LessThan(data.lastId) }),
...(data.status && { ...getStatusCondition(data.status) }),
...(data.workflowIds && { workflowId: In(data.workflowIds) }),
...(data.excludedWorkflowIds && { workflowId: Not(In(data.excludedWorkflowIds)) }),
},
take: data.limit,
});
return executions;
}
export async function getExecutionInWorkflows(
id: string,
workflowIds: string[],
includeData?: boolean,
): Promise<IExecutionBase | undefined> {
return Container.get(ExecutionRepository).findSingleExecution(id, {
where: {
workflowId: In(workflowIds),
},
includeData,
unflattenData: true,
});
}

View file

@ -4,15 +4,18 @@ import {
DataSource, DataSource,
In, In,
IsNull, IsNull,
LessThan,
LessThanOrEqual, LessThanOrEqual,
MoreThanOrEqual, MoreThanOrEqual,
Not, Not,
Raw,
Repository, Repository,
} from 'typeorm'; } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils'; import { DateUtils } from 'typeorm/util/DateUtils';
import type { import type {
FindManyOptions, FindManyOptions,
FindOneOptions, FindOneOptions,
FindOperator,
FindOptionsWhere, FindOptionsWhere,
SelectQueryBuilder, SelectQueryBuilder,
} from 'typeorm'; } from 'typeorm';
@ -32,7 +35,6 @@ import type {
} from '@/Interfaces'; } from '@/Interfaces';
import config from '@/config'; import config from '@/config';
import type { IGetExecutionsQueryFilter } from '@/executions/executions.service';
import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers'; import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers';
import type { ExecutionData } from '../entities/ExecutionData'; import type { ExecutionData } from '../entities/ExecutionData';
import { ExecutionEntity } from '../entities/ExecutionEntity'; import { ExecutionEntity } from '../entities/ExecutionEntity';
@ -540,4 +542,130 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}, },
}); });
} }
async getExecutionsCountForPublicApi(data: {
limit: number;
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedWorkflowIds?: string[];
}): Promise<number> {
const executions = await this.count({
where: {
...(data.lastId && { id: LessThan(data.lastId) }),
...(data.status && { ...this.getStatusCondition(data.status) }),
...(data.workflowIds && { workflowId: In(data.workflowIds) }),
...(data.excludedWorkflowIds && { workflowId: Not(In(data.excludedWorkflowIds)) }),
},
take: data.limit,
});
return executions;
}
private getStatusCondition(status: ExecutionStatus) {
const condition: Pick<FindOptionsWhere<IExecutionFlattedDb>, 'status'> = {};
if (status === 'success') {
condition.status = 'success';
} else if (status === 'waiting') {
condition.status = 'waiting';
} else if (status === 'error') {
condition.status = In(['error', 'crashed', 'failed']);
}
return condition;
}
async getExecutionsForPublicApi(params: {
limit: number;
includeData?: boolean;
lastId?: string;
workflowIds?: string[];
status?: ExecutionStatus;
excludedExecutionsIds?: string[];
}): Promise<IExecutionBase[]> {
let where: FindOptionsWhere<IExecutionFlattedDb> = {};
if (params.lastId && params.excludedExecutionsIds?.length) {
where.id = Raw((id) => `${id} < :lastId AND ${id} NOT IN (:...excludedExecutionsIds)`, {
lastId: params.lastId,
excludedExecutionsIds: params.excludedExecutionsIds,
});
} else if (params.lastId) {
where.id = LessThan(params.lastId);
} else if (params.excludedExecutionsIds?.length) {
where.id = Not(In(params.excludedExecutionsIds));
}
if (params.status) {
where = { ...where, ...this.getStatusCondition(params.status) };
}
if (params.workflowIds) {
where = { ...where, workflowId: In(params.workflowIds) };
}
return this.findMultipleExecutions(
{
select: [
'id',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowId',
'waitTill',
'finished',
],
where,
order: { id: 'DESC' },
take: params.limit,
relations: ['executionData'],
},
{
includeData: params.includeData,
unflattenData: true,
},
);
}
async getExecutionInWorkflowsForPublicApi(
id: string,
workflowIds: string[],
includeData?: boolean,
): Promise<IExecutionBase | undefined> {
return this.findSingleExecution(id, {
where: {
workflowId: In(workflowIds),
},
includeData,
unflattenData: true,
});
}
async findIfShared(executionId: string, sharedWorkflowIds: string[]) {
return this.findSingleExecution(executionId, {
where: {
workflowId: In(sharedWorkflowIds),
},
includeData: true,
unflattenData: false,
});
}
}
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
status?: ExecutionStatus[];
workflowId?: string;
waitTill?: FindOperator<any> | boolean;
metadata?: Array<{ key: string; value: string }>;
startedAfter?: string;
startedBefore?: string;
} }

View file

@ -59,3 +59,5 @@ export interface Migration extends Function {
} }
export type InsertResult = Array<{ insertId: number }>; export type InsertResult = Array<{ insertId: number }>;
export { QueryFailedError } from 'typeorm/error/QueryFailedError';

View file

@ -2,15 +2,12 @@ import { validate as jsonSchemaValidate } from 'jsonschema';
import type { import type {
IWorkflowBase, IWorkflowBase,
JsonObject, JsonObject,
ExecutionStatus,
ExecutionError, ExecutionError,
INode, INode,
IRunExecutionData, IRunExecutionData,
WorkflowExecuteMode, WorkflowExecuteMode,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { ApplicationError, jsonParse, Workflow, WorkflowOperationError } from 'n8n-workflow'; import { ApplicationError, jsonParse, Workflow, WorkflowOperationError } from 'n8n-workflow';
import type { FindOperator } from 'typeorm';
import { In } from 'typeorm';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config'; import config from '@/config';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
@ -30,26 +27,13 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import * as GenericHelpers from '@/GenericHelpers'; import * as GenericHelpers from '@/GenericHelpers';
import { Container, Service } from 'typedi'; import { Container, Service } from 'typedi';
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers'; import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
status?: ExecutionStatus[];
workflowId?: string;
waitTill?: FindOperator<any> | boolean;
metadata?: Array<{ key: string; value: string }>;
startedAfter?: string;
startedBefore?: string;
}
const schemaGetExecutionsQueryFilter = { const schemaGetExecutionsQueryFilter = {
$id: '/IGetExecutionsQueryFilter', $id: '/IGetExecutionsQueryFilter',
type: 'object', type: 'object',
@ -193,14 +177,10 @@ export class ExecutionsService {
if (!sharedWorkflowIds.length) return undefined; if (!sharedWorkflowIds.length) return undefined;
const { id: executionId } = req.params; const { id: executionId } = req.params;
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, { const execution = await Container.get(ExecutionRepository).findIfShared(
where: { executionId,
id: executionId, sharedWorkflowIds,
workflowId: In(sharedWorkflowIds), );
},
includeData: true,
unflattenData: false,
});
if (!execution) { if (!execution) {
Container.get(Logger).info( Container.get(Logger).info(
@ -225,13 +205,10 @@ export class ExecutionsService {
if (!sharedWorkflowIds.length) return false; if (!sharedWorkflowIds.length) return false;
const { id: executionId } = req.params; const { id: executionId } = req.params;
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, { const execution = (await Container.get(ExecutionRepository).findIfShared(
where: { executionId,
workflowId: In(sharedWorkflowIds), sharedWorkflowIds,
}, )) as unknown as IExecutionResponse;
includeData: true,
unflattenData: true,
});
if (!execution) { if (!execution) {
Container.get(Logger).info( Container.get(Logger).info(