refactor: Make execution IDs mandatory in BE (#8299)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
Omar Ajoue 2024-01-16 09:53:17 +00:00 committed by GitHub
parent 0f4f472a72
commit e1acb5911a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 136 additions and 136 deletions

View file

@ -49,6 +49,7 @@ export class ActiveExecutions {
startedAt: new Date(),
workflowData: executionData.workflowData,
status: executionStatus,
workflowId: executionData.workflowData.id,
};
if (executionData.retryOf !== undefined) {

View file

@ -81,7 +81,6 @@ export type ITagWithCountDb = Pick<TagEntity, 'id' | 'name' | 'createdAt' | 'upd
// Almost identical to editor-ui.Interfaces.ts
export interface IWorkflowDb extends IWorkflowBase {
id: string;
tags?: TagEntity[];
}
@ -119,18 +118,18 @@ export interface IExecutionBase {
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
workflowId?: string; // To be able to filter executions easily //
workflowId: string;
finished: boolean;
retryOf?: string; // If it is a retry, the id of the execution it is a retry of.
retrySuccessId?: string; // If it failed and a retry did succeed. The id of the successful retry.
status: ExecutionStatus;
waitTill?: Date | null;
}
// Data in regular format with references
export interface IExecutionDb extends IExecutionBase {
data: IRunExecutionData;
waitTill?: Date | null;
workflowData?: IWorkflowBase;
workflowData: IWorkflowBase;
}
/**
@ -148,7 +147,6 @@ export interface IExecutionResponse extends IExecutionBase {
data: IRunExecutionData;
retryOf?: string;
retrySuccessId?: string;
waitTill?: Date | null;
workflowData: IWorkflowBase | WorkflowWithSharingsAndCredentials;
}
@ -162,9 +160,7 @@ export interface IExecutionFlatted extends IExecutionBase {
export interface IExecutionFlattedDb extends IExecutionBase {
id: string;
data: string;
waitTill?: Date | null;
workflowData: Omit<IWorkflowBase, 'pinData'>;
status: ExecutionStatus;
}
export interface IExecutionFlattedResponse extends IExecutionFlatted {

View file

@ -33,7 +33,7 @@ export = {
}
await Container.get(ExecutionRepository).hardDelete({
workflowId: execution.workflowId as string,
workflowId: execution.workflowId,
executionId: execution.id,
});

View file

@ -1,6 +1,5 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { Request, Response } from 'express';
import { parse, stringify } from 'flatted';
import picocolors from 'picocolors';
import {
ErrorReporterProxy as ErrorReporter,
@ -8,13 +7,7 @@ import {
NodeApiError,
} from 'n8n-workflow';
import { Readable } from 'node:stream';
import type {
IExecutionDb,
IExecutionFlatted,
IExecutionFlattedDb,
IExecutionResponse,
IWorkflowDb,
} from '@/Interfaces';
import { inDevelopment } from '@/constants';
import { ResponseError } from './errors/response-errors/abstract/response.error';
@ -173,68 +166,6 @@ export function send<T, R extends Request, S extends Response>(
};
}
/**
* Flattens the Execution data.
* As it contains a lot of references which normally would be saved as duplicate data
* with regular JSON.stringify it gets flattened which keeps the references in place.
*
* @param {IExecutionDb} fullExecutionData The data to flatten
*/
// TODO: Remove this functions since it's purpose should be fulfilled by the execution repository
export function flattenExecutionData(fullExecutionData: IExecutionDb): IExecutionFlatted {
// Flatten the data
const returnData: IExecutionFlatted = {
data: stringify(fullExecutionData.data),
mode: fullExecutionData.mode,
// @ts-ignore
waitTill: fullExecutionData.waitTill,
startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt,
finished: fullExecutionData.finished ? fullExecutionData.finished : false,
workflowId: fullExecutionData.workflowId,
workflowData: fullExecutionData.workflowData!,
status: fullExecutionData.status,
};
if (fullExecutionData.id !== undefined) {
returnData.id = fullExecutionData.id;
}
if (fullExecutionData.retryOf !== undefined) {
returnData.retryOf = fullExecutionData.retryOf.toString();
}
if (fullExecutionData.retrySuccessId !== undefined) {
returnData.retrySuccessId = fullExecutionData.retrySuccessId.toString();
}
return returnData;
}
/**
* Unflattens the Execution data.
*
* @param {IExecutionFlattedDb} fullExecutionData The data to unflatten
*/
// TODO: Remove this functions since it's purpose should be fulfilled by the execution repository
export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb): IExecutionResponse {
const returnData: IExecutionResponse = {
id: fullExecutionData.id,
workflowData: fullExecutionData.workflowData as IWorkflowDb,
data: parse(fullExecutionData.data),
mode: fullExecutionData.mode,
waitTill: fullExecutionData.waitTill ? fullExecutionData.waitTill : undefined,
startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt,
finished: fullExecutionData.finished ? fullExecutionData.finished : false,
workflowId: fullExecutionData.workflowId,
status: fullExecutionData.status,
};
return returnData;
}
export const flattenObject = (obj: { [x: string]: any }, prefix = '') =>
Object.keys(obj).reduce((acc, k) => {
const pre = prefix.length ? prefix + '.' : '';

View file

@ -4,14 +4,8 @@ import {
WorkflowOperationError,
} from 'n8n-workflow';
import { Container, Service } from 'typedi';
import * as ResponseHelper from '@/ResponseHelper';
import type {
IExecutionResponse,
IExecutionsStopData,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from './services/ownership.service';
import { Logger } from '@/Logger';
@ -79,42 +73,22 @@ export class WaitTracker {
}
// Also check in database
const execution = await this.executionRepository.findSingleExecution(executionId, {
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) {
if (!fullExecutionData) {
throw new ApplicationError('Execution not found.', {
extra: { executionId },
});
}
if (!['new', 'unknown', 'waiting', 'running'].includes(execution.status)) {
if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${executionId} is currently ${execution.status}.`,
`Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`,
);
}
let fullExecutionData: IExecutionResponse;
try {
fullExecutionData = ResponseHelper.unflattenExecutionData(execution);
} catch (error) {
// if the execution ended in an unforseen, non-cancelable state, try to recover it
await recoverExecutionDataFromEventLogMessages(executionId, [], true);
// find recovered data
const restoredExecution = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
if (!restoredExecution) {
throw new ApplicationError('Execution could not be recovered or canceled.', {
extra: { executionId },
});
}
fullExecutionData = restoredExecution;
}
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
@ -184,4 +158,11 @@ export class WaitTracker {
);
});
}
shutdown() {
clearInterval(this.mainTimer);
Object.keys(this.waitingExecutions).forEach((executionId) => {
clearTimeout(this.waitingExecutions[executionId].timer);
});
}
}

View file

@ -78,7 +78,7 @@ export class WaitingWebhooks implements IWebhookManager {
const { workflowData } = execution;
const workflow = new Workflow({
id: workflowData.id!,
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
@ -90,7 +90,7 @@ export class WaitingWebhooks implements IWebhookManager {
let workflowOwner;
try {
workflowOwner = await this.ownershipService.getWorkflowOwnerCached(workflowData.id!);
workflowOwner = await this.ownershipService.getWorkflowOwnerCached(workflowData.id);
} catch (error) {
throw new NotFoundError('Could not find workflow');
}

View file

@ -420,7 +420,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id as string,
this.workflowData.id,
newStaticData,
);
} catch (e) {
@ -464,7 +464,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
workflowId: this.workflowData.id,
executionId: this.executionId,
});
@ -483,7 +483,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id as string,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
@ -566,7 +566,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id as string,
this.workflowData.id,
newStaticData,
);
} catch (e) {
@ -601,7 +601,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
await updateExistingExecution({
executionId: this.executionId,
workflowId: this.workflowData.id as string,
workflowId: this.workflowData.id,
executionData: fullExecutionData,
});
} catch (error) {
@ -702,7 +702,7 @@ export async function getRunData(
export async function getWorkflowData(
workflowInfo: IExecuteWorkflowInfo,
parentWorkflowId?: string,
parentWorkflowId: string,
parentWorkflowSettings?: IWorkflowSettings,
): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
@ -748,7 +748,7 @@ async function executeWorkflow(
additionalData: IWorkflowExecuteAdditionalData,
options: {
node?: INode;
parentWorkflowId?: string;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
@ -769,7 +769,7 @@ async function executeWorkflow(
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({
id: workflowData.id?.toString(),
id: workflowData.id,
name: workflowName,
nodes: workflowData.nodes,
connections: workflowData.connections,
@ -788,10 +788,7 @@ async function executeWorkflow(
if (options.parentExecutionId !== undefined) {
executionId = options.parentExecutionId;
} else {
executionId =
options.parentExecutionId !== undefined
? options.parentExecutionId
: await activeExecutions.add(runData);
executionId = options.parentExecutionId ?? (await activeExecutions.add(runData));
}
void internalHooks.onWorkflowBeforeExecute(executionId || '', runData);
@ -801,7 +798,7 @@ async function executeWorkflow(
await PermissionChecker.check(workflow, additionalData.userId);
await PermissionChecker.checkSubworkflowExecutePolicy(
workflow,
options.parentWorkflowId!,
options.parentWorkflowId,
options.node,
);
@ -879,6 +876,7 @@ async function executeWorkflow(
stoppedAt: fullRunData.stoppedAt,
status: fullRunData.status,
workflowData,
workflowId: workflowData.id,
};
if (workflowData.id) {
fullExecutionData.workflowId = workflowData.id;
@ -1082,7 +1080,7 @@ export function getWorkflowHooksWorkerMain(
if (shouldNotSave) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}

View file

@ -197,16 +197,16 @@ class WorkflowRunnerProcess {
additionalData.executeWorkflow = async (
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options?: {
parentWorkflowId?: string;
options: {
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentWorkflowSettings?: IWorkflowSettings;
},
): Promise<Array<INodeExecutionData[] | null> | IRun> => {
const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData(
workflowInfo,
options?.parentWorkflowId,
options?.parentWorkflowSettings,
options.parentWorkflowId,
options.parentWorkflowSettings,
);
const runData = await WorkflowExecuteAdditionalData.getRunData(
workflowData,

View file

@ -30,6 +30,7 @@ import { UrlService } from '@/services/url.service';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/WaitTracker';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
@ -99,6 +100,8 @@ export class Start extends BaseCommand {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
Container.get(WaitTracker).shutdown();
await this.externalHooks?.run('n8n.stop', []);
if (Container.get(MultiMainSetup).isEnabled) {

View file

@ -122,7 +122,7 @@ export class Worker extends BaseCommand {
{ extra: { executionId } },
);
}
const workflowId = fullExecutionData.workflowData.id!; // @tech_debt Ensure this is not optional
const workflowId = fullExecutionData.workflowData.id;
this.logger.info(
`Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`,

View file

@ -762,6 +762,12 @@ export const schema = {
default: '',
env: 'N8N_USER_MANAGEMENT_JWT_SECRET',
},
jwtDuration: {
doc: 'Set a specific JWT secret (optional - n8n can generate one)', // Generated @ start.ts
format: Number,
default: 168,
env: 'N8N_USER_MANAGEMENT_JWT_DURATION',
},
isInstanceOwnerSetUp: {
// n8n loads this setting from DB on startup
doc: "Whether the instance owner's account has been set up",

View file

@ -229,7 +229,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData?.id },
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
});
return String(executionId);

View file

@ -54,6 +54,7 @@ export function prepareExecutionDataForDbUpdate(parameters: {
workflowData: pristineWorkflowData,
waitTill: runData.waitTill,
status: workflowStatusFinal,
workflowId: pristineWorkflowData.id,
};
if (retryOf !== undefined) {

View file

@ -281,7 +281,7 @@ export class ExecutionsService {
if (req.body.loadWorkflow) {
// Loads the currently saved workflow to execute instead of the
// one saved at the time of the execution.
const workflowId = execution.workflowData.id as string;
const workflowId = execution.workflowData.id;
const workflowData = (await Container.get(WorkflowRepository).findOneBy({
id: workflowId,
})) as IWorkflowBase;
@ -296,7 +296,7 @@ export class ExecutionsService {
data.workflowData = workflowData;
const nodeTypes = Container.get(NodeTypes);
const workflowInstance = new Workflow({
id: workflowData.id as string,
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,

View file

@ -0,0 +1,83 @@
import { WaitTracker } from '@/WaitTracker';
import { mock } from 'jest-mock-extended';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/Interfaces';
jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const execution = mock<IExecutionResponse>({
id: '123',
waitTill: new Date(Date.now() + 1000),
});
afterEach(() => {
jest.clearAllMocks();
});
describe('constructor()', () => {
it('should query DB for waiting executions', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
new WaitTracker(mock(), executionRepository, mock());
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});
it('if no executions to start, should do nothing', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
new WaitTracker(mock(), executionRepository, mock());
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
});
describe('if execution to start', () => {
it('if not enough time passed, should not start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock());
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(100);
expect(startExecutionSpy).not.toHaveBeenCalled();
});
it('if enough time passed, should start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock());
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(2_000);
expect(startExecutionSpy).toHaveBeenCalledWith(execution.id);
});
});
});
describe('startExecution()', () => {
it('should query for execution to start', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock());
executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.startExecution(execution.id);
jest.advanceTimersByTime(5);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
includeData: true,
unflattenData: true,
});
});
});
});

View file

@ -1884,7 +1884,7 @@ export interface IWaitingForExecutionSource {
}
export interface IWorkflowBase {
id?: string;
id: string;
name: string;
active: boolean;
createdAt: Date;
@ -1921,7 +1921,7 @@ export interface IWorkflowExecuteAdditionalData {
additionalData: IWorkflowExecuteAdditionalData,
options: {
node?: INode;
parentWorkflowId?: string;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;