From e1acb5911a85ce2199553671b744c3fcb65c0e7d Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Tue, 16 Jan 2024 09:53:17 +0000 Subject: [PATCH] refactor: Make execution IDs mandatory in BE (#8299) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- packages/cli/src/ActiveExecutions.ts | 1 + packages/cli/src/Interfaces.ts | 10 +-- .../handlers/executions/executions.handler.ts | 2 +- packages/cli/src/ResponseHelper.ts | 71 +--------------- packages/cli/src/WaitTracker.ts | 45 +++------- packages/cli/src/WaitingWebhooks.ts | 4 +- .../cli/src/WorkflowExecuteAdditionalData.ts | 26 +++--- packages/cli/src/WorkflowRunnerProcess.ts | 8 +- packages/cli/src/commands/start.ts | 3 + packages/cli/src/commands/worker.ts | 2 +- packages/cli/src/config/schema.ts | 6 ++ .../repositories/execution.repository.ts | 2 +- .../shared/sharedHookFunctions.ts | 1 + .../cli/src/executions/executions.service.ts | 4 +- packages/cli/test/unit/WaitTracker.test.ts | 83 +++++++++++++++++++ packages/workflow/src/Interfaces.ts | 4 +- 16 files changed, 136 insertions(+), 136 deletions(-) create mode 100644 packages/cli/test/unit/WaitTracker.test.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index cd8ce8ecdb..d9526b84c5 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -49,6 +49,7 @@ export class ActiveExecutions { startedAt: new Date(), workflowData: executionData.workflowData, status: executionStatus, + workflowId: executionData.workflowData.id, }; if (executionData.retryOf !== undefined) { diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 3a62df243f..88a0c2f610 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -81,7 +81,6 @@ export type ITagWithCountDb = Pick; - status: ExecutionStatus; } export interface IExecutionFlattedResponse extends IExecutionFlatted { diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts index 68aac7595b..4acad28696 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts @@ -33,7 +33,7 @@ export = { } await Container.get(ExecutionRepository).hardDelete({ - workflowId: execution.workflowId as string, + workflowId: execution.workflowId, executionId: execution.id, }); diff --git a/packages/cli/src/ResponseHelper.ts b/packages/cli/src/ResponseHelper.ts index d477af6060..e9e27d02be 100644 --- a/packages/cli/src/ResponseHelper.ts +++ b/packages/cli/src/ResponseHelper.ts @@ -1,6 +1,5 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import type { Request, Response } from 'express'; -import { parse, stringify } from 'flatted'; import picocolors from 'picocolors'; import { ErrorReporterProxy as ErrorReporter, @@ -8,13 +7,7 @@ import { NodeApiError, } from 'n8n-workflow'; import { Readable } from 'node:stream'; -import type { - IExecutionDb, - IExecutionFlatted, - IExecutionFlattedDb, - IExecutionResponse, - IWorkflowDb, -} from '@/Interfaces'; + import { inDevelopment } from '@/constants'; import { ResponseError } from './errors/response-errors/abstract/response.error'; @@ -173,68 +166,6 @@ export function send( }; } -/** - * Flattens the Execution data. - * As it contains a lot of references which normally would be saved as duplicate data - * with regular JSON.stringify it gets flattened which keeps the references in place. - * - * @param {IExecutionDb} fullExecutionData The data to flatten - */ -// TODO: Remove this functions since it's purpose should be fulfilled by the execution repository -export function flattenExecutionData(fullExecutionData: IExecutionDb): IExecutionFlatted { - // Flatten the data - const returnData: IExecutionFlatted = { - data: stringify(fullExecutionData.data), - mode: fullExecutionData.mode, - // @ts-ignore - waitTill: fullExecutionData.waitTill, - startedAt: fullExecutionData.startedAt, - stoppedAt: fullExecutionData.stoppedAt, - finished: fullExecutionData.finished ? fullExecutionData.finished : false, - workflowId: fullExecutionData.workflowId, - - workflowData: fullExecutionData.workflowData!, - status: fullExecutionData.status, - }; - - if (fullExecutionData.id !== undefined) { - returnData.id = fullExecutionData.id; - } - - if (fullExecutionData.retryOf !== undefined) { - returnData.retryOf = fullExecutionData.retryOf.toString(); - } - - if (fullExecutionData.retrySuccessId !== undefined) { - returnData.retrySuccessId = fullExecutionData.retrySuccessId.toString(); - } - - return returnData; -} - -/** - * Unflattens the Execution data. - * - * @param {IExecutionFlattedDb} fullExecutionData The data to unflatten - */ -// TODO: Remove this functions since it's purpose should be fulfilled by the execution repository -export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb): IExecutionResponse { - const returnData: IExecutionResponse = { - id: fullExecutionData.id, - workflowData: fullExecutionData.workflowData as IWorkflowDb, - data: parse(fullExecutionData.data), - mode: fullExecutionData.mode, - waitTill: fullExecutionData.waitTill ? fullExecutionData.waitTill : undefined, - startedAt: fullExecutionData.startedAt, - stoppedAt: fullExecutionData.stoppedAt, - finished: fullExecutionData.finished ? fullExecutionData.finished : false, - workflowId: fullExecutionData.workflowId, - status: fullExecutionData.status, - }; - - return returnData; -} - export const flattenObject = (obj: { [x: string]: any }, prefix = '') => Object.keys(obj).reduce((acc, k) => { const pre = prefix.length ? prefix + '.' : ''; diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 4bbf3dd026..435b6f5b94 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -4,14 +4,8 @@ import { WorkflowOperationError, } from 'n8n-workflow'; import { Container, Service } from 'typedi'; -import * as ResponseHelper from '@/ResponseHelper'; -import type { - IExecutionResponse, - IExecutionsStopData, - IWorkflowExecutionDataProcess, -} from '@/Interfaces'; +import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { WorkflowRunner } from '@/WorkflowRunner'; -import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { OwnershipService } from './services/ownership.service'; import { Logger } from '@/Logger'; @@ -79,42 +73,22 @@ export class WaitTracker { } // Also check in database - const execution = await this.executionRepository.findSingleExecution(executionId, { + const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { includeData: true, + unflattenData: true, }); - if (!execution) { + if (!fullExecutionData) { throw new ApplicationError('Execution not found.', { extra: { executionId }, }); } - if (!['new', 'unknown', 'waiting', 'running'].includes(execution.status)) { + if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) { throw new WorkflowOperationError( - `Only running or waiting executions can be stopped and ${executionId} is currently ${execution.status}.`, + `Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`, ); } - let fullExecutionData: IExecutionResponse; - try { - fullExecutionData = ResponseHelper.unflattenExecutionData(execution); - } catch (error) { - // if the execution ended in an unforseen, non-cancelable state, try to recover it - await recoverExecutionDataFromEventLogMessages(executionId, [], true); - // find recovered data - const restoredExecution = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: true, - unflattenData: true, - }, - ); - if (!restoredExecution) { - throw new ApplicationError('Execution could not be recovered or canceled.', { - extra: { executionId }, - }); - } - fullExecutionData = restoredExecution; - } // Set in execution in DB as failed and remove waitTill time const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); @@ -184,4 +158,11 @@ export class WaitTracker { ); }); } + + shutdown() { + clearInterval(this.mainTimer); + Object.keys(this.waitingExecutions).forEach((executionId) => { + clearTimeout(this.waitingExecutions[executionId].timer); + }); + } } diff --git a/packages/cli/src/WaitingWebhooks.ts b/packages/cli/src/WaitingWebhooks.ts index f91ffe1267..223fc92820 100644 --- a/packages/cli/src/WaitingWebhooks.ts +++ b/packages/cli/src/WaitingWebhooks.ts @@ -78,7 +78,7 @@ export class WaitingWebhooks implements IWebhookManager { const { workflowData } = execution; const workflow = new Workflow({ - id: workflowData.id!, + id: workflowData.id, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, @@ -90,7 +90,7 @@ export class WaitingWebhooks implements IWebhookManager { let workflowOwner; try { - workflowOwner = await this.ownershipService.getWorkflowOwnerCached(workflowData.id!); + workflowOwner = await this.ownershipService.getWorkflowOwnerCached(workflowData.id); } catch (error) { throw new NotFoundError('Could not find workflow'); } diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index cdb809a83c..143ef169a4 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -420,7 +420,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { // Workflow is saved so update in database try { await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id as string, + this.workflowData.id, newStaticData, ); } catch (e) { @@ -464,7 +464,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { this.retryOf, ); await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id as string, + workflowId: this.workflowData.id, executionId: this.executionId, }); @@ -483,7 +483,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { await updateExistingExecution({ executionId: this.executionId, - workflowId: this.workflowData.id as string, + workflowId: this.workflowData.id, executionData: fullExecutionData, }); @@ -566,7 +566,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { // Workflow is saved so update in database try { await Container.get(WorkflowStaticDataService).saveStaticDataById( - this.workflowData.id as string, + this.workflowData.id, newStaticData, ); } catch (e) { @@ -601,7 +601,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { await updateExistingExecution({ executionId: this.executionId, - workflowId: this.workflowData.id as string, + workflowId: this.workflowData.id, executionData: fullExecutionData, }); } catch (error) { @@ -702,7 +702,7 @@ export async function getRunData( export async function getWorkflowData( workflowInfo: IExecuteWorkflowInfo, - parentWorkflowId?: string, + parentWorkflowId: string, parentWorkflowSettings?: IWorkflowSettings, ): Promise { if (workflowInfo.id === undefined && workflowInfo.code === undefined) { @@ -748,7 +748,7 @@ async function executeWorkflow( additionalData: IWorkflowExecuteAdditionalData, options: { node?: INode; - parentWorkflowId?: string; + parentWorkflowId: string; inputData?: INodeExecutionData[]; parentExecutionId?: string; loadedWorkflowData?: IWorkflowBase; @@ -769,7 +769,7 @@ async function executeWorkflow( const workflowName = workflowData ? workflowData.name : undefined; const workflow = new Workflow({ - id: workflowData.id?.toString(), + id: workflowData.id, name: workflowName, nodes: workflowData.nodes, connections: workflowData.connections, @@ -788,10 +788,7 @@ async function executeWorkflow( if (options.parentExecutionId !== undefined) { executionId = options.parentExecutionId; } else { - executionId = - options.parentExecutionId !== undefined - ? options.parentExecutionId - : await activeExecutions.add(runData); + executionId = options.parentExecutionId ?? (await activeExecutions.add(runData)); } void internalHooks.onWorkflowBeforeExecute(executionId || '', runData); @@ -801,7 +798,7 @@ async function executeWorkflow( await PermissionChecker.check(workflow, additionalData.userId); await PermissionChecker.checkSubworkflowExecutePolicy( workflow, - options.parentWorkflowId!, + options.parentWorkflowId, options.node, ); @@ -879,6 +876,7 @@ async function executeWorkflow( stoppedAt: fullRunData.stoppedAt, status: fullRunData.status, workflowData, + workflowId: workflowData.id, }; if (workflowData.id) { fullExecutionData.workflowId = workflowData.id; @@ -1082,7 +1080,7 @@ export function getWorkflowHooksWorkerMain( if (shouldNotSave) { await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id as string, + workflowId: this.workflowData.id, executionId: this.executionId, }); } diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 18992db4ac..4bb8ba24cc 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -197,16 +197,16 @@ class WorkflowRunnerProcess { additionalData.executeWorkflow = async ( workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, - options?: { - parentWorkflowId?: string; + options: { + parentWorkflowId: string; inputData?: INodeExecutionData[]; parentWorkflowSettings?: IWorkflowSettings; }, ): Promise | IRun> => { const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData( workflowInfo, - options?.parentWorkflowId, - options?.parentWorkflowSettings, + options.parentWorkflowId, + options.parentWorkflowSettings, ); const runData = await WorkflowExecuteAdditionalData.getRunData( workflowData, diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index a2d674d9ad..b9c43a9a69 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -30,6 +30,7 @@ import { UrlService } from '@/services/url.service'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; +import { WaitTracker } from '@/WaitTracker'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -99,6 +100,8 @@ export class Start extends BaseCommand { // Stop with trying to activate workflows that could not be activated this.activeWorkflowRunner.removeAllQueuedWorkflowActivations(); + Container.get(WaitTracker).shutdown(); + await this.externalHooks?.run('n8n.stop', []); if (Container.get(MultiMainSetup).isEnabled) { diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 31a862dfd8..4848ff7d10 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -122,7 +122,7 @@ export class Worker extends BaseCommand { { extra: { executionId } }, ); } - const workflowId = fullExecutionData.workflowData.id!; // @tech_debt Ensure this is not optional + const workflowId = fullExecutionData.workflowData.id; this.logger.info( `Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`, diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 78247a8dac..3a2c3a8ee4 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -762,6 +762,12 @@ export const schema = { default: '', env: 'N8N_USER_MANAGEMENT_JWT_SECRET', }, + jwtDuration: { + doc: 'Set a specific JWT secret (optional - n8n can generate one)', // Generated @ start.ts + format: Number, + default: 168, + env: 'N8N_USER_MANAGEMENT_JWT_DURATION', + }, isInstanceOwnerSetUp: { // n8n loads this setting from DB on startup doc: "Whether the instance owner's account has been set up", diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 32ddc25f65..8021433f3b 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -229,7 +229,7 @@ export class ExecutionRepository extends Repository { const { connections, nodes, name, settings } = workflowData ?? {}; await this.executionDataRepository.insert({ executionId, - workflowData: { connections, nodes, name, settings, id: workflowData?.id }, + workflowData: { connections, nodes, name, settings, id: workflowData.id }, data: stringify(data), }); return String(executionId); diff --git a/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts index 477b6be0ee..dc520d90b2 100644 --- a/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts +++ b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts @@ -54,6 +54,7 @@ export function prepareExecutionDataForDbUpdate(parameters: { workflowData: pristineWorkflowData, waitTill: runData.waitTill, status: workflowStatusFinal, + workflowId: pristineWorkflowData.id, }; if (retryOf !== undefined) { diff --git a/packages/cli/src/executions/executions.service.ts b/packages/cli/src/executions/executions.service.ts index 4e2d23ec20..64fb3ff6c9 100644 --- a/packages/cli/src/executions/executions.service.ts +++ b/packages/cli/src/executions/executions.service.ts @@ -281,7 +281,7 @@ export class ExecutionsService { if (req.body.loadWorkflow) { // Loads the currently saved workflow to execute instead of the // one saved at the time of the execution. - const workflowId = execution.workflowData.id as string; + const workflowId = execution.workflowData.id; const workflowData = (await Container.get(WorkflowRepository).findOneBy({ id: workflowId, })) as IWorkflowBase; @@ -296,7 +296,7 @@ export class ExecutionsService { data.workflowData = workflowData; const nodeTypes = Container.get(NodeTypes); const workflowInstance = new Workflow({ - id: workflowData.id as string, + id: workflowData.id, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, diff --git a/packages/cli/test/unit/WaitTracker.test.ts b/packages/cli/test/unit/WaitTracker.test.ts new file mode 100644 index 0000000000..f1f8f306dd --- /dev/null +++ b/packages/cli/test/unit/WaitTracker.test.ts @@ -0,0 +1,83 @@ +import { WaitTracker } from '@/WaitTracker'; +import { mock } from 'jest-mock-extended'; +import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { IExecutionResponse } from '@/Interfaces'; + +jest.useFakeTimers(); + +describe('WaitTracker', () => { + const executionRepository = mock(); + + const execution = mock({ + id: '123', + waitTill: new Date(Date.now() + 1000), + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('constructor()', () => { + it('should query DB for waiting executions', async () => { + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + + new WaitTracker(mock(), executionRepository, mock()); + + expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); + }); + + it('if no executions to start, should do nothing', () => { + executionRepository.getWaitingExecutions.mockResolvedValue([]); + + new WaitTracker(mock(), executionRepository, mock()); + + expect(executionRepository.findSingleExecution).not.toHaveBeenCalled(); + }); + + describe('if execution to start', () => { + it('if not enough time passed, should not start execution', async () => { + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + await waitTracker.getWaitingExecutions(); + + const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution'); + + jest.advanceTimersByTime(100); + + expect(startExecutionSpy).not.toHaveBeenCalled(); + }); + + it('if enough time passed, should start execution', async () => { + executionRepository.getWaitingExecutions.mockResolvedValue([]); + const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + await waitTracker.getWaitingExecutions(); + + const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution'); + + jest.advanceTimersByTime(2_000); + + expect(startExecutionSpy).toHaveBeenCalledWith(execution.id); + }); + }); + }); + + describe('startExecution()', () => { + it('should query for execution to start', async () => { + executionRepository.getWaitingExecutions.mockResolvedValue([]); + const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + + executionRepository.findSingleExecution.mockResolvedValue(execution); + waitTracker.startExecution(execution.id); + jest.advanceTimersByTime(5); + + expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { + includeData: true, + unflattenData: true, + }); + }); + }); +}); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index a4e9040e84..b3ca075803 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1884,7 +1884,7 @@ export interface IWaitingForExecutionSource { } export interface IWorkflowBase { - id?: string; + id: string; name: string; active: boolean; createdAt: Date; @@ -1921,7 +1921,7 @@ export interface IWorkflowExecuteAdditionalData { additionalData: IWorkflowExecuteAdditionalData, options: { node?: INode; - parentWorkflowId?: string; + parentWorkflowId: string; inputData?: INodeExecutionData[]; parentExecutionId?: string; loadedWorkflowData?: IWorkflowBase;