refactor(core): Make placeholders in manual executions in workers temporary (#12463)

This commit is contained in:
Iván Ovejero 2025-01-14 14:32:00 +01:00 committed by GitHub
parent ce22f065c2
commit c2569a0607
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 142 additions and 22 deletions

View file

@ -52,6 +52,17 @@ type NodeExecuteAfter = {
executionId: string;
nodeName: string;
data: ITaskData;
/**
* When a worker relays updates about a manual execution to main, if the
* payload size is above a limit, we send only a placeholder to the client.
* Later we fetch the entire execution data and fill in any placeholders.
*
* When sending a placheolder, we also send the number of output items, so
* the client knows ahead of time how many items are there, to prevent the
* items count from jumping up when the execution finishes.
*/
itemCount?: number;
};
};

View file

@ -169,8 +169,12 @@ export class Push extends TypedEmitter<PushEvents> {
this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`);
if (type === 'nodeExecuteAfter') pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS;
else if (type === 'executionFinished') pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB
if (type === 'nodeExecuteAfter') {
pushMsgCopy.data.itemCount = pushMsgCopy.data.data.data?.main[0]?.length ?? 1;
pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS;
} else if (type === 'executionFinished') {
pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB
}
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',

View file

@ -81,6 +81,7 @@ import {
import { storeToRefs } from 'pinia';
import { useRoute } from 'vue-router';
import { useExecutionHelpers } from '@/composables/useExecutionHelpers';
import { useUIStore } from '@/stores/ui.store';
const LazyRunDataTable = defineAsyncComponent(
async () => await import('@/components/RunDataTable.vue'),
@ -180,6 +181,7 @@ const ndvStore = useNDVStore();
const workflowsStore = useWorkflowsStore();
const sourceControlStore = useSourceControlStore();
const rootStore = useRootStore();
const uiStore = useUIStore();
const toast = useToast();
const route = useRoute();
@ -1611,6 +1613,16 @@ defineExpose({ enterEditMode });
</N8nText>
</div>
<div
v-else-if="isTrimmedManualExecutionDataItem && uiStore.isProcessingExecutionResults"
:class="$style.center"
>
<div :class="$style.spinner"><N8nSpinner type="ring" /></div>
<N8nText color="text-dark" size="large">
{{ i18n.baseText('runData.trimmedData.loading') }}
</N8nText>
</div>
<div v-else-if="isTrimmedManualExecutionDataItem" :class="$style.center">
<N8nText bold color="text-dark" size="large">
{{ i18n.baseText('runData.trimmedData.title') }}

View file

@ -36,7 +36,7 @@ import type { PushMessageQueueItem } from '@/types';
import { useAssistantStore } from '@/stores/assistant.store';
import NodeExecutionErrorMessage from '@/components/NodeExecutionErrorMessage.vue';
import type { IExecutionResponse } from '@/Interface';
import { clearPopupWindowState } from '../utils/executionUtils';
import { clearPopupWindowState, hasTrimmedData, hasTrimmedItem } from '../utils/executionUtils';
import { usePostHog } from '@/stores/posthog.store';
import { getEasyAiWorkflowJson } from '@/utils/easyAiWorkflowUtils';
@ -237,18 +237,51 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
return false;
}
let showedSuccessToast = false;
let executionData: Pick<IExecutionResponse, 'workflowId' | 'data' | 'status'>;
if (receivedData.type === 'executionFinished' && receivedData.data.rawData) {
const { workflowId, status, rawData } = receivedData.data;
executionData = { workflowId, data: parse(rawData), status };
} else {
const execution = await workflowsStore.fetchExecutionDataById(executionId);
if (!execution?.data) return false;
executionData = {
workflowId: execution.workflowId,
data: parse(execution.data as unknown as string),
status: execution.status,
};
uiStore.setProcessingExecutionResults(true);
/**
* On successful completion without data, we show a success toast
* immediately, even though we still need to fetch and deserialize the
* full execution data, to minimize perceived latency.
*/
if (receivedData.type === 'executionFinished' && receivedData.data.status === 'success') {
workflowHelpers.setDocumentTitle(
workflowsStore.getWorkflowById(receivedData.data.workflowId)?.name,
'IDLE',
);
uiStore.removeActiveAction('workflowRunning');
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
showedSuccessToast = true;
}
let execution: IExecutionResponse | null;
try {
execution = await workflowsStore.fetchExecutionDataById(executionId);
if (!execution?.data) {
uiStore.setProcessingExecutionResults(false);
return false;
}
executionData = {
workflowId: execution.workflowId,
data: parse(execution.data as unknown as string),
status: execution.status,
};
} catch {
uiStore.setProcessingExecutionResults(false);
return false;
}
}
const iRunExecutionData: IRunExecutionData = {
@ -261,11 +294,14 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (hasTrimmedItem(activeRunData[key])) continue;
iRunExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
uiStore.setProcessingExecutionResults(false);
let runDataExecutedErrorMessage = getExecutionError(iRunExecutionData);
if (executionData.status === 'crashed') {
@ -410,7 +446,6 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
}
}
} else {
// Workflow did execute without a problem
workflowHelpers.setDocumentTitle(workflow.name as string, 'IDLE');
const execution = workflowsStore.getWorkflowExecution;
@ -441,7 +476,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
type: 'success',
});
}
} else {
} else if (!showedSuccessToast) {
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
@ -451,8 +486,9 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have.
if (workflowsStore.getWorkflowRunData) {
// which we already have. But if the run data in the store is trimmed,
// we skip copying so we use the full data from the final message.
if (workflowsStore.getWorkflowRunData && !hasTrimmedData(workflowsStore.getWorkflowRunData)) {
iRunExecutionData.resultData.runData = workflowsStore.getWorkflowRunData;
}
@ -493,6 +529,22 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
/**
* When we receive a placeholder in `nodeExecuteAfter`, we fake the items
* to be the same count as the data the placeholder is standing in for.
* This prevents the items count from jumping up when the execution
* finishes and the full data replaces the placeholder.
*/
if (
pushData.itemCount &&
pushData.data?.data?.main &&
Array.isArray(pushData.data.data.main[0]) &&
pushData.data.data.main[0].length < pushData.itemCount
) {
pushData.data.data.main[0]?.push(...new Array(pushData.itemCount - 1).fill({ json: {} }));
}
workflowsStore.updateNodeExecutionData(pushData);
void assistantStore.onNodeExecution(pushData);
} else if (receivedData.type === 'nodeExecuteBefore') {

View file

@ -1665,8 +1665,9 @@
"runData.aiContentBlock.tokens.prompt": "Prompt:",
"runData.aiContentBlock.tokens.completion": "Completion:",
"runData.trimmedData.title": "Data too large to display",
"runData.trimmedData.message": "The data is too large to be shown here. View the full details in 'Executions' tab.",
"runData.trimmedData.message": "Large amount of data will be loaded once the execution is finished.",
"runData.trimmedData.button": "See execution",
"runData.trimmedData.loading": "Loading data",
"saveButton.save": "@:_reusableBaseText.save",
"saveButton.saved": "Saved",
"saveWorkflowButton.hint": "Save workflow",

View file

@ -175,6 +175,7 @@ export const useUIStore = defineStore(STORES.UI, () => {
const bannersHeight = ref<number>(0);
const bannerStack = ref<BannerName[]>([]);
const pendingNotificationsForViews = ref<{ [key in VIEWS]?: NotificationOptions[] }>({});
const processingExecutionResults = ref<boolean>(false);
const appGridWidth = ref<number>(0);
@ -329,6 +330,12 @@ export const useUIStore = defineStore(STORES.UI, () => {
return modalStack.value.length > 0;
});
/**
* Whether we are currently in the process of fetching and deserializing
* the full execution data and loading it to the store.
*/
const isProcessingExecutionResults = computed(() => processingExecutionResults.value);
// Methods
const setTheme = (newTheme: ThemeOption): void => {
@ -566,6 +573,14 @@ export const useUIStore = defineStore(STORES.UI, () => {
lastCancelledConnectionPosition.value = undefined;
}
/**
* Set whether we are currently in the process of fetching and deserializing
* the full execution data and loading it to the store.
*/
const setProcessingExecutionResults = (value: boolean) => {
processingExecutionResults.value = value;
};
return {
appGridWidth,
appliedTheme,
@ -604,6 +619,7 @@ export const useUIStore = defineStore(STORES.UI, () => {
isAnyModalOpen,
pendingNotificationsForViews,
activeModals,
isProcessingExecutionResults,
setTheme,
setMode,
setActiveId,
@ -638,6 +654,7 @@ export const useUIStore = defineStore(STORES.UI, () => {
setNotificationsForView,
deleteNotificationsForView,
resetLastInteractedWith,
setProcessingExecutionResults,
};
});

View file

@ -1,10 +1,11 @@
import {
SEND_AND_WAIT_OPERATION,
type ExecutionStatus,
type IDataObject,
type INode,
type IPinData,
type IRunData,
import { SEND_AND_WAIT_OPERATION, TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
import type {
ITaskData,
ExecutionStatus,
IDataObject,
INode,
IPinData,
IRunData,
} from 'n8n-workflow';
import type { ExecutionFilterType, ExecutionsQueryFilter, INodeUi } from '@/Interface';
import { isEmpty } from '@/utils/typesUtils';
@ -180,3 +181,25 @@ export const waitingNodeTooltip = (node: INodeUi | null | undefined) => {
return '';
};
/**
* Check whether task data contains a trimmed item.
*
* In manual executions in scaling mode, the payload in push messages may be
* arbitrarily large. To protect Redis as it relays run data from workers to
* main process, we set a limit on payload size. If the payload is oversize,
* we replace it with a placeholder, which is later overridden on execution
* finish, when the client receives the full data.
*/
export function hasTrimmedItem(taskData: ITaskData[]) {
return taskData[0]?.data?.main[0]?.[0].json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY] ?? false;
}
/**
* Check whether run data contains any trimmed items.
*
* See {@link hasTrimmedItem} for more details.
*/
export function hasTrimmedData(runData: IRunData) {
return Object.keys(runData).some((nodeName) => hasTrimmedItem(runData[nodeName]));
}