refactor(editor): Migrate pushConnection mixin to composable and remove collaboration store side effects (no-changelog) (#9249)

This commit is contained in:
Alex Grozav 2024-05-03 10:26:15 +03:00 committed by GitHub
parent 0a2de093c0
commit ff0955c995
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 879 additions and 710 deletions

View file

@ -447,6 +447,7 @@ export type IPushData =
| PushDataExecutionStarted
| PushDataExecuteAfter
| PushDataExecuteBefore
| PushDataNodeDescriptionUpdated
| PushDataConsoleMessage
| PushDataReloadNodeType
| PushDataRemoveNodeType
@ -458,67 +459,72 @@ export type IPushData =
| PushDataWorkflowFailedToActivate
| PushDataWorkflowUsersChanged;
type PushDataActiveWorkflowAdded = {
export type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
type: 'workflowActivated';
};
type PushDataActiveWorkflowRemoved = {
export type PushDataActiveWorkflowRemoved = {
data: IActiveWorkflowRemoved;
type: 'workflowDeactivated';
};
type PushDataWorkflowFailedToActivate = {
export type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataExecutionRecovered = {
export type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered;
type: 'executionRecovered';
};
type PushDataExecutionFinished = {
export type PushDataExecutionFinished = {
data: IPushDataExecutionFinished;
type: 'executionFinished';
};
type PushDataExecutionStarted = {
export type PushDataExecutionStarted = {
data: IPushDataExecutionStarted;
type: 'executionStarted';
};
type PushDataExecuteAfter = {
export type PushDataExecuteAfter = {
data: IPushDataNodeExecuteAfter;
type: 'nodeExecuteAfter';
};
type PushDataExecuteBefore = {
export type PushDataExecuteBefore = {
data: IPushDataNodeExecuteBefore;
type: 'nodeExecuteBefore';
};
type PushDataConsoleMessage = {
export type PushDataNodeDescriptionUpdated = {
data: {};
type: 'nodeDescriptionUpdated';
};
export type PushDataConsoleMessage = {
data: IPushDataConsoleMessage;
type: 'sendConsoleMessage';
};
type PushDataReloadNodeType = {
export type PushDataReloadNodeType = {
data: IPushDataReloadNodeType;
type: 'reloadNodeType';
};
type PushDataRemoveNodeType = {
export type PushDataRemoveNodeType = {
data: IPushDataRemoveNodeType;
type: 'removeNodeType';
};
type PushDataTestWebhook = {
export type PushDataTestWebhook = {
data: IPushDataTestWebhook;
type: 'testWebhookDeleted' | 'testWebhookReceived';
};
type PushDataWorkerStatusMessage = {
export type PushDataWorkerStatusMessage = {
data: IPushDataWorkerStatusMessage;
type: 'sendWorkerStatusMessage';
};

View file

@ -54,6 +54,7 @@ const onDocumentVisibilityChange = () => {
};
onMounted(() => {
collaborationStore.initialize();
startHeartbeat();
document.addEventListener('visibilitychange', onDocumentVisibilityChange);
});
@ -61,6 +62,7 @@ onMounted(() => {
onBeforeUnmount(() => {
document.removeEventListener('visibilitychange', onDocumentVisibilityChange);
stopHeartbeat();
collaborationStore.terminate();
});
</script>

View file

@ -16,9 +16,9 @@
<script lang="ts">
import { defineComponent } from 'vue';
import type { Route, RouteLocationRaw } from 'vue-router';
import type { RouteLocation, RouteLocationRaw } from 'vue-router';
import { useRouter } from 'vue-router';
import { mapStores } from 'pinia';
import { pushConnection } from '@/mixins/pushConnection';
import WorkflowDetails from '@/components/MainHeader/WorkflowDetails.vue';
import TabBar from '@/components/MainHeader/TabBar.vue';
import {
@ -33,6 +33,7 @@ import { useSourceControlStore } from '@/stores/sourceControl.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useExecutionsStore } from '@/stores/executions.store';
import { usePushConnection } from '@/composables/usePushConnection';
export default defineComponent({
name: 'MainHeader',
@ -40,11 +41,12 @@ export default defineComponent({
WorkflowDetails,
TabBar,
},
mixins: [pushConnection],
setup(props, ctx) {
setup() {
const router = useRouter();
const pushConnection = usePushConnection({ router });
return {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
...pushConnection.setup?.(props, ctx),
pushConnection,
};
},
data() {
@ -99,12 +101,18 @@ export default defineComponent({
this.syncTabsWithRoute(to, from);
},
},
beforeMount() {
this.pushConnection.initialize();
},
mounted() {
this.dirtyState = this.uiStore.stateIsDirty;
this.syncTabsWithRoute(this.$route);
},
beforeUnmount() {
this.pushConnection.terminate();
},
methods: {
syncTabsWithRoute(to: Route, from?: Route): void {
syncTabsWithRoute(to: RouteLocation, from?: RouteLocation): void {
if (
to.name === VIEWS.EXECUTION_HOME ||
to.name === VIEWS.WORKFLOW_EXECUTIONS ||

View file

@ -29,53 +29,36 @@ import type { ExecutionStatus } from 'n8n-workflow';
import { useUIStore } from '@/stores/ui.store';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { setPageTitle } from '@/utils/htmlUtils';
import { pushConnection } from '@/mixins/pushConnection';
import WorkerCard from './Workers/WorkerCard.ee.vue';
import { usePushConnection } from '@/composables/usePushConnection';
import { useRouter } from 'vue-router';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useRootStore } from '@/stores/n8nRoot.store';
// eslint-disable-next-line import/no-default-export
export default defineComponent({
name: 'WorkerList',
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/naming-convention
components: { PushConnectionTracker, WorkerCard },
mixins: [pushConnection],
props: {
autoRefreshEnabled: {
type: Boolean,
default: true,
},
},
setup(props, ctx) {
setup() {
const router = useRouter();
const i18n = useI18n();
const pushConnection = usePushConnection({ router });
return {
i18n,
pushConnection,
...useToast(),
// eslint-disable-next-line @typescript-eslint/no-misused-promises
...pushConnection.setup?.(props, ctx),
};
},
mounted() {
setPageTitle(`n8n - ${this.pageTitle}`);
this.$telemetry.track('User viewed worker view', {
instance_id: this.rootStore.instanceId,
});
},
beforeMount() {
if (window.Cypress !== undefined) {
return;
}
this.pushStore.pushConnect();
this.orchestrationManagerStore.startWorkerStatusPolling();
},
beforeUnmount() {
if (window.Cypress !== undefined) {
return;
}
this.orchestrationManagerStore.stopWorkerStatusPolling();
this.pushStore.pushDisconnect();
},
computed: {
...mapStores(useUIStore, useOrchestrationStore),
...mapStores(useRootStore, useUIStore, usePushConnectionStore, useOrchestrationStore),
combinedWorkers(): IPushDataWorkerStatusPayload[] {
const returnData: IPushDataWorkerStatusPayload[] = [];
for (const workerId in this.orchestrationManagerStore.workers) {
@ -93,6 +76,31 @@ export default defineComponent({
return this.i18n.baseText('workerList.pageTitle');
},
},
mounted() {
setPageTitle(`n8n - ${this.pageTitle}`);
this.$telemetry.track('User viewed worker view', {
instance_id: this.rootStore.instanceId,
});
},
beforeMount() {
if (window.Cypress !== undefined) {
return;
}
this.pushConnection.initialize();
this.pushStore.pushConnect();
this.orchestrationManagerStore.startWorkerStatusPolling();
},
beforeUnmount() {
if (window.Cypress !== undefined) {
return;
}
this.orchestrationManagerStore.stopWorkerStatusPolling();
this.pushStore.pushDisconnect();
this.pushConnection.terminate();
},
methods: {
averageLoadAvg(loads: number[]) {
return (loads.reduce((prev, curr) => prev + curr, 0) / loads.length).toFixed(2);

View file

@ -0,0 +1,123 @@
import { usePushConnection } from '@/composables/usePushConnection';
import { useRouter } from 'vue-router';
import { createPinia, setActivePinia } from 'pinia';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
import type { IPushData, PushDataWorkerStatusMessage } from '@/Interface';
import { useOrchestrationStore } from '@/stores/orchestration.store';
vi.mock('vue-router', () => {
return {
RouterLink: vi.fn(),
useRouter: () => ({
push: vi.fn(),
}),
};
});
vi.useFakeTimers();
describe('usePushConnection()', () => {
let router: ReturnType<typeof useRouter>;
let pushStore: ReturnType<typeof usePushConnectionStore>;
let collaborationStore: ReturnType<typeof useCollaborationStore>;
let orchestrationStore: ReturnType<typeof useOrchestrationStore>;
let pushConnection: ReturnType<typeof usePushConnection>;
beforeEach(() => {
setActivePinia(createPinia());
router = vi.mocked(useRouter)();
pushStore = usePushConnectionStore();
collaborationStore = useCollaborationStore();
orchestrationStore = useOrchestrationStore();
pushConnection = usePushConnection({ router });
});
describe('initialize()', () => {
it('should add event listener to the pushStore', () => {
const spy = vi.spyOn(pushStore, 'addEventListener').mockImplementation(() => () => {});
pushConnection.initialize();
expect(spy).toHaveBeenCalled();
});
it('should initialize collaborationStore', () => {
const spy = vi.spyOn(collaborationStore, 'initialize').mockImplementation(() => {});
pushConnection.initialize();
expect(spy).toHaveBeenCalled();
});
});
describe('terminate()', () => {
it('should remove event listener from the pushStore', () => {
const returnFn = vi.fn();
vi.spyOn(pushStore, 'addEventListener').mockImplementation(() => returnFn);
pushConnection.initialize();
pushConnection.terminate();
expect(returnFn).toHaveBeenCalled();
});
it('should terminate collaborationStore', () => {
const spy = vi.spyOn(collaborationStore, 'terminate').mockImplementation(() => {});
pushConnection.terminate();
expect(spy).toHaveBeenCalled();
});
});
describe('queuePushMessage()', () => {
it('should add message to the queue and sets timeout if not already set', () => {
const event: IPushData = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {},
},
} as PushDataWorkerStatusMessage;
pushConnection.queuePushMessage(event, 5);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value[0]).toEqual({ message: event, retriesLeft: 5 });
expect(pushConnection.retryTimeout.value).not.toBeNull();
});
});
describe('processWaitingPushMessages()', () => {
it('should clear the queue and reset the timeout', async () => {
const event: IPushData = { type: 'executionRecovered', data: { executionId: '1' } };
pushConnection.queuePushMessage(event, 0);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.retryTimeout.value).toBeDefined();
await pushConnection.processWaitingPushMessages();
expect(pushConnection.pushMessageQueue.value).toHaveLength(0);
expect(pushConnection.retryTimeout.value).toBeNull();
});
});
describe('pushMessageReceived()', () => {
describe('sendWorkerStatusMessage', () => {
it('should handle event type correctly', async () => {
const spy = vi.spyOn(orchestrationStore, 'updateWorkerStatus').mockImplementation(() => {});
const event: IPushData = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {},
},
} as PushDataWorkerStatusMessage;
const result = await pushConnection.pushMessageReceived(event);
expect(spy).toHaveBeenCalledWith(event.data.status);
expect(result).toBeTruthy();
});
});
});
});

View file

@ -0,0 +1,632 @@
import type {
IExecutionResponse,
IExecutionsCurrentSummaryExtended,
IPushData,
IPushDataExecutionFinished,
} from '@/Interface';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useTitleChange } from '@/composables/useTitleChange';
import { useToast } from '@/composables/useToast';
import type {
ExpressionError,
IDataObject,
INodeTypeNameVersion,
IRun,
IRunExecutionData,
IWorkflowBase,
SubworkflowOperationError,
IExecuteContextData,
NodeOperationError,
INodeTypeDescription,
} from 'n8n-workflow';
import { TelemetryHelpers } from 'n8n-workflow';
import { WORKFLOW_SETTINGS_MODAL_KEY } from '@/constants';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useCredentialsStore } from '@/stores/credentials.store';
import { useSettingsStore } from '@/stores/settings.store';
import { parse } from 'flatted';
import { useSegment } from '@/stores/segment.store';
import { ref } from 'vue';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
import { useExternalHooks } from '@/composables/useExternalHooks';
import type { useRouter } from 'vue-router';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useI18n } from '@/composables/useI18n';
import { useTelemetry } from '@/composables/useTelemetry';
import type { PushMessageQueueItem } from '@/types';
export function usePushConnection({ router }: { router: ReturnType<typeof useRouter> }) {
const workflowHelpers = useWorkflowHelpers({ router });
const nodeHelpers = useNodeHelpers();
const titleChange = useTitleChange();
const toast = useToast();
const i18n = useI18n();
const telemetry = useTelemetry();
const collaborationStore = useCollaborationStore();
const credentialsStore = useCredentialsStore();
const nodeTypesStore = useNodeTypesStore();
const orchestrationManagerStore = useOrchestrationStore();
const pushStore = usePushConnectionStore();
const settingsStore = useSettingsStore();
const segmentStore = useSegment();
const uiStore = useUIStore();
const workflowsStore = useWorkflowsStore();
const retryTimeout = ref<NodeJS.Timeout | null>(null);
const pushMessageQueue = ref<PushMessageQueueItem[]>([]);
const removeEventListener = ref<(() => void) | null>(null);
function initialize() {
removeEventListener.value = pushStore.addEventListener((message) => {
void pushMessageReceived(message);
});
collaborationStore.initialize();
}
function terminate() {
collaborationStore.terminate();
if (typeof removeEventListener.value === 'function') {
removeEventListener.value();
}
}
/**
* Sometimes the push message is faster as the result from
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*/
function queuePushMessage(event: IPushData, retryAttempts: number) {
pushMessageQueue.value.push({ message: event, retriesLeft: retryAttempts });
if (retryTimeout.value === null) {
retryTimeout.value = setTimeout(processWaitingPushMessages, 20);
}
}
/**
* Process the push messages which are waiting in the queue
*/
async function processWaitingPushMessages() {
if (retryTimeout.value !== null) {
clearTimeout(retryTimeout.value);
retryTimeout.value = null;
}
const queueLength = pushMessageQueue.value.length;
for (let i = 0; i < queueLength; i++) {
const messageData = pushMessageQueue.value.shift() as PushMessageQueueItem;
const result = await pushMessageReceived(messageData.message, true);
if (!result) {
// Was not successful
messageData.retriesLeft -= 1;
if (messageData.retriesLeft > 0) {
// If still retries are left add it back and stop execution
pushMessageQueue.value.unshift(messageData);
}
break;
}
}
if (pushMessageQueue.value.length !== 0 && retryTimeout.value === null) {
retryTimeout.value = setTimeout(processWaitingPushMessages, 25);
}
}
/**
* Process a newly received message
*/
async function pushMessageReceived(receivedData: IPushData, isRetry?: boolean): Promise<boolean> {
const retryAttempts = 5;
if (receivedData.type === 'sendWorkerStatusMessage') {
const pushData = receivedData.data;
orchestrationManagerStore.updateWorkerStatus(pushData.status);
return true;
}
if (receivedData.type === 'sendConsoleMessage') {
const pushData = receivedData.data;
console.log(pushData.source, ...pushData.messages);
return true;
}
if (
!['testWebhookReceived'].includes(receivedData.type) &&
isRetry !== true &&
pushMessageQueue.value.length
) {
// If there are already messages in the queue add the new one that all of them
// get executed in order
queuePushMessage(receivedData, retryAttempts);
return false;
}
if (receivedData.type === 'nodeExecuteAfter' || receivedData.type === 'nodeExecuteBefore') {
if (!uiStore.isActionActive('workflowRunning')) {
// No workflow is running so ignore the messages
return false;
}
const pushData = receivedData.data;
if (workflowsStore.activeExecutionId !== pushData.executionId) {
// The data is not for the currently active execution or
// we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(event as unknown as IPushData, retryAttempts);
}
return false;
}
}
// recovered execution data is handled like executionFinished data, however for security reasons
// we need to fetch the data from the server again rather than push it to all clients
let recoveredPushData: IPushDataExecutionFinished | undefined = undefined;
if (receivedData.type === 'executionRecovered') {
const recoveredExecutionId = receivedData.data?.executionId;
const isWorkflowRunning = uiStore.isActionActive('workflowRunning');
if (isWorkflowRunning && workflowsStore.activeExecutionId === recoveredExecutionId) {
// pull execution data for the recovered execution from the server
const executionData = await workflowsStore.fetchExecutionDataById(
workflowsStore.activeExecutionId,
);
if (executionData?.data) {
// data comes in as 'flatten' object, so we need to parse it
executionData.data = parse(executionData.data as unknown as string) as IRunExecutionData;
const iRunExecutionData: IRunExecutionData = {
startData: executionData.data?.startData,
resultData: executionData.data?.resultData ?? { runData: {} },
executionData: executionData.data?.executionData,
};
if (workflowsStore.workflowExecutionData?.workflowId === executionData.workflowId) {
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
iRunExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
const iRun: IRun = {
data: iRunExecutionData,
finished: executionData.finished,
mode: executionData.mode,
waitTill: executionData.data?.waitTill,
startedAt: executionData.startedAt,
stoppedAt: executionData.stoppedAt,
status: 'crashed',
};
if (executionData.data) {
recoveredPushData = {
executionId: executionData.id,
data: iRun,
};
}
}
}
}
if (
receivedData.type === 'workflowFailedToActivate' &&
workflowsStore.workflowId === receivedData.data.workflowId
) {
workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
workflowsStore.setActive(false);
toast.showError(
new Error(receivedData.data.errorMessage),
i18n.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
return true;
}
if (receivedData.type === 'workflowActivated') {
workflowsStore.setWorkflowActive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'workflowDeactivated') {
workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
// The workflow finished executing
let pushData: IPushDataExecutionFinished;
if (receivedData.type === 'executionRecovered' && recoveredPushData !== undefined) {
pushData = recoveredPushData;
} else {
pushData = receivedData.data as IPushDataExecutionFinished;
}
const { activeExecutionId } = workflowsStore;
if (activeExecutionId === pushData.executionId) {
const activeRunData = workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (
pushData.data.data.resultData.runData[key]?.[0]?.data?.main?.[0]?.[0]?.json
?.isArtificialRecoveredEventItem === true &&
activeRunData[key].length > 0
)
pushData.data.data.resultData.runData[key] = activeRunData[key];
}
}
workflowsStore.finishActiveExecution(pushData);
}
if (!uiStore.isActionActive('workflowRunning')) {
// No workflow is running so ignore the messages
return false;
}
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(event as unknown as IPushData, retryAttempts);
}
return false;
}
const runDataExecuted = pushData.data;
let runDataExecutedErrorMessage = getExecutionError(runDataExecuted.data);
if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = i18n.baseText('pushConnection.executionFailed.message');
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = i18n.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
},
);
}
const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
codeNodeEditorEventBus.emit('error-line-number', lineNumber || 'final');
const workflow = workflowHelpers.getCurrentWorkflow();
if (runDataExecuted.waitTill !== undefined) {
const workflowSettings = workflowsStore.workflowSettings;
const saveManualExecutions = settingsStore.saveManualExecutions;
const isSavingExecutions =
workflowSettings.saveManualExecutions === undefined
? saveManualExecutions
: workflowSettings.saveManualExecutions;
let action;
if (!isSavingExecutions) {
globalLinkActionsEventBus.emit('registerGlobalLinkAction', {
key: 'open-settings',
action: async () => {
if (workflowsStore.isNewWorkflow) await workflowHelpers.saveAsNewWorkflow();
uiStore.openModal(WORKFLOW_SETTINGS_MODAL_KEY);
},
});
action =
'<a data-action="open-settings">Turn on saving manual executions</a> and run again to see what happened after this node.';
} else {
action = `<a href="/workflow/${workflow.id}/executions/${activeExecutionId}">View the execution</a> to see what happened after this node.`;
}
// Workflow did start but had been put to wait
titleChange.titleSet(workflow.name as string, 'IDLE');
toast.showToast({
title: 'Workflow started waiting',
message: `${action} <a href="https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.wait/" target="_blank">More info</a>`,
type: 'success',
duration: 0,
});
} else if (runDataExecuted.finished !== true) {
titleChange.titleSet(workflow.name as string, 'ERROR');
if (
runDataExecuted.data.resultData.error?.name === 'ExpressionError' &&
(runDataExecuted.data.resultData.error as ExpressionError).context.functionality ===
'pairedItem'
) {
const error = runDataExecuted.data.resultData.error as ExpressionError;
void workflowHelpers.getWorkflowDataToSave().then((workflowData) => {
const eventData: IDataObject = {
caused_by_credential: false,
error_message: error.description,
error_title: error.message,
error_type: error.context.type,
node_graph_string: JSON.stringify(
TelemetryHelpers.generateNodesGraph(
workflowData as IWorkflowBase,
workflowHelpers.getNodeTypes(),
).nodeGraph,
),
workflow_id: workflowsStore.workflowId,
};
if (
error.context.nodeCause &&
['paired_item_no_info', 'paired_item_invalid_info'].includes(
error.context.type as string,
)
) {
const node = workflow.getNode(error.context.nodeCause as string);
if (node) {
eventData.is_pinned = !!workflow.getPinDataOfNode(node.name);
eventData.mode = node.parameters.mode;
eventData.node_type = node.type;
eventData.operation = node.parameters.operation;
eventData.resource = node.parameters.resource;
}
}
telemetry.track('Instance FE emitted paired item error', eventData, {
withPostHog: true,
});
});
}
if (runDataExecuted.data.resultData.error?.name === 'SubworkflowOperationError') {
const error = runDataExecuted.data.resultData.error as SubworkflowOperationError;
workflowsStore.subWorkflowExecutionError = error;
toast.showMessage({
title: error.message,
message: error.description,
type: 'error',
duration: 0,
});
} else if (
runDataExecuted.data.resultData.error?.name === 'NodeOperationError' &&
(runDataExecuted.data.resultData.error as NodeOperationError).functionality ===
'configuration-node'
) {
// If the error is a configuration error of the node itself doesn't get executed so we can't use lastNodeExecuted for the title
let title: string;
const nodeError = runDataExecuted.data.resultData.error as NodeOperationError;
if (nodeError.node.name) {
title = `Error in sub-node ${nodeError.node.name}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message:
(nodeError?.description ?? runDataExecutedErrorMessage) +
i18n.baseText('pushConnection.executionError.openNode', {
interpolate: {
node: nodeError.node.name,
},
}),
type: 'error',
duration: 0,
dangerouslyUseHTMLString: true,
});
} else {
let title: string;
const isManualExecutionCancelled =
runDataExecuted.mode === 'manual' && runDataExecuted.status === 'canceled';
// Do not show the error message if the workflow got canceled manually
if (isManualExecutionCancelled) {
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} else {
if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ${runDataExecuted.data.resultData.lastNodeExecuted}`;
} else {
title = 'Problem executing workflow';
}
toast.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
duration: 0,
dangerouslyUseHTMLString: true,
});
}
}
} else {
// Workflow did execute without a problem
titleChange.titleSet(workflow.name as string, 'IDLE');
const execution = workflowsStore.getWorkflowExecution;
if (execution?.executedNode) {
const node = workflowsStore.getNodeByName(execution.executedNode);
const nodeType = node && nodeTypesStore.getNodeType(node.type, node.typeVersion);
const nodeOutput =
execution &&
execution.executedNode &&
execution.data?.resultData?.runData?.[execution.executedNode];
if (nodeType?.polling && !nodeOutput) {
toast.showMessage({
title: i18n.baseText('pushConnection.pollingNode.dataNotFound', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
message: i18n.baseText('pushConnection.pollingNode.dataNotFound.message', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
type: 'success',
});
} else {
toast.showMessage({
title: i18n.baseText('pushConnection.nodeExecutedSuccessfully'),
type: 'success',
});
}
} else {
toast.showMessage({
title: i18n.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
}
}
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have.
if (workflowsStore.getWorkflowRunData) {
runDataExecuted.data.resultData.runData = workflowsStore.getWorkflowRunData;
}
workflowsStore.executingNode.length = 0;
workflowsStore.setWorkflowExecutionData(runDataExecuted as IExecutionResponse);
uiStore.removeActiveAction('workflowRunning');
// Set the node execution issues on all the nodes which produced an error so that
// it can be displayed in the node-view
nodeHelpers.updateNodesExecutionIssues();
const lastNodeExecuted: string | undefined = runDataExecuted.data.resultData.lastNodeExecuted;
let itemsCount = 0;
if (
lastNodeExecuted &&
runDataExecuted.data.resultData.runData[lastNodeExecuted] &&
!runDataExecutedErrorMessage
) {
itemsCount =
runDataExecuted.data.resultData.runData[lastNodeExecuted][0].data!.main[0]!.length;
}
void useExternalHooks().run('pushConnection.executionFinished', {
itemsCount,
nodeName: runDataExecuted.data.resultData.lastNodeExecuted,
errorMessage: runDataExecutedErrorMessage,
runDataExecutedStartData: runDataExecuted.data.startData,
resultDataError: runDataExecuted.data.resultData.error,
});
if (!runDataExecuted.data.resultData.error) {
segmentStore.trackSuccessfulWorkflowExecution(runDataExecuted);
}
} else if (receivedData.type === 'executionStarted') {
const pushData = receivedData.data;
const executionData: IExecutionsCurrentSummaryExtended = {
id: pushData.executionId,
finished: false,
mode: pushData.mode,
startedAt: pushData.startedAt,
retryOf: pushData.retryOf,
workflowId: pushData.workflowId,
workflowName: pushData.workflowName,
};
workflowsStore.addActiveExecution(executionData);
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
workflowsStore.addNodeExecutionData(pushData);
workflowsStore.removeExecutingNode(pushData.nodeName);
} else if (receivedData.type === 'nodeExecuteBefore') {
// A node started to be executed. Set it as executing.
const pushData = receivedData.data;
workflowsStore.addExecutingNode(pushData.nodeName);
} else if (receivedData.type === 'testWebhookDeleted') {
// A test-webhook was deleted
const pushData = receivedData.data;
if (pushData.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
uiStore.removeActiveAction('workflowRunning');
}
} else if (receivedData.type === 'testWebhookReceived') {
// A test-webhook did get called
const pushData = receivedData.data;
if (pushData.workflowId === workflowsStore.workflowId) {
workflowsStore.executionWaitingForWebhook = false;
workflowsStore.activeExecutionId = pushData.executionId;
}
void processWaitingPushMessages();
} else if (receivedData.type === 'reloadNodeType') {
await nodeTypesStore.getNodeTypes();
await nodeTypesStore.getFullNodesProperties([receivedData.data]);
} else if (receivedData.type === 'removeNodeType') {
const pushData = receivedData.data;
const nodesToBeRemoved: INodeTypeNameVersion[] = [pushData];
// Force reload of all credential types
await credentialsStore.fetchCredentialTypes(false).then(() => {
nodeTypesStore.removeNodeTypes(nodesToBeRemoved as INodeTypeDescription[]);
});
} else if (receivedData.type === 'nodeDescriptionUpdated') {
await nodeTypesStore.getNodeTypes();
await credentialsStore.fetchCredentialTypes(true);
}
return true;
}
function getExecutionError(data: IRunExecutionData | IExecuteContextData) {
const error = data.resultData.error;
let errorMessage: string;
if (data.resultData.lastNodeExecuted && error) {
errorMessage = error.message || error.description;
} else {
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: { error: '!' },
});
if (error?.message) {
let nodeName: string | undefined;
if ('node' in error) {
nodeName = typeof error.node === 'string' ? error.node : error.node!.name;
}
const receivedError = nodeName ? `${nodeName}: ${error.message}` : error.message;
errorMessage = i18n.baseText('pushConnection.executionError', {
interpolate: {
error: `.${i18n.baseText('pushConnection.executionError.details', {
interpolate: {
details: receivedError,
},
})}`,
},
});
}
}
return errorMessage;
}
return {
initialize,
terminate,
pushMessageReceived,
queuePushMessage,
processWaitingPushMessages,
pushMessageQueue,
removeEventListener,
retryTimeout,
};
}

View file

@ -1,637 +0,0 @@
import type {
IExecutionResponse,
IExecutionsCurrentSummaryExtended,
IPushData,
IPushDataExecutionFinished,
} from '@/Interface';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useTitleChange } from '@/composables/useTitleChange';
import { useToast } from '@/composables/useToast';
import type {
ExpressionError,
IDataObject,
INodeTypeNameVersion,
IRun,
IRunExecutionData,
IWorkflowBase,
SubworkflowOperationError,
IExecuteContextData,
NodeOperationError,
} from 'n8n-workflow';
import { TelemetryHelpers } from 'n8n-workflow';
import { WORKFLOW_SETTINGS_MODAL_KEY } from '@/constants';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
import { mapStores } from 'pinia';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useCredentialsStore } from '@/stores/credentials.store';
import { useSettingsStore } from '@/stores/settings.store';
import { parse } from 'flatted';
import { useSegment } from '@/stores/segment.store';
import { defineComponent } from 'vue';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useRouter } from 'vue-router';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
export const pushConnection = defineComponent({
setup() {
const router = useRouter();
const workflowHelpers = useWorkflowHelpers({ router });
const nodeHelpers = useNodeHelpers();
return {
...useTitleChange(),
...useToast(),
nodeHelpers,
workflowHelpers,
};
},
data() {
return {
retryTimeout: null as NodeJS.Timeout | null,
pushMessageQueue: [] as Array<{ message: IPushData; retriesLeft: number }>,
removeEventListener: null as (() => void) | null,
};
},
created() {
this.removeEventListener = this.pushStore.addEventListener((message) => {
void this.pushMessageReceived(message);
});
},
unmounted() {
if (typeof this.removeEventListener === 'function') {
this.removeEventListener();
}
},
computed: {
...mapStores(
useCredentialsStore,
useNodeTypesStore,
useUIStore,
useWorkflowsStore,
useSettingsStore,
useSegment,
useOrchestrationStore,
usePushConnectionStore,
useCollaborationStore,
),
pushRef(): string {
return this.rootStore.pushRef;
},
},
methods: {
/**
* Sometimes the push message is faster as the result from
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*/
queuePushMessage(event: IPushData, retryAttempts: number) {
this.pushMessageQueue.push({ message: event, retriesLeft: retryAttempts });
if (this.retryTimeout === null) {
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 20);
}
},
/**
* Process the push messages which are waiting in the queue
*/
async processWaitingPushMessages() {
if (this.retryTimeout !== null) {
clearTimeout(this.retryTimeout);
this.retryTimeout = null;
}
const queueLength = this.pushMessageQueue.length;
for (let i = 0; i < queueLength; i++) {
const messageData = this.pushMessageQueue.shift();
const result = await this.pushMessageReceived(messageData!.message, true);
if (!result) {
// Was not successful
messageData!.retriesLeft -= 1;
if (messageData!.retriesLeft > 0) {
// If still retries are left add it back and stop execution
this.pushMessageQueue.unshift(messageData!);
}
break;
}
}
if (this.pushMessageQueue.length !== 0 && this.retryTimeout === null) {
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 25);
}
},
/**
* Process a newly received message
*/
async pushMessageReceived(receivedData: IPushData, isRetry?: boolean): Promise<boolean> {
const retryAttempts = 5;
if (receivedData.type === 'sendWorkerStatusMessage') {
const pushData = receivedData.data;
this.orchestrationManagerStore.updateWorkerStatus(pushData.status);
return true;
}
if (receivedData.type === 'sendConsoleMessage') {
const pushData = receivedData.data;
console.log(pushData.source, ...pushData.messages);
return true;
}
if (
!['testWebhookReceived'].includes(receivedData.type) &&
isRetry !== true &&
this.pushMessageQueue.length
) {
// If there are already messages in the queue add the new one that all of them
// get executed in order
this.queuePushMessage(receivedData, retryAttempts);
return false;
}
if (receivedData.type === 'nodeExecuteAfter' || receivedData.type === 'nodeExecuteBefore') {
if (!this.uiStore.isActionActive('workflowRunning')) {
// No workflow is running so ignore the messages
return false;
}
const pushData = receivedData.data;
if (this.workflowsStore.activeExecutionId !== pushData.executionId) {
// The data is not for the currently active execution or
// we do not have the execution id yet.
if (isRetry !== true) {
this.queuePushMessage(event, retryAttempts);
}
return false;
}
}
// recovered execution data is handled like executionFinished data, however for security reasons
// we need to fetch the data from the server again rather than push it to all clients
let recoveredPushData: IPushDataExecutionFinished | undefined = undefined;
if (receivedData.type === 'executionRecovered') {
const recoveredExecutionId = receivedData.data?.executionId;
const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning');
if (isWorkflowRunning && this.workflowsStore.activeExecutionId === recoveredExecutionId) {
// pull execution data for the recovered execution from the server
const executionData = await this.workflowsStore.fetchExecutionDataById(
this.workflowsStore.activeExecutionId,
);
if (executionData?.data) {
// data comes in as 'flatten' object, so we need to parse it
executionData.data = parse(
executionData.data as unknown as string,
) as IRunExecutionData;
const iRunExecutionData: IRunExecutionData = {
startData: executionData.data?.startData,
resultData: executionData.data?.resultData ?? { runData: {} },
executionData: executionData.data?.executionData,
};
if (
this.workflowsStore.workflowExecutionData?.workflowId === executionData.workflowId
) {
const activeRunData =
this.workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
iRunExecutionData.resultData.runData[key] = activeRunData[key];
}
}
}
const iRun: IRun = {
data: iRunExecutionData,
finished: executionData.finished,
mode: executionData.mode,
waitTill: executionData.data?.waitTill,
startedAt: executionData.startedAt,
stoppedAt: executionData.stoppedAt,
status: 'crashed',
};
if (executionData.data) {
recoveredPushData = {
executionId: executionData.id,
data: iRun,
};
}
}
}
}
if (
receivedData.type === 'workflowFailedToActivate' &&
this.workflowsStore.workflowId === receivedData.data.workflowId
) {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
this.workflowsStore.setActive(false);
this.showError(
new Error(receivedData.data.errorMessage),
this.$locale.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
return true;
}
if (receivedData.type === 'workflowActivated') {
this.workflowsStore.setWorkflowActive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'workflowDeactivated') {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
// The workflow finished executing
let pushData: IPushDataExecutionFinished;
if (receivedData.type === 'executionRecovered' && recoveredPushData !== undefined) {
pushData = recoveredPushData;
} else {
pushData = receivedData.data as IPushDataExecutionFinished;
}
const { activeExecutionId } = this.workflowsStore;
if (activeExecutionId === pushData.executionId) {
const activeRunData =
this.workflowsStore.workflowExecutionData?.data?.resultData?.runData;
if (activeRunData) {
for (const key of Object.keys(activeRunData)) {
if (
pushData.data.data.resultData.runData[key]?.[0]?.data?.main?.[0]?.[0]?.json
?.isArtificialRecoveredEventItem === true &&
activeRunData[key].length > 0
)
pushData.data.data.resultData.runData[key] = activeRunData[key];
}
}
this.workflowsStore.finishActiveExecution(pushData);
}
if (!this.uiStore.isActionActive('workflowRunning')) {
// No workflow is running so ignore the messages
return false;
}
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
this.queuePushMessage(event, retryAttempts);
}
return false;
}
const runDataExecuted = pushData.data;
let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);
if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = this.$locale.baseText(
'pushConnection.executionFailed.message',
);
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = this.$locale.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
},
);
}
const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
codeNodeEditorEventBus.emit('error-line-number', lineNumber || 'final');
const workflow = this.workflowHelpers.getCurrentWorkflow();
if (runDataExecuted.waitTill !== undefined) {
const workflowSettings = this.workflowsStore.workflowSettings;
const saveManualExecutions = this.settingsStore.saveManualExecutions;
const isSavingExecutions =
workflowSettings.saveManualExecutions === undefined
? saveManualExecutions
: workflowSettings.saveManualExecutions;
let action;
if (!isSavingExecutions) {
globalLinkActionsEventBus.emit('registerGlobalLinkAction', {
key: 'open-settings',
action: async () => {
if (this.workflowsStore.isNewWorkflow)
await this.workflowHelpers.saveAsNewWorkflow();
this.uiStore.openModal(WORKFLOW_SETTINGS_MODAL_KEY);
},
});
action =
'<a data-action="open-settings">Turn on saving manual executions</a> and run again to see what happened after this node.';
} else {
action = `<a href="/workflow/${workflow.id}/executions/${activeExecutionId}">View the execution</a> to see what happened after this node.`;
}
// Workflow did start but had been put to wait
this.titleSet(workflow.name as string, 'IDLE');
this.showToast({
title: 'Workflow started waiting',
message: `${action} <a href="https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.wait/" target="_blank">More info</a>`,
type: 'success',
duration: 0,
});
} else if (runDataExecuted.finished !== true) {
this.titleSet(workflow.name as string, 'ERROR');
if (
runDataExecuted.data.resultData.error?.name === 'ExpressionError' &&
(runDataExecuted.data.resultData.error as ExpressionError).context.functionality ===
'pairedItem'
) {
const error = runDataExecuted.data.resultData.error as ExpressionError;
void this.workflowHelpers.getWorkflowDataToSave().then((workflowData) => {
const eventData: IDataObject = {
caused_by_credential: false,
error_message: error.description,
error_title: error.message,
error_type: error.context.type,
node_graph_string: JSON.stringify(
TelemetryHelpers.generateNodesGraph(
workflowData as IWorkflowBase,
this.workflowHelpers.getNodeTypes(),
).nodeGraph,
),
workflow_id: this.workflowsStore.workflowId,
};
if (
error.context.nodeCause &&
['paired_item_no_info', 'paired_item_invalid_info'].includes(
error.context.type as string,
)
) {
const node = workflow.getNode(error.context.nodeCause as string);
if (node) {
eventData.is_pinned = !!workflow.getPinDataOfNode(node.name);
eventData.mode = node.parameters.mode;
eventData.node_type = node.type;
eventData.operation = node.parameters.operation;
eventData.resource = node.parameters.resource;
}
}
this.$telemetry.track('Instance FE emitted paired item error', eventData, {
withPostHog: true,
});
});
}
if (runDataExecuted.data.resultData.error?.name === 'SubworkflowOperationError') {
const error = runDataExecuted.data.resultData.error as SubworkflowOperationError;
this.workflowsStore.subWorkflowExecutionError = error;
this.showMessage({
title: error.message,
message: error.description,
type: 'error',
duration: 0,
});
} else if (
runDataExecuted.data.resultData.error?.name === 'NodeOperationError' &&
(runDataExecuted.data.resultData.error as NodeOperationError).functionality ===
'configuration-node'
) {
// If the error is a configuration error of the node itself doesn't get executed so we can't use lastNodeExecuted for the title
let title: string;
const nodeError = runDataExecuted.data.resultData.error as NodeOperationError;
if (nodeError.node.name) {
title = `Error in sub-node ${nodeError.node.name}`;
} else {
title = 'Problem executing workflow';
}
this.showMessage({
title,
message:
(nodeError?.description ?? runDataExecutedErrorMessage) +
this.$locale.baseText('pushConnection.executionError.openNode', {
interpolate: {
node: nodeError.node.name,
},
}),
type: 'error',
duration: 0,
dangerouslyUseHTMLString: true,
});
} else {
let title: string;
const isManualExecutionCancelled =
runDataExecuted.mode === 'manual' && runDataExecuted.status === 'canceled';
// Do not show the error message if the workflow got canceled manually
if (isManualExecutionCancelled) {
this.showMessage({
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} else {
if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ${runDataExecuted.data.resultData.lastNodeExecuted}`;
} else {
title = 'Problem executing workflow';
}
this.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
duration: 0,
dangerouslyUseHTMLString: true,
});
}
}
} else {
// Workflow did execute without a problem
this.titleSet(workflow.name as string, 'IDLE');
const execution = this.workflowsStore.getWorkflowExecution;
if (execution?.executedNode) {
const node = this.workflowsStore.getNodeByName(execution.executedNode);
const nodeType = node && this.nodeTypesStore.getNodeType(node.type, node.typeVersion);
const nodeOutput =
execution &&
execution.executedNode &&
execution.data?.resultData?.runData?.[execution.executedNode];
if (nodeType?.polling && !nodeOutput) {
this.showMessage({
title: this.$locale.baseText('pushConnection.pollingNode.dataNotFound', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
message: this.$locale.baseText('pushConnection.pollingNode.dataNotFound.message', {
interpolate: {
service: getTriggerNodeServiceName(nodeType),
},
}),
type: 'success',
});
} else {
this.showMessage({
title: this.$locale.baseText('pushConnection.nodeExecutedSuccessfully'),
type: 'success',
});
}
} else {
this.showMessage({
title: this.$locale.baseText('pushConnection.workflowExecutedSuccessfully'),
type: 'success',
});
}
}
// It does not push the runData as it got already pushed with each
// node that did finish. For that reason copy in here the data
// which we already have.
if (this.workflowsStore.getWorkflowRunData) {
runDataExecuted.data.resultData.runData = this.workflowsStore.getWorkflowRunData;
}
this.workflowsStore.executingNode.length = 0;
this.workflowsStore.setWorkflowExecutionData(runDataExecuted as IExecutionResponse);
this.uiStore.removeActiveAction('workflowRunning');
// Set the node execution issues on all the nodes which produced an error so that
// it can be displayed in the node-view
this.nodeHelpers.updateNodesExecutionIssues();
const lastNodeExecuted: string | undefined =
runDataExecuted.data.resultData.lastNodeExecuted;
let itemsCount = 0;
if (
lastNodeExecuted &&
runDataExecuted.data.resultData.runData[lastNodeExecuted] &&
!runDataExecutedErrorMessage
) {
itemsCount =
runDataExecuted.data.resultData.runData[lastNodeExecuted][0].data!.main[0]!.length;
}
void useExternalHooks().run('pushConnection.executionFinished', {
itemsCount,
nodeName: runDataExecuted.data.resultData.lastNodeExecuted,
errorMessage: runDataExecutedErrorMessage,
runDataExecutedStartData: runDataExecuted.data.startData,
resultDataError: runDataExecuted.data.resultData.error,
});
if (!runDataExecuted.data.resultData.error) {
this.segmentStore.trackSuccessfulWorkflowExecution(runDataExecuted);
}
} else if (receivedData.type === 'executionStarted') {
const pushData = receivedData.data;
const executionData: IExecutionsCurrentSummaryExtended = {
id: pushData.executionId,
finished: false,
mode: pushData.mode,
startedAt: pushData.startedAt,
retryOf: pushData.retryOf,
workflowId: pushData.workflowId,
workflowName: pushData.workflowName,
};
this.workflowsStore.addActiveExecution(executionData);
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
this.workflowsStore.addNodeExecutionData(pushData);
this.workflowsStore.removeExecutingNode(pushData.nodeName);
} else if (receivedData.type === 'nodeExecuteBefore') {
// A node started to be executed. Set it as executing.
const pushData = receivedData.data;
this.workflowsStore.addExecutingNode(pushData.nodeName);
} else if (receivedData.type === 'testWebhookDeleted') {
// A test-webhook was deleted
const pushData = receivedData.data;
if (pushData.workflowId === this.workflowsStore.workflowId) {
this.workflowsStore.executionWaitingForWebhook = false;
this.uiStore.removeActiveAction('workflowRunning');
}
} else if (receivedData.type === 'testWebhookReceived') {
// A test-webhook did get called
const pushData = receivedData.data;
if (pushData.workflowId === this.workflowsStore.workflowId) {
this.workflowsStore.executionWaitingForWebhook = false;
this.workflowsStore.activeExecutionId = pushData.executionId;
}
void this.processWaitingPushMessages();
} else if (receivedData.type === 'reloadNodeType') {
await this.nodeTypesStore.getNodeTypes();
await this.nodeTypesStore.getFullNodesProperties([receivedData.data]);
} else if (receivedData.type === 'removeNodeType') {
const pushData = receivedData.data;
const nodesToBeRemoved: INodeTypeNameVersion[] = [pushData];
// Force reload of all credential types
await this.credentialsStore.fetchCredentialTypes(false).then(() => {
this.nodeTypesStore.removeNodeTypes(nodesToBeRemoved);
});
} else if (receivedData.type === 'nodeDescriptionUpdated') {
await this.nodeTypesStore.getNodeTypes();
await this.credentialsStore.fetchCredentialTypes(true);
}
return true;
},
getExecutionError(data: IRunExecutionData | IExecuteContextData) {
const error = data.resultData.error;
let errorMessage: string;
if (data.resultData.lastNodeExecuted && error) {
errorMessage = error.message || error.description;
} else {
errorMessage = this.$locale.baseText('pushConnection.executionError', {
interpolate: { error: '!' },
});
if (error?.message) {
let nodeName: string | undefined;
if ('node' in error) {
nodeName = typeof error.node === 'string' ? error.node : error.node!.name;
}
const receivedError = nodeName ? `${nodeName}: ${error.message}` : error.message;
errorMessage = this.$locale.baseText('pushConnection.executionError', {
interpolate: {
error: `.${this.$locale.baseText('pushConnection.executionError.details', {
interpolate: {
details: receivedError,
},
})}`,
},
});
}
}
return errorMessage;
},
},
});

View file

@ -14,37 +14,48 @@ export const useCollaborationStore = defineStore(STORES.COLLABORATION, () => {
const workflowStore = useWorkflowsStore();
const usersForWorkflows = ref<ActiveUsersForWorkflows>({});
const pushStoreEventListenerRemovalFn = ref<(() => void) | null>(null);
pushStore.addEventListener((event) => {
if (event.type === 'activeWorkflowUsersChanged') {
const activeWorkflowId = workflowStore.workflowId;
if (event.data.workflowId === activeWorkflowId) {
usersForWorkflows.value[activeWorkflowId] = event.data.activeUsers;
}
}
const getUsersForCurrentWorkflow = computed(() => {
return usersForWorkflows.value[workflowStore.workflowId] ?? [];
});
const workflowUsersUpdated = (data: ActiveUsersForWorkflows) => {
usersForWorkflows.value = data;
};
function initialize() {
pushStoreEventListenerRemovalFn.value = pushStore.addEventListener((event) => {
if (event.type === 'activeWorkflowUsersChanged') {
const activeWorkflowId = workflowStore.workflowId;
if (event.data.workflowId === activeWorkflowId) {
usersForWorkflows.value[activeWorkflowId] = event.data.activeUsers;
}
}
});
}
const notifyWorkflowOpened = (workflowId: string) => {
function terminate() {
if (typeof pushStoreEventListenerRemovalFn.value === 'function') {
pushStoreEventListenerRemovalFn.value();
}
}
function workflowUsersUpdated(data: ActiveUsersForWorkflows) {
usersForWorkflows.value = data;
}
function notifyWorkflowOpened(workflowId: string) {
pushStore.send({
type: 'workflowOpened',
workflowId,
});
};
}
const notifyWorkflowClosed = (workflowId: string) => {
function notifyWorkflowClosed(workflowId: string) {
pushStore.send({ type: 'workflowClosed', workflowId });
};
const getUsersForCurrentWorkflow = computed(() => {
return usersForWorkflows.value[workflowStore.workflowId];
});
}
return {
usersForWorkflows,
initialize,
terminate,
notifyWorkflowOpened,
notifyWorkflowClosed,
workflowUsersUpdated,

View file

@ -154,6 +154,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
pushRef,
pushSource,
isConnectionOpen,
onMessageReceivedHandlers,
addEventListener,
pushConnect,
pushDisconnect,

View file

@ -1 +1,2 @@
export * from './externalHooks';
export * from './pushConnection';

View file

@ -0,0 +1,6 @@
import type { IPushData } from '@/Interface';
export type PushMessageQueueItem = {
message: IPushData;
retriesLeft: number;
};

View file

@ -806,6 +806,7 @@ export default defineComponent({
this.resetWorkspace();
this.canvasStore.initInstance(this.nodeViewRef as HTMLElement);
this.titleReset();
window.addEventListener('message', this.onPostMessageReceived);
this.clipboard.onPaste.value = this.onClipboardPasteEvent;
@ -980,6 +981,7 @@ export default defineComponent({
if (!this.isDemo) {
this.pushStore.pushConnect();
}
this.collaborationStore.initialize();
},
beforeUnmount() {
// Make sure the event listeners get removed again else we
@ -993,6 +995,7 @@ export default defineComponent({
if (!this.isDemo) {
this.pushStore.pushDisconnect();
}
this.collaborationStore.terminate();
this.resetWorkspace();
this.instance.unbind();

View file

@ -60,7 +60,6 @@ import {
} from '@/constants';
import CommunityPackageCard from '@/components/CommunityPackageCard.vue';
import { useToast } from '@/composables/useToast';
import { pushConnection } from '@/mixins/pushConnection';
import type { PublicInstalledPackage } from 'n8n-workflow';
import { useCommunityNodesStore } from '@/stores/communityNodes.store';
@ -69,6 +68,9 @@ import { mapStores } from 'pinia';
import { useSettingsStore } from '@/stores/settings.store';
import { defineComponent } from 'vue';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useRouter } from 'vue-router';
import { usePushConnection } from '@/composables/usePushConnection';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
const PACKAGE_COUNT_THRESHOLD = 31;
@ -77,15 +79,15 @@ export default defineComponent({
components: {
CommunityPackageCard,
},
mixins: [pushConnection],
setup(props, ctx) {
setup() {
const router = useRouter();
const pushConnection = usePushConnection({ router });
const externalHooks = useExternalHooks();
return {
externalHooks,
...useToast(),
// eslint-disable-next-line @typescript-eslint/no-misused-promises
...pushConnection.setup?.(props, ctx),
pushConnection,
};
},
data() {
@ -93,10 +95,12 @@ export default defineComponent({
loading: false,
};
},
async mounted() {
beforeMount() {
this.pushConnection.initialize();
// The push connection is needed here to receive `reloadNodeType` and `removeNodeType` events when community nodes are installed, updated, or removed.
this.pushStore.pushConnect();
},
async mounted() {
try {
this.loading = true;
await this.communityNodesStore.fetchInstalledPackages();
@ -142,9 +146,10 @@ export default defineComponent({
},
beforeUnmount() {
this.pushStore.pushDisconnect();
this.pushConnection.terminate();
},
computed: {
...mapStores(useCommunityNodesStore, useSettingsStore, useUIStore),
...mapStores(useCommunityNodesStore, useSettingsStore, useUIStore, usePushConnectionStore),
getEmptyStateDescription(): string {
const packageCount = this.communityNodesStore.availablePackageCount;