mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-23 02:21:42 -08:00
refactor(core): Move error execution creation to execution service (no-changelog) (#8006)
Continue breaking down legacy helpers. Note: `getUserById` is unused.
This commit is contained in:
parent
d1b2affd2c
commit
9ac8825a67
|
@ -49,7 +49,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
|
|||
import type { User } from '@db/entities/User';
|
||||
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import { createErrorExecution } from '@/GenericHelpers';
|
||||
import { ExecutionsService } from './executions/executions.service';
|
||||
import {
|
||||
STARTING_NODES,
|
||||
WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
|
||||
|
@ -94,6 +94,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
||||
private readonly multiMainSetup: MultiMainSetup,
|
||||
private readonly activationErrorsService: ActivationErrorsService,
|
||||
private readonly executionService: ExecutionsService,
|
||||
) {}
|
||||
|
||||
async init() {
|
||||
|
@ -547,9 +548,11 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
};
|
||||
|
||||
returnFunctions.__emitError = (error: ExecutionError): void => {
|
||||
void createErrorExecution(error, node, workflowData, workflow, mode).then(() => {
|
||||
this.executeErrorWorkflow(error, workflowData, mode);
|
||||
});
|
||||
void this.executionService
|
||||
.createErrorExecution(error, node, workflowData, workflow, mode)
|
||||
.then(() => {
|
||||
this.executeErrorWorkflow(error, workflowData, mode);
|
||||
});
|
||||
};
|
||||
return returnFunctions;
|
||||
};
|
||||
|
|
|
@ -1,21 +1,11 @@
|
|||
import type express from 'express';
|
||||
import type {
|
||||
ExecutionError,
|
||||
INode,
|
||||
IRunExecutionData,
|
||||
Workflow,
|
||||
WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import { validate } from 'class-validator';
|
||||
import { Container } from 'typedi';
|
||||
import config from '@/config';
|
||||
import type { ExecutionPayload, IWorkflowDb } from '@/Interfaces';
|
||||
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
||||
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
|
||||
import type { TagEntity } from '@db/entities/TagEntity';
|
||||
import type { User } from '@db/entities/User';
|
||||
import type { UserUpdatePayload } from '@/requests';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
import { BadRequestError } from './errors/response-errors/bad-request.error';
|
||||
|
||||
/**
|
||||
|
@ -58,85 +48,4 @@ export async function validateEntity(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an error execution
|
||||
*
|
||||
* @param {INode} node
|
||||
* @param {IWorkflowDb} workflowData
|
||||
* @param {Workflow} workflow
|
||||
* @param {WorkflowExecuteMode} mode
|
||||
* @returns
|
||||
* @memberof ActiveWorkflowRunner
|
||||
*/
|
||||
|
||||
export async function createErrorExecution(
|
||||
error: ExecutionError,
|
||||
node: INode,
|
||||
workflowData: IWorkflowDb,
|
||||
workflow: Workflow,
|
||||
mode: WorkflowExecuteMode,
|
||||
): Promise<void> {
|
||||
const saveDataErrorExecutionDisabled = workflowData?.settings?.saveDataErrorExecution === 'none';
|
||||
|
||||
if (saveDataErrorExecutionDisabled) return;
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {
|
||||
destinationNode: node.name,
|
||||
runNodeFilter: [node.name],
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
pairedItem: {
|
||||
item: 0,
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
resultData: {
|
||||
runData: {
|
||||
[node.name]: [
|
||||
{
|
||||
startTime: 0,
|
||||
executionTime: 0,
|
||||
error,
|
||||
source: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
error,
|
||||
lastNodeExecuted: node.name,
|
||||
},
|
||||
};
|
||||
|
||||
const fullExecutionData: ExecutionPayload = {
|
||||
data: executionData,
|
||||
mode,
|
||||
finished: false,
|
||||
startedAt: new Date(),
|
||||
workflowData,
|
||||
workflowId: workflow.id,
|
||||
stoppedAt: new Date(),
|
||||
status: 'error',
|
||||
};
|
||||
|
||||
await Container.get(ExecutionRepository).createNewExecution(fullExecutionData);
|
||||
}
|
||||
|
||||
export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20;
|
||||
|
|
|
@ -6,7 +6,6 @@ import type { User } from '@db/entities/User';
|
|||
import config from '@/config';
|
||||
import { License } from '@/License';
|
||||
import { getWebhookBaseUrl } from '@/WebhookHelpers';
|
||||
import { UserRepository } from '@db/repositories/user.repository';
|
||||
import type { Scope } from '@n8n/permissions';
|
||||
|
||||
export function isSharingEnabled(): boolean {
|
||||
|
@ -26,14 +25,6 @@ export function generateUserInviteUrl(inviterId: string, inviteeId: string): str
|
|||
return `${getInstanceBaseUrl()}/signup?inviterId=${inviterId}&inviteeId=${inviteeId}`;
|
||||
}
|
||||
|
||||
export async function getUserById(userId: string): Promise<User> {
|
||||
const user = await Container.get(UserRepository).findOneOrFail({
|
||||
where: { id: userId },
|
||||
relations: ['globalRole'],
|
||||
});
|
||||
return user;
|
||||
}
|
||||
|
||||
// return the difference between two arrays
|
||||
export function rightDiff<T1, T2>(
|
||||
[arr1, keyExtractor1]: [T1[], (item: T1) => string],
|
||||
|
|
|
@ -35,10 +35,7 @@ export type CredentialsGetSharedOptions =
|
|||
| { allowGlobalScope: false };
|
||||
|
||||
export class CredentialsService {
|
||||
static async get(
|
||||
where: FindOptionsWhere<ICredentialsDb>,
|
||||
options?: { relations: string[] },
|
||||
): Promise<ICredentialsDb | null> {
|
||||
static async get(where: FindOptionsWhere<ICredentialsDb>, options?: { relations: string[] }) {
|
||||
return Container.get(CredentialsRepository).findOne({
|
||||
relations: options?.relations,
|
||||
where,
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
import { validate as jsonSchemaValidate } from 'jsonschema';
|
||||
import type { IWorkflowBase, JsonObject, ExecutionStatus } from 'n8n-workflow';
|
||||
import type {
|
||||
IWorkflowBase,
|
||||
JsonObject,
|
||||
ExecutionStatus,
|
||||
ExecutionError,
|
||||
INode,
|
||||
IRunExecutionData,
|
||||
WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import { ApplicationError, jsonParse, Workflow, WorkflowOperationError } from 'n8n-workflow';
|
||||
import type { FindOperator } from 'typeorm';
|
||||
import { In } from 'typeorm';
|
||||
|
@ -7,9 +15,11 @@ import { ActiveExecutions } from '@/ActiveExecutions';
|
|||
import config from '@/config';
|
||||
import type { User } from '@db/entities/User';
|
||||
import type {
|
||||
ExecutionPayload,
|
||||
IExecutionFlattedResponse,
|
||||
IExecutionResponse,
|
||||
IExecutionsListResponse,
|
||||
IWorkflowDb,
|
||||
IWorkflowExecutionDataProcess,
|
||||
} from '@/Interfaces';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
|
@ -18,7 +28,7 @@ import type { ExecutionRequest } from '@/requests';
|
|||
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
|
||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { Container } from 'typedi';
|
||||
import { Container, Service } from 'typedi';
|
||||
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
|
@ -75,6 +85,7 @@ const schemaGetExecutionsQueryFilter = {
|
|||
|
||||
const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties);
|
||||
|
||||
@Service()
|
||||
export class ExecutionsService {
|
||||
/**
|
||||
* Function to get the workflow Ids for a User
|
||||
|
@ -362,4 +373,75 @@ export class ExecutionsService {
|
|||
},
|
||||
);
|
||||
}
|
||||
|
||||
async createErrorExecution(
|
||||
error: ExecutionError,
|
||||
node: INode,
|
||||
workflowData: IWorkflowDb,
|
||||
workflow: Workflow,
|
||||
mode: WorkflowExecuteMode,
|
||||
): Promise<void> {
|
||||
const saveDataErrorExecutionDisabled =
|
||||
workflowData?.settings?.saveDataErrorExecution === 'none';
|
||||
|
||||
if (saveDataErrorExecutionDisabled) return;
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {
|
||||
destinationNode: node.name,
|
||||
runNodeFilter: [node.name],
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
pairedItem: {
|
||||
item: 0,
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
resultData: {
|
||||
runData: {
|
||||
[node.name]: [
|
||||
{
|
||||
startTime: 0,
|
||||
executionTime: 0,
|
||||
error,
|
||||
source: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
error,
|
||||
lastNodeExecuted: node.name,
|
||||
},
|
||||
};
|
||||
|
||||
const fullExecutionData: ExecutionPayload = {
|
||||
data: executionData,
|
||||
mode,
|
||||
finished: false,
|
||||
startedAt: new Date(),
|
||||
workflowData,
|
||||
workflowId: workflow.id,
|
||||
stoppedAt: new Date(),
|
||||
status: 'error',
|
||||
};
|
||||
|
||||
await Container.get(ExecutionRepository).createNewExecution(fullExecutionData);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,12 +24,14 @@ import { setSchedulerAsLoadedNode } from './shared/utils';
|
|||
import * as testDb from './shared/testDb';
|
||||
import { createOwner } from './shared/db/users';
|
||||
import { createWorkflow } from './shared/db/workflows';
|
||||
import { ExecutionsService } from '@/executions/executions.service';
|
||||
import { WorkflowService } from '@/workflows/workflow.service';
|
||||
|
||||
mockInstance(ActiveExecutions);
|
||||
mockInstance(ActiveWorkflows);
|
||||
mockInstance(Push);
|
||||
mockInstance(SecretsHelper);
|
||||
mockInstance(ExecutionsService);
|
||||
mockInstance(WorkflowService);
|
||||
|
||||
const webhookService = mockInstance(WebhookService);
|
||||
|
|
|
@ -2,8 +2,12 @@ import type { SuperAgentTest } from 'supertest';
|
|||
import * as utils from './shared/utils/';
|
||||
import { getGlobalMemberRole } from './shared/db/roles';
|
||||
import { createUser } from './shared/db/users';
|
||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
|
||||
describe('Auth Middleware', () => {
|
||||
mockInstance(ActiveWorkflowRunner);
|
||||
|
||||
const testServer = utils.setupTestServer({
|
||||
endpointGroups: ['me', 'auth', 'owner', 'users', 'invitations'],
|
||||
});
|
||||
|
|
|
@ -18,6 +18,7 @@ import { createWorkflow, createWorkflowWithTrigger } from '../shared/db/workflow
|
|||
import { createTag } from '../shared/db/tags';
|
||||
import { mockInstance } from '../../shared/mocking';
|
||||
import { Push } from '@/push';
|
||||
import { ExecutionsService } from '@/executions/executions.service';
|
||||
|
||||
let workflowOwnerRole: Role;
|
||||
let owner: User;
|
||||
|
@ -30,6 +31,7 @@ const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] });
|
|||
const license = testServer.license;
|
||||
|
||||
mockInstance(Push);
|
||||
mockInstance(ExecutionsService);
|
||||
|
||||
beforeAll(async () => {
|
||||
const [globalOwnerRole, globalMemberRole, fetchedWorkflowOwnerRole] = await getAllRoles();
|
||||
|
|
|
@ -18,6 +18,7 @@ import { SettingsRepository } from '@db/repositories/settings.repository';
|
|||
import { mockNodeTypesData } from '../../../unit/Helpers';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { mockInstance } from '../../../shared/mocking';
|
||||
import { ExecutionsService } from '@/executions/executions.service';
|
||||
|
||||
export { setupTestServer } from './testServer';
|
||||
|
||||
|
@ -31,6 +32,7 @@ export { setupTestServer } from './testServer';
|
|||
export async function initActiveWorkflowRunner() {
|
||||
mockInstance(MultiMainSetup);
|
||||
|
||||
mockInstance(ExecutionsService);
|
||||
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
|
||||
const workflowRunner = Container.get(ActiveWorkflowRunner);
|
||||
await workflowRunner.init();
|
||||
|
|
|
@ -18,6 +18,10 @@ import * as testDb from './shared/testDb';
|
|||
import type { SuperAgentTest } from 'supertest';
|
||||
import type { Role } from '@db/entities/Role';
|
||||
import type { User } from '@db/entities/User';
|
||||
import { ExecutionsService } from '@/executions/executions.service';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
|
||||
mockInstance(ExecutionsService);
|
||||
|
||||
const testServer = utils.setupTestServer({
|
||||
endpointGroups: ['users'],
|
||||
|
|
Loading…
Reference in a new issue