feat(core): Improve handling of manual executions with wait nodes (#11750)

Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-18 12:58:26 +01:00 committed by GitHub
parent d5ba1a059b
commit 61696c3db3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 325 additions and 402 deletions

View file

@ -12,6 +12,13 @@ type ExecutionStarted = {
};
};
type ExecutionWaiting = {
type: 'executionWaiting';
data: {
executionId: string;
};
};
type ExecutionFinished = {
type: 'executionFinished';
data: {
@ -45,6 +52,7 @@ type NodeExecuteAfter = {
export type ExecutionPushMessage =
| ExecutionStarted
| ExecutionWaiting
| ExecutionFinished
| ExecutionRecovered
| NodeExecuteBefore

View file

@ -1,5 +1,4 @@
import {
deepCopy,
ErrorReporterProxy,
type IRunExecutionData,
type ITaskData,
@ -87,37 +86,6 @@ test('should update execution when saving progress is enabled', async () => {
expect(reporterSpy).not.toHaveBeenCalled();
});
test('should update execution when saving progress is disabled, but waitTill is defined', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: false,
});
const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error');
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
const args = deepCopy(commonArgs);
args[4].waitTill = new Date();
await saveExecutionProgress(...args);
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
},
},
startData: {},
},
status: 'running',
});
expect(reporterSpy).not.toHaveBeenCalled();
});
test('should report error on failure', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,

View file

@ -16,7 +16,7 @@ export async function saveExecutionProgress(
) {
const saveSettings = toSaveSettings(workflowData.settings);
if (!saveSettings.progress && !executionData.waitTill) return;
if (!saveSettings.progress) return;
const logger = Container.get(Logger);

View file

@ -18,20 +18,20 @@ export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) {
PROGRESS: config.getEnv('executions.saveExecutionProgress'),
};
const {
saveDataErrorExecution = DEFAULTS.ERROR,
saveDataSuccessExecution = DEFAULTS.SUCCESS,
saveManualExecutions = DEFAULTS.MANUAL,
saveExecutionProgress = DEFAULTS.PROGRESS,
} = workflowSettings;
return {
error: workflowSettings.saveDataErrorExecution
? workflowSettings.saveDataErrorExecution !== 'none'
: DEFAULTS.ERROR !== 'none',
success: workflowSettings.saveDataSuccessExecution
? workflowSettings.saveDataSuccessExecution !== 'none'
: DEFAULTS.SUCCESS !== 'none',
manual:
workflowSettings === undefined || workflowSettings.saveManualExecutions === 'DEFAULT'
? DEFAULTS.MANUAL
: (workflowSettings.saveManualExecutions ?? DEFAULTS.MANUAL),
progress:
workflowSettings === undefined || workflowSettings.saveExecutionProgress === 'DEFAULT'
? DEFAULTS.PROGRESS
: (workflowSettings.saveExecutionProgress ?? DEFAULTS.PROGRESS),
error: saveDataErrorExecution === 'DEFAULT' ? DEFAULTS.ERROR : saveDataErrorExecution === 'all',
success:
saveDataSuccessExecution === 'DEFAULT'
? DEFAULTS.SUCCESS
: saveDataSuccessExecution === 'all',
manual: saveManualExecutions === 'DEFAULT' ? DEFAULTS.MANUAL : saveManualExecutions,
progress: saveExecutionProgress === 'DEFAULT' ? DEFAULTS.PROGRESS : saveExecutionProgress,
};
}

View file

@ -464,6 +464,11 @@ export async function executeWebhook(
projectId: project?.id,
};
// When resuming from a wait node, copy over the pushRef from the execution-data
if (!runData.pushRef) {
runData.pushRef = runExecutionData.pushRef;
}
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') {
responsePromise = createDeferredPromise<IN8nHttpFullResponse>();

View file

@ -307,7 +307,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks): Promise<void> {
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
@ -318,7 +318,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId,
});
pushInstance.send('executionFinished', { executionId }, pushRef);
const pushType =
fullRunData.status === 'waiting' ? 'executionWaiting' : 'executionFinished';
pushInstance.send(pushType, { executionId }, pushRef);
},
],
};
@ -430,22 +432,21 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill) {
if (!fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
return;
}
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
return;
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
@ -1110,6 +1111,9 @@ export function getWorkflowHooksWorkerMain(
hookFunctions.nodeExecuteAfter = [];
hookFunctions.workflowExecuteAfter = [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
const saveSettings = toSaveSettings(this.workflowData.settings);

View file

@ -740,14 +740,6 @@
}
return;
}).then(() => {
window.addEventListener('storage', function(event) {
if (event.key === 'n8n_redirect_to_next_form_test_page' && event.newValue) {
const newUrl = event.newValue;
localStorage.removeItem('n8n_redirect_to_next_form_test_page');
window.location.replace(newUrl);
}
});
})
.catch(function (error) {
console.error('Error:', error);

View file

@ -916,7 +916,6 @@ export class WorkflowExecute {
let nodeSuccessData: INodeExecutionData[][] | null | undefined;
let runIndex: number;
let startTime: number;
let taskData: ITaskData;
if (this.runExecutionData.startData === undefined) {
this.runExecutionData.startData = {};
@ -1446,13 +1445,13 @@ export class WorkflowExecute {
this.runExecutionData.resultData.runData[executionNode.name] = [];
}
taskData = {
const taskData: ITaskData = {
hints: executionHints,
startTime,
executionTime: new Date().getTime() - startTime,
source: !executionData.source ? [] : executionData.source.main,
metadata: executionData.metadata,
executionStatus: 'success',
executionStatus: this.runExecutionData.waitTill ? 'waiting' : 'success',
};
if (executionError !== undefined) {

View file

@ -212,7 +212,10 @@ const activeNodeType = computed(() => {
return nodeTypesStore.getNodeType(activeNode.value.type, activeNode.value.typeVersion);
});
const waitingMessage = computed(() => waitingNodeTooltip());
const waitingMessage = computed(() => {
const parentNode = parentNodes.value[0];
return parentNode && waitingNodeTooltip(workflowsStore.getNodeByName(parentNode.name));
});
watch(
inputMode,

View file

@ -65,7 +65,7 @@ const lastPopupCountUpdate = ref(0);
const codeGenerationInProgress = ref(false);
const router = useRouter();
const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution } = useRunWorkflow({ router });
const { runWorkflow, stopCurrentExecution } = useRunWorkflow({ router });
const workflowsStore = useWorkflowsStore();
const externalHooks = useExternalHooks();
@ -353,17 +353,10 @@ async function onClick() {
telemetry.track('User clicked execute node button', telemetryPayload);
await externalHooks.run('nodeExecuteButton.onClick', telemetryPayload);
if (workflowsStore.isWaitingExecution) {
await runWorkflowResolvePending({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
} else {
await runWorkflow({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
}
await runWorkflow({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
emit('execute');
}

View file

@ -352,7 +352,7 @@ const activatePane = () => {
<template #node-waiting>
<N8nText :bold="true" color="text-dark" size="large">Waiting for input</N8nText>
<N8nText v-n8n-html="waitingNodeTooltip()"></N8nText>
<N8nText v-n8n-html="waitingNodeTooltip(node)"></N8nText>
</template>
<template #no-output-data>

View file

@ -200,12 +200,13 @@ const displayMode = computed(() =>
);
const isReadOnlyRoute = computed(() => route.meta.readOnlyCanvas === true);
const isWaitNodeWaiting = computed(
() =>
workflowExecution.value?.status === 'waiting' &&
workflowExecution.value.data?.waitTill &&
workflowExecution.value?.data?.resultData?.lastNodeExecuted === node.value?.name,
);
const isWaitNodeWaiting = computed(() => {
return (
node.value?.name &&
workflowExecution.value?.data?.resultData?.runData?.[node.value?.name]?.[props.runIndex]
?.executionStatus === 'waiting'
);
});
const { activeNode } = storeToRefs(ndvStore);
const nodeType = computed(() => {
@ -1508,7 +1509,11 @@ defineExpose({ enterEditMode });
</div>
<div ref="dataContainerRef" :class="$style.dataContainer" data-test-id="ndv-data-container">
<div v-if="isExecuting" :class="$style.center" data-test-id="ndv-executing">
<div
v-if="isExecuting && !isWaitNodeWaiting"
:class="$style.center"
data-test-id="ndv-executing"
>
<div :class="$style.spinner"><N8nSpinner type="ring" /></div>
<N8nText>{{ executingMessage }}</N8nText>
</div>

View file

@ -446,13 +446,14 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
runDataExecutedStartData: iRunExecutionData.startData,
resultDataError: iRunExecutionData.resultData.error,
});
} else if (receivedData.type === 'executionWaiting') {
// Nothing to do
} else if (receivedData.type === 'executionStarted') {
// Nothing to do
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
workflowsStore.addNodeExecutionData(pushData);
workflowsStore.removeExecutingNode(pushData.nodeName);
workflowsStore.updateNodeExecutionData(pushData);
void assistantStore.onNodeExecution(pushData);
} else if (receivedData.type === 'nodeExecuteBefore') {
// A node started to be executed. Set it as executing.

View file

@ -6,7 +6,7 @@ import { ExpressionError, type IPinData, type IRunData, type Workflow } from 'n8
import { useRootStore } from '@/stores/root.store';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import type { IExecutionResponse, IStartRunData, IWorkflowData } from '@/Interface';
import type { IStartRunData, IWorkflowData } from '@/Interface';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
@ -321,77 +321,4 @@ describe('useRunWorkflow({ router })', () => {
expect(result.runData).toEqual(undefined);
});
});
describe('useRunWorkflow({ router }) - runWorkflowResolvePending', () => {
let uiStore: ReturnType<typeof useUIStore>;
let workflowsStore: ReturnType<typeof useWorkflowsStore>;
let router: ReturnType<typeof useRouter>;
beforeAll(() => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
rootStore = useRootStore();
uiStore = useUIStore();
workflowsStore = useWorkflowsStore();
router = useRouter();
workflowHelpers = useWorkflowHelpers({ router });
});
beforeEach(() => {
uiStore.activeActions = [];
vi.mocked(workflowsStore).runWorkflow.mockReset();
});
it('should resolve when runWorkflow finished', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: true,
workflowData: { nodes: [] },
} as unknown as IExecutionResponse);
vi.mocked(workflowsStore).workflowExecutionData = {
id: '123',
} as unknown as IExecutionResponse;
const result = await runWorkflowResolvePending({});
expect(result).toEqual(mockExecutionResponse);
});
it('should return when workflowExecutionData is null', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: true,
} as unknown as IExecutionResponse);
vi.mocked(workflowsStore).workflowExecutionData = null;
const result = await runWorkflowResolvePending({});
expect(result).toEqual(mockExecutionResponse);
});
it('should handle workflow execution error properly', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: false,
status: 'error',
} as unknown as IExecutionResponse);
await runWorkflowResolvePending({});
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalled();
expect(workflowsStore.workflowExecutionData).toBe(null);
});
});
});

View file

@ -17,17 +17,17 @@ import type {
IDataObject,
} from 'n8n-workflow';
import { FORM_NODE_TYPE, NodeConnectionType } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { useToast } from '@/composables/useToast';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { CHAT_TRIGGER_NODE_TYPE, FORM_TRIGGER_NODE_TYPE, WAIT_NODE_TYPE } from '@/constants';
import { CHAT_TRIGGER_NODE_TYPE } from '@/constants';
import { useRootStore } from '@/stores/root.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { displayForm, openPopUpWindow } from '@/utils/executionUtils';
import { displayForm } from '@/utils/executionUtils';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import type { useRouter } from 'vue-router';
@ -37,8 +37,6 @@ import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store';
import { useLocalStorage } from '@vueuse/core';
const FORM_RELOAD = 'n8n_redirect_to_next_form_test_page';
export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof useRouter> }) {
const nodeHelpers = useNodeHelpers();
const workflowHelpers = useWorkflowHelpers({ router: useRunWorkflowOpts.router });
@ -303,152 +301,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
}
}
function getFormResumeUrl(node: INode, executionId: string) {
const { webhookSuffix } = (node.parameters.options ?? {}) as IDataObject;
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
const testUrl = `${rootStore.formWaitingUrl}/${executionId}${suffix}`;
return testUrl;
}
async function runWorkflowResolvePending(options: {
destinationNode?: string;
triggerNode?: string;
nodeData?: ITaskData;
source?: string;
}): Promise<IExecutionPushResponse | undefined> {
let runWorkflowApiResponse = await runWorkflow(options);
let { executionId } = runWorkflowApiResponse || {};
const MAX_DELAY = 3000;
const waitForWebhook = async (): Promise<string> => {
return await new Promise<string>((resolve) => {
let delay = 300;
let timeoutId: NodeJS.Timeout | null = null;
const checkWebhook = async () => {
await useExternalHooks().run('workflowRun.runWorkflow', {
nodeName: options.destinationNode,
source: options.source,
});
if (workflowsStore.activeExecutionId) {
executionId = workflowsStore.activeExecutionId;
runWorkflowApiResponse = { executionId };
if (timeoutId) clearTimeout(timeoutId);
resolve(executionId);
}
delay = Math.min(delay * 1.1, MAX_DELAY);
timeoutId = setTimeout(checkWebhook, delay);
};
timeoutId = setTimeout(checkWebhook, delay);
});
};
if (!executionId) executionId = await waitForWebhook();
let isFormShown =
!options.destinationNode &&
workflowsStore.allNodes.some(
(node) =>
node.type === FORM_TRIGGER_NODE_TYPE && !workflowsStore?.pinnedWorkflowData?.[node.name],
);
const resolveWaitingNodesData = async (): Promise<void> => {
return await new Promise<void>((resolve) => {
let delay = 300;
let timeoutId: NodeJS.Timeout | null = null;
const processExecution = async () => {
await useExternalHooks().run('workflowRun.runWorkflow', {
nodeName: options.destinationNode,
source: options.source,
});
const execution = await workflowsStore.getExecution((executionId as string) || '');
localStorage.removeItem(FORM_RELOAD);
if (!execution || workflowsStore.workflowExecutionData === null) {
uiStore.removeActiveAction('workflowRunning');
if (timeoutId) clearTimeout(timeoutId);
resolve();
return;
}
const { lastNodeExecuted } = execution.data?.resultData || {};
const lastNode = execution.workflowData.nodes.find((node) => {
return node.name === lastNodeExecuted;
});
if (
execution.finished ||
['error', 'canceled', 'crashed', 'success'].includes(execution.status)
) {
workflowsStore.setWorkflowExecutionData(execution);
uiStore.removeActiveAction('workflowRunning');
workflowsStore.activeExecutionId = null;
if (timeoutId) clearTimeout(timeoutId);
resolve();
return;
}
if (execution.status === 'waiting' && execution.data?.waitTill) {
delete execution.data.resultData.runData[
execution.data.resultData.lastNodeExecuted as string
];
workflowsStore.setWorkflowExecutionRunData(execution.data);
if (
lastNode &&
(lastNode.type === FORM_NODE_TYPE ||
(lastNode.type === WAIT_NODE_TYPE && lastNode.parameters.resume === 'form'))
) {
let testUrl = getFormResumeUrl(lastNode, executionId as string);
if (isFormShown) {
localStorage.setItem(FORM_RELOAD, testUrl);
} else {
if (options.destinationNode) {
// Check if the form trigger has starting data
// if not do not show next form as trigger would redirect to page
// otherwise there would be duplicate popup
const formTrigger = execution?.workflowData.nodes.find((node) => {
return node.type === FORM_TRIGGER_NODE_TYPE;
});
const runNodeFilter = execution?.data?.startData?.runNodeFilter || [];
if (formTrigger && !runNodeFilter.includes(formTrigger.name)) {
isFormShown = true;
}
}
if (!isFormShown) {
if (lastNode.type === FORM_NODE_TYPE) {
testUrl = `${rootStore.formWaitingUrl}/${executionId}`;
} else {
testUrl = getFormResumeUrl(lastNode, executionId as string);
}
isFormShown = true;
if (testUrl) openPopUpWindow(testUrl);
}
}
}
}
delay = Math.min(delay * 1.1, MAX_DELAY);
timeoutId = setTimeout(processExecution, delay);
};
timeoutId = setTimeout(processExecution, delay);
});
};
await resolveWaitingNodesData();
return runWorkflowApiResponse;
}
function consolidateRunDataAndStartNodes(
directParentNodes: string[],
runData: IRunData | null,
@ -514,10 +366,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
if (execution === undefined) {
// execution finished but was not saved (e.g. due to low connectivity)
workflowsStore.executingNode.length = 0;
uiStore.removeActiveAction('workflowRunning');
workflowHelpers.setDocumentTitle(workflowsStore.workflowName, 'IDLE');
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionCatch.unsaved.title'),
message: i18n.baseText('nodeView.showMessage.stopExecutionCatch.unsaved.message'),
@ -532,10 +380,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
} as IRun;
workflowHelpers.setDocumentTitle(execution.workflowData.name, 'IDLE');
workflowsStore.executingNode.length = 0;
workflowsStore.setWorkflowExecutionData(executedData as IExecutionResponse);
uiStore.removeActiveAction('workflowRunning');
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionCatch.title'),
message: i18n.baseText('nodeView.showMessage.stopExecutionCatch.message'),
@ -544,6 +389,8 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
} else {
toast.showError(error, i18n.baseText('nodeView.showError.stopExecution.title'));
}
} finally {
workflowsStore.markExecutionAsStopped();
}
}
@ -559,7 +406,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
return {
consolidateRunDataAndStartNodes,
runWorkflow,
runWorkflowResolvePending,
runWorkflowApi,
stopCurrentExecution,
stopWaitingForWebhook,

View file

@ -19,6 +19,7 @@ import { useUIStore } from '@/stores/ui.store';
import type { PushPayload } from '@n8n/api-types';
import { flushPromises } from '@vue/test-utils';
import { useNDVStore } from '@/stores/ndv.store';
import { mock } from 'vitest-mock-extended';
vi.mock('@/stores/ndv.store', () => ({
useNDVStore: vi.fn(() => ({
@ -523,20 +524,24 @@ describe('useWorkflowsStore', () => {
});
});
describe('addNodeExecutionData', () => {
const { successEvent, errorEvent, executionReponse } = generateMockExecutionEvents();
it('should throw error if not initalized', () => {
expect(() => workflowsStore.addNodeExecutionData(successEvent)).toThrowError();
describe('updateNodeExecutionData', () => {
const { successEvent, errorEvent, executionResponse } = generateMockExecutionEvents();
it('should throw error if not initialized', () => {
expect(() => workflowsStore.updateNodeExecutionData(successEvent)).toThrowError();
});
it('should add node success run data', () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
workflowsStore.setWorkflowExecutionData(executionResponse);
workflowsStore.nodesByName[successEvent.nodeName] = mock<INodeUi>({
type: 'n8n-nodes-base.manualTrigger',
});
// ACT
workflowsStore.addNodeExecutionData(successEvent);
workflowsStore.updateNodeExecutionData(successEvent);
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
...executionResponse,
data: {
resultData: {
runData: {
@ -548,7 +553,7 @@ describe('useWorkflowsStore', () => {
});
it('should add node error event and track errored executions', async () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
workflowsStore.setWorkflowExecutionData(executionResponse);
workflowsStore.addNode({
parameters: {},
id: '554c7ff4-7ee2-407c-8931-e34234c5056a',
@ -561,11 +566,11 @@ describe('useWorkflowsStore', () => {
getNodeType.mockReturnValue(getMockEditFieldsNode());
// ACT
workflowsStore.addNodeExecutionData(errorEvent);
workflowsStore.updateNodeExecutionData(errorEvent);
await flushPromises();
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
...executionResponse,
data: {
resultData: {
runData: {
@ -636,7 +641,7 @@ function getMockEditFieldsNode() {
}
function generateMockExecutionEvents() {
const executionReponse: IExecutionResponse = {
const executionResponse: IExecutionResponse = {
id: '1',
workflowData: {
id: '1',
@ -737,5 +742,5 @@ function generateMockExecutionEvents() {
},
};
return { executionReponse, errorEvent, successEvent };
return { executionResponse, errorEvent, successEvent };
}

View file

@ -83,6 +83,7 @@ import { TelemetryHelpers } from 'n8n-workflow';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useRouter } from 'vue-router';
import { useSettingsStore } from './settings.store';
import { openPopUpWindow } from '@/utils/executionUtils';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '',
@ -114,6 +115,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const router = useRouter();
const workflowHelpers = useWorkflowHelpers({ router });
const settingsStore = useSettingsStore();
const rootStore = useRootStore();
// -1 means the backend chooses the default
// 0 is the old flow
// 1 is the new flow
@ -137,6 +140,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const chatMessages = ref<string[]>([]);
const isChatPanelOpen = ref(false);
const isLogsPanelOpen = ref(false);
const formPopupWindow = ref<Window | null>(null);
const workflowName = computed(() => workflow.value.name);
@ -453,14 +457,12 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function getWorkflowFromUrl(url: string): Promise<IWorkflowDb> {
const rootStore = useRootStore();
return await makeRestApiRequest(rootStore.restApiContext, 'GET', '/workflows/from-url', {
url,
});
}
async function getActivationError(id: string): Promise<string | undefined> {
const rootStore = useRootStore();
return await makeRestApiRequest(
rootStore.restApiContext,
'GET',
@ -469,8 +471,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchAllWorkflows(projectId?: string): Promise<IWorkflowDb[]> {
const rootStore = useRootStore();
const filter = {
projectId,
};
@ -484,7 +484,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchWorkflow(id: string): Promise<IWorkflowDb> {
const rootStore = useRootStore();
const workflowData = await workflowsApi.getWorkflow(rootStore.restApiContext, id);
addWorkflow(workflowData);
return workflowData;
@ -497,8 +496,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
settings: { ...defaults.settings },
};
try {
const rootStore = useRootStore();
const data: IDataObject = {
name,
projectId,
@ -632,7 +629,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function deleteWorkflow(id: string) {
const rootStore = useRootStore();
await makeRestApiRequest(rootStore.restApiContext, 'DELETE', `/workflows/${id}`);
const { [id]: deletedWorkflow, ...workflows } = workflowsById.value;
workflowsById.value = workflows;
@ -676,7 +672,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchActiveWorkflows(): Promise<string[]> {
const rootStore = useRootStore();
const data = await workflowsApi.getActiveWorkflows(rootStore.restApiContext);
activeWorkflows.value = data;
return data;
@ -696,7 +691,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
let newName = `${currentWorkflowName}${DUPLICATE_POSTFFIX}`;
try {
const rootStore = useRootStore();
const newWorkflow = await workflowsApi.getNewWorkflow(rootStore.restApiContext, {
name: newName,
});
@ -1276,12 +1270,24 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
}
function addNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
function getFormResumeUrl(node: INode, executionId: string) {
const { webhookSuffix } = (node.parameters.options ?? {}) as IDataObject;
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
const testUrl = `${rootStore.formWaitingUrl}/${executionId}${suffix}`;
return testUrl;
}
function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
if (!workflowExecutionData.value?.data) {
throw new Error('The "workflowExecutionData" is not initialized!');
}
if (workflowExecutionData.value.data.resultData.runData[pushData.nodeName] === undefined) {
const { nodeName, data, executionId } = pushData;
const isNodeWaiting = data.executionStatus === 'waiting';
const node = getNodeByName(nodeName);
if (!node) return;
if (workflowExecutionData.value.data.resultData.runData[nodeName] === undefined) {
workflowExecutionData.value = {
...workflowExecutionData.value,
data: {
@ -1290,15 +1296,38 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
...workflowExecutionData.value.data.resultData,
runData: {
...workflowExecutionData.value.data.resultData.runData,
[pushData.nodeName]: [],
[nodeName]: [],
},
},
},
};
}
workflowExecutionData.value.data!.resultData.runData[pushData.nodeName].push(pushData.data);
void trackNodeExecution(pushData);
const tasksData = workflowExecutionData.value.data!.resultData.runData[nodeName];
if (isNodeWaiting) {
tasksData.push(data);
if (
node.type === FORM_NODE_TYPE ||
(node.type === WAIT_NODE_TYPE && node.parameters.resume === 'form')
) {
const testUrl = getFormResumeUrl(node, executionId);
if (!formPopupWindow.value || formPopupWindow.value.closed) {
formPopupWindow.value = openPopUpWindow(testUrl);
} else {
formPopupWindow.value.location = testUrl;
formPopupWindow.value.focus();
}
}
} else {
if (tasksData.length && tasksData[tasksData.length - 1].executionStatus === 'waiting') {
tasksData.splice(tasksData.length - 1, 1, data);
} else {
tasksData.push(data);
}
removeExecutingNode(nodeName);
void trackNodeExecution(pushData);
}
}
function clearNodeExecutionData(nodeName: string): void {
@ -1348,12 +1377,10 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
limit,
};
}
const rootStore = useRootStore();
return await makeRestApiRequest(rootStore.restApiContext, 'GET', '/executions', sendData);
}
async function getExecution(id: string): Promise<IExecutionResponse | undefined> {
const rootStore = useRootStore();
const response = await makeRestApiRequest<IExecutionFlattedResponse | undefined>(
rootStore.restApiContext,
'GET',
@ -1367,7 +1394,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
// make sure that the new ones are not active
sendData.active = false;
const rootStore = useRootStore();
const projectStore = useProjectsStore();
if (projectStore.currentProjectId) {
@ -1387,8 +1413,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
data: IWorkflowDataUpdate,
forceSave = false,
): Promise<IWorkflowDb> {
const rootStore = useRootStore();
if (data.settings === null) {
data.settings = undefined;
}
@ -1402,8 +1426,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function runWorkflow(startRunData: IStartRunData): Promise<IExecutionPushResponse> {
const rootStore = useRootStore();
if (startRunData.workflowData.settings === null) {
startRunData.workflowData.settings = undefined;
}
@ -1427,7 +1449,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function removeTestWebhook(targetWorkflowId: string): Promise<boolean> {
const rootStore = useRootStore();
return await makeRestApiRequest(
rootStore.restApiContext,
'DELETE',
@ -1436,7 +1457,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchExecutionDataById(executionId: string): Promise<IExecutionResponse | null> {
const rootStore = useRootStore();
return await workflowsApi.getExecutionData(rootStore.restApiContext, executionId);
}
@ -1459,7 +1479,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
fileName: string,
mimeType: string,
): string {
const rootStore = useRootStore();
let restUrl = rootStore.restUrl;
if (restUrl.startsWith('/')) restUrl = window.location.origin + restUrl;
const url = new URL(`${restUrl}/binary-data`);
@ -1538,6 +1557,24 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
isLogsPanelOpen.value = isOpen;
}
function markExecutionAsStopped() {
activeExecutionId.value = null;
executingNode.value.length = 0;
executionWaitingForWebhook.value = false;
uiStore.removeActiveAction('workflowRunning');
workflowHelpers.setDocumentTitle(workflowName.value, 'IDLE');
formPopupWindow.value?.close();
formPopupWindow.value = null;
const runData = workflowExecutionData.value?.data?.resultData.runData ?? {};
for (const nodeName in runData) {
runData[nodeName] = runData[nodeName].filter(
({ executionStatus }) => executionStatus === 'success',
);
}
}
return {
workflow,
usedCredentials,
@ -1651,7 +1688,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
setNodeValue,
setNodeParameters,
setLastNodeParameters,
addNodeExecutionData,
updateNodeExecutionData,
clearNodeExecutionData,
pinDataByNodeName,
activeNode,
@ -1675,5 +1712,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
removeNodeExecutionDataById,
setNodes,
setConnections,
markExecutionAsStopped,
};
});

View file

@ -1,6 +1,12 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { displayForm, openPopUpWindow, executionFilterToQueryFilter } from './executionUtils';
import {
displayForm,
openPopUpWindow,
executionFilterToQueryFilter,
waitingNodeTooltip,
} from './executionUtils';
import type { INode, IRunData, IPinData } from 'n8n-workflow';
import { type INodeUi } from '../Interface';
const FORM_TRIGGER_NODE_TYPE = 'formTrigger';
const WAIT_NODE_TYPE = 'waitNode';
@ -13,6 +19,33 @@ vi.mock('./executionUtils', async () => {
};
});
vi.mock('@/stores/root.store', () => ({
useRootStore: () => ({
formWaitingUrl: 'http://localhost:5678/form-waiting',
webhookWaitingUrl: 'http://localhost:5678/webhook-waiting',
}),
}));
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: () => ({
activeExecutionId: '123',
}),
}));
vi.mock('@/plugins/i18n', () => ({
i18n: {
baseText: (key: string) => {
const texts: { [key: string]: string } = {
'ndv.output.waitNodeWaiting': 'Waiting for execution to resume...',
'ndv.output.waitNodeWaitingForFormSubmission': 'Waiting for form submission: ',
'ndv.output.waitNodeWaitingForWebhook': 'Waiting for webhook call: ',
'ndv.output.sendAndWaitWaitingApproval': 'Waiting for approval...',
};
return texts[key] || key;
},
},
}));
describe('displayForm', () => {
const getTestUrlMock = vi.fn();
@ -124,3 +157,116 @@ describe('displayForm', () => {
});
});
});
describe('waitingNodeTooltip', () => {
it('should return empty string for null or undefined node', () => {
expect(waitingNodeTooltip(null)).toBe('');
expect(waitingNodeTooltip(undefined)).toBe('');
});
it('should return default waiting message for time resume types', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'timeInterval',
},
};
expect(waitingNodeTooltip(node)).toBe('Waiting for execution to resume...');
});
it('should return form submission message with URL for form resume type', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'form',
},
};
const expectedUrl = 'http://localhost:5678/form-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for form submission: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should include webhook suffix in URL when provided', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'webhook',
options: {
webhookSuffix: 'test-suffix',
},
},
};
const expectedUrl = 'http://localhost:5678/webhook-waiting/123/test-suffix';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for webhook call: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should handle form node type', () => {
const node: INodeUi = {
id: '1',
name: 'Form',
type: 'n8n-nodes-base.form',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const expectedUrl = 'http://localhost:5678/form-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for form submission: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should handle send and wait operation', () => {
const node: INodeUi = {
id: '1',
name: 'SendWait',
type: 'n8n-nodes-base.sendWait',
typeVersion: 1,
position: [0, 0],
parameters: {
operation: 'sendAndWait',
},
};
expect(waitingNodeTooltip(node)).toBe('Waiting for approval...');
});
it('should ignore object-type webhook suffix', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'webhook',
options: {
webhookSuffix: { some: 'object' },
},
},
};
const expectedUrl = 'http://localhost:5678/webhook-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for webhook call: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
});

View file

@ -6,7 +6,7 @@ import {
type IPinData,
type IRunData,
} from 'n8n-workflow';
import type { ExecutionFilterType, ExecutionsQueryFilter } from '@/Interface';
import type { ExecutionFilterType, ExecutionsQueryFilter, INodeUi } from '@/Interface';
import { isEmpty } from '@/utils/typesUtils';
import { FORM_NODE_TYPE, FORM_TRIGGER_NODE_TYPE } from '../constants';
import { useWorkflowsStore } from '@/stores/workflows.store';
@ -136,18 +136,17 @@ export function displayForm({
}
}
export const waitingNodeTooltip = () => {
export const waitingNodeTooltip = (node: INodeUi | null | undefined) => {
if (!node) return '';
try {
const lastNode =
useWorkflowsStore().workflowExecutionData?.data?.executionData?.nodeExecutionStack[0]?.node;
const resume = lastNode?.parameters?.resume;
const resume = node?.parameters?.resume;
if (resume) {
if (!['webhook', 'form'].includes(resume as string)) {
return i18n.baseText('ndv.output.waitNodeWaiting');
}
const { webhookSuffix } = (lastNode.parameters.options ?? {}) as { webhookSuffix: string };
const { webhookSuffix } = (node.parameters.options ?? {}) as { webhookSuffix: string };
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
let message = '';
@ -168,13 +167,13 @@ export const waitingNodeTooltip = () => {
}
}
if (lastNode?.type === FORM_NODE_TYPE) {
if (node?.type === FORM_NODE_TYPE) {
const message = i18n.baseText('ndv.output.waitNodeWaitingForFormSubmission');
const resumeUrl = `${useRootStore().formWaitingUrl}/${useWorkflowsStore().activeExecutionId}`;
return `${message}<a href="${resumeUrl}" target="_blank">${resumeUrl}</a>`;
}
if (lastNode?.parameters.operation === SEND_AND_WAIT_OPERATION) {
if (node?.parameters.operation === SEND_AND_WAIT_OPERATION) {
return i18n.baseText('ndv.output.sendAndWaitWaitingApproval');
}
} catch (error) {

View file

@ -154,8 +154,7 @@ const { addBeforeUnloadEventBindings, removeBeforeUnloadEventBindings } = useBef
route,
});
const { registerCustomAction, unregisterCustomAction } = useGlobalLinkActions();
const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution, stopWaitingForWebhook } =
useRunWorkflow({ router });
const { runWorkflow, stopCurrentExecution, stopWaitingForWebhook } = useRunWorkflow({ router });
const {
updateNodePosition,
updateNodesPosition,
@ -1011,11 +1010,7 @@ const workflowExecutionData = computed(() => workflowsStore.workflowExecutionDat
async function onRunWorkflow() {
trackRunWorkflow();
if (!isExecutionPreview.value && workflowsStore.isWaitingExecution) {
void runWorkflowResolvePending({});
} else {
void runWorkflow({});
}
void runWorkflow({});
}
function trackRunWorkflow() {
@ -1041,11 +1036,7 @@ async function onRunWorkflowToNode(id: string) {
trackRunWorkflowToNode(node);
if (!isExecutionPreview.value && workflowsStore.isWaitingExecution) {
void runWorkflowResolvePending({ destinationNode: node.name, source: 'Node.executeNode' });
} else {
void runWorkflow({ destinationNode: node.name, source: 'Node.executeNode' });
}
void runWorkflow({ destinationNode: node.name, source: 'Node.executeNode' });
}
function trackRunWorkflowToNode(node: INodeUi) {

View file

@ -232,7 +232,7 @@ export default defineComponent({
const { callDebounced } = useDebounce();
const canvasPanning = useCanvasPanning(nodeViewRootRef, { onMouseMoveEnd });
const workflowHelpers = useWorkflowHelpers({ router });
const { runWorkflow, stopCurrentExecution, runWorkflowResolvePending } = useRunWorkflow({
const { runWorkflow, stopCurrentExecution } = useRunWorkflow({
router,
});
const { addBeforeUnloadEventBindings, removeBeforeUnloadEventBindings } = useBeforeUnload({
@ -254,7 +254,6 @@ export default defineComponent({
onMouseMoveEnd,
workflowHelpers,
runWorkflow,
runWorkflowResolvePending,
stopCurrentExecution,
callDebounced,
...useCanvasMouseSelect(),
@ -852,11 +851,7 @@ export default defineComponent({
this.$telemetry.track('User clicked execute node button', telemetryPayload);
void this.externalHooks.run('nodeView.onRunNode', telemetryPayload);
if (!this.isExecutionPreview && this.workflowsStore.isWaitingExecution) {
void this.runWorkflowResolvePending({ destinationNode: nodeName, source });
} else {
void this.runWorkflow({ destinationNode: nodeName, source });
}
void this.runWorkflow({ destinationNode: nodeName, source });
},
async onOpenChat() {
const telemetryPayload = {
@ -883,11 +878,7 @@ export default defineComponent({
void this.externalHooks.run('nodeView.onRunWorkflow', telemetryPayload);
});
if (!this.isExecutionPreview && this.workflowsStore.isWaitingExecution) {
void this.runWorkflowResolvePending({});
} else {
void this.runWorkflow({});
}
void this.runWorkflow({});
this.refreshEndpointsErrorsState();
},
@ -1758,6 +1749,8 @@ export default defineComponent({
} else {
this.showError(error, this.i18n.baseText('nodeView.showError.stopExecution.title'));
}
} finally {
this.workflowsStore.markExecutionAsStopped();
}
this.stopExecutionInProgress = false;
void this.workflowHelpers.getWorkflowDataToSave().then((workflowData) => {