From 722f4a8b771058800b992a482ad5f644b650960d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 4 Oct 2024 16:08:52 +0200 Subject: [PATCH] fix(core): Always set `startedAt` when executions start running (#11098) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- .../workflow-execute-additional-data.test.ts | 75 ++++++++++++++++++- .../repositories/execution.repository.ts | 4 - .../src/workflow-execute-additional-data.ts | 10 ++- packages/cli/src/workflow-runner.ts | 2 +- 4 files changed, 83 insertions(+), 8 deletions(-) diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index 6b715f175d..88aee51540 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,21 +1,80 @@ +import { mock } from 'jest-mock-extended'; +import type { + IExecuteWorkflowInfo, + IWorkflowExecuteAdditionalData, + ExecuteWorkflowOptions, + IRun, +} from 'n8n-workflow'; +import type PCancelable from 'p-cancelable'; import Container from 'typedi'; +import { ActiveExecutions } from '@/active-executions'; import { CredentialsHelper } from '@/credentials-helper'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { VariablesService } from '@/environments/variables/variables.service.ee'; import { EventService } from '@/events/event.service'; +import { ExternalHooks } from '@/external-hooks'; import { SecretsHelper } from '@/secrets-helpers'; -import { getBase } from '@/workflow-execute-additional-data'; +import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; +import { Telemetry } from '@/telemetry'; +import { PermissionChecker } from '@/user-management/permission-checker'; +import { executeWorkflow, getBase } from '@/workflow-execute-additional-data'; import { mockInstance } from '@test/mocking'; +const run = mock({ + data: { resultData: {} }, + finished: true, + mode: 'manual', + startedAt: new Date(), + status: 'new', +}); + +const cancelablePromise = mock>({ + then: jest + .fn() + .mockImplementation(async (onfulfilled) => await Promise.resolve(run).then(onfulfilled)), + catch: jest + .fn() + .mockImplementation(async (onrejected) => await Promise.resolve(run).catch(onrejected)), + finally: jest + .fn() + .mockImplementation(async (onfinally) => await Promise.resolve(run).finally(onfinally)), + [Symbol.toStringTag]: 'PCancelable', +}); + +jest.mock('n8n-core', () => ({ + __esModule: true, + ...jest.requireActual('n8n-core'), + WorkflowExecute: jest.fn().mockImplementation(() => ({ + processRunExecutionData: jest.fn().mockReturnValue(cancelablePromise), + })), +})); + +jest.mock('../workflow-helpers', () => ({ + ...jest.requireActual('../workflow-helpers'), + getDataLastExecutedNodeData: jest.fn().mockReturnValue({ data: { main: [] } }), +})); + describe('WorkflowExecuteAdditionalData', () => { const variablesService = mockInstance(VariablesService); variablesService.getAllCached.mockResolvedValue([]); const credentialsHelper = mockInstance(CredentialsHelper); const secretsHelper = mockInstance(SecretsHelper); const eventService = mockInstance(EventService); + mockInstance(ExternalHooks); Container.set(VariablesService, variablesService); Container.set(CredentialsHelper, credentialsHelper); Container.set(SecretsHelper, secretsHelper); + const executionRepository = mockInstance(ExecutionRepository); + mockInstance(Telemetry); + const workflowRepository = mockInstance(WorkflowRepository); + const activeExecutions = mockInstance(ActiveExecutions); + mockInstance(PermissionChecker); + mockInstance(SubworkflowPolicyChecker); + mockInstance(WorkflowStatisticsService); test('logAiEvent should call MessageEventBus', async () => { const additionalData = await getBase('user-id'); @@ -35,4 +94,18 @@ describe('WorkflowExecuteAdditionalData', () => { expect(eventService.emit).toHaveBeenCalledTimes(1); expect(eventService.emit).toHaveBeenCalledWith(eventName, payload); }); + + it('`executeWorkflow` should set subworkflow execution as running', async () => { + const executionId = '123'; + workflowRepository.get.mockResolvedValue(mock({ id: executionId, nodes: [] })); + activeExecutions.add.mockResolvedValue(executionId); + + await executeWorkflow( + mock(), + mock(), + mock({ loadedWorkflowData: undefined }), + ); + + expect(executionRepository.setRunning).toHaveBeenCalledWith(executionId); + }); }); diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index aee2752494..7b26463969 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -340,10 +340,6 @@ export class ExecutionRepository extends Repository { ]); } - async updateStatus(executionId: string, status: ExecutionStatus) { - await this.update({ id: executionId }, { status }); - } - async setRunning(executionId: string) { const startedAt = new Date(); diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 73e14f2cb7..f357bbc018 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -766,7 +766,7 @@ export async function getWorkflowData( /** * Executes the workflow with the given ID */ -async function executeWorkflow( +export async function executeWorkflow( workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, options: ExecuteWorkflowOptions, @@ -798,7 +798,13 @@ async function executeWorkflow( const runData = options.loadedRunData ?? (await getRunData(workflowData, options.inputData)); const executionId = await activeExecutions.add(runData); - await executionRepository.updateStatus(executionId, 'running'); + + /** + * A subworkflow execution in queue mode is not enqueued, but rather runs in the + * same worker process as the parent execution. Hence ensure the subworkflow + * execution is marked as started as well. + */ + await executionRepository.setRunning(executionId); Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData }); diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index aaed763500..8d1e147e85 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -245,7 +245,7 @@ export class WorkflowRunner { { executionId }, ); let workflowExecution: PCancelable; - await this.executionRepository.updateStatus(executionId, 'running'); // write + await this.executionRepository.setRunning(executionId); // write try { additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);