fix(core): Fix push for waiting executions (#11984)
Some checks failed
Test Master / install-and-build (push) Has been cancelled
Benchmark Docker Image CI / build (push) Has been cancelled
Test Master / Unit tests (18.x) (push) Has been cancelled
Test Master / Unit tests (20.x) (push) Has been cancelled
Test Master / Unit tests (22.4) (push) Has been cancelled
Test Master / Lint (push) Has been cancelled
Test Master / Notify Slack on failure (push) Has been cancelled

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-29 16:14:05 +01:00 committed by GitHub
parent 1eb94bcaf5
commit 8d71307da0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 77 additions and 58 deletions

View file

@ -1,33 +1,46 @@
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core'; import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { Project } from '@/databases/entities/project';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces'; import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import type { OwnershipService } from '@/services/ownership.service';
import { WaitTracker } from '@/wait-tracker'; import { WaitTracker } from '@/wait-tracker';
import type { WorkflowRunner } from '@/workflow-runner';
import { mockLogger } from '@test/mocking'; import { mockLogger } from '@test/mocking';
jest.useFakeTimers(); jest.useFakeTimers();
describe('WaitTracker', () => { describe('WaitTracker', () => {
const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>(); const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>(); const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock()); const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock());
const instanceSettings = mock<InstanceSettings>({ isLeader: true }); const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const project = mock<Project>({ id: 'projectId' });
const execution = mock<IExecutionResponse>({ const execution = mock<IExecutionResponse>({
id: '123', id: '123',
finished: false,
waitTill: new Date(Date.now() + 1000), waitTill: new Date(Date.now() + 1000),
mode: 'manual',
data: mock({
pushRef: 'push_ref',
}),
}); });
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
let waitTracker: WaitTracker; let waitTracker: WaitTracker;
beforeEach(() => { beforeEach(() => {
waitTracker = new WaitTracker( waitTracker = new WaitTracker(
mockLogger(), mockLogger(),
executionRepository, executionRepository,
mock(), ownershipService,
mock(), workflowRunner,
orchestrationService, orchestrationService,
instanceSettings, instanceSettings,
); );
@ -64,29 +77,31 @@ describe('WaitTracker', () => {
}); });
describe('if execution to start', () => { describe('if execution to start', () => {
it('if not enough time passed, should not start execution', async () => { let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>;
beforeEach(() => {
executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]); executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
startExecutionSpy = jest
.spyOn(waitTracker, 'startExecution')
.mockImplementation(async () => {});
waitTracker.init(); waitTracker.init();
});
executionRepository.getWaitingExecutions.mockResolvedValue([execution]); it('if not enough time passed, should not start execution', async () => {
await waitTracker.getWaitingExecutions(); await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(100); jest.advanceTimersByTime(100);
expect(startExecutionSpy).not.toHaveBeenCalled(); expect(startExecutionSpy).not.toHaveBeenCalled();
}); });
it('if enough time passed, should start execution', async () => { it('if enough time passed, should start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init();
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions(); await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(2_000); jest.advanceTimersByTime(2_000);
expect(startExecutionSpy).toHaveBeenCalledWith(execution.id); expect(startExecutionSpy).toHaveBeenCalledWith(execution.id);
@ -100,13 +115,27 @@ describe('WaitTracker', () => {
waitTracker.init(); waitTracker.init();
executionRepository.findSingleExecution.mockResolvedValue(execution); executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.startExecution(execution.id); ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
jest.advanceTimersByTime(5);
await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
includeData: true, includeData: true,
unflattenData: true, unflattenData: true,
}); });
expect(workflowRunner.run).toHaveBeenCalledWith(
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);
}); });
}); });
@ -135,8 +164,8 @@ describe('WaitTracker', () => {
const waitTracker = new WaitTracker( const waitTracker = new WaitTracker(
mockLogger(), mockLogger(),
executionRepository, executionRepository,
mock(), ownershipService,
mock(), workflowRunner,
orchestrationService, orchestrationService,
mock<InstanceSettings>({ isLeader: false }), mock<InstanceSettings>({ isLeader: false }),
); );

View file

@ -1,9 +1,5 @@
import { InstanceSettings } from 'n8n-core'; import { InstanceSettings } from 'n8n-core';
import { import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
ApplicationError,
ErrorReporterProxy as ErrorReporter,
type IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@ -88,7 +84,7 @@ export class WaitTracker {
this.waitingExecutions[executionId] = { this.waitingExecutions[executionId] = {
executionId, executionId,
timer: setTimeout(() => { timer: setTimeout(() => {
this.startExecution(executionId); void this.startExecution(executionId);
}, triggerTime), }, triggerTime),
}; };
} }
@ -103,11 +99,10 @@ export class WaitTracker {
delete this.waitingExecutions[executionId]; delete this.waitingExecutions[executionId];
} }
startExecution(executionId: string) { async startExecution(executionId: string) {
this.logger.debug(`Resuming execution ${executionId}`, { executionId }); this.logger.debug(`Resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId]; delete this.waitingExecutions[executionId];
(async () => {
// Get the data to execute // Get the data to execute
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true, includeData: true,
@ -124,6 +119,7 @@ export class WaitTracker {
if (!fullExecutionData.workflowData.id) { if (!fullExecutionData.workflowData.id) {
throw new ApplicationError('Only saved workflows can be resumed.'); throw new ApplicationError('Only saved workflows can be resumed.');
} }
const workflowId = fullExecutionData.workflowData.id; const workflowId = fullExecutionData.workflowData.id;
const project = await this.ownershipService.getWorkflowProjectCached(workflowId); const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
@ -132,17 +128,11 @@ export class WaitTracker {
executionData: fullExecutionData.data, executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData, workflowData: fullExecutionData.workflowData,
projectId: project.id, projectId: project.id,
pushRef: fullExecutionData.data.pushRef,
}; };
// Start the execution again // Start the execution again
await this.workflowRunner.run(data, false, false, executionId); await this.workflowRunner.run(data, false, false, executionId);
})().catch((error: Error) => {
ErrorReporter.error(error);
this.logger.error(
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
{ executionId },
);
});
} }
stopTracking() { stopTracking() {