diff --git a/packages/@n8n/api-types/src/push/execution.ts b/packages/@n8n/api-types/src/push/execution.ts index 320b3dc264..b87bb67d0f 100644 --- a/packages/@n8n/api-types/src/push/execution.ts +++ b/packages/@n8n/api-types/src/push/execution.ts @@ -52,6 +52,17 @@ type NodeExecuteAfter = { executionId: string; nodeName: string; data: ITaskData; + + /** + * When a worker relays updates about a manual execution to main, if the + * payload size is above a limit, we send only a placeholder to the client. + * Later we fetch the entire execution data and fill in any placeholders. + * + * When sending a placheolder, we also send the number of output items, so + * the client knows ahead of time how many items are there, to prevent the + * items count from jumping up when the execution finishes. + */ + itemCount?: number; }; }; diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index e795df5722..4c2d8ac032 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -169,8 +169,12 @@ export class Push extends TypedEmitter { this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`); - if (type === 'nodeExecuteAfter') pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS; - else if (type === 'executionFinished') pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB + if (type === 'nodeExecuteAfter') { + pushMsgCopy.data.itemCount = pushMsgCopy.data.data.data?.main[0]?.length ?? 1; + pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS; + } else if (type === 'executionFinished') { + pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB + } void this.publisher.publishCommand({ command: 'relay-execution-lifecycle-event', diff --git a/packages/editor-ui/src/components/RunData.vue b/packages/editor-ui/src/components/RunData.vue index 9634e1e04f..478008b21d 100644 --- a/packages/editor-ui/src/components/RunData.vue +++ b/packages/editor-ui/src/components/RunData.vue @@ -81,6 +81,7 @@ import { import { storeToRefs } from 'pinia'; import { useRoute } from 'vue-router'; import { useExecutionHelpers } from '@/composables/useExecutionHelpers'; +import { useUIStore } from '@/stores/ui.store'; const LazyRunDataTable = defineAsyncComponent( async () => await import('@/components/RunDataTable.vue'), @@ -180,6 +181,7 @@ const ndvStore = useNDVStore(); const workflowsStore = useWorkflowsStore(); const sourceControlStore = useSourceControlStore(); const rootStore = useRootStore(); +const uiStore = useUIStore(); const toast = useToast(); const route = useRoute(); @@ -1611,6 +1613,16 @@ defineExpose({ enterEditMode }); +
+
+ + {{ i18n.baseText('runData.trimmedData.loading') }} + +
+
{{ i18n.baseText('runData.trimmedData.title') }} diff --git a/packages/editor-ui/src/composables/usePushConnection.ts b/packages/editor-ui/src/composables/usePushConnection.ts index 1c4d0c4b48..3bf9cd4cc1 100644 --- a/packages/editor-ui/src/composables/usePushConnection.ts +++ b/packages/editor-ui/src/composables/usePushConnection.ts @@ -36,7 +36,7 @@ import type { PushMessageQueueItem } from '@/types'; import { useAssistantStore } from '@/stores/assistant.store'; import NodeExecutionErrorMessage from '@/components/NodeExecutionErrorMessage.vue'; import type { IExecutionResponse } from '@/Interface'; -import { clearPopupWindowState } from '../utils/executionUtils'; +import { clearPopupWindowState, hasTrimmedData, hasTrimmedItem } from '../utils/executionUtils'; import { usePostHog } from '@/stores/posthog.store'; import { getEasyAiWorkflowJson } from '@/utils/easyAiWorkflowUtils'; @@ -237,18 +237,51 @@ export function usePushConnection({ router }: { router: ReturnType; if (receivedData.type === 'executionFinished' && receivedData.data.rawData) { const { workflowId, status, rawData } = receivedData.data; executionData = { workflowId, data: parse(rawData), status }; } else { - const execution = await workflowsStore.fetchExecutionDataById(executionId); - if (!execution?.data) return false; - executionData = { - workflowId: execution.workflowId, - data: parse(execution.data as unknown as string), - status: execution.status, - }; + uiStore.setProcessingExecutionResults(true); + + /** + * On successful completion without data, we show a success toast + * immediately, even though we still need to fetch and deserialize the + * full execution data, to minimize perceived latency. + */ + if (receivedData.type === 'executionFinished' && receivedData.data.status === 'success') { + workflowHelpers.setDocumentTitle( + workflowsStore.getWorkflowById(receivedData.data.workflowId)?.name, + 'IDLE', + ); + uiStore.removeActiveAction('workflowRunning'); + toast.showMessage({ + title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'), + type: 'success', + }); + showedSuccessToast = true; + } + + let execution: IExecutionResponse | null; + + try { + execution = await workflowsStore.fetchExecutionDataById(executionId); + if (!execution?.data) { + uiStore.setProcessingExecutionResults(false); + return false; + } + + executionData = { + workflowId: execution.workflowId, + data: parse(execution.data as unknown as string), + status: execution.status, + }; + } catch { + uiStore.setProcessingExecutionResults(false); + return false; + } } const iRunExecutionData: IRunExecutionData = { @@ -261,11 +294,14 @@ export function usePushConnection({ router }: { router: ReturnType { const bannersHeight = ref(0); const bannerStack = ref([]); const pendingNotificationsForViews = ref<{ [key in VIEWS]?: NotificationOptions[] }>({}); + const processingExecutionResults = ref(false); const appGridWidth = ref(0); @@ -329,6 +330,12 @@ export const useUIStore = defineStore(STORES.UI, () => { return modalStack.value.length > 0; }); + /** + * Whether we are currently in the process of fetching and deserializing + * the full execution data and loading it to the store. + */ + const isProcessingExecutionResults = computed(() => processingExecutionResults.value); + // Methods const setTheme = (newTheme: ThemeOption): void => { @@ -566,6 +573,14 @@ export const useUIStore = defineStore(STORES.UI, () => { lastCancelledConnectionPosition.value = undefined; } + /** + * Set whether we are currently in the process of fetching and deserializing + * the full execution data and loading it to the store. + */ + const setProcessingExecutionResults = (value: boolean) => { + processingExecutionResults.value = value; + }; + return { appGridWidth, appliedTheme, @@ -604,6 +619,7 @@ export const useUIStore = defineStore(STORES.UI, () => { isAnyModalOpen, pendingNotificationsForViews, activeModals, + isProcessingExecutionResults, setTheme, setMode, setActiveId, @@ -638,6 +654,7 @@ export const useUIStore = defineStore(STORES.UI, () => { setNotificationsForView, deleteNotificationsForView, resetLastInteractedWith, + setProcessingExecutionResults, }; }); diff --git a/packages/editor-ui/src/utils/executionUtils.ts b/packages/editor-ui/src/utils/executionUtils.ts index a6fc615aed..9a744c16e7 100644 --- a/packages/editor-ui/src/utils/executionUtils.ts +++ b/packages/editor-ui/src/utils/executionUtils.ts @@ -1,10 +1,11 @@ -import { - SEND_AND_WAIT_OPERATION, - type ExecutionStatus, - type IDataObject, - type INode, - type IPinData, - type IRunData, +import { SEND_AND_WAIT_OPERATION, TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow'; +import type { + ITaskData, + ExecutionStatus, + IDataObject, + INode, + IPinData, + IRunData, } from 'n8n-workflow'; import type { ExecutionFilterType, ExecutionsQueryFilter, INodeUi } from '@/Interface'; import { isEmpty } from '@/utils/typesUtils'; @@ -180,3 +181,25 @@ export const waitingNodeTooltip = (node: INodeUi | null | undefined) => { return ''; }; + +/** + * Check whether task data contains a trimmed item. + * + * In manual executions in scaling mode, the payload in push messages may be + * arbitrarily large. To protect Redis as it relays run data from workers to + * main process, we set a limit on payload size. If the payload is oversize, + * we replace it with a placeholder, which is later overridden on execution + * finish, when the client receives the full data. + */ +export function hasTrimmedItem(taskData: ITaskData[]) { + return taskData[0]?.data?.main[0]?.[0].json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY] ?? false; +} + +/** + * Check whether run data contains any trimmed items. + * + * See {@link hasTrimmedItem} for more details. + */ +export function hasTrimmedData(runData: IRunData) { + return Object.keys(runData).some((nodeName) => hasTrimmedItem(runData[nodeName])); +}