fix: Retain execution data between partial executions (new flow) (#11828)

This commit is contained in:
Danny Martini 2024-11-26 13:32:39 +01:00 committed by GitHub
parent 75e2b6fd9e
commit 3320436a6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 159 additions and 14 deletions

View file

@ -9,6 +9,7 @@ type ExecutionStarted = {
workflowId: string;
workflowName?: string;
retryOf?: string;
flattedRunData: string;
};
};

View file

@ -281,7 +281,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
@ -302,6 +302,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
retryOf: this.retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
? stringify(data.resultData.runData)
: stringify({}),
},
pushRef,
);

View file

@ -137,7 +137,10 @@ export class WorkflowRunner {
// Create a failed execution with the data for the node, save it and abort execution
const runData = generateFailedExecutionFromError(data.executionMode, error, error.node);
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
await workflowHooks.executeHookFunctions('workflowExecuteBefore', [
undefined,
data.executionData,
]);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
responsePromise?.reject(error);
this.activeExecutions.finalizeExecution(executionId);
@ -401,7 +404,7 @@ export class WorkflowRunner {
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await hooks.executeHookFunctions('workflowExecuteBefore', []);
await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]);
} catch (error) {
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.

View file

@ -5,13 +5,14 @@ interface StubNode {
name: string;
parameters?: INodeParameters;
disabled?: boolean;
type?: string;
}
export function createNodeData(stubData: StubNode): INode {
return {
name: stubData.name,
parameters: stubData.parameters ?? {},
type: 'test.set',
type: stubData.type ?? 'n8n-nodes-base.set',
typeVersion: 1,
id: 'uuid-1234',
position: [100, 100],

View file

@ -948,7 +948,7 @@ export class WorkflowExecute {
const returnPromise = (async () => {
try {
if (!this.additionalData.restartExecutionId) {
await this.executeHook('workflowExecuteBefore', [workflow]);
await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]);
}
} catch (error) {
const e = error as unknown as ExecutionBaseError;

View file

@ -206,7 +206,7 @@ describe('WorkflowExecute', () => {
}
});
describe('WorkflowExecute, NodeExecutionOutput type test', () => {
test('WorkflowExecute, NodeExecutionOutput type test', () => {
//TODO Add more tests here when execution hints are added to some node types
const nodeExecutionOutput = new NodeExecutionOutput(
[[{ json: { data: 123 } }]],

View file

@ -2,7 +2,7 @@ import { stringify } from 'flatted';
import { useRouter } from 'vue-router';
import { createPinia, setActivePinia } from 'pinia';
import type { PushMessage, PushPayload } from '@n8n/api-types';
import type { ITaskData, WorkflowOperationError } from 'n8n-workflow';
import type { ITaskData, WorkflowOperationError, IRunData } from 'n8n-workflow';
import { usePushConnection } from '@/composables/usePushConnection';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
@ -10,6 +10,7 @@ import { useOrchestrationStore } from '@/stores/orchestration.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useToast } from '@/composables/useToast';
import type { IExecutionResponse } from '@/Interface';
vi.mock('vue-router', () => {
return {
@ -162,7 +163,7 @@ describe('usePushConnection()', () => {
expect(result).toBeTruthy();
expect(workflowsStore.workflowExecutionData).toBeDefined();
expect(uiStore.isActionActive['workflowRunning']).toBeTruthy();
expect(uiStore.isActionActive.workflowRunning).toBeTruthy();
expect(toast.showMessage).toHaveBeenCalledWith({
title: 'Workflow executed successfully',
@ -236,5 +237,62 @@ describe('usePushConnection()', () => {
expect(pushConnection.retryTimeout).not.toBeNull();
});
});
describe('executionStarted', async () => {
it("enqueues messages if we don't have the active execution id yet", async () => {
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'executionStarted',
data: {
executionId: '1',
mode: 'manual',
startedAt: new Date(),
workflowId: '1',
flattedRunData: stringify({}),
},
};
expect(pushConnection.retryTimeout.value).toBeNull();
expect(pushConnection.pushMessageQueue.value.length).toBe(0);
const result = await pushConnection.pushMessageReceived(event);
expect(result).toBe(false);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value).toContainEqual({
message: event,
retriesLeft: 5,
});
expect(pushConnection.retryTimeout).not.toBeNull();
});
it('overwrites the run data in the workflow store', async () => {
// ARRANGE
uiStore.isActionActive.workflowRunning = true;
const oldRunData: IRunData = { foo: [] };
workflowsStore.workflowExecutionData = {
data: { resultData: { runData: oldRunData } },
} as IExecutionResponse;
const newRunData: IRunData = { bar: [] };
const event: PushMessage = {
type: 'executionStarted',
data: {
executionId: '1',
flattedRunData: stringify(newRunData),
mode: 'manual',
startedAt: new Date(),
workflowId: '1',
},
};
workflowsStore.activeExecutionId = event.data.executionId;
// ACT
const result = await pushConnection.pushMessageReceived(event);
// ASSERT
expect(result).toBe(true);
expect(workflowsStore.workflowExecutionData.data?.resultData.runData).toEqual(newRunData);
});
});
});
});

View file

@ -146,7 +146,11 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
return false;
}
if (receivedData.type === 'nodeExecuteAfter' || receivedData.type === 'nodeExecuteBefore') {
if (
receivedData.type === 'nodeExecuteAfter' ||
receivedData.type === 'nodeExecuteBefore' ||
receivedData.type === 'executionStarted'
) {
if (!uiStore.isActionActive['workflowRunning']) {
// No workflow is running so ignore the messages
return false;
@ -455,7 +459,11 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
} else if (receivedData.type === 'executionWaiting') {
// Nothing to do
} else if (receivedData.type === 'executionStarted') {
// Nothing to do
if (workflowsStore.workflowExecutionData?.data && receivedData.data.flattedRunData) {
workflowsStore.workflowExecutionData.data.resultData.runData = parse(
receivedData.data.flattedRunData,
);
}
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;

View file

@ -12,6 +12,8 @@ import { useUIStore } from '@/stores/ui.store';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useToast } from './useToast';
import { useI18n } from '@/composables/useI18n';
import { useLocalStorage } from '@vueuse/core';
import { ref } from 'vue';
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
@ -29,6 +31,16 @@ vi.mock('@/stores/workflows.store', () => ({
}),
}));
vi.mock('@vueuse/core', async () => {
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
const originalModule = await vi.importActual<typeof import('@vueuse/core')>('@vueuse/core');
return {
...originalModule, // Keep all original exports
useLocalStorage: vi.fn().mockReturnValue({ value: undefined }), // Mock useLocalStorage
};
});
vi.mock('@/composables/useTelemetry', () => ({
useTelemetry: vi.fn().mockReturnValue({ track: vi.fn() }),
}));
@ -99,6 +111,7 @@ describe('useRunWorkflow({ router })', () => {
beforeEach(() => {
uiStore.activeActions = [];
vi.clearAllMocks();
});
describe('runWorkflowApi()', () => {
@ -168,8 +181,6 @@ describe('useRunWorkflow({ router })', () => {
it('should prevent execution and show error message when workflow is active with single webhook trigger', async () => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
const router = useRouter();
const workflowsStore = useWorkflowsStore();
const toast = useToast();
const i18n = useI18n();
const { runWorkflow } = useRunWorkflow({ router });
@ -250,6 +261,66 @@ describe('useRunWorkflow({ router })', () => {
const result = await runWorkflow({});
expect(result).toEqual(mockExecutionResponse);
});
it('does not use the original run data if `PartialExecution.version` is set to 0', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };
const mockRunData = { nodeName: [] };
const { runWorkflow } = useRunWorkflow({ router });
vi.mocked(useLocalStorage).mockReturnValueOnce(ref(0));
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
name: 'Test Workflow',
} as Workflow);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({
id: 'workflowId',
nodes: [],
} as unknown as IWorkflowData);
vi.mocked(workflowsStore).getWorkflowRunData = mockRunData;
// ACT
const result = await runWorkflow({});
// ASSERT
expect(result).toEqual(mockExecutionResponse);
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1);
expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({
data: { resultData: { runData: {} } },
});
});
it('retains the original run data if `PartialExecution.version` is set to 1', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };
const mockRunData = { nodeName: [] };
const { runWorkflow } = useRunWorkflow({ router });
vi.mocked(useLocalStorage).mockReturnValueOnce(ref(1));
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
name: 'Test Workflow',
} as Workflow);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({
id: 'workflowId',
nodes: [],
} as unknown as IWorkflowData);
vi.mocked(workflowsStore).getWorkflowRunData = mockRunData;
// ACT
const result = await runWorkflow({});
// ASSERT
expect(result).toEqual(mockExecutionResponse);
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1);
expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({
data: { resultData: { runData: mockRunData } },
});
});
});
describe('consolidateRunDataAndStartNodes()', () => {

View file

@ -259,7 +259,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
executedNode,
data: {
resultData: {
runData: newRunData ?? {},
runData: startRunData.runData ?? {},
pinData: workflowData.pinData,
workflowData,
},

View file

@ -2244,7 +2244,7 @@ export interface IWorkflowExecuteHooks {
>;
nodeExecuteBefore?: Array<(nodeName: string) => Promise<void>>;
workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise<void>>;
workflowExecuteBefore?: Array<(workflow: Workflow, data: IRunExecutionData) => Promise<void>>;
workflowExecuteBefore?: Array<(workflow?: Workflow, data?: IRunExecutionData) => Promise<void>>;
sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise<void>>;
}