feat(editor): Show avatars for users currently working on the same workflow (#7763)

This PR introduces the following changes:
- New Vue stores: `collaborationStore` and `pushConnectionStore`
- Front-end push connection handling overhaul: Keep only a singe
connection open and handle it from the new store
- Add user avatars in the editor header when there are multiple users
working on the same workflow
- Sending a heartbeat event to back-end service periodically to confirm
user is still active

- Back-end overhauls (authored by @tomi):
  - Implementing a cleanup procedure that removes inactive users
  - Refactoring collaboration service current implementation

---------

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Milorad FIlipović 2023-11-23 10:14:34 +01:00 committed by GitHub
parent 99a9ea497a
commit 77bc8ecd4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 654 additions and 148 deletions

View file

@ -116,6 +116,7 @@ import { UserService } from './services/user.service';
import { OrchestrationController } from './controllers/orchestration.controller';
import { WorkflowHistoryController } from './workflows/workflowHistory/workflowHistory.controller.ee';
import { InvitationController } from './controllers/invitation.controller';
import { CollaborationService } from './collaboration/collaboration.service';
const exec = promisify(callbackExec);
@ -138,6 +139,8 @@ export class Server extends AbstractServer {
private postHog: PostHogClient;
private collaborationService: CollaborationService;
constructor() {
super('main');
@ -233,6 +236,7 @@ export class Server extends AbstractServer {
.then(async (workflow) =>
Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt),
);
this.collaborationService = Container.get(CollaborationService);
}
private async registerControllers(ignoredEndpoints: Readonly<string[]>) {

View file

@ -1,5 +1,6 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import config from '@/config';
import { Push } from '../push';
import { Logger } from '@/Logger';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
@ -8,6 +9,13 @@ import { UserService } from '../services/user.service';
import type { IActiveWorkflowUsersChanged } from '../Interfaces';
import type { OnPushMessageEvent } from '@/push/types';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { TIME } from '@/constants';
/**
* After how many minutes of inactivity a user should be removed
* as being an active user of a workflow.
*/
const INACTIVITY_CLEAN_UP_TIME_IN_MS = 15 * TIME.MINUTE;
/**
* Service for managing collaboration feature between users. E.g. keeping
@ -28,6 +36,14 @@ export class CollaborationService {
return;
}
const isMultiMainSetup = config.get('multiMainSetup.enabled');
if (isMultiMainSetup) {
// TODO: We should support collaboration in multi-main setup as well
// This requires using redis as the state store instead of in-memory
logger.warn('Collaboration features are disabled because multi-main setup is enabled.');
return;
}
this.push.on('message', async (event: OnPushMessageEvent) => {
try {
await this.handleUserMessage(event.userId, event.msg);
@ -53,6 +69,7 @@ export class CollaborationService {
const { workflowId } = msg;
this.state.addActiveWorkflowUser(workflowId, userId);
this.state.cleanInactiveUsers(workflowId, INACTIVITY_CLEAN_UP_TIME_IN_MS);
await this.sendWorkflowUsersChangedMessage(workflowId);
}

View file

@ -59,4 +59,21 @@ export class CollaborationState {
return [...workflowState.values()];
}
/**
* Removes all users that have not been seen in a given time
*/
cleanInactiveUsers(workflowId: Workflow['id'], inactivityCleanUpTimeInMs: number) {
const activeUsers = this.state.activeUsersByWorkflowId.get(workflowId);
if (!activeUsers) {
return;
}
const now = Date.now();
for (const user of activeUsers.values()) {
if (now - user.lastSeen.getTime() > inactivityCleanUpTimeInMs) {
activeUsers.delete(user.userId);
}
}
}
}

View file

@ -30,6 +30,14 @@ export class Push extends EventEmitter {
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
constructor() {
super();
if (useWebSockets) {
this.backend.on('message', (msg) => this.emit('message', msg));
}
}
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
userId,
@ -37,7 +45,6 @@ export class Push extends EventEmitter {
} = req;
if (req.ws) {
(this.backend as WebSocketPush).add(sessionId, userId, req.ws);
this.backend.on('message', (msg) => this.emit('message', msg));
} else if (!useWebSockets) {
(this.backend as SSEPush).add(sessionId, userId, { req, res });
} else {

View file

@ -0,0 +1,61 @@
import { TIME } from '@/constants';
import { CollaborationState } from '@/collaboration/collaboration.state';
const origDate = global.Date;
const mockDateFactory = (currentDate: string) => {
return class CustomDate extends origDate {
constructor() {
super(currentDate);
}
} as DateConstructor;
};
describe('CollaborationState', () => {
let collaborationState: CollaborationState;
beforeEach(() => {
collaborationState = new CollaborationState();
});
describe('cleanInactiveUsers', () => {
const workflowId = 'workflow';
it('should remove inactive users', () => {
// Setup
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');
collaborationState.addActiveWorkflowUser(workflowId, 'inactiveUser');
global.Date = mockDateFactory('2023-01-01T00:30:00.000Z');
collaborationState.addActiveWorkflowUser(workflowId, 'activeUser');
// Act: Clean inactive users
jest
.spyOn(global.Date, 'now')
.mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime());
collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE);
// Assert: The inactive user should be removed
expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([
{ userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') },
]);
});
it('should not remove active users', () => {
// Setup: Add an active user to the state
global.Date = mockDateFactory('2023-01-01T00:30:00.000Z');
collaborationState.addActiveWorkflowUser(workflowId, 'activeUser');
// Act: Clean inactive users
jest
.spyOn(global.Date, 'now')
.mockReturnValue(new origDate('2023-01-01T00:35:00.000Z').getTime());
collaborationState.cleanInactiveUsers(workflowId, 10 * TIME.MINUTE);
// Assert: The active user should still be present
expect(collaborationState.getActiveWorkflowUsers(workflowId)).toEqual([
{ userId: 'activeUser', lastSeen: new origDate('2023-01-01T00:30:00.000Z') },
]);
});
});
});

View file

@ -75,26 +75,31 @@ const menuHeight = computed(() => {
:max-height="menuHeight"
popper-class="user-stack-popper"
>
<div :class="$style.avatars">
<div :class="$style.avatars" data-test-id="user-stack-avatars">
<n8n-avatar
v-for="user in flatUserList.slice(0, visibleAvatarCount)"
:key="user.id"
:firstName="user.firstName"
:lastName="user.lastName"
:class="$style.avatar"
:data-test-id="`user-stack-avatar-${user.id}`"
size="small"
/>
<div v-if="hiddenUsersCount > 0" :class="$style.hiddenBadge">+{{ hiddenUsersCount }}</div>
</div>
<template #dropdown>
<el-dropdown-menu class="user-stack-list">
<el-dropdown-menu class="user-stack-list" data-test-id="user-stack-list">
<div v-for="(groupUsers, index) in nonEmptyGroups" :key="index">
<div :class="$style.groupContainer">
<el-dropdown-item>
<header v-if="groupCount > 1" :class="$style.groupName">{{ index }}</header>
</el-dropdown-item>
<div :class="$style.groupUsers">
<el-dropdown-item v-for="user in groupUsers" :key="user.id">
<el-dropdown-item
v-for="user in groupUsers"
:key="user.id"
:data-test-id="`user-stack-info-${user.id}`"
>
<n8n-user-info
v-bind="user"
:isCurrentUser="user.email === props.currentUserEmail"
@ -156,11 +161,12 @@ const menuHeight = computed(() => {
</style>
<style lang="scss">
.user-stack-list {
ul.user-stack-list {
border: none;
display: flex;
flex-direction: column;
gap: 16px;
gap: var(--spacing-s);
padding-bottom: var(--spacing-2xs);
.el-dropdown-menu__item {
line-height: var(--font-line-height-regular);

View file

@ -57,6 +57,7 @@ import {
useCloudPlanStore,
useSourceControlStore,
useUsageStore,
usePushConnectionStore,
} from '@/stores';
import { useHistoryHelper } from '@/composables/useHistoryHelper';
import { useRoute } from 'vue-router';
@ -92,6 +93,7 @@ export default defineComponent({
useSourceControlStore,
useCloudPlanStore,
useUsageStore,
usePushConnectionStore,
),
defaultLocale(): string {
return this.rootStore.defaultLocale;
@ -168,6 +170,7 @@ export default defineComponent({
void this.onAfterAuthenticate();
void runExternalHook('app.mount');
this.pushStore.pushConnect();
this.loading = false;
},
watch: {

View file

@ -411,6 +411,16 @@ export interface IExecutionDeleteFilter {
ids?: string[];
}
export type PushDataUsersForWorkflow = {
workflowId: string;
activeUsers: Array<{ user: IUser; lastSeen: string }>;
};
type PushDataWorkflowUsersChanged = {
data: PushDataUsersForWorkflow;
type: 'activeWorkflowUsersChanged';
};
export type IPushData =
| PushDataExecutionFinished
| PushDataExecutionStarted
@ -424,7 +434,8 @@ export type IPushData =
| PushDataWorkerStatusMessage
| PushDataActiveWorkflowAdded
| PushDataActiveWorkflowRemoved
| PushDataWorkflowFailedToActivate;
| PushDataWorkflowFailedToActivate
| PushDataWorkflowUsersChanged;
type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
@ -690,6 +701,7 @@ export interface IUser extends IUserResponse {
fullName?: string;
createdAt?: string;
mfaEnabled: boolean;
globalRoleId?: number;
}
export interface IVersionNotificationSettings {

View file

@ -0,0 +1,81 @@
<script setup lang="ts">
import { useUsersStore } from '@/stores/users.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
import { onBeforeUnmount } from 'vue';
import { onMounted } from 'vue';
import { computed, ref } from 'vue';
import { TIME } from '@/constants';
const collaborationStore = useCollaborationStore();
const usersStore = useUsersStore();
const workflowsStore = useWorkflowsStore();
const HEARTBEAT_INTERVAL = 5 * TIME.MINUTE;
const heartbeatTimer = ref<number | null>(null);
const activeUsersSorted = computed(() => {
const currentWorkflowUsers = (collaborationStore.getUsersForCurrentWorkflow ?? []).map(
(userInfo) => userInfo.user,
);
const owner = currentWorkflowUsers.find((user) => user.globalRoleId === 1);
return {
defaultGroup: owner
? [owner, ...currentWorkflowUsers.filter((user) => user.id !== owner.id)]
: currentWorkflowUsers,
};
});
const currentUserEmail = computed(() => {
return usersStore.currentUser?.email;
});
const startHeartbeat = () => {
if (heartbeatTimer.value !== null) {
clearInterval(heartbeatTimer.value);
heartbeatTimer.value = null;
}
heartbeatTimer.value = window.setInterval(() => {
collaborationStore.notifyWorkflowOpened(workflowsStore.workflow.id);
}, HEARTBEAT_INTERVAL);
};
const stopHeartbeat = () => {
if (heartbeatTimer.value !== null) {
clearInterval(heartbeatTimer.value);
}
};
const onDocumentVisibilityChange = () => {
if (document.visibilityState === 'hidden') {
stopHeartbeat();
} else {
startHeartbeat();
}
};
onMounted(() => {
startHeartbeat();
document.addEventListener('visibilitychange', onDocumentVisibilityChange);
});
onBeforeUnmount(() => {
document.removeEventListener('visibilitychange', onDocumentVisibilityChange);
stopHeartbeat();
});
</script>
<template>
<div
:class="`collaboration-pane-container ${$style.container}`"
data-test-id="collaboration-pane"
>
<n8n-user-stack :users="activeUsersSorted" :currentUserEmail="currentUserEmail" />
</div>
</template>
<style lang="scss" module>
.container {
margin: 0 var(--spacing-4xs);
}
</style>

View file

@ -90,11 +90,6 @@ export default defineComponent({
mounted() {
this.dirtyState = this.uiStore.stateIsDirty;
this.syncTabsWithRoute(this.$route);
// Initialize the push connection
this.pushConnect();
},
beforeUnmount() {
this.pushDisconnect();
},
watch: {
$route(to, from) {

View file

@ -56,18 +56,20 @@
<span v-else class="tags"></span>
<PushConnectionTracker class="actions">
<span class="activator">
<span :class="`activator ${$style.group}`">
<WorkflowActivator :workflow-active="isWorkflowActive" :workflow-id="currentWorkflowId" />
</span>
<enterprise-edition :features="[EnterpriseEditionFeature.Sharing]">
<div :class="$style.group">
<collaboration-pane />
<n8n-button
type="secondary"
class="mr-2xs"
@click="onShareButtonClick"
data-test-id="workflow-share-button"
>
{{ $locale.baseText('workflowDetails.share') }}
</n8n-button>
</div>
<template #fallback>
<n8n-tooltip>
<n8n-button type="secondary" :class="['mr-2xs', $style.disabledShareButton]">
@ -94,6 +96,7 @@
</n8n-tooltip>
</template>
</enterprise-edition>
<div :class="$style.group">
<SaveButton
type="primary"
:saved="!this.isDirty && !this.isNewWorkflow"
@ -115,7 +118,8 @@
text
/>
</router-link>
<div :class="$style.workflowMenuContainer">
</div>
<div :class="[$style.workflowMenuContainer, $style.group]">
<input
:class="$style.hiddenInput"
type="file"
@ -159,6 +163,7 @@ import SaveButton from '@/components/SaveButton.vue';
import TagsDropdown from '@/components/TagsDropdown.vue';
import InlineTextEdit from '@/components/InlineTextEdit.vue';
import BreakpointsObserver from '@/components/BreakpointsObserver.vue';
import CollaborationPane from '@/components/MainHeader/CollaborationPane.vue';
import type { IUser, IWorkflowDataUpdate, IWorkflowDb, IWorkflowToShare } from '@/Interface';
import { saveAs } from 'file-saver';
@ -201,6 +206,7 @@ export default defineComponent({
TagsDropdown,
InlineTextEdit,
BreakpointsObserver,
CollaborationPane,
},
props: {
readOnly: {
@ -681,7 +687,6 @@ $--header-spacing: 20px;
line-height: $--text-line-height;
display: flex;
align-items: center;
margin-right: 30px;
> span {
margin-right: 5px;
@ -717,14 +722,15 @@ $--header-spacing: 20px;
.actions {
display: flex;
align-items: center;
gap: var(--spacing-m);
}
</style>
<style module lang="scss">
.workflowMenuContainer {
margin-left: var(--spacing-2xs);
.group {
display: flex;
gap: var(--spacing-xs);
}
.hiddenInput {
display: none;
}
@ -740,8 +746,6 @@ $--header-spacing: 20px;
.workflowHistoryButton {
width: 30px;
height: 30px;
margin-left: var(--spacing-m);
margin-right: var(--spacing-4xs);
color: var(--color-text-dark);
border-radius: var(--border-radius-base);

View file

@ -0,0 +1,135 @@
import { merge } from 'lodash-es';
import { SETTINGS_STORE_DEFAULT_STATE, waitAllPromises } from '@/__tests__/utils';
import { STORES } from '@/constants';
import { createTestingPinia } from '@pinia/testing';
import { useUIStore } from '@/stores/ui.store';
import CollaborationPane from '@/components//MainHeader/CollaborationPane.vue';
import type { RenderOptions } from '@/__tests__/render';
import { createComponentRenderer } from '@/__tests__/render';
const OWNER_USER = {
createdAt: '2023-11-22T10:17:12.246Z',
id: 'aaaaaa',
email: 'owner@user.com',
firstName: 'Owner',
lastName: 'User',
globalRoleId: 1,
disabled: false,
globalRole: {
id: '1',
name: 'owner',
scope: 'global',
},
isPending: false,
isOwner: true,
fullName: 'Owner User',
};
const MEMBER_USER = {
createdAt: '2023-11-22T10:17:12.246Z',
id: 'aaabbb',
email: 'member@user.com',
firstName: 'Member',
lastName: 'User',
globalRoleId: 2,
disabled: false,
globalRole: {
id: '2',
name: 'member',
scope: 'global',
},
isPending: false,
isOwner: false,
fullName: 'Member User',
};
const MEMBER_USER_2 = {
createdAt: '2023-11-22T10:17:12.246Z',
id: 'aaaccc',
email: 'member2@user.com',
firstName: 'Another Member',
lastName: 'User',
globalRoleId: 2,
disabled: false,
globalRole: {
id: '2',
name: 'member',
scope: 'global',
},
isPending: false,
isOwner: false,
fullName: 'Another Member User',
};
let uiStore: ReturnType<typeof useUIStore>;
const initialState = {
[STORES.SETTINGS]: {
settings: merge({}, SETTINGS_STORE_DEFAULT_STATE.settings),
},
[STORES.WORKFLOWS]: {
workflow: {
id: 'w1',
},
},
[STORES.USERS]: {
currentUserId: 'aaaaaa',
users: {
aaaaaa: OWNER_USER,
aaabbb: MEMBER_USER,
aaaccc: MEMBER_USER_2,
},
},
[STORES.COLLABORATION]: {
usersForWorkflows: {
w1: [
{ lastSeen: '2023-11-22T10:17:12.246Z', user: MEMBER_USER },
{ lastSeen: '2023-11-22T10:17:12.246Z', user: OWNER_USER },
],
w2: [{ lastSeen: '2023-11-22T10:17:12.246Z', user: MEMBER_USER_2 }],
},
},
};
const defaultRenderOptions: RenderOptions = {
pinia: createTestingPinia({ initialState }),
};
const renderComponent = createComponentRenderer(CollaborationPane, defaultRenderOptions);
describe('CollaborationPane', () => {
beforeEach(() => {
uiStore = useUIStore();
});
afterEach(() => {
vi.clearAllMocks();
});
it('should show only current workflow users', async () => {
const { getByTestId, queryByTestId } = renderComponent();
await waitAllPromises();
expect(getByTestId('collaboration-pane')).toBeInTheDocument();
expect(getByTestId('user-stack-avatars')).toBeInTheDocument();
expect(getByTestId(`user-stack-avatar-${OWNER_USER.id}`)).toBeInTheDocument();
expect(getByTestId(`user-stack-avatar-${MEMBER_USER.id}`)).toBeInTheDocument();
expect(queryByTestId(`user-stack-avatar-${MEMBER_USER_2.id}`)).toBeNull();
});
it('should render current user correctly', async () => {
const { getByText, queryByText } = renderComponent();
await waitAllPromises();
expect(getByText(`${OWNER_USER.fullName} (you)`)).toBeInTheDocument();
expect(queryByText(`${MEMBER_USER.fullName} (you)`)).toBeNull();
expect(queryByText(`${MEMBER_USER.fullName}`)).toBeInTheDocument();
});
it('should always render owner first in the list', async () => {
const { getByTestId } = renderComponent();
await waitAllPromises();
const firstAvatar = getByTestId('user-stack-avatars').querySelector('.n8n-avatar');
// Owner is second in the store bur shourld be rendered first
expect(firstAvatar).toHaveAttribute('data-test-id', `user-stack-avatar-${OWNER_USER.id}`);
});
});

View file

@ -567,6 +567,8 @@ export const enum STORES {
WEBHOOKS = 'webhooks',
HISTORY = 'history',
CLOUD_PLAN = 'cloudPlan',
COLLABORATION = 'collaboration',
PUSH = 'push',
}
export const enum SignInType {
@ -651,3 +653,13 @@ export const NOT_DUPLICATABE_NODE_TYPES = [FORM_TRIGGER_NODE_TYPE];
export const UPDATE_WEBHOOK_ID_NODE_TYPES = [FORM_TRIGGER_NODE_TYPE];
export const CREATOR_HUB_URL = 'https://creators.n8n.io/hub';
/**
* Units of time in milliseconds
*/
export const TIME = {
SECOND: 1000,
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
DAY: 24 * 60 * 60 * 1000,
};

View file

@ -35,6 +35,8 @@ import { parse } from 'flatted';
import { useSegment } from '@/stores/segment.store';
import { defineComponent } from 'vue';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useCollaborationStore } from '@/stores/collaboration.store';
export const pushConnection = defineComponent({
setup() {
@ -43,15 +45,16 @@ export const pushConnection = defineComponent({
...useToast(),
};
},
created() {
this.pushStore.addEventListener((message) => {
void this.pushMessageReceived(message);
});
},
mixins: [externalHooks, nodeHelpers, workflowHelpers],
data() {
return {
pushSource: null as WebSocket | EventSource | null,
reconnectTimeout: null as NodeJS.Timeout | null,
retryTimeout: null as NodeJS.Timeout | null,
pushMessageQueue: [] as Array<{ event: Event; retriesLeft: number }>,
connectRetries: 0,
lostConnection: false,
pushMessageQueue: [] as Array<{ message: IPushData; retriesLeft: number }>,
};
},
computed: {
@ -63,95 +66,22 @@ export const pushConnection = defineComponent({
useSettingsStore,
useSegment,
useOrchestrationStore,
usePushConnectionStore,
useCollaborationStore,
),
sessionId(): string {
return this.rootStore.sessionId;
},
},
methods: {
attemptReconnect() {
this.pushConnect();
},
/**
* Connect to server to receive data via a WebSocket or EventSource
*/
pushConnect(): void {
// always close the previous connection so that we do not end up with multiple connections
this.pushDisconnect();
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
const useWebSockets = this.settingsStore.pushBackend === 'websocket';
const { getRestUrl: restUrl } = this.rootStore;
const url = `/push?sessionId=${this.sessionId}`;
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
this.pushSource = new WebSocket(`${baseUrl}${url}`);
} else {
this.pushSource = new EventSource(`${restUrl}${url}`, { withCredentials: true });
}
this.pushSource.addEventListener('open', this.onConnectionSuccess, false);
this.pushSource.addEventListener('message', this.pushMessageReceived, false);
this.pushSource.addEventListener(
useWebSockets ? 'close' : 'error',
this.onConnectionError,
false,
);
},
onConnectionSuccess() {
this.connectRetries = 0;
this.lostConnection = false;
this.rootStore.pushConnectionActive = true;
try {
// in the workers view context this fn is not defined
this.clearAllStickyNotifications();
} catch {}
this.pushSource?.removeEventListener('open', this.onConnectionSuccess);
},
onConnectionError() {
this.pushDisconnect();
this.connectRetries++;
this.reconnectTimeout = setTimeout(
this.attemptReconnect,
Math.min(this.connectRetries * 2000, 8000), // maximum 8 seconds backoff
);
},
/**
* Close connection to server
*/
pushDisconnect(): void {
if (this.pushSource !== null) {
this.pushSource.removeEventListener('error', this.onConnectionError);
this.pushSource.removeEventListener('close', this.onConnectionError);
this.pushSource.removeEventListener('message', this.pushMessageReceived);
if (this.pushSource.readyState < 2) this.pushSource.close();
this.pushSource = null;
}
this.rootStore.pushConnectionActive = false;
},
/**
* Sometimes the push message is faster as the result from
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*/
queuePushMessage(event: Event, retryAttempts: number) {
this.pushMessageQueue.push({ event, retriesLeft: retryAttempts });
queuePushMessage(event: IPushData, retryAttempts: number) {
this.pushMessageQueue.push({ message: event, retriesLeft: retryAttempts });
if (this.retryTimeout === null) {
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 20);
@ -161,7 +91,7 @@ export const pushConnection = defineComponent({
/**
* Process the push messages which are waiting in the queue
*/
processWaitingPushMessages() {
async processWaitingPushMessages() {
if (this.retryTimeout !== null) {
clearTimeout(this.retryTimeout);
this.retryTimeout = null;
@ -171,7 +101,8 @@ export const pushConnection = defineComponent({
for (let i = 0; i < queueLength; i++) {
const messageData = this.pushMessageQueue.shift();
if (this.pushMessageReceived(messageData!.event, true) === false) {
const result = await this.pushMessageReceived(messageData!.message, true);
if (result === false) {
// Was not successful
messageData!.retriesLeft -= 1;
@ -191,15 +122,8 @@ export const pushConnection = defineComponent({
/**
* Process a newly received message
*/
async pushMessageReceived(event: Event, isRetry?: boolean): Promise<boolean> {
async pushMessageReceived(receivedData: IPushData, isRetry?: boolean): Promise<boolean> {
const retryAttempts = 5;
let receivedData: IPushData;
try {
// @ts-ignore
receivedData = JSON.parse(event.data);
} catch (error) {
return false;
}
if (receivedData.type === 'sendWorkerStatusMessage') {
const pushData = receivedData.data;
@ -220,7 +144,7 @@ export const pushConnection = defineComponent({
) {
// If there are already messages in the queue add the new one that all of them
// get executed in order
this.queuePushMessage(event, retryAttempts);
this.queuePushMessage(receivedData, retryAttempts);
return false;
}
@ -596,7 +520,7 @@ export const pushConnection = defineComponent({
this.workflowsStore.activeExecutionId = pushData.executionId;
}
this.processWaitingPushMessages();
void this.processWaitingPushMessages();
} else if (receivedData.type === 'reloadNodeType') {
await this.nodeTypesStore.getNodeTypes();
await this.nodeTypesStore.getFullNodesProperties([receivedData.data]);

View file

@ -0,0 +1,53 @@
import { defineStore } from 'pinia';
import { computed, ref } from 'vue';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { STORES } from '@/constants';
import type { IUser } from '@/Interface';
type ActiveUsersForWorkflows = {
[workflowId: string]: Array<{ user: IUser; lastSeen: string }>;
};
export const useCollaborationStore = defineStore(STORES.COLLABORATION, () => {
const pushStore = usePushConnectionStore();
const workflowStore = useWorkflowsStore();
const usersForWorkflows = ref<ActiveUsersForWorkflows>({});
pushStore.addEventListener((event) => {
if (event.type === 'activeWorkflowUsersChanged') {
const activeWorkflowId = workflowStore.workflowId;
if (event.data.workflowId === activeWorkflowId) {
usersForWorkflows.value[activeWorkflowId] = event.data.activeUsers;
}
}
});
const workflowUsersUpdated = (data: ActiveUsersForWorkflows) => {
usersForWorkflows.value = data;
};
const notifyWorkflowOpened = (workflowId: string) => {
pushStore.send({
type: 'workflowOpened',
workflowId,
});
};
const notifyWorkflowClosed = (workflowId: string) => {
pushStore.send({ type: 'workflowClosed', workflowId });
};
const getUsersForCurrentWorkflow = computed(() => {
return usersForWorkflows.value[workflowStore.workflowId];
});
return {
usersForWorkflows,
notifyWorkflowOpened,
notifyWorkflowClosed,
workflowUsersUpdated,
getUsersForCurrentWorkflow,
};
});

View file

@ -26,3 +26,5 @@ export * from './cloudPlan.store';
export * from './sourceControl.store';
export * from './sso.store';
export * from './auditLogs.store';
export * from './collaboration.store';
export * from './pushConnection.store';

View file

@ -0,0 +1,154 @@
import { defineStore } from 'pinia';
import { STORES, TIME } from '@/constants';
import { ref, computed } from 'vue';
import { useSettingsStore } from './settings.store';
import { useRootStore } from './n8nRoot.store';
import type { IPushData } from '../Interface';
export interface PushState {
sessionId: string;
pushSource: WebSocket | EventSource | null;
reconnectTimeout: NodeJS.Timeout | null;
retryTimeout: NodeJS.Timeout | null;
pushMessageQueue: Array<{ event: Event; retriesLeft: number }>;
connectRetries: number;
lostConnection: boolean;
outgoingQueue: unknown[];
isConnectionOpen: boolean;
}
export type OnPushMessageHandler = (event: IPushData) => void;
/**
* Store for managing a push connection to the server
*/
export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
const rootStore = useRootStore();
const settingsStore = useSettingsStore();
const sessionId = computed(() => rootStore.sessionId);
const pushSource = ref<WebSocket | EventSource | null>(null);
const reconnectTimeout = ref<NodeJS.Timeout | null>(null);
const connectRetries = ref(0);
const lostConnection = ref(false);
const outgoingQueue = ref<unknown[]>([]);
const isConnectionOpen = ref(false);
const onMessageReceivedHandlers = ref<OnPushMessageHandler[]>([]);
const addEventListener = (handler: OnPushMessageHandler) => {
onMessageReceivedHandlers.value.push(handler);
};
function onConnectionError() {
pushDisconnect();
connectRetries.value++;
reconnectTimeout.value = setTimeout(
attemptReconnect,
Math.min(connectRetries.value * 2000, 8 * TIME.SECOND), // maximum 8 seconds backoff
);
}
/**
* Close connection to server
*/
function pushDisconnect() {
if (pushSource.value !== null) {
pushSource.value.removeEventListener('error', onConnectionError);
pushSource.value.removeEventListener('close', onConnectionError);
pushSource.value.removeEventListener('message', pushMessageReceived);
if (pushSource.value.readyState < 2) pushSource.value.close();
pushSource.value = null;
}
isConnectionOpen.value = false;
}
/**
* Connect to server to receive data via a WebSocket or EventSource
*/
function pushConnect() {
// always close the previous connection so that we do not end up with multiple connections
pushDisconnect();
if (reconnectTimeout.value) {
clearTimeout(reconnectTimeout.value);
reconnectTimeout.value = null;
}
const useWebSockets = settingsStore.pushBackend === 'websocket';
const { getRestUrl: restUrl } = rootStore;
const url = `/push?sessionId=${sessionId.value}`;
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
pushSource.value = new WebSocket(`${baseUrl}${url}`);
} else {
pushSource.value = new EventSource(`${restUrl}${url}`, { withCredentials: true });
}
pushSource.value.addEventListener('open', onConnectionSuccess, false);
pushSource.value.addEventListener('message', pushMessageReceived, false);
pushSource.value.addEventListener(useWebSockets ? 'close' : 'error', onConnectionError, false);
}
function attemptReconnect() {
pushConnect();
}
function serializeAndSend(message: unknown) {
if (pushSource.value && 'send' in pushSource.value) {
pushSource.value.send(JSON.stringify(message));
}
}
function onConnectionSuccess() {
isConnectionOpen.value = true;
connectRetries.value = 0;
lostConnection.value = false;
rootStore.pushConnectionActive = true;
pushSource.value?.removeEventListener('open', onConnectionSuccess);
if (outgoingQueue.value.length) {
for (const message of outgoingQueue.value) {
serializeAndSend(message);
}
outgoingQueue.value = [];
}
}
function send(message: unknown) {
if (!isConnectionOpen.value) {
outgoingQueue.value.push(message);
return;
}
serializeAndSend(message);
}
/**
* Process a newly received message
*/
async function pushMessageReceived(event: Event) {
let receivedData: IPushData;
try {
// @ts-ignore
receivedData = JSON.parse(event.data);
} catch (error) {
return;
}
// TODO: Why is this received multiple times?
onMessageReceivedHandlers.value.forEach((handler) => handler(receivedData));
}
return {
sessionId,
pushSource,
isConnectionOpen,
addEventListener,
pushConnect,
send,
};
});

View file

@ -230,6 +230,7 @@ import {
AI_NODE_CREATOR_VIEW,
DRAG_EVENT_DATA_KEY,
UPDATE_WEBHOOK_ID_NODE_TYPES,
TIME,
} from '@/constants';
import { copyPaste } from '@/mixins/copyPaste';
import { externalHooks } from '@/mixins/externalHooks';
@ -329,6 +330,7 @@ import {
useUIStore,
useHistoryStore,
useExternalSecretsStore,
useCollaborationStore,
} from '@/stores';
import * as NodeViewUtils from '@/utils/nodeViewUtils';
import { getAccountAge, getConnectionInfo, getNodeViewTab } from '@/utils';
@ -523,15 +525,18 @@ export default defineComponent({
},
);
} else {
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
next();
}
} else if (confirmModal === MODAL_CANCEL) {
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
this.workflowsStore.setWorkflowId(PLACEHOLDER_EMPTY_WORKFLOW_ID);
this.resetWorkspace();
this.uiStore.stateIsDirty = false;
next();
}
} else {
this.collaborationStore.notifyWorkflowClosed(this.currentWorkflow);
next();
}
},
@ -554,6 +559,7 @@ export default defineComponent({
useWorkflowsEEStore,
useHistoryStore,
useExternalSecretsStore,
useCollaborationStore,
),
nativelyNumberSuffixedDefaults(): string[] {
return this.nodeTypesStore.nativelyNumberSuffixedDefaults;
@ -717,6 +723,7 @@ export default defineComponent({
suspendRecordingDetachedConnections: false,
NODE_CREATOR_OPEN_SOURCES,
eventsAttached: false,
unloadTimeout: undefined as undefined | ReturnType<typeof setTimeout>,
};
},
methods: {
@ -1064,6 +1071,7 @@ export default defineComponent({
this.workflowsStore.activeWorkflowExecution = selectedExecution;
}
this.stopLoading();
this.collaborationStore.notifyWorkflowOpened(workflow.id);
},
touchTap(e: MouseEvent | TouchEvent) {
if (this.isTouchDevice) {
@ -3047,20 +3055,30 @@ export default defineComponent({
this.eventsAttached = false;
},
onBeforeUnload(e) {
onBeforeUnload(e: BeforeUnloadEvent) {
if (this.isDemo || window.preventNodeViewBeforeUnload) {
return;
} else if (this.uiStore.stateIsDirty) {
const confirmationMessage = this.$locale.baseText(
'nodeView.itLooksLikeYouHaveBeenEditingSomething',
);
(e || window.event).returnValue = confirmationMessage; //Gecko + IE
return confirmationMessage; //Gecko + Webkit, Safari, Chrome etc.
// A bit hacky solution to detecting users leaving the page after prompt:
// 1. Notify that workflow is closed straight away
this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId);
// 2. If user decided to stay on the page we notify that the workflow is opened again
this.unloadTimeout = setTimeout(() => {
this.collaborationStore.notifyWorkflowOpened(this.workflowsStore.workflowId);
}, 5 * TIME.SECOND);
e.returnValue = true; //Gecko + IE
return true; //Gecko + Webkit, Safari, Chrome etc.
} else {
this.startLoading(this.$locale.baseText('nodeView.redirecting'));
this.collaborationStore.notifyWorkflowClosed(this.workflowsStore.workflowId);
return;
}
},
onUnload() {
// This will fire if users decides to leave the page after prompted
// Clear the interval to prevent the notification from being sent
clearTimeout(this.unloadTimeout);
},
async newWorkflow(): Promise<void> {
this.startLoading();
this.resetWorkspace();
@ -3159,6 +3177,7 @@ export default defineComponent({
document.addEventListener('keyup', this.keyUp);
window.addEventListener('beforeunload', this.onBeforeUnload);
window.addEventListener('unload', this.onUnload);
},
getOutputEndpointUUID(
nodeName: string,