fix(editor): Fix issues with push connect reconnection (#13085)

This commit is contained in:
Tomi Turtiainen 2025-02-06 16:39:18 +02:00 committed by GitHub
parent e59d9830bf
commit fff98b16bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 907 additions and 137 deletions

View file

@ -889,7 +889,6 @@ export interface RootState {
endpointWebhook: string;
endpointWebhookTest: string;
endpointWebhookWaiting: string;
pushConnectionActive: boolean;
timezone: string;
executionTimeout: number;
maxExecutionTimeout: number;

View file

@ -38,6 +38,13 @@ vi.mock('@/composables/useToast', () => {
},
};
});
vi.mock('@/stores/pushConnection.store', () => ({
usePushConnectionStore: vi.fn().mockReturnValue({
isConnected: true,
}),
}));
// Test data
const mockNodes: INodeUi[] = [
{

View file

@ -11,6 +11,12 @@ vi.mock('vue-router', () => ({
RouterLink: vi.fn(),
}));
vi.mock('@/stores/pushConnection.store', () => ({
usePushConnectionStore: vi.fn().mockReturnValue({
isConnected: true,
}),
}));
const initialState = {
[STORES.SETTINGS]: {
settings: {

View file

@ -0,0 +1,58 @@
import { createComponentRenderer } from '@/__tests__/render';
import PushConnectionTracker from '@/components/PushConnectionTracker.vue';
import { STORES } from '@/constants';
import { createTestingPinia } from '@pinia/testing';
import { setActivePinia } from 'pinia';
let isConnected = true;
let isConnectionRequested = true;
vi.mock('@/stores/pushConnection.store', () => {
return {
usePushConnectionStore: vi.fn(() => ({
isConnected,
isConnectionRequested,
})),
};
});
describe('PushConnectionTracker', () => {
const render = () => {
const pinia = createTestingPinia({
stubActions: false,
initialState: {
[STORES.PUSH]: {
isConnected,
isConnectionRequested,
},
},
});
setActivePinia(pinia);
return createComponentRenderer(PushConnectionTracker)();
};
it('should not render error when connected and connection requested', () => {
isConnected = true;
isConnectionRequested = true;
const { container } = render();
expect(container).toMatchSnapshot();
});
it('should render error when disconnected and connection requested', () => {
isConnected = false;
isConnectionRequested = true;
const { container } = render();
expect(container).toMatchSnapshot();
});
it('should not render error when connected and connection not requested', () => {
isConnected = true;
isConnectionRequested = false;
const { container } = render();
expect(container).toMatchSnapshot();
});
});

View file

@ -1,16 +1,23 @@
<script setup lang="ts">
import { computed } from 'vue';
import { useRootStore } from '@/stores/root.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useI18n } from '@/composables/useI18n';
import { computed } from 'vue';
const rootStore = useRootStore();
const pushConnectionActive = computed(() => rootStore.pushConnectionActive);
const pushConnectionStore = usePushConnectionStore();
const i18n = useI18n();
const showConnectionLostError = computed(() => {
// Only show the connection lost error if the connection has been requested
// and the connection is not currently connected. This is to prevent the
// connection error from being shown e.g. when user navigates directly to
// the workflow executions list, which doesn't open the connection.
return pushConnectionStore.isConnectionRequested && !pushConnectionStore.isConnected;
});
</script>
<template>
<span>
<div v-if="!pushConnectionActive" class="push-connection-lost primary-color">
<div v-if="showConnectionLostError" class="push-connection-lost primary-color">
<n8n-tooltip placement="bottom-end">
<template #content>
<div v-n8n-html="i18n.baseText('pushConnectionTracker.cannotConnectToServer')"></div>

View file

@ -0,0 +1,55 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
exports[`PushConnectionTracker > should not render error when connected and connection not requested 1`] = `
<div>
<span>
</span>
</div>
`;
exports[`PushConnectionTracker > should not render error when connected and connection requested 1`] = `
<div>
<span>
</span>
</div>
`;
exports[`PushConnectionTracker > should render error when disconnected and connection requested 1`] = `
<div>
<span>
<div
class="push-connection-lost primary-color"
>
<span
class="el-tooltip__trigger"
>
<svg
aria-hidden="true"
class="svg-inline--fa fa-exclamation-triangle fa-w-18"
data-icon="exclamation-triangle"
data-prefix="fas"
focusable="false"
role="img"
viewBox="0 0 576 512"
xmlns="http://www.w3.org/2000/svg"
>
<path
class=""
d="M569.517 440.013C587.975 472.007 564.806 512 527.94 512H48.054c-36.937 0-59.999-40.055-41.577-71.987L246.423 23.985c18.467-32.009 64.72-31.951 83.154 0l239.94 416.028zM288 354c-25.405 0-46 20.595-46 46s20.595 46 46 46 46-20.595 46-46-20.595-46-46-46zm-43.673-165.346l7.418 136c.347 6.364 5.609 11.346 11.982 11.346h48.546c6.373 0 11.635-4.982 11.982-11.346l7.418-136c.375-6.874-5.098-12.654-11.982-12.654h-63.383c-6.884 0-12.356 5.78-11.981 12.654z"
fill="currentColor"
/>
</svg>
  Connection lost
</span>
<!--teleport start-->
<!--teleport end-->
</div>
</span>
</div>
`;

View file

@ -11,7 +11,6 @@ import {
type ITaskData,
} from 'n8n-workflow';
import { useRootStore } from '@/stores/root.store';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import type { IStartRunData, IWorkflowData } from '@/Interface';
import { useWorkflowsStore } from '@/stores/workflows.store';
@ -21,6 +20,7 @@ import { useToast } from './useToast';
import { useI18n } from '@/composables/useI18n';
import { captor, mock } from 'vitest-mock-extended';
import { useSettingsStore } from '@/stores/settings.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
@ -40,6 +40,12 @@ vi.mock('@/stores/workflows.store', () => ({
}),
}));
vi.mock('@/stores/pushConnection.store', () => ({
usePushConnectionStore: vi.fn().mockReturnValue({
isConnected: true,
}),
}));
vi.mock('@/composables/useTelemetry', () => ({
useTelemetry: vi.fn().mockReturnValue({ track: vi.fn() }),
}));
@ -90,7 +96,7 @@ vi.mock('vue-router', async (importOriginal) => {
});
describe('useRunWorkflow({ router })', () => {
let rootStore: ReturnType<typeof useRootStore>;
let pushConnectionStore: ReturnType<typeof usePushConnectionStore>;
let uiStore: ReturnType<typeof useUIStore>;
let workflowsStore: ReturnType<typeof useWorkflowsStore>;
let router: ReturnType<typeof useRouter>;
@ -102,7 +108,7 @@ describe('useRunWorkflow({ router })', () => {
setActivePinia(pinia);
rootStore = useRootStore();
pushConnectionStore = usePushConnectionStore();
uiStore = useUIStore();
workflowsStore = useWorkflowsStore();
settingsStore = useSettingsStore();
@ -120,7 +126,7 @@ describe('useRunWorkflow({ router })', () => {
it('should throw an error if push connection is not active', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
rootStore.setPushConnectionInactive();
vi.mocked(pushConnectionStore).isConnected = false;
await expect(runWorkflowApi({} as IStartRunData)).rejects.toThrow(
'workflowRun.noActiveConnectionToTheServer',
@ -130,7 +136,7 @@ describe('useRunWorkflow({ router })', () => {
it('should successfully run a workflow', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
rootStore.setPushConnectionActive();
vi.mocked(pushConnectionStore).isConnected = true;
const mockResponse = { executionId: '123', waitingForWebhook: false };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
@ -161,7 +167,7 @@ describe('useRunWorkflow({ router })', () => {
it('should handle workflow run failure', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
rootStore.setPushConnectionActive();
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockRejectedValue(new Error('Failed to run workflow'));
await expect(runWorkflowApi({} as IStartRunData)).rejects.toThrow('Failed to run workflow');
@ -171,7 +177,7 @@ describe('useRunWorkflow({ router })', () => {
it('should set waitingForWebhook if response indicates waiting', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
rootStore.setPushConnectionActive();
vi.mocked(pushConnectionStore).isConnected = true;
const mockResponse = { executionId: '123', waitingForWebhook: true };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
@ -210,6 +216,7 @@ describe('useRunWorkflow({ router })', () => {
type: 'error',
});
});
it('should execute workflow has pin data and is active with single webhook trigger', async () => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
@ -295,7 +302,7 @@ describe('useRunWorkflow({ router })', () => {
const mockExecutionResponse = { executionId: '123' };
const { runWorkflow } = useRunWorkflow({ router });
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
@ -405,7 +412,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 1;
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
@ -436,7 +443,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 2;
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
@ -465,7 +472,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 2;
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);

View file

@ -36,6 +36,7 @@ import { useI18n } from '@/composables/useI18n';
import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store';
import { useSettingsStore } from '@/stores/settings.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
const getDirtyNodeNames = (
runData: IRunData,
@ -63,12 +64,13 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
const toast = useToast();
const rootStore = useRootStore();
const pushConnectionStore = usePushConnectionStore();
const uiStore = useUIStore();
const workflowsStore = useWorkflowsStore();
const executionsStore = useExecutionsStore();
// Starts to execute a workflow on server
async function runWorkflowApi(runData: IStartRunData): Promise<IExecutionPushResponse> {
if (!rootStore.pushConnectionActive) {
if (!pushConnectionStore.isConnected) {
// Do not start if the connection to server is not active
// because then it can not receive the data as it executes.
throw new Error(i18n.baseText('workflowRun.noActiveConnectionToTheServer'));

View file

@ -0,0 +1,20 @@
/** Mocked EventSource class to help testing */
export class MockEventSource extends EventTarget {
constructor(public url: string) {
super();
}
simulateConnectionOpen() {
this.dispatchEvent(new Event('open'));
}
simulateConnectionClose() {
this.dispatchEvent(new Event('close'));
}
simulateMessageEvent(data: string) {
this.dispatchEvent(new MessageEvent('message', { data }));
}
close = vi.fn();
}

View file

@ -0,0 +1,32 @@
import { WebSocketState } from '@/push-connection/useWebSocketClient';
/** Mocked WebSocket class to help testing */
export class MockWebSocket extends EventTarget {
readyState: number = WebSocketState.CONNECTING;
constructor(public url: string) {
super();
}
simulateConnectionOpen() {
this.dispatchEvent(new Event('open'));
this.readyState = WebSocketState.OPEN;
}
simulateConnectionClose(code: number) {
this.dispatchEvent(new CloseEvent('close', { code }));
this.readyState = WebSocketState.CLOSED;
}
simulateMessageEvent(data: string) {
this.dispatchEvent(new MessageEvent('message', { data }));
}
dispatchErrorEvent() {
this.dispatchEvent(new Event('error'));
}
send = vi.fn();
close = vi.fn();
}

View file

@ -0,0 +1,153 @@
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import { useEventSourceClient } from '../useEventSourceClient';
import { MockEventSource } from './mockEventSource';
describe('useEventSourceClient', () => {
let mockEventSource: MockEventSource;
beforeEach(() => {
mockEventSource = new MockEventSource('http://test.com');
// @ts-expect-error - mock EventSource
global.EventSource = vi.fn(() => mockEventSource);
vi.useFakeTimers();
});
afterEach(() => {
vi.clearAllTimers();
vi.clearAllMocks();
});
test('should create EventSource connection with provided URL', () => {
const url = 'http://test.com';
const onMessage = vi.fn();
const { connect } = useEventSourceClient({ url, onMessage });
connect();
expect(EventSource).toHaveBeenCalledWith(url, { withCredentials: true });
});
test('should update connection status on successful connection', () => {
const { connect, isConnected } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
mockEventSource.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
});
test('should handle incoming messages', () => {
const onMessage = vi.fn();
const { connect } = useEventSourceClient({ url: 'http://test.com', onMessage });
connect();
mockEventSource.simulateMessageEvent('test data');
expect(onMessage).toHaveBeenCalledWith('test data');
});
test('should handle disconnection', () => {
const { connect, disconnect, isConnected } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
// Simulate successful connection
mockEventSource.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
disconnect();
expect(isConnected.value).toBe(false);
expect(mockEventSource.close).toHaveBeenCalled();
});
test('should handle connection loss', () => {
const { connect, isConnected } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
expect(EventSource).toHaveBeenCalledTimes(1);
// Simulate successful connection
mockEventSource.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
// Simulate connection loss
mockEventSource.simulateConnectionClose();
expect(isConnected.value).toBe(false);
// Advance timer to trigger reconnect
vi.advanceTimersByTime(1_000);
expect(EventSource).toHaveBeenCalledTimes(2);
});
test('sendMessage should be a noop function', () => {
const { connect, sendMessage } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
// Simulate successful connection
mockEventSource.simulateConnectionOpen();
const message = 'test message';
// Should not throw error and should do nothing
expect(() => sendMessage(message)).not.toThrow();
});
test('should attempt reconnection with increasing delays', () => {
const { connect } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
mockEventSource.simulateConnectionOpen();
mockEventSource.simulateConnectionClose();
// First reconnection attempt after 1 second
vi.advanceTimersByTime(1_000);
expect(EventSource).toHaveBeenCalledTimes(2);
mockEventSource.simulateConnectionClose();
// Second reconnection attempt after 2 seconds
vi.advanceTimersByTime(2_000);
expect(EventSource).toHaveBeenCalledTimes(3);
});
test('should reset connection attempts on successful connection', () => {
const { connect } = useEventSourceClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
// First connection attempt
mockEventSource.simulateConnectionOpen();
mockEventSource.simulateConnectionClose();
// First reconnection attempt
vi.advanceTimersByTime(1_000);
expect(EventSource).toHaveBeenCalledTimes(2);
// Successful connection
mockEventSource.simulateConnectionOpen();
// Connection lost again
mockEventSource.simulateConnectionClose();
// Should start with initial delay again
vi.advanceTimersByTime(1_000);
expect(EventSource).toHaveBeenCalledTimes(3);
});
});

View file

@ -0,0 +1,88 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { useHeartbeat } from '../useHeartbeat';
describe('useHeartbeat', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.clearAllTimers();
});
it('should start heartbeat and call onHeartbeat at specified intervals', () => {
const onHeartbeat = vi.fn();
const interval = 1000;
const heartbeat = useHeartbeat({ interval, onHeartbeat });
heartbeat.startHeartbeat();
// Initially, the callback should not be called
expect(onHeartbeat).not.toHaveBeenCalled();
// Advance timer by interval
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(1);
// Advance timer by another interval
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(2);
});
it('should stop heartbeat when stopHeartbeat is called', () => {
const onHeartbeat = vi.fn();
const interval = 1000;
const heartbeat = useHeartbeat({ interval, onHeartbeat });
heartbeat.startHeartbeat();
// Advance timer by interval
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(1);
// Stop the heartbeat
heartbeat.stopHeartbeat();
// Advance timer by multiple intervals
vi.advanceTimersByTime(interval * 3);
expect(onHeartbeat).toHaveBeenCalledTimes(1); // Should still be 1
});
it('should be safe to call stopHeartbeat multiple times', () => {
const onHeartbeat = vi.fn();
const interval = 1000;
const heartbeat = useHeartbeat({ interval, onHeartbeat });
heartbeat.startHeartbeat();
// Stop multiple times
heartbeat.stopHeartbeat();
heartbeat.stopHeartbeat();
heartbeat.stopHeartbeat();
vi.advanceTimersByTime(interval * 2);
expect(onHeartbeat).not.toHaveBeenCalled();
});
it('should restart heartbeat after stopping', () => {
const onHeartbeat = vi.fn();
const interval = 1000;
const heartbeat = useHeartbeat({ interval, onHeartbeat });
// First start
heartbeat.startHeartbeat();
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(1);
// Stop
heartbeat.stopHeartbeat();
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(1);
// Restart
heartbeat.startHeartbeat();
vi.advanceTimersByTime(interval);
expect(onHeartbeat).toHaveBeenCalledTimes(2);
});
});

View file

@ -0,0 +1,140 @@
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import { useWebSocketClient } from '../useWebSocketClient';
import { MockWebSocket } from './mockWebSocketClient';
describe('useWebSocketClient', () => {
let mockWebSocket: MockWebSocket;
beforeEach(() => {
mockWebSocket = new MockWebSocket('ws://test.com');
// @ts-expect-error - mock WebSocket
global.WebSocket = vi.fn(() => mockWebSocket);
vi.useFakeTimers();
});
afterEach(() => {
vi.clearAllTimers();
vi.clearAllMocks();
});
test('should create WebSocket connection with provided URL', () => {
const url = 'ws://test.com';
const onMessage = vi.fn();
const { connect } = useWebSocketClient({ url, onMessage });
connect();
expect(WebSocket).toHaveBeenCalledWith(url);
});
test('should update connection status and start heartbeat on successful connection', () => {
const { connect, isConnected } = useWebSocketClient({
url: 'ws://test.com',
onMessage: vi.fn(),
});
connect();
mockWebSocket.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
// Advance timer to trigger heartbeat
vi.advanceTimersByTime(30_000);
expect(mockWebSocket.send).toHaveBeenCalledWith(JSON.stringify({ type: 'heartbeat' }));
});
test('should handle incoming messages', () => {
const onMessage = vi.fn();
const { connect } = useWebSocketClient({ url: 'ws://test.com', onMessage });
connect();
mockWebSocket.simulateMessageEvent('test data');
expect(onMessage).toHaveBeenCalledWith('test data');
});
test('should handle disconnection', () => {
const { connect, disconnect, isConnected } = useWebSocketClient({
url: 'ws://test.com',
onMessage: vi.fn(),
});
connect();
// Simulate successful connection
mockWebSocket.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
disconnect();
expect(isConnected.value).toBe(false);
expect(mockWebSocket.close).toHaveBeenCalledWith(1000);
});
test('should handle connection loss', () => {
const { connect, isConnected } = useWebSocketClient({
url: 'ws://test.com',
onMessage: vi.fn(),
});
connect();
expect(WebSocket).toHaveBeenCalledTimes(1);
// Simulate successful connection
mockWebSocket.simulateConnectionOpen();
expect(isConnected.value).toBe(true);
// Simulate connection loss
mockWebSocket.simulateConnectionClose(1006);
expect(isConnected.value).toBe(false);
// Advance timer to reconnect
vi.advanceTimersByTime(1_000);
expect(WebSocket).toHaveBeenCalledTimes(2);
});
test('should throw error when trying to send message while disconnected', () => {
const { sendMessage } = useWebSocketClient({ url: 'ws://test.com', onMessage: vi.fn() });
expect(() => sendMessage('test')).toThrow('Not connected to the server');
});
test('should attempt reconnection with increasing delays', () => {
const { connect } = useWebSocketClient({
url: 'http://test.com',
onMessage: vi.fn(),
});
connect();
mockWebSocket.simulateConnectionOpen();
mockWebSocket.simulateConnectionClose(1006);
// First reconnection attempt after 1 second
vi.advanceTimersByTime(1_000);
expect(WebSocket).toHaveBeenCalledTimes(2);
mockWebSocket.simulateConnectionClose(1006);
// Second reconnection attempt after 2 seconds
vi.advanceTimersByTime(2_000);
expect(WebSocket).toHaveBeenCalledTimes(3);
});
test('should send message when connected', () => {
const { connect, sendMessage } = useWebSocketClient({
url: 'ws://test.com',
onMessage: vi.fn(),
});
connect();
// Simulate successful connection
mockWebSocket.simulateConnectionOpen();
const message = 'test message';
sendMessage(message);
expect(mockWebSocket.send).toHaveBeenCalledWith(message);
});
});

View file

@ -0,0 +1,69 @@
import { useReconnectTimer } from '@/push-connection/useReconnectTimer';
import { ref } from 'vue';
export type UseEventSourceClientOptions = {
url: string;
onMessage: (data: string) => void;
};
/**
* Creates an EventSource connection to the server. Uses reconnection logic
* to reconnect if the connection is lost.
*/
export const useEventSourceClient = (options: UseEventSourceClientOptions) => {
const isConnected = ref(false);
const eventSource = ref<EventSource | null>(null);
const onConnected = () => {
isConnected.value = true;
reconnectTimer.resetConnectionAttempts();
};
const onConnectionLost = () => {
console.warn('[EventSourceClient] Connection lost');
isConnected.value = false;
reconnectTimer.scheduleReconnect();
};
const onMessage = (event: MessageEvent) => {
options.onMessage(event.data);
};
const disconnect = () => {
if (eventSource.value) {
reconnectTimer.stopReconnectTimer();
eventSource.value.close();
eventSource.value = null;
}
isConnected.value = false;
};
const connect = () => {
// Ensure we disconnect any existing connection
disconnect();
eventSource.value = new EventSource(options.url, { withCredentials: true });
eventSource.value.addEventListener('open', onConnected);
eventSource.value.addEventListener('message', onMessage);
eventSource.value.addEventListener('close', onConnectionLost);
};
const reconnectTimer = useReconnectTimer({
onAttempt: connect,
onAttemptScheduled: (delay) => {
console.log(`[EventSourceClient] Attempting to reconnect in ${delay}ms`);
},
});
const sendMessage = (_: string) => {
// Noop, EventSource does not support sending messages
};
return {
isConnected,
connect,
disconnect,
sendMessage,
};
};

View file

@ -0,0 +1,32 @@
import { ref } from 'vue';
export type UseHeartbeatOptions = {
interval: number;
onHeartbeat: () => void;
};
/**
* Creates a heartbeat timer using the given interval. The timer needs
* to be started and stopped manually.
*/
export const useHeartbeat = (options: UseHeartbeatOptions) => {
const { interval, onHeartbeat } = options;
const heartbeatTimer = ref<ReturnType<typeof setInterval> | null>(null);
const startHeartbeat = () => {
heartbeatTimer.value = setInterval(onHeartbeat, interval);
};
const stopHeartbeat = () => {
if (heartbeatTimer.value) {
clearInterval(heartbeatTimer.value);
heartbeatTimer.value = null;
}
};
return {
startHeartbeat,
stopHeartbeat,
};
};

View file

@ -0,0 +1,48 @@
import { ref } from 'vue';
type UseReconnectTimerOptions = {
/** Callback that an attempt should be made */
onAttempt: () => void;
/** Callback that a future attempt was scheduled */
onAttemptScheduled: (delay: number) => void;
};
/**
* A timer for exponential backoff reconnect attempts.
*/
export const useReconnectTimer = ({ onAttempt, onAttemptScheduled }: UseReconnectTimerOptions) => {
const initialReconnectDelay = 1000;
const maxReconnectDelay = 15_000;
const reconnectTimer = ref<ReturnType<typeof setTimeout> | null>(null);
const reconnectAttempts = ref(0);
const scheduleReconnect = () => {
const delay = Math.min(initialReconnectDelay * 2 ** reconnectAttempts.value, maxReconnectDelay);
reconnectAttempts.value++;
onAttemptScheduled(delay);
reconnectTimer.value = setTimeout(() => {
onAttempt();
}, delay);
};
const stopReconnectTimer = () => {
if (reconnectTimer.value) {
clearTimeout(reconnectTimer.value);
reconnectTimer.value = null;
}
};
const resetConnectionAttempts = () => {
reconnectAttempts.value = 0;
};
return {
scheduleReconnect,
stopReconnectTimer,
resetConnectionAttempts,
};
};

View file

@ -0,0 +1,106 @@
import { useHeartbeat } from '@/push-connection/useHeartbeat';
import { useReconnectTimer } from '@/push-connection/useReconnectTimer';
import { ref } from 'vue';
export type UseWebSocketClientOptions<T> = {
url: string;
onMessage: (data: T) => void;
};
/** Defined here as not available in tests */
export const WebSocketState = {
CONNECTING: 0,
OPEN: 1,
CLOSING: 2,
CLOSED: 3,
};
/**
* Creates a WebSocket connection to the server. Uses reconnection logic
* to reconnect if the connection is lost.
*/
export const useWebSocketClient = <T>(options: UseWebSocketClientOptions<T>) => {
const isConnected = ref(false);
const socket = ref<WebSocket | null>(null);
/**
* Heartbeat timer to keep the connection alive. This is an additional
* mechanism to the protocol level ping/pong mechanism the server sends.
* This is used the ensure the client notices connection issues.
*/
const { startHeartbeat, stopHeartbeat } = useHeartbeat({
interval: 30_000,
onHeartbeat: () => {
socket.value?.send(JSON.stringify({ type: 'heartbeat' }));
},
});
const onConnected = () => {
socket.value?.removeEventListener('open', onConnected);
isConnected.value = true;
startHeartbeat();
reconnectTimer.resetConnectionAttempts();
};
const onConnectionLost = (event: CloseEvent) => {
console.warn(`[WebSocketClient] Connection lost, code=${event.code ?? 'unknown'}`);
isConnected.value = false;
stopHeartbeat();
reconnectTimer.scheduleReconnect();
};
const onMessage = (event: MessageEvent) => {
options.onMessage(event.data);
};
const onError = (error: unknown) => {
console.warn('[WebSocketClient] Connection error:', error);
};
const disconnect = () => {
if (socket.value) {
stopHeartbeat();
reconnectTimer.stopReconnectTimer();
socket.value.removeEventListener('message', onMessage);
socket.value.removeEventListener('error', onError);
socket.value.removeEventListener('close', onConnectionLost);
socket.value.close(1000);
socket.value = null;
}
isConnected.value = false;
};
const connect = () => {
// Ensure we disconnect any existing connection
disconnect();
socket.value = new WebSocket(options.url);
socket.value.addEventListener('open', onConnected);
socket.value.addEventListener('message', onMessage);
socket.value.addEventListener('error', onError);
socket.value.addEventListener('close', onConnectionLost);
};
const reconnectTimer = useReconnectTimer({
onAttempt: connect,
onAttemptScheduled: (delay) => {
console.log(`[WebSocketClient] Attempting to reconnect in ${delay}ms`);
},
});
const sendMessage = (serializedMessage: string) => {
if (!isConnected.value || !socket.value) {
throw new Error('Not connected to the server');
}
socket.value.send(serializedMessage);
};
return {
isConnected,
connect,
disconnect,
sendMessage,
};
};

View file

@ -1,22 +1,12 @@
import { defineStore } from 'pinia';
import { ref, computed } from 'vue';
import { computed, ref, watch } from 'vue';
import type { PushMessage } from '@n8n/api-types';
import { STORES, TIME } from '@/constants';
import { STORES } from '@/constants';
import { useSettingsStore } from './settings.store';
import { useRootStore } from './root.store';
export interface PushState {
pushRef: string;
pushSource: WebSocket | EventSource | null;
reconnectTimeout: NodeJS.Timeout | null;
retryTimeout: NodeJS.Timeout | null;
pushMessageQueue: Array<{ event: Event; retriesLeft: number }>;
connectRetries: number;
lostConnection: boolean;
outgoingQueue: unknown[];
isConnectionOpen: boolean;
}
import { useWebSocketClient } from '@/push-connection/useWebSocketClient';
import { useEventSourceClient } from '@/push-connection/useEventSourceClient';
export type OnPushMessageHandler = (event: PushMessage) => void;
@ -27,18 +17,20 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
const rootStore = useRootStore();
const settingsStore = useSettingsStore();
const pushRef = computed(() => rootStore.pushRef);
const pushSource = ref<WebSocket | EventSource | null>(null);
const reconnectTimeout = ref<NodeJS.Timeout | null>(null);
const connectRetries = ref(0);
const lostConnection = ref(false);
/**
* Queue of messages to be sent to the server. Messages are queued if
* the connection is down.
*/
const outgoingQueue = ref<unknown[]>([]);
const isConnectionOpen = ref(false);
/** Whether the connection has been requested */
const isConnectionRequested = ref(false);
const onMessageReceivedHandlers = ref<OnPushMessageHandler[]>([]);
const addEventListener = (handler: OnPushMessageHandler) => {
onMessageReceivedHandlers.value.push(handler);
return () => {
const index = onMessageReceivedHandlers.value.indexOf(handler);
if (index !== -1) {
@ -47,103 +39,30 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
};
};
function onConnectionError() {
pushDisconnect();
connectRetries.value++;
reconnectTimeout.value = setTimeout(
attemptReconnect,
Math.min(connectRetries.value * 2000, 8 * TIME.SECOND), // maximum 8 seconds backoff
);
}
/**
* Close connection to server
*/
function pushDisconnect() {
if (pushSource.value !== null) {
pushSource.value.removeEventListener('error', onConnectionError);
pushSource.value.removeEventListener('close', onConnectionError);
pushSource.value.removeEventListener('message', pushMessageReceived);
if (pushSource.value.readyState < 2) pushSource.value.close();
pushSource.value = null;
}
isConnectionOpen.value = false;
}
/**
* Connect to server to receive data via a WebSocket or EventSource
*/
function pushConnect() {
// always close the previous connection so that we do not end up with multiple connections
pushDisconnect();
if (reconnectTimeout.value) {
clearTimeout(reconnectTimeout.value);
reconnectTimeout.value = null;
}
const useWebSockets = settingsStore.pushBackend === 'websocket';
const useWebSockets = settingsStore.pushBackend === 'websocket';
const getConnectionUrl = () => {
const restUrl = rootStore.restUrl;
const url = `/push?pushRef=${pushRef.value}`;
const url = `/push?pushRef=${rootStore.pushRef}`;
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
pushSource.value = new WebSocket(`${baseUrl}${url}`);
return `${baseUrl}${url}`;
} else {
pushSource.value = new EventSource(`${restUrl}${url}`, { withCredentials: true });
return `${restUrl}${url}`;
}
pushSource.value.addEventListener('open', onConnectionSuccess, false);
pushSource.value.addEventListener('message', pushMessageReceived, false);
pushSource.value.addEventListener(useWebSockets ? 'close' : 'error', onConnectionError, false);
}
function attemptReconnect() {
pushConnect();
}
function serializeAndSend(message: unknown) {
if (pushSource.value && 'send' in pushSource.value) {
pushSource.value.send(JSON.stringify(message));
}
}
function onConnectionSuccess() {
isConnectionOpen.value = true;
connectRetries.value = 0;
lostConnection.value = false;
rootStore.setPushConnectionActive();
pushSource.value?.removeEventListener('open', onConnectionSuccess);
if (outgoingQueue.value.length) {
for (const message of outgoingQueue.value) {
serializeAndSend(message);
}
outgoingQueue.value = [];
}
}
function send(message: unknown) {
if (!isConnectionOpen.value) {
outgoingQueue.value.push(message);
return;
}
serializeAndSend(message);
}
};
/**
* Process a newly received message
*/
async function pushMessageReceived(event: Event) {
async function onMessage(data: unknown) {
let receivedData: PushMessage;
try {
// @ts-ignore
receivedData = JSON.parse(event.data);
receivedData = JSON.parse(data as string);
} catch (error) {
return;
}
@ -151,19 +70,55 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
onMessageReceivedHandlers.value.forEach((handler) => handler(receivedData));
}
const url = getConnectionUrl();
const client = useWebSockets
? useWebSocketClient({ url, onMessage })
: useEventSourceClient({ url, onMessage });
function serializeAndSend(message: unknown) {
client.sendMessage(JSON.stringify(message));
}
const pushConnect = () => {
isConnectionRequested.value = true;
client.connect();
};
const pushDisconnect = () => {
isConnectionRequested.value = false;
client.disconnect();
};
watch(client.isConnected, (didConnect) => {
if (!didConnect) {
return;
}
// Send any buffered messages
if (outgoingQueue.value.length) {
for (const message of outgoingQueue.value) {
serializeAndSend(message);
}
outgoingQueue.value = [];
}
});
/** Removes all buffered messages from the sent queue */
const clearQueue = () => {
outgoingQueue.value = [];
};
const isConnected = computed(() => client.isConnected.value);
return {
pushRef,
pushSource,
isConnectionOpen,
isConnected,
isConnectionRequested,
onMessageReceivedHandlers,
addEventListener,
pushConnect,
pushDisconnect,
send,
send: serializeAndSend,
clearQueue,
};
});

View file

@ -20,7 +20,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
endpointWebhook: 'webhook',
endpointWebhookTest: 'webhook-test',
endpointWebhookWaiting: 'webhook-waiting',
pushConnectionActive: true,
timezone: 'America/New_York',
executionTimeout: -1,
maxExecutionTimeout: Number.MAX_SAFE_INTEGER,
@ -66,8 +65,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
const versionCli = computed(() => state.value.versionCli);
const pushConnectionActive = computed(() => state.value.pushConnectionActive);
const OAuthCallbackUrls = computed(() => state.value.oauthCallbackUrls);
const webhookTestUrl = computed(
@ -105,14 +102,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
state.value.urlBaseWebhook = url;
};
const setPushConnectionActive = () => {
state.value.pushConnectionActive = true;
};
const setPushConnectionInactive = () => {
state.value.pushConnectionActive = false;
};
const setUrlBaseEditor = (urlBaseEditor: string) => {
const url = urlBaseEditor.endsWith('/') ? urlBaseEditor : `${urlBaseEditor}/`;
state.value.urlBaseEditor = url;
@ -198,13 +187,10 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
pushRef,
defaultLocale,
binaryDataMode,
pushConnectionActive,
OAuthCallbackUrls,
executionTimeout,
maxExecutionTimeout,
timezone,
setPushConnectionInactive,
setPushConnectionActive,
setUrlBaseWebhook,
setUrlBaseEditor,
setEndpointForm,