From 9bdb85c4ced96fde75435e334dc757d6c7679926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 1 Nov 2023 13:51:13 +0100 Subject: [PATCH] fix(core): Prevent executions from becoming forever running (#7569) Fixes CP-867 Possibly also fixes PAY-323 and PAY-412 --- packages/cli/src/InternalHooks.ts | 15 ++++----------- packages/cli/src/WorkflowRunner.ts | 7 ++----- packages/cli/src/commands/worker.ts | 13 ++++++------- .../repositories/execution.repository.ts | 8 ++++++-- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index 4f4c365b19..863a0e05bc 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -27,7 +27,6 @@ import type { User } from '@db/entities/User'; import { N8N_VERSION } from '@/constants'; import { NodeTypes } from './NodeTypes'; import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; -import { ExecutionRepository } from '@db/repositories'; import { RoleService } from './services/role.service'; import type { EventPayloadWorkflow } from './eventbus/EventMessageClasses/EventMessageWorkflow'; import { determineFinalExecutionStatus } from './executionLifecycleHooks/shared/sharedHookFunctions'; @@ -55,7 +54,6 @@ export class InternalHooks implements IInternalHooksClass { private telemetry: Telemetry, private nodeTypes: NodeTypes, private roleService: RoleService, - private executionRepository: ExecutionRepository, eventsService: EventsService, private readonly instanceSettings: InstanceSettings, ) { @@ -256,15 +254,10 @@ export class InternalHooks implements IInternalHooksClass { workflowName: (data as IWorkflowBase).name, }; } - void Promise.all([ - this.executionRepository.updateExistingExecution(executionId, { - status: 'running', - }), - eventBus.sendWorkflowEvent({ - eventName: 'n8n.workflow.started', - payload, - }), - ]); + void eventBus.sendWorkflowEvent({ + eventName: 'n8n.workflow.started', + payload, + }); } async onWorkflowCrashed( diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index b004aec93c..dad876e47e 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -306,13 +306,9 @@ export class WorkflowRunner { { executionId }, ); let workflowExecution: PCancelable; + await Container.get(ExecutionRepository).updateStatus(executionId, 'running'); try { - this.logger.verbose( - `Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`, - { executionId }, - ); - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain( data, executionId, @@ -701,6 +697,7 @@ export class WorkflowRunner { const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId); (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; + await Container.get(ExecutionRepository).updateStatus(executionId, 'running'); const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 928aecd15c..5fb75b2c96 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -112,13 +112,11 @@ export class Worker extends BaseCommand { async runJob(job: Job, nodeTypes: INodeTypes): Promise { const { executionId, loadStaticData } = job.data; - const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: true, - unflattenData: true, - }, - ); + const executionRepository = Container.get(ExecutionRepository); + const fullExecutionData = await executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); if (!fullExecutionData) { this.logger.error( @@ -133,6 +131,7 @@ export class Worker extends BaseCommand { this.logger.info( `Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`, ); + await executionRepository.updateStatus(executionId, 'running'); const workflowOwner = await Container.get(OwnershipService).getWorkflowOwnerCached(workflowId); diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index a6f0387405..9a80bb15c7 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -17,7 +17,7 @@ import type { SelectQueryBuilder, } from 'typeorm'; import { parse, stringify } from 'flatted'; -import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow'; +import type { ExecutionStatus, IExecutionsSummary, IRunExecutionData } from 'n8n-workflow'; import { BinaryDataService } from 'n8n-core'; import type { ExecutionPayload, @@ -298,11 +298,15 @@ export class ExecutionRepository extends Repository { return Promise.all([this.delete(ids.executionId), this.binaryDataService.deleteMany([ids])]); } + async updateStatus(executionId: string, status: ExecutionStatus) { + await this.update({ id: executionId }, { status }); + } + async updateExistingExecution(executionId: string, execution: Partial) { // Se isolate startedAt because it must be set when the execution starts and should never change. // So we prevent updating it, if it's sent (it usually is and causes problems to executions that // are resumed after waiting for some time, as a new startedAt is set) - const { id, data, workflowData, startedAt, ...executionInformation } = execution; + const { id, data, workflowId, workflowData, startedAt, ...executionInformation } = execution; if (Object.keys(executionInformation).length > 0) { await this.update({ id: executionId }, executionInformation); }