mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
calculate saving settings only once for an execution
This commit is contained in:
parent
e4ee65fdc5
commit
4471c0f066
|
@ -185,16 +185,11 @@ describe('Execution Lifecycle Hooks', () => {
|
||||||
};
|
};
|
||||||
|
|
||||||
describe('getWorkflowHooksMain', () => {
|
describe('getWorkflowHooksMain', () => {
|
||||||
|
const createHooks = () =>
|
||||||
|
getWorkflowHooksMain({ executionMode, workflowData, pushRef, retryOf }, executionId);
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
hooks = getWorkflowHooksMain(
|
hooks = createHooks();
|
||||||
{
|
|
||||||
executionMode,
|
|
||||||
workflowData,
|
|
||||||
pushRef,
|
|
||||||
retryOf,
|
|
||||||
},
|
|
||||||
executionId,
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
workflowEventTests();
|
workflowEventTests();
|
||||||
|
@ -245,6 +240,7 @@ describe('Execution Lifecycle Hooks', () => {
|
||||||
|
|
||||||
it('should save execution progress when enabled', async () => {
|
it('should save execution progress when enabled', async () => {
|
||||||
workflowData.settings = { saveExecutionProgress: true };
|
workflowData.settings = { saveExecutionProgress: true };
|
||||||
|
hooks = createHooks();
|
||||||
|
|
||||||
await hooks.executeHookFunctions('nodeExecuteAfter', [
|
await hooks.executeHookFunctions('nodeExecuteAfter', [
|
||||||
nodeName,
|
nodeName,
|
||||||
|
@ -260,6 +256,7 @@ describe('Execution Lifecycle Hooks', () => {
|
||||||
|
|
||||||
it('should not save execution progress when disabled', async () => {
|
it('should not save execution progress when disabled', async () => {
|
||||||
workflowData.settings = { saveExecutionProgress: false };
|
workflowData.settings = { saveExecutionProgress: false };
|
||||||
|
hooks = createHooks();
|
||||||
|
|
||||||
await hooks.executeHookFunctions('nodeExecuteAfter', [
|
await hooks.executeHookFunctions('nodeExecuteAfter', [
|
||||||
nodeName,
|
nodeName,
|
||||||
|
@ -391,6 +388,7 @@ describe('Execution Lifecycle Hooks', () => {
|
||||||
|
|
||||||
it('should soft delete manual executions when manual saving is disabled', async () => {
|
it('should soft delete manual executions when manual saving is disabled', async () => {
|
||||||
hooks.workflowData.settings = { saveManualExecutions: false };
|
hooks.workflowData.settings = { saveManualExecutions: false };
|
||||||
|
hooks = createHooks();
|
||||||
|
|
||||||
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
|
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
|
||||||
|
|
||||||
|
@ -399,6 +397,7 @@ describe('Execution Lifecycle Hooks', () => {
|
||||||
|
|
||||||
it('should not soft delete manual executions with waitTill', async () => {
|
it('should not soft delete manual executions with waitTill', async () => {
|
||||||
hooks.workflowData.settings = { saveManualExecutions: false };
|
hooks.workflowData.settings = { saveManualExecutions: false };
|
||||||
|
hooks = createHooks();
|
||||||
|
|
||||||
await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]);
|
await hooks.executeHookFunctions('workflowExecuteAfter', [waitingRun, {}]);
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
import { ErrorReporter } from 'n8n-core';
|
import { ErrorReporter } from 'n8n-core';
|
||||||
import { Logger } from 'n8n-core';
|
import { Logger } from 'n8n-core';
|
||||||
import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow';
|
import type { IRunExecutionData, ITaskData } from 'n8n-workflow';
|
||||||
|
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import type { IExecutionResponse } from '@/interfaces';
|
import type { IExecutionResponse } from '@/interfaces';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
|
|
||||||
import { saveExecutionProgress } from '../save-execution-progress';
|
import { saveExecutionProgress } from '../save-execution-progress';
|
||||||
import * as fnModule from '../to-save-settings';
|
|
||||||
|
|
||||||
mockInstance(Logger);
|
mockInstance(Logger);
|
||||||
const errorReporter = mockInstance(ErrorReporter);
|
const errorReporter = mockInstance(ErrorReporter);
|
||||||
|
@ -17,8 +16,8 @@ afterEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
});
|
});
|
||||||
|
|
||||||
const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData, string] = [
|
const commonArgs: [string, string, string, ITaskData, IRunExecutionData, string] = [
|
||||||
{} as IWorkflowBase,
|
'some-workflow-id',
|
||||||
'some-execution-id',
|
'some-execution-id',
|
||||||
'My Node',
|
'My Node',
|
||||||
{} as ITaskData,
|
{} as ITaskData,
|
||||||
|
@ -29,40 +28,25 @@ const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData,
|
||||||
const commonSettings = { error: true, success: true, manual: true };
|
const commonSettings = { error: true, success: true, manual: true };
|
||||||
|
|
||||||
test('should ignore if save settings say so', async () => {
|
test('should ignore if save settings say so', async () => {
|
||||||
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
|
await saveExecutionProgress({ ...commonSettings, progress: false }, ...commonArgs);
|
||||||
...commonSettings,
|
|
||||||
progress: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
await saveExecutionProgress(...commonArgs);
|
|
||||||
|
|
||||||
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
test('should ignore on leftover async call', async () => {
|
test('should ignore on leftover async call', async () => {
|
||||||
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
|
|
||||||
...commonSettings,
|
|
||||||
progress: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue({
|
executionRepository.findSingleExecution.mockResolvedValue({
|
||||||
finished: true,
|
finished: true,
|
||||||
} as IExecutionResponse);
|
} as IExecutionResponse);
|
||||||
|
|
||||||
await saveExecutionProgress(...commonArgs);
|
await saveExecutionProgress({ ...commonSettings, progress: true }, ...commonArgs);
|
||||||
|
|
||||||
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
test('should update execution when saving progress is enabled', async () => {
|
test('should update execution when saving progress is enabled', async () => {
|
||||||
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
|
|
||||||
...commonSettings,
|
|
||||||
progress: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
|
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
|
||||||
|
|
||||||
await saveExecutionProgress(...commonArgs);
|
await saveExecutionProgress({ ...commonSettings, progress: true }, ...commonArgs);
|
||||||
|
|
||||||
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
|
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
|
||||||
data: {
|
data: {
|
||||||
|
@ -82,18 +66,13 @@ test('should update execution when saving progress is enabled', async () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
test('should report error on failure', async () => {
|
test('should report error on failure', async () => {
|
||||||
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
|
|
||||||
...commonSettings,
|
|
||||||
progress: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
const error = new Error('Something went wrong');
|
const error = new Error('Something went wrong');
|
||||||
|
|
||||||
executionRepository.findSingleExecution.mockImplementation(() => {
|
executionRepository.findSingleExecution.mockImplementation(() => {
|
||||||
throw error;
|
throw error;
|
||||||
});
|
});
|
||||||
|
|
||||||
await saveExecutionProgress(...commonArgs);
|
await saveExecutionProgress({ ...commonSettings, progress: true }, ...commonArgs);
|
||||||
|
|
||||||
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
|
||||||
expect(errorReporter.error).toHaveBeenCalledWith(error);
|
expect(errorReporter.error).toHaveBeenCalledWith(error);
|
||||||
|
|
|
@ -32,7 +32,7 @@ import {
|
||||||
prepareExecutionDataForDbUpdate,
|
prepareExecutionDataForDbUpdate,
|
||||||
updateExistingExecution,
|
updateExistingExecution,
|
||||||
} from './shared/shared-hook-functions';
|
} from './shared/shared-hook-functions';
|
||||||
import { toSaveSettings } from './to-save-settings';
|
import { type ExecutionSavingSettings, toSaveSettings } from './to-save-settings';
|
||||||
|
|
||||||
function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks {
|
function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks {
|
||||||
const result: IWorkflowExecuteHooks = {
|
const result: IWorkflowExecuteHooks = {
|
||||||
|
@ -212,7 +212,7 @@ function hookFunctionsExternalHooks(): IWorkflowExecuteHooks {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function hookFunctionsSaveProgress(): IWorkflowExecuteHooks {
|
function hookFunctionsSaveProgress(saveSettings: ExecutionSavingSettings): IWorkflowExecuteHooks {
|
||||||
return {
|
return {
|
||||||
nodeExecuteAfter: [
|
nodeExecuteAfter: [
|
||||||
async function (
|
async function (
|
||||||
|
@ -222,7 +222,8 @@ function hookFunctionsSaveProgress(): IWorkflowExecuteHooks {
|
||||||
executionData: IRunExecutionData,
|
executionData: IRunExecutionData,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await saveExecutionProgress(
|
await saveExecutionProgress(
|
||||||
this.workflowData,
|
saveSettings,
|
||||||
|
this.workflowData.id,
|
||||||
this.executionId,
|
this.executionId,
|
||||||
nodeName,
|
nodeName,
|
||||||
data,
|
data,
|
||||||
|
@ -248,7 +249,7 @@ function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks {
|
||||||
/**
|
/**
|
||||||
* Returns hook functions to save workflow execution and call error workflow
|
* Returns hook functions to save workflow execution and call error workflow
|
||||||
*/
|
*/
|
||||||
function hookFunctionsSave(): IWorkflowExecuteHooks {
|
function hookFunctionsSave(saveSettings: ExecutionSavingSettings): IWorkflowExecuteHooks {
|
||||||
const logger = Container.get(Logger);
|
const logger = Container.get(Logger);
|
||||||
const errorReporter = Container.get(ErrorReporter);
|
const errorReporter = Container.get(ErrorReporter);
|
||||||
const executionRepository = Container.get(ExecutionRepository);
|
const executionRepository = Container.get(ExecutionRepository);
|
||||||
|
@ -288,8 +289,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const saveSettings = toSaveSettings(this.workflowData.settings);
|
|
||||||
|
|
||||||
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
|
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
|
||||||
/**
|
/**
|
||||||
* When manual executions are not being saved, we only soft-delete
|
* When manual executions are not being saved, we only soft-delete
|
||||||
|
@ -472,12 +471,13 @@ export function getWorkflowHooksIntegrated(
|
||||||
workflowData: IWorkflowBase,
|
workflowData: IWorkflowBase,
|
||||||
userId?: string,
|
userId?: string,
|
||||||
): WorkflowHooks {
|
): WorkflowHooks {
|
||||||
|
const saveSettings = toSaveSettings(workflowData.settings);
|
||||||
const hookFunctions = mergeHookFunctions(
|
const hookFunctions = mergeHookFunctions(
|
||||||
hookFunctionsWorkflowEvents(userId),
|
hookFunctionsWorkflowEvents(userId),
|
||||||
hookFunctionsNodeEvents(),
|
hookFunctionsNodeEvents(),
|
||||||
hookFunctionsFinalizeExecutionStatus(),
|
hookFunctionsFinalizeExecutionStatus(),
|
||||||
hookFunctionsSave(),
|
hookFunctionsSave(saveSettings),
|
||||||
hookFunctionsSaveProgress(),
|
hookFunctionsSaveProgress(saveSettings),
|
||||||
hookFunctionsExternalHooks(),
|
hookFunctionsExternalHooks(),
|
||||||
);
|
);
|
||||||
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
|
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
|
||||||
|
@ -492,11 +492,12 @@ export function getWorkflowHooksWorkerExecuter(
|
||||||
workflowData: IWorkflowBase,
|
workflowData: IWorkflowBase,
|
||||||
optionalParameters: IWorkflowHooksOptionalParameters = {},
|
optionalParameters: IWorkflowHooksOptionalParameters = {},
|
||||||
): WorkflowHooks {
|
): WorkflowHooks {
|
||||||
|
const saveSettings = toSaveSettings(workflowData.settings);
|
||||||
const toMerge = [
|
const toMerge = [
|
||||||
hookFunctionsNodeEvents(),
|
hookFunctionsNodeEvents(),
|
||||||
hookFunctionsFinalizeExecutionStatus(),
|
hookFunctionsFinalizeExecutionStatus(),
|
||||||
hookFunctionsSaveWorker(),
|
hookFunctionsSaveWorker(),
|
||||||
hookFunctionsSaveProgress(),
|
hookFunctionsSaveProgress(saveSettings),
|
||||||
hookFunctionsExternalHooks(),
|
hookFunctionsExternalHooks(),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -517,10 +518,11 @@ export function getWorkflowHooksWorkerMain(
|
||||||
workflowData: IWorkflowBase,
|
workflowData: IWorkflowBase,
|
||||||
optionalParameters: IWorkflowHooksOptionalParameters = {},
|
optionalParameters: IWorkflowHooksOptionalParameters = {},
|
||||||
): WorkflowHooks {
|
): WorkflowHooks {
|
||||||
|
const saveSettings = toSaveSettings(workflowData.settings);
|
||||||
const executionRepository = Container.get(ExecutionRepository);
|
const executionRepository = Container.get(ExecutionRepository);
|
||||||
const hookFunctions = mergeHookFunctions(
|
const hookFunctions = mergeHookFunctions(
|
||||||
hookFunctionsWorkflowEvents(),
|
hookFunctionsWorkflowEvents(),
|
||||||
hookFunctionsSaveProgress(),
|
hookFunctionsSaveProgress(saveSettings),
|
||||||
hookFunctionsExternalHooks(),
|
hookFunctionsExternalHooks(),
|
||||||
hookFunctionsFinalizeExecutionStatus(),
|
hookFunctionsFinalizeExecutionStatus(),
|
||||||
{
|
{
|
||||||
|
@ -529,8 +531,6 @@ export function getWorkflowHooksWorkerMain(
|
||||||
// Don't delete executions before they are finished
|
// Don't delete executions before they are finished
|
||||||
if (!fullRunData.finished) return;
|
if (!fullRunData.finished) return;
|
||||||
|
|
||||||
const saveSettings = toSaveSettings(this.workflowData.settings);
|
|
||||||
|
|
||||||
const isManualMode = this.mode === 'manual';
|
const isManualMode = this.mode === 'manual';
|
||||||
|
|
||||||
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
|
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
|
||||||
|
@ -579,13 +579,14 @@ export function getWorkflowHooksMain(
|
||||||
data: IWorkflowExecutionDataProcess,
|
data: IWorkflowExecutionDataProcess,
|
||||||
executionId: string,
|
executionId: string,
|
||||||
): WorkflowHooks {
|
): WorkflowHooks {
|
||||||
|
const saveSettings = toSaveSettings(data.workflowData.settings);
|
||||||
const hookFunctions = mergeHookFunctions(
|
const hookFunctions = mergeHookFunctions(
|
||||||
hookFunctionsWorkflowEvents(),
|
hookFunctionsWorkflowEvents(),
|
||||||
hookFunctionsNodeEvents(),
|
hookFunctionsNodeEvents(),
|
||||||
hookFunctionsFinalizeExecutionStatus(),
|
hookFunctionsFinalizeExecutionStatus(),
|
||||||
hookFunctionsSave(),
|
hookFunctionsSave(saveSettings),
|
||||||
hookFunctionsPush(),
|
hookFunctionsPush(),
|
||||||
hookFunctionsSaveProgress(),
|
hookFunctionsSaveProgress(saveSettings),
|
||||||
hookFunctionsExternalHooks(),
|
hookFunctionsExternalHooks(),
|
||||||
);
|
);
|
||||||
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
|
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
|
||||||
|
|
|
@ -1,21 +1,20 @@
|
||||||
import { Container } from '@n8n/di';
|
import { Container } from '@n8n/di';
|
||||||
import { ErrorReporter, Logger } from 'n8n-core';
|
import { ErrorReporter, Logger } from 'n8n-core';
|
||||||
import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow';
|
import type { IRunExecutionData, ITaskData } from 'n8n-workflow';
|
||||||
|
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
|
|
||||||
import { toSaveSettings } from './to-save-settings';
|
import { type ExecutionSavingSettings } from './to-save-settings';
|
||||||
|
|
||||||
export async function saveExecutionProgress(
|
export async function saveExecutionProgress(
|
||||||
workflowData: IWorkflowBase,
|
saveSettings: ExecutionSavingSettings,
|
||||||
|
workflowId: string,
|
||||||
executionId: string,
|
executionId: string,
|
||||||
nodeName: string,
|
nodeName: string,
|
||||||
data: ITaskData,
|
data: ITaskData,
|
||||||
executionData: IRunExecutionData,
|
executionData: IRunExecutionData,
|
||||||
pushRef?: string,
|
pushRef?: string,
|
||||||
) {
|
) {
|
||||||
const saveSettings = toSaveSettings(workflowData.settings);
|
|
||||||
|
|
||||||
if (!saveSettings.progress) return;
|
if (!saveSettings.progress) return;
|
||||||
|
|
||||||
const logger = Container.get(Logger);
|
const logger = Container.get(Logger);
|
||||||
|
@ -97,7 +96,7 @@ export async function saveExecutionProgress(
|
||||||
...error,
|
...error,
|
||||||
executionId,
|
executionId,
|
||||||
pushRef,
|
pushRef,
|
||||||
workflowId: workflowData.id,
|
workflowId,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,13 @@ import type { IWorkflowSettings } from 'n8n-workflow';
|
||||||
|
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
|
|
||||||
|
export type ExecutionSavingSettings = {
|
||||||
|
error: boolean | 'all' | 'none';
|
||||||
|
success: boolean | 'all' | 'none';
|
||||||
|
manual: boolean;
|
||||||
|
progress: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return whether a workflow execution is configured to be saved or not:
|
* Return whether a workflow execution is configured to be saved or not:
|
||||||
*
|
*
|
||||||
|
@ -10,7 +17,7 @@ import config from '@/config';
|
||||||
* - `manual`: Whether to save successful or failed manual executions.
|
* - `manual`: Whether to save successful or failed manual executions.
|
||||||
* - `progress`: Whether to save execution progress, i.e. after each node's execution.
|
* - `progress`: Whether to save execution progress, i.e. after each node's execution.
|
||||||
*/
|
*/
|
||||||
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) {
|
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}): ExecutionSavingSettings {
|
||||||
const DEFAULTS = {
|
const DEFAULTS = {
|
||||||
ERROR: config.getEnv('executions.saveDataOnError'),
|
ERROR: config.getEnv('executions.saveDataOnError'),
|
||||||
SUCCESS: config.getEnv('executions.saveDataOnSuccess'),
|
SUCCESS: config.getEnv('executions.saveDataOnSuccess'),
|
||||||
|
|
Loading…
Reference in a new issue