refactor(core): Move push message types to a new shared package (no-changelog) (#10742)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-13 13:02:00 +02:00 committed by GitHub
parent 7f1c131b72
commit 2f8c8448d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 477 additions and 660 deletions

View file

@ -26,7 +26,7 @@ jobs:
run: pnpm install --frozen-lockfile
- name: Build relevant packages
run: pnpm --filter @n8n/client-oauth2 --filter @n8n/imap --filter n8n-workflow --filter n8n-core --filter n8n-nodes-base --filter @n8n/n8n-nodes-langchain build
run: pnpm build:nodes
- run: npm install --prefix=.github/scripts --no-package-lock

View file

@ -0,0 +1,7 @@
const sharedOptions = require('@n8n_io/eslint-config/shared');
/** @type {import('@types/eslint').ESLint.ConfigData} */
module.exports = {
extends: ['@n8n_io/eslint-config/base'],
...sharedOptions(__dirname),
};

View file

@ -0,0 +1,3 @@
## @n8n/api-types
This package contains types and schema definitions for the n8n internal API, so that these can be shared between the backend and the frontend code.

View file

@ -0,0 +1,2 @@
/** @type {import('jest').Config} */
module.exports = require('../../../jest.config');

View file

@ -0,0 +1,24 @@
{
"name": "@n8n/api-types",
"version": "0.1.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",
"typecheck": "tsc --noEmit",
"build": "tsc -p tsconfig.build.json",
"format": "prettier --write . --ignore-path ../../../.prettierignore",
"lint": "eslint .",
"lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch",
"test": "echo \"No tests yet\" && exit 0"
},
"main": "dist/index.js",
"module": "src/index.ts",
"types": "dist/index.d.ts",
"files": [
"dist/**/*"
],
"devDependencies": {
"n8n-workflow": "workspace:*"
}
}

View file

@ -0,0 +1,2 @@
/** Date time in the ISO 8601 format, e.g. 2024-10-31T00:00:00.123Z */
export type Iso8601DateTimeString = string;

View file

@ -0,0 +1,7 @@
export type * from './push';
export type * from './scaling';
export type * from './datetime';
export type * from './user';
export type { Collaborator } from './push/collaboration';
export type { SendWorkerStatusMessage } from './push/worker';

View file

@ -0,0 +1,17 @@
import type { Iso8601DateTimeString } from '../datetime';
import type { MinimalUser } from '../user';
export type Collaborator = {
user: MinimalUser;
lastSeen: Iso8601DateTimeString;
};
type CollaboratorsChanged = {
type: 'collaboratorsChanged';
data: {
workflowId: string;
collaborators: Collaborator[];
};
};
export type CollaborationPushMessage = CollaboratorsChanged;

View file

@ -0,0 +1,9 @@
type SendConsoleMessage = {
type: 'sendConsoleMessage';
data: {
source: string;
messages: unknown[];
};
};
export type DebugPushMessage = SendConsoleMessage;

View file

@ -0,0 +1,53 @@
import type { IRun, ITaskData, WorkflowExecuteMode } from 'n8n-workflow';
type ExecutionStarted = {
type: 'executionStarted';
data: {
executionId: string;
mode: WorkflowExecuteMode;
startedAt: Date;
workflowId: string;
workflowName?: string;
retryOf?: string;
};
};
type ExecutionFinished = {
type: 'executionFinished';
data: {
executionId: string;
data: IRun;
retryOf?: string;
};
};
type ExecutionRecovered = {
type: 'executionRecovered';
data: {
executionId: string;
};
};
type NodeExecuteBefore = {
type: 'nodeExecuteBefore';
data: {
executionId: string;
nodeName: string;
};
};
type NodeExecuteAfter = {
type: 'nodeExecuteAfter';
data: {
executionId: string;
nodeName: string;
data: ITaskData;
};
};
export type ExecutionPushMessage =
| ExecutionStarted
| ExecutionFinished
| ExecutionRecovered
| NodeExecuteBefore
| NodeExecuteAfter;

View file

@ -0,0 +1,21 @@
type NodeTypeData = {
name: string;
version: number;
};
type ReloadNodeType = {
type: 'reloadNodeType';
data: NodeTypeData;
};
type RemoveNodeType = {
type: 'removeNodeType';
data: NodeTypeData;
};
type NodeDescriptionUpdated = {
type: 'nodeDescriptionUpdated';
data: {};
};
export type HotReloadPushMessage = ReloadNodeType | RemoveNodeType | NodeDescriptionUpdated;

View file

@ -0,0 +1,20 @@
import type { ExecutionPushMessage } from './execution';
import type { WorkflowPushMessage } from './workflow';
import type { HotReloadPushMessage } from './hot-reload';
import type { WorkerPushMessage } from './worker';
import type { WebhookPushMessage } from './webhook';
import type { CollaborationPushMessage } from './collaboration';
import type { DebugPushMessage } from './debug';
export type PushMessage =
| ExecutionPushMessage
| WorkflowPushMessage
| HotReloadPushMessage
| WebhookPushMessage
| WorkerPushMessage
| CollaborationPushMessage
| DebugPushMessage;
export type PushType = PushMessage['type'];
export type PushPayload<T extends PushType> = Extract<PushMessage, { type: T }>['data'];

View file

@ -0,0 +1,17 @@
type TestWebhookDeleted = {
type: 'testWebhookDeleted';
data: {
executionId?: string;
workflowId: string;
};
};
type TestWebhookReceived = {
type: 'testWebhookReceived';
data: {
executionId: string;
workflowId: string;
};
};
export type WebhookPushMessage = TestWebhookDeleted | TestWebhookReceived;

View file

@ -0,0 +1,11 @@
import type { WorkerStatus } from '../scaling';
export type SendWorkerStatusMessage = {
type: 'sendWorkerStatusMessage';
data: {
workerId: string;
status: WorkerStatus;
};
};
export type WorkerPushMessage = SendWorkerStatusMessage;

View file

@ -0,0 +1,26 @@
type WorkflowActivated = {
type: 'workflowActivated';
data: {
workflowId: string;
};
};
type WorkflowFailedToActivate = {
type: 'workflowFailedToActivate';
data: {
workflowId: string;
errorMessage: string;
};
};
type WorkflowDeactivated = {
type: 'workflowDeactivated';
data: {
workflowId: string;
};
};
export type WorkflowPushMessage =
| WorkflowActivated
| WorkflowFailedToActivate
| WorkflowDeactivated;

View file

@ -0,0 +1,30 @@
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
export type RunningJobSummary = {
executionId: string;
workflowId: string;
workflowName: string;
mode: WorkflowExecuteMode;
startedAt: Date;
retryOf: string;
status: ExecutionStatus;
};
export type WorkerStatus = {
workerId: string;
runningJobsSummary: RunningJobSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
interfaces: Array<{
family: 'IPv4' | 'IPv6';
address: string;
internal: boolean;
}>;
version: string;
};

View file

@ -0,0 +1,6 @@
export type MinimalUser = {
id: string;
email: string;
firstName: string;
lastName: string;
};

View file

@ -0,0 +1,11 @@
{
"extends": ["./tsconfig.json", "../../../tsconfig.build.json"],
"compilerOptions": {
"composite": true,
"rootDir": "src",
"outDir": "dist",
"tsBuildInfoFile": "dist/build.tsbuildinfo"
},
"include": ["src/**/*.ts"],
"exclude": ["test/**", "src/**/__tests__/**"]
}

View file

@ -0,0 +1,10 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"rootDir": ".",
"types": ["node", "jest"],
"baseUrl": "src",
"tsBuildInfoFile": "dist/typecheck.tsbuildinfo"
},
"include": ["src/**/*.ts", "test/**/*.ts"]
}

View file

@ -85,6 +85,7 @@
"@azure/identity": "^4.3.0",
"@azure/keyvault-secrets": "^4.8.0",
"@google-cloud/secret-manager": "^5.6.0",
"@n8n/api-types": "workspace:*",
"@n8n/client-oauth2": "workspace:*",
"@n8n/config": "workspace:*",
"@n8n/localtunnel": "3.0.0",

View file

@ -1,3 +1,4 @@
import type { PushPayload } from '@n8n/api-types';
import type { Workflow } from 'n8n-workflow';
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
import { Service } from 'typedi';
@ -5,7 +6,6 @@ import { Service } from 'typedi';
import { CollaborationState } from '@/collaboration/collaboration.state';
import type { User } from '@/databases/entities/user';
import { UserRepository } from '@/databases/repositories/user.repository';
import type { ICollaboratorsChanged } from '@/interfaces';
import { Push } from '@/push';
import type { OnPushMessage } from '@/push/types';
import { AccessService } from '@/services/access.service';
@ -92,7 +92,7 @@ export class CollaborationService {
user: user.toIUser(),
lastSeen: collaborators.find(({ userId }) => userId === user.id)!.lastSeen,
}));
const msgData: ICollaboratorsChanged = {
const msgData: PushPayload<'collaboratorsChanged'> = {
workflowId,
collaborators: activeCollaborators,
};

View file

@ -1,9 +1,9 @@
import type { Iso8601DateTimeString } from '@n8n/api-types';
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Time } from '@/constants';
import type { User } from '@/databases/entities/user';
import type { Iso8601DateTimeString } from '@/interfaces';
import { CacheService } from '@/services/cache/cache.service';
type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;

View file

@ -1,3 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import { Request } from 'express';
import Container from 'typedi';
import { v4 as uuid } from 'uuid';
@ -10,7 +11,7 @@ import { SettingsRepository } from '@/databases/repositories/settings.repository
import { UserRepository } from '@/databases/repositories/user.repository';
import { Patch, Post, RestController } from '@/decorators';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import type { BooleanLicenseFeature, IPushDataType, NumericLicenseFeature } from '@/interfaces';
import type { BooleanLicenseFeature, NumericLicenseFeature } from '@/interfaces';
import { License } from '@/license';
import { Logger } from '@/logger';
import { MfaService } from '@/mfa/mfa.service';
@ -56,13 +57,13 @@ type ResetRequest = Request<
}
>;
type PushRequest = Request<
type PushRequest<T extends PushType> = Request<
{},
{},
{
type: IPushDataType;
type: T;
pushRef: string;
data: object;
data: PushPayload<T>;
}
>;
@ -132,7 +133,7 @@ export class E2EController {
}
@Post('/push', { skipAuth: true })
async pushSend(req: PushRequest) {
async pushSend(req: PushRequest<any>) {
this.push.broadcast(req.body.type, req.body.data);
}

View file

@ -11,7 +11,6 @@ import type {
IExecuteResponsePromiseData,
IRun,
IRunExecutionData,
ITaskData,
ITelemetryTrackProperties,
IWorkflowBase,
CredentialLoadingDetails,
@ -23,7 +22,6 @@ import type {
INodeProperties,
IUserSettings,
IWorkflowExecutionDataProcess,
IUser,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
@ -40,7 +38,6 @@ import type { WorkflowRepository } from '@/databases/repositories/workflow.repos
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
import type { ExternalHooks } from './external-hooks';
import type { RunningJobSummary } from './scaling/scaling.types';
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
export interface ICredentialsTypeData {
@ -139,11 +136,6 @@ export interface IExecutionDb extends IExecutionBase {
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;
export interface IExecutionPushResponse {
executionId?: string;
waitingForWebhook?: boolean;
}
export interface IExecutionResponse extends IExecutionBase {
id: string;
data: IRunExecutionData;
@ -271,207 +263,6 @@ export interface IPackageVersions {
cli: string;
}
export type IPushDataType = IPushData['type'];
export type IPushData =
| PushDataExecutionFinished
| PushDataExecutionStarted
| PushDataExecuteAfter
| PushDataExecuteBefore
| PushDataConsoleMessage
| PushDataReloadNodeType
| PushDataRemoveNodeType
| PushDataTestWebhook
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate
| PushDataCollaboratorsChanged;
type PushDataCollaboratorsChanged = {
data: ICollaboratorsChanged;
type: 'collaboratorsChanged';
};
type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataWorkflowActivated = {
data: IActiveWorkflowChanged;
type: 'workflowActivated';
};
type PushDataWorkflowDeactivated = {
data: IActiveWorkflowChanged;
type: 'workflowDeactivated';
};
export type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered;
type: 'executionRecovered';
};
export type PushDataExecutionFinished = {
data: IPushDataExecutionFinished;
type: 'executionFinished';
};
export type PushDataExecutionStarted = {
data: IPushDataExecutionStarted;
type: 'executionStarted';
};
export type PushDataExecuteAfter = {
data: IPushDataNodeExecuteAfter;
type: 'nodeExecuteAfter';
};
export type PushDataExecuteBefore = {
data: IPushDataNodeExecuteBefore;
type: 'nodeExecuteBefore';
};
export type PushDataConsoleMessage = {
data: IPushDataConsoleMessage;
type: 'sendConsoleMessage';
};
type PushDataWorkerStatusMessage = {
data: IPushDataWorkerStatusMessage;
type: 'sendWorkerStatusMessage';
};
type PushDataReloadNodeType = {
data: IPushDataReloadNodeType;
type: 'reloadNodeType';
};
export type PushDataRemoveNodeType = {
data: IPushDataRemoveNodeType;
type: 'removeNodeType';
};
export type PushDataTestWebhook = {
data: IPushDataTestWebhook;
type: 'testWebhookDeleted' | 'testWebhookReceived';
};
export type PushDataNodeDescriptionUpdated = {
data: undefined;
type: 'nodeDescriptionUpdated';
};
/** DateTime in the Iso8601 format, e.g. 2024-10-31T00:00:00.123Z */
export type Iso8601DateTimeString = string;
export interface ICollaborator {
user: IUser;
lastSeen: Iso8601DateTimeString;
}
export interface ICollaboratorsChanged {
workflowId: Workflow['id'];
collaborators: ICollaborator[];
}
export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}
interface IActiveWorkflowChanged {
workflowId: Workflow['id'];
}
interface IWorkflowFailedToActivate {
workflowId: Workflow['id'];
errorMessage: string;
}
export interface IPushDataExecutionRecovered {
executionId: string;
}
export interface IPushDataExecutionFinished {
data: IRun;
executionId: string;
retryOf?: string;
}
export interface IPushDataExecutionStarted {
executionId: string;
mode: WorkflowExecuteMode;
startedAt: Date;
retryOf?: string;
workflowId: string;
workflowName?: string;
}
export interface IPushDataNodeExecuteAfter {
data: ITaskData;
executionId: string;
nodeName: string;
}
export interface IPushDataNodeExecuteBefore {
executionId: string;
nodeName: string;
}
export interface IPushDataReloadNodeType {
name: string;
version: number;
}
export interface IPushDataRemoveNodeType {
name: string;
version: number;
}
export interface IPushDataTestWebhook {
executionId: string;
workflowId: string;
}
export interface IPushDataConsoleMessage {
source: string;
message: string;
}
export interface IPushDataWorkerStatusMessage {
workerId: string;
status: IPushDataWorkerStatusPayload;
}
export interface IPushDataWorkerStatusPayload {
workerId: string;
runningJobsSummary: RunningJobSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
interfaces: Array<{
family: 'IPv4' | 'IPv6';
address: string;
internal: boolean;
}>;
version: string;
}
export interface INodesTypeData {
[key: string]: {
className: string;
sourcePath: string;
};
}
export interface IWorkflowErrorData {
[key: string]: any;
execution?: {

View file

@ -375,7 +375,7 @@ export class LoadNodesAndCredentials {
loader.reset();
await loader.loadAll();
await this.postProcessLoaders();
push.broadcast('nodeDescriptionUpdated');
push.broadcast('nodeDescriptionUpdated', {});
}, 100);
const toWatch = loader.isLazyLoaded

View file

@ -1,9 +1,9 @@
import type { PushMessage } from '@n8n/api-types';
import { EventEmitter } from 'events';
import { Container } from 'typedi';
import type WebSocket from 'ws';
import type { User } from '@/databases/entities/user';
import type { PushDataExecutionRecovered } from '@/interfaces';
import { Logger } from '@/logger';
import { WebSocketPush } from '@/push/websocket.push';
import { mockInstance } from '@test/mocking';
@ -28,6 +28,18 @@ describe('WebSocketPush', () => {
const pushRef1 = 'test-session1';
const pushRef2 = 'test-session2';
const userId: User['id'] = 'test-user';
const pushMessage: PushMessage = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
});
mockInstance(Logger);
const webSocketPush = Container.get(WebSocketPush);
@ -61,50 +73,17 @@ describe('WebSocketPush', () => {
it('sends data to one connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.sendToOne(pushMessage.type, pushMessage.data, pushRef1);
webSocketPush.sendToOne('executionRecovered', data, pushRef1);
expect(mockWebSocket1.send).toHaveBeenCalledWith(
JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
}),
);
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
});
it('sends data to all connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.sendToAll(pushMessage.type, pushMessage.data);
webSocketPush.sendToAll('executionRecovered', data);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
@ -122,24 +101,8 @@ describe('WebSocketPush', () => {
it('sends data to all users connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.sendToUsers(pushMessage.type, pushMessage.data, [userId]);
webSocketPush.sendToUsers('executionRecovered', data, [userId]);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});

View file

@ -1,7 +1,7 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import { assert, jsonStringify } from 'n8n-workflow';
import type { User } from '@/databases/entities/user';
import type { IPushDataType } from '@/interfaces';
import type { Logger } from '@/logger';
import type { OnPushMessage } from '@/push/types';
import { TypedEmitter } from '@/typed-emitter';
@ -16,19 +16,19 @@ export interface AbstractPushEvents {
*
* @emits message when a message is received from a client
*/
export abstract class AbstractPush<T> extends TypedEmitter<AbstractPushEvents> {
protected connections: Record<string, T> = {};
export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPushEvents> {
protected connections: Record<string, Connection> = {};
protected userIdByPushRef: Record<string, string> = {};
protected abstract close(connection: T): void;
protected abstract sendToOneConnection(connection: T, data: string): void;
protected abstract close(connection: Connection): void;
protected abstract sendToOneConnection(connection: Connection, data: string): void;
constructor(protected readonly logger: Logger) {
super();
}
protected add(pushRef: string, userId: User['id'], connection: T) {
protected add(pushRef: string, userId: User['id'], connection: Connection) {
const { connections, userIdByPushRef } = this;
this.logger.debug('Add editor-UI session', { pushRef });
@ -60,7 +60,7 @@ export abstract class AbstractPush<T> extends TypedEmitter<AbstractPushEvents> {
delete this.userIdByPushRef[pushRef];
}
private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
private sendTo<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRefs: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
pushRefs: pushRefs.join(', '),
@ -75,11 +75,11 @@ export abstract class AbstractPush<T> extends TypedEmitter<AbstractPushEvents> {
}
}
sendToAll(type: IPushDataType, data?: unknown) {
sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.sendTo(type, data, Object.keys(this.connections));
}
sendToOne(type: IPushDataType, data: unknown, pushRef: string) {
sendToOne<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
if (this.connections[pushRef] === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
@ -88,7 +88,11 @@ export abstract class AbstractPush<T> extends TypedEmitter<AbstractPushEvents> {
this.sendTo(type, data, [pushRef]);
}
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
const { connections } = this;
const userPushRefs = Object.keys(connections).filter((pushRef) =>
userIds.includes(this.userIdByPushRef[pushRef]),

View file

@ -1,3 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server } from 'http';
@ -11,7 +12,6 @@ import config from '@/config';
import type { User } from '@/databases/entities/user';
import { OnShutdown } from '@/decorators/on-shutdown';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import type { IPushDataType } from '@/interfaces';
import { OrchestrationService } from '@/services/orchestration.service';
import { TypedEmitter } from '@/typed-emitter';
@ -45,6 +45,10 @@ export class Push extends TypedEmitter<PushEvents> {
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
}
getBackend() {
return this.backend;
}
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
ws,
@ -73,11 +77,11 @@ export class Push extends TypedEmitter<PushEvents> {
this.emit('editorUiConnected', pushRef);
}
broadcast(type: IPushDataType, data?: unknown) {
broadcast<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.backend.sendToAll(type, data);
}
send(type: IPushDataType, data: unknown, pushRef: string) {
send<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
@ -93,11 +97,11 @@ export class Push extends TypedEmitter<PushEvents> {
this.backend.sendToOne(type, data, pushRef);
}
getBackend() {
return this.backend;
}
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
this.backend.sendToUsers(type, data, userIds);
}

View file

@ -1,3 +1,4 @@
import type { RunningJobSummary } from '@n8n/api-types';
import { WorkflowExecute } from 'n8n-core';
import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow';
import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
@ -11,7 +12,7 @@ import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import type { Job, JobId, JobResult, RunningJob, RunningJobSummary } from './scaling.types';
import type { Job, JobId, JobResult, RunningJob } from './scaling.types';
/**
* Responsible for processing jobs from the queue, i.e. running enqueued executions.

View file

@ -1,11 +1,6 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type Bull from 'bull';
import type {
ExecutionError,
ExecutionStatus,
IExecuteResponsePromiseData,
IRun,
WorkflowExecuteMode as WorkflowExecutionMode,
} from 'n8n-workflow';
import type { ExecutionError, IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
export type JobQueue = Bull.Queue<JobData>;
@ -30,11 +25,11 @@ export type JobOptions = Bull.JobOptions;
export type PubSubMessage = MessageToMain | MessageToWorker;
type MessageToMain = RepondToWebhookMessage;
type MessageToMain = RespondToWebhookMessage;
type MessageToWorker = AbortJobMessage;
type RepondToWebhookMessage = {
type RespondToWebhookMessage = {
kind: 'respond-to-webhook';
executionId: string;
response: IExecuteResponsePromiseData;
@ -44,19 +39,10 @@ type AbortJobMessage = {
kind: 'abort-job';
};
export type RunningJob = {
executionId: string;
workflowId: string;
workflowName: string;
mode: WorkflowExecutionMode;
startedAt: Date;
retryOf: string;
status: ExecutionStatus;
export type RunningJob = RunningJobSummary & {
run: PCancelable<IRun>;
};
export type RunningJobSummary = Omit<RunningJob, 'run'>;
export type QueueRecoveryContext = {
/** ID of timeout for next scheduled recovery cycle. */
timeout?: NodeJS.Timeout;

View file

@ -1,5 +1,5 @@
import { jsonParse } from 'n8n-workflow';
import * as os from 'os';
import os from 'node:os';
import { Container } from 'typedi';
import { Logger } from '@/logger';

View file

@ -1,3 +1,4 @@
import type { WorkerStatus } from '@n8n/api-types';
import { jsonParse } from 'n8n-workflow';
import Container from 'typedi';
@ -29,7 +30,7 @@ export async function handleWorkerResponseMessageMain(
case 'getStatus':
Container.get(Push).broadcast('sendWorkerStatusMessage', {
workerId: workerResponse.workerId,
status: workerResponse.payload,
status: workerResponse.payload as WorkerStatus,
});
break;
case 'getId':

View file

@ -1,5 +1,6 @@
import type { IPushDataType, IWorkflowDb } from '@/interfaces';
import type { RunningJobSummary } from '@/scaling/scaling.types';
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type PubSubMessageMap = {
// #region Lifecycle
@ -43,24 +44,7 @@ export type PubSubMessageMap = {
'get-worker-id': never;
'get-worker-status': {
workerId: string;
runningJobsSummary: RunningJobSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
interfaces: Array<{
family: 'IPv4' | 'IPv6';
address: string;
internal: boolean;
}>;
version: string;
};
'get-worker-status': WorkerStatus;
// #endregion
@ -89,7 +73,7 @@ export type PubSubMessageMap = {
};
'relay-execution-lifecycle-event': {
type: IPushDataType;
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};

View file

@ -1,5 +1,5 @@
import { jsonParse } from 'n8n-workflow';
import * as os from 'os';
import os from 'node:os';
import Container from 'typedi';
import { N8N_VERSION } from '@/constants';

View file

@ -1,7 +1,6 @@
import type { RunningJobSummary } from '@n8n/api-types';
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RunningJobSummary } from '@/scaling/scaling.types';
import type { RedisServicePubSubPublisher } from '../../redis/redis-service-pub-sub-publisher';
export interface WorkerCommandReceivedHandlerOptions {

View file

@ -1,4 +1,6 @@
import type { IPushDataType, IPushDataWorkerStatusPayload, IWorkflowDb } from '@/interfaces';
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type RedisServiceCommand =
| 'getStatus'
@ -42,7 +44,7 @@ export type RedisServiceBaseCommand =
| {
senderId: string;
command: 'relay-execution-lifecycle-event';
payload: { type: IPushDataType; args: Record<string, unknown>; pushRef: string };
payload: { type: PushType; args: Record<string, unknown>; pushRef: string };
}
| {
senderId: string;
@ -64,7 +66,7 @@ export type RedisServiceWorkerResponseObject = {
| RedisServiceBaseCommand
| {
command: 'getStatus';
payload: IPushDataWorkerStatusPayload;
payload: WorkerStatus;
}
| {
command: 'getId';

View file

@ -3,6 +3,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { PushType } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config';
import { WorkflowExecute } from 'n8n-core';
import type {
@ -40,13 +41,7 @@ import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type {
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowErrorData,
IPushDataType,
ExecutionPayload,
} from '@/interfaces';
import type { IWorkflowExecuteProcess, IWorkflowErrorData, ExecutionPayload } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
@ -299,7 +294,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
startedAt: new Date(),
retryOf: this.retryOf,
workflowId,
pushRef,
workflowName,
},
pushRef,
@ -346,13 +340,15 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId,
});
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId,
data: pushRunData,
retryOf,
};
pushInstance.send('executionFinished', sendData, pushRef);
pushInstance.send(
'executionFinished',
{
executionId,
data: pushRunData,
retryOf,
},
pushRef,
);
},
],
};
@ -938,7 +934,7 @@ export function setExecutionStatus(status: ExecutionStatus) {
Container.get(ActiveExecutions).setStatus(this.executionId, status);
}
export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) {
export function sendDataToUI(type: PushType, data: IDataObject | IDataObject[]) {
const { pushRef } = this;
if (pushRef === undefined) {
return;
@ -947,7 +943,7 @@ export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) {
// Push data to session which started workflow
try {
const pushInstance = Container.get(Push);
pushInstance.send(type as IPushDataType, data, pushRef);
pushInstance.send(type, data, pushRef);
} catch (error) {
const logger = Container.get(Logger);
logger.warn(`There was a problem sending message to UI: ${error.message}`);

View file

@ -19,6 +19,7 @@
"references": [
{ "path": "../workflow/tsconfig.build.json" },
{ "path": "../core/tsconfig.build.json" },
{ "path": "../@n8n/api-types/tsconfig.build.json" },
{ "path": "../@n8n/client-oauth2/tsconfig.build.json" },
{ "path": "../@n8n/config/tsconfig.build.json" },
{ "path": "../@n8n/permissions/tsconfig.build.json" }

View file

@ -32,6 +32,7 @@
"@jsplumb/core": "^5.13.2",
"@jsplumb/util": "^5.13.2",
"@lezer/common": "^1.0.4",
"@n8n/api-types": "workspace:*",
"@n8n/chat": "workspace:*",
"@n8n/codemirror-lang": "workspace:*",
"@n8n/codemirror-lang-sql": "^1.0.2",

View file

@ -1,13 +1,8 @@
import type {
AI_NODE_CREATOR_VIEW,
CREDENTIAL_EDIT_MODAL_KEY,
SignInType,
FAKE_DOOR_FEATURES,
TRIGGER_NODE_CREATOR_VIEW,
REGULAR_NODE_CREATOR_VIEW,
AI_OTHERS_NODE_CREATOR_VIEW,
ROLE,
} from '@/constants';
import type { Component } from 'vue';
import type { NotificationOptions as ElementNotificationOptions } from 'element-plus';
import type { Connection } from '@jsplumb/core';
import type { Iso8601DateTimeString } from '@n8n/api-types';
import type { Scope } from '@n8n/permissions';
import type { IMenuItem, NodeCreatorTag } from 'n8n-design-system';
import type {
GenericValue,
@ -22,9 +17,7 @@ import type {
INodeTypeDescription,
IPinData,
IRunExecutionData,
IRun,
IRunData,
ITaskData,
IWorkflowSettings as IWorkflowSettingsWorkflow,
WorkflowExecuteMode,
PublicInstalledPackage,
@ -51,13 +44,21 @@ import type {
IPersonalizationSurveyAnswersV4,
AnnotationVote,
} from 'n8n-workflow';
import type {
AI_NODE_CREATOR_VIEW,
CREDENTIAL_EDIT_MODAL_KEY,
SignInType,
FAKE_DOOR_FEATURES,
TRIGGER_NODE_CREATOR_VIEW,
REGULAR_NODE_CREATOR_VIEW,
AI_OTHERS_NODE_CREATOR_VIEW,
ROLE,
} from '@/constants';
import type { BulkCommand, Undoable } from '@/models/history';
import type { PartialBy, TupleToUnion } from '@/utils/typeHelpers';
import type { Component } from 'vue';
import type { Scope } from '@n8n/permissions';
import type { NotificationOptions as ElementNotificationOptions } from 'element-plus';
import type { ProjectSharingData } from '@/types/projects.types';
import type { Connection } from '@jsplumb/core';
import type { BaseTextKey } from './plugins/i18n';
export * from 'n8n-design-system/types';
@ -119,9 +120,6 @@ declare global {
}
}
/** String that represents a timestamp in the ISO8601 format, i.e. YYYY-MM-DDTHH:MM:SS.sssZ */
export type Iso8601String = string;
export type EndpointStyle = {
width?: number;
height?: number;
@ -336,8 +334,8 @@ export interface IShareWorkflowsPayload {
export interface ICredentialsResponse extends ICredentialsEncrypted {
id: string;
createdAt: Iso8601String;
updatedAt: Iso8601String;
createdAt: Iso8601DateTimeString;
updatedAt: Iso8601DateTimeString;
sharedWithProjects?: ProjectSharingData[];
homeProject?: ProjectSharingData;
currentUserHasAccess?: boolean;
@ -346,8 +344,8 @@ export interface ICredentialsResponse extends ICredentialsEncrypted {
}
export interface ICredentialsBase {
createdAt: Iso8601String;
updatedAt: Iso8601String;
createdAt: Iso8601DateTimeString;
updatedAt: Iso8601DateTimeString;
}
export interface ICredentialsDecryptedResponse extends ICredentialsBase, ICredentialsDecrypted {
@ -422,213 +420,6 @@ export interface IExecutionDeleteFilter {
ids?: string[];
}
export interface Collaborator {
user: IUser;
lastSeen: string;
}
export type PushDataCollaborators = {
workflowId: string;
collaborators: Collaborator[];
};
type PushDataCollaboratorsChanged = {
data: PushDataCollaborators;
type: 'collaboratorsChanged';
};
export type IPushData =
| PushDataExecutionFinished
| PushDataExecutionStarted
| PushDataExecuteAfter
| PushDataExecuteBefore
| PushDataNodeDescriptionUpdated
| PushDataConsoleMessage
| PushDataReloadNodeType
| PushDataRemoveNodeType
| PushDataTestWebhook
| PushDataExecutionRecovered
| PushDataWorkerStatusMessage
| PushDataActiveWorkflowAdded
| PushDataActiveWorkflowRemoved
| PushDataCollaboratorsChanged
| PushDataWorkflowFailedToActivate;
export type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
type: 'workflowActivated';
};
export type PushDataActiveWorkflowRemoved = {
data: IActiveWorkflowRemoved;
type: 'workflowDeactivated';
};
export type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
export type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered;
type: 'executionRecovered';
};
export type PushDataExecutionFinished = {
data: IPushDataExecutionFinished;
type: 'executionFinished';
};
export type PushDataExecutionStarted = {
data: IPushDataExecutionStarted;
type: 'executionStarted';
};
export type PushDataExecuteAfter = {
data: IPushDataNodeExecuteAfter;
type: 'nodeExecuteAfter';
};
export type PushDataExecuteBefore = {
data: IPushDataNodeExecuteBefore;
type: 'nodeExecuteBefore';
};
export type PushDataNodeDescriptionUpdated = {
data: {};
type: 'nodeDescriptionUpdated';
};
export type PushDataConsoleMessage = {
data: IPushDataConsoleMessage;
type: 'sendConsoleMessage';
};
export type PushDataReloadNodeType = {
data: IPushDataReloadNodeType;
type: 'reloadNodeType';
};
export type PushDataRemoveNodeType = {
data: IPushDataRemoveNodeType;
type: 'removeNodeType';
};
export type PushDataTestWebhook = {
data: IPushDataTestWebhook;
type: 'testWebhookDeleted' | 'testWebhookReceived';
};
export type PushDataWorkerStatusMessage = {
data: IPushDataWorkerStatusMessage;
type: 'sendWorkerStatusMessage';
};
export interface IPushDataExecutionStarted {
executionId: string;
mode: WorkflowExecuteMode;
startedAt: Date;
retryOf?: string;
workflowId: string;
workflowName?: string;
}
export interface IPushDataExecutionRecovered {
executionId: string;
}
export interface IPushDataExecutionFinished {
data: IRun;
executionId: string;
retryOf?: string;
}
export interface IActiveWorkflowAdded {
workflowId: string;
}
export interface IActiveWorkflowRemoved {
workflowId: string;
}
export interface IWorkflowFailedToActivate {
workflowId: string;
errorMessage: string;
}
export interface IPushDataUnsavedExecutionFinished {
executionId: string;
data: { finished: true; stoppedAt: Date };
}
export interface IPushDataExecutionStarted {
executionId: string;
}
export interface IPushDataNodeExecuteAfter {
data: ITaskData;
executionId: string;
nodeName: string;
}
export interface IPushDataNodeExecuteBefore {
executionId: string;
nodeName: string;
}
export interface IPushDataReloadNodeType {
name: string;
version: number;
}
export interface IPushDataRemoveNodeType {
name: string;
version: number;
}
export interface IPushDataTestWebhook {
executionId: string;
workflowId: string;
}
export interface IPushDataConsoleMessage {
source: string;
messages: string[];
}
export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}
export interface IPushDataWorkerStatusPayload {
workerId: string;
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
interfaces: Array<{
family: 'IPv4' | 'IPv6';
address: string;
internal: boolean;
}>;
version: string;
}
export interface IPushDataWorkerStatusMessage {
workerId: string;
status: IPushDataWorkerStatusPayload;
}
export type IPersonalizationSurveyAnswersV1 = {
codingSkill?: string | null;
companyIndustry?: string[] | null;

View file

@ -1,17 +1,18 @@
<script lang="ts">
import { defineComponent } from 'vue';
import { useRouter } from 'vue-router';
import { mapStores } from 'pinia';
import type { WorkerStatus } from '@n8n/api-types';
import type { ExecutionStatus } from 'n8n-workflow';
import PushConnectionTracker from '@/components/PushConnectionTracker.vue';
import { useI18n } from '@/composables/useI18n';
import { useToast } from '@/composables/useToast';
import type { IPushDataWorkerStatusPayload } from '@/Interface';
import type { ExecutionStatus } from 'n8n-workflow';
import { useUIStore } from '@/stores/ui.store';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { setPageTitle } from '@/utils/htmlUtils';
import WorkerCard from './Workers/WorkerCard.ee.vue';
import { usePushConnection } from '@/composables/usePushConnection';
import { useRouter } from 'vue-router';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useRootStore } from '@/stores/root.store';
@ -39,8 +40,8 @@ export default defineComponent({
},
computed: {
...mapStores(useRootStore, useUIStore, usePushConnectionStore, useOrchestrationStore),
combinedWorkers(): IPushDataWorkerStatusPayload[] {
const returnData: IPushDataWorkerStatusPayload[] = [];
combinedWorkers(): WorkerStatus[] {
const returnData: WorkerStatus[] = [];
for (const workerId in this.orchestrationManagerStore.workers) {
returnData.push(this.orchestrationManagerStore.workers[workerId]);
}
@ -85,14 +86,14 @@ export default defineComponent({
averageLoadAvg(loads: number[]) {
return (loads.reduce((prev, curr) => prev + curr, 0) / loads.length).toFixed(2);
},
getStatus(payload: IPushDataWorkerStatusPayload): ExecutionStatus {
getStatus(payload: WorkerStatus): ExecutionStatus {
if (payload.runningJobsSummary.length > 0) {
return 'running';
} else {
return 'success';
}
},
getRowClass(payload: IPushDataWorkerStatusPayload): string {
getRowClass(payload: WorkerStatus): string {
return [this.$style.execRow, this.$style[this.getStatus(payload)]].join(' ');
},
},

View file

@ -1,7 +1,8 @@
<script setup lang="ts">
import { useOrchestrationStore } from '@/stores/orchestration.store';
import type { IPushDataWorkerStatusPayload } from '@/Interface';
import { computed, onMounted, onBeforeUnmount, ref } from 'vue';
import type { WorkerStatus } from '@n8n/api-types';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { averageWorkerLoadFromLoadsAsString, memAsGb } from '../../utils/workerUtils';
import WorkerJobAccordion from './WorkerJobAccordion.ee.vue';
import WorkerNetAccordion from './WorkerNetAccordion.ee.vue';
@ -18,7 +19,7 @@ const props = defineProps<{
const secondsSinceLastUpdateString = ref<string>('0');
const stale = ref<boolean>(false);
const worker = computed((): IPushDataWorkerStatusPayload | undefined => {
const worker = computed((): WorkerStatus | undefined => {
return orchestrationStore.getWorkerStatus(props.workerId);
});

View file

@ -1,9 +1,9 @@
<script setup lang="ts">
import type { WorkerJobStatusSummary } from '@/Interface';
import type { RunningJobSummary } from '@n8n/api-types';
import WorkerAccordion from './WorkerAccordion.ee.vue';
const props = defineProps<{
items: WorkerJobStatusSummary[];
items: RunningJobSummary[];
}>();
function runningSince(started: Date): string {

View file

@ -1,12 +1,12 @@
<script setup lang="ts">
import type { IPushDataWorkerStatusPayload } from '@/Interface';
import type { WorkerStatus } from '@n8n/api-types';
import WorkerAccordion from './WorkerAccordion.ee.vue';
import { useClipboard } from '@/composables/useClipboard';
import { useI18n } from '@/composables/useI18n';
import { useToast } from '@/composables/useToast';
const props = defineProps<{
items: IPushDataWorkerStatusPayload['interfaces'];
items: WorkerStatus['interfaces'];
}>();
const i18n = useI18n();

View file

@ -1,8 +1,9 @@
import { usePushConnection } from '@/composables/usePushConnection';
import { useRouter } from 'vue-router';
import { createPinia, setActivePinia } from 'pinia';
import type { PushMessage, PushPayload } from '@n8n/api-types';
import { usePushConnection } from '@/composables/usePushConnection';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import type { IPushData, PushDataWorkerStatusMessage } from '@/Interface';
import { useOrchestrationStore } from '@/stores/orchestration.store';
vi.mock('vue-router', () => {
@ -56,13 +57,13 @@ describe('usePushConnection()', () => {
describe('queuePushMessage()', () => {
it('should add message to the queue and sets timeout if not already set', () => {
const event: IPushData = {
const event: PushMessage = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {},
status: {} as PushPayload<'sendWorkerStatusMessage'>['status'],
},
} as PushDataWorkerStatusMessage;
};
pushConnection.queuePushMessage(event, 5);
@ -74,7 +75,7 @@ describe('usePushConnection()', () => {
describe('processWaitingPushMessages()', () => {
it('should clear the queue and reset the timeout', async () => {
const event: IPushData = { type: 'executionRecovered', data: { executionId: '1' } };
const event: PushMessage = { type: 'executionRecovered', data: { executionId: '1' } };
pushConnection.queuePushMessage(event, 0);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
@ -91,13 +92,13 @@ describe('usePushConnection()', () => {
describe('sendWorkerStatusMessage', () => {
it('should handle event type correctly', async () => {
const spy = vi.spyOn(orchestrationStore, 'updateWorkerStatus').mockImplementation(() => {});
const event: IPushData = {
const event: PushMessage = {
type: 'sendWorkerStatusMessage',
data: {
workerId: '1',
status: {},
status: {} as PushPayload<'sendWorkerStatusMessage'>['status'],
},
} as PushDataWorkerStatusMessage;
};
const result = await pushConnection.pushMessageReceived(event);

View file

@ -1,14 +1,6 @@
import type {
IExecutionResponse,
IExecutionsCurrentSummaryExtended,
IPushData,
IPushDataExecutionFinished,
} from '@/Interface';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useTitleChange } from '@/composables/useTitleChange';
import { useToast } from '@/composables/useToast';
import { parse } from 'flatted';
import { h, ref } from 'vue';
import type { useRouter } from 'vue-router';
import type {
ExpressionError,
IDataObject,
@ -22,7 +14,12 @@ import type {
INodeTypeDescription,
} from 'n8n-workflow';
import { TelemetryHelpers } from 'n8n-workflow';
import type { PushMessage, PushPayload } from '@n8n/api-types';
import type { IExecutionResponse, IExecutionsCurrentSummaryExtended } from '@/Interface';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useTitleChange } from '@/composables/useTitleChange';
import { useToast } from '@/composables/useToast';
import { WORKFLOW_SETTINGS_MODAL_KEY } from '@/constants';
import { getTriggerNodeServiceName } from '@/utils/nodeTypesUtils';
import { codeNodeEditorEventBus, globalLinkActionsEventBus } from '@/event-bus';
@ -31,12 +28,9 @@ import { useWorkflowsStore } from '@/stores/workflows.store';
import { useNodeTypesStore } from '@/stores/nodeTypes.store';
import { useCredentialsStore } from '@/stores/credentials.store';
import { useSettingsStore } from '@/stores/settings.store';
import { parse } from 'flatted';
import { h, ref } from 'vue';
import { useOrchestrationStore } from '@/stores/orchestration.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useExternalHooks } from '@/composables/useExternalHooks';
import type { useRouter } from 'vue-router';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useI18n } from '@/composables/useI18n';
import { useTelemetry } from '@/composables/useTelemetry';
@ -44,6 +38,8 @@ import type { PushMessageQueueItem } from '@/types';
import { useAssistantStore } from '@/stores/assistant.store';
import NodeExecutionErrorMessage from '@/components/NodeExecutionErrorMessage.vue';
type IPushDataExecutionFinishedPayload = PushPayload<'executionFinished'>;
export function usePushConnection({ router }: { router: ReturnType<typeof useRouter> }) {
const workflowHelpers = useWorkflowHelpers({ router });
const nodeHelpers = useNodeHelpers();
@ -83,7 +79,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
* is currently active. So internally resend the message
* a few more times
*/
function queuePushMessage(event: IPushData, retryAttempts: number) {
function queuePushMessage(event: PushMessage, retryAttempts: number) {
pushMessageQueue.value.push({ message: event, retriesLeft: retryAttempts });
if (retryTimeout.value === null) {
@ -125,7 +121,10 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
/**
* Process a newly received message
*/
async function pushMessageReceived(receivedData: IPushData, isRetry?: boolean): Promise<boolean> {
async function pushMessageReceived(
receivedData: PushMessage,
isRetry?: boolean,
): Promise<boolean> {
const retryAttempts = 5;
if (receivedData.type === 'sendWorkerStatusMessage') {
@ -161,7 +160,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
// The data is not for the currently active execution or
// we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(event as unknown as IPushData, retryAttempts);
queuePushMessage(event as unknown as PushMessage, retryAttempts);
}
return false;
}
@ -169,7 +168,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
// recovered execution data is handled like executionFinished data, however for security reasons
// we need to fetch the data from the server again rather than push it to all clients
let recoveredPushData: IPushDataExecutionFinished | undefined = undefined;
let recoveredPushData: IPushDataExecutionFinishedPayload | undefined = undefined;
if (receivedData.type === 'executionRecovered') {
const recoveredExecutionId = receivedData.data?.executionId;
const isWorkflowRunning = uiStore.isActionActive['workflowRunning'];
@ -242,11 +241,11 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
// The workflow finished executing
let pushData: IPushDataExecutionFinished;
let pushData: IPushDataExecutionFinishedPayload;
if (receivedData.type === 'executionRecovered' && recoveredPushData !== undefined) {
pushData = recoveredPushData;
} else {
pushData = receivedData.data as IPushDataExecutionFinished;
pushData = receivedData.data as IPushDataExecutionFinishedPayload;
}
const { activeExecutionId } = workflowsStore;
@ -274,7 +273,7 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
queuePushMessage(event as unknown as IPushData, retryAttempts);
queuePushMessage(event as unknown as PushMessage, retryAttempts);
}
return false;
}

View file

@ -1,7 +1,6 @@
import type {
IExecutionPushResponse,
IExecutionResponse,
IPushDataExecutionFinished,
IStartRunData,
IWorkflowDb,
} from '@/Interface';
@ -34,6 +33,7 @@ import { isEmpty } from '@/utils/typesUtils';
import { useI18n } from '@/composables/useI18n';
import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store';
import type { PushPayload } from '@n8n/api-types';
export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof useRouter> }) {
const nodeHelpers = useNodeHelpers();
@ -375,7 +375,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
// execution finished but was not saved (e.g. due to low connectivity)
workflowsStore.finishActiveExecution({
executionId,
data: { finished: true, stoppedAt: new Date() },
data: { finished: true, stoppedAt: new Date() } as IRun,
});
workflowsStore.executingNode.length = 0;
uiStore.removeActiveAction('workflowRunning');
@ -395,11 +395,11 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
} as IRun;
const pushData = {
const pushData: PushPayload<'executionFinished'> = {
data: executedData,
executionId,
retryOf: execution.retryOf,
} as IPushDataExecutionFinished;
};
workflowsStore.finishActiveExecution(pushData);
titleSet(execution.workflowData.name, 'IDLE');
workflowsStore.executingNode.length = 0;

View file

@ -9,6 +9,7 @@ import {
import type { ChatRequest } from '@/types/assistant.types';
import type { ChatUI } from 'n8n-design-system/types/assistant';
import { defineStore } from 'pinia';
import type { PushPayload } from '@n8n/api-types';
import { computed, h, ref, watch } from 'vue';
import { useRootStore } from './root.store';
import { useUsersStore } from './users.store';
@ -20,7 +21,7 @@ import type { ICredentialType, INodeParameters } from 'n8n-workflow';
import { deepCopy } from 'n8n-workflow';
import { ndvEventBus, codeNodeEditorEventBus } from '@/event-bus';
import { useNDVStore } from './ndv.store';
import type { IPushDataNodeExecuteAfter, IUpdateInformation } from '@/Interface';
import type { IUpdateInformation } from '@/Interface';
import {
getMainAuthField,
getNodeAuthOptions,
@ -473,7 +474,7 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
(e) => handleServiceError(e, id),
);
}
async function onNodeExecution(pushEvent: IPushDataNodeExecuteAfter) {
async function onNodeExecution(pushEvent: PushPayload<'nodeExecuteAfter'>) {
if (!chatSessionError.value || pushEvent.nodeName !== chatSessionError.value.node.name) {
return;
}

View file

@ -1,10 +1,10 @@
import { defineStore } from 'pinia';
import { ref } from 'vue';
import { useRoute } from 'vue-router';
import type { Collaborator } from '@n8n/api-types';
import { STORES, PLACEHOLDER_EMPTY_WORKFLOW_ID, TIME } from '@/constants';
import { useBeforeUnload } from '@/composables/useBeforeUnload';
import type { Collaborator } from '@/Interface';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { usePushConnectionStore } from '@/stores/pushConnection.store';
import { useUsersStore } from '@/stores/users.store';

View file

@ -1,5 +1,6 @@
import { defineStore } from 'pinia';
import type { IPushDataWorkerStatusPayload } from '../Interface';
import type { WorkerStatus } from '@n8n/api-types';
import { useRootStore } from './root.store';
import { sendGetWorkerStatus } from '../api/orchestration';
@ -8,7 +9,7 @@ const STALE_SECONDS = 120 * 1000;
export interface IOrchestrationStoreState {
initialStatusReceived: boolean;
workers: { [id: string]: IPushDataWorkerStatusPayload };
workers: { [id: string]: WorkerStatus };
workersHistory: {
[id: string]: IWorkerHistoryItem[];
};
@ -18,7 +19,7 @@ export interface IOrchestrationStoreState {
export interface IWorkerHistoryItem {
timestamp: number;
data: IPushDataWorkerStatusPayload;
data: WorkerStatus;
}
export const useOrchestrationStore = defineStore('orchestrationManager', {
@ -30,7 +31,7 @@ export const useOrchestrationStore = defineStore('orchestrationManager', {
statusInterval: null,
}),
actions: {
updateWorkerStatus(data: IPushDataWorkerStatusPayload) {
updateWorkerStatus(data: WorkerStatus) {
this.workers[data.workerId] = data;
if (!this.workersHistory[data.workerId]) {
this.workersHistory[data.workerId] = [];
@ -70,7 +71,7 @@ export const useOrchestrationStore = defineStore('orchestrationManager', {
getWorkerLastUpdated(workerId: string): number {
return this.workersLastUpdated[workerId] ?? 0;
},
getWorkerStatus(workerId: string): IPushDataWorkerStatusPayload | undefined {
getWorkerStatus(workerId: string): WorkerStatus | undefined {
return this.workers[workerId];
},
getWorkerStatusHistory(workerId: string): IWorkerHistoryItem[] {

View file

@ -1,9 +1,10 @@
import { defineStore } from 'pinia';
import { STORES, TIME } from '@/constants';
import { ref, computed } from 'vue';
import type { PushMessage } from '@n8n/api-types';
import { STORES, TIME } from '@/constants';
import { useSettingsStore } from './settings.store';
import { useRootStore } from './root.store';
import type { IPushData } from '../Interface';
export interface PushState {
pushRef: string;
@ -17,7 +18,7 @@ export interface PushState {
isConnectionOpen: boolean;
}
export type OnPushMessageHandler = (event: IPushData) => void;
export type OnPushMessageHandler = (event: PushMessage) => void;
/**
* Store for managing a push connection to the server
@ -139,7 +140,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
* Process a newly received message
*/
async function pushMessageReceived(event: Event) {
let receivedData: IPushData;
let receivedData: PushMessage;
try {
// @ts-ignore
receivedData = JSON.parse(event.data);

View file

@ -18,9 +18,6 @@ import type {
INodeMetadata,
INodeUi,
INodeUpdatePropertiesInformation,
IPushDataExecutionFinished,
IPushDataNodeExecuteAfter,
IPushDataUnsavedExecutionFinished,
IStartRunData,
IUpdateInformation,
IUsedCredential,
@ -74,6 +71,7 @@ import { i18n } from '@/plugins/i18n';
import { computed, ref } from 'vue';
import { useProjectsStore } from '@/stores/projects.store';
import type { ProjectSharingData } from '@/types/projects.types';
import type { PushPayload } from '@n8n/api-types';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '',
@ -1185,7 +1183,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
}
function addNodeExecutionData(pushData: IPushDataNodeExecuteAfter): void {
function addNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
if (!workflowExecutionData.value?.data) {
throw new Error('The "workflowExecutionData" is not initialized!');
}
@ -1257,9 +1255,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
activeExecutionId.value = newActiveExecution.id;
}
function finishActiveExecution(
finishedActiveExecution: IPushDataExecutionFinished | IPushDataUnsavedExecutionFinished,
): void {
function finishActiveExecution(finishedActiveExecution: PushPayload<'executionFinished'>): void {
// Find the execution to set to finished
const activeExecutionIndex = activeExecutions.value.findIndex((execution) => {
return execution.id === finishedActiveExecution.executionId;

View file

@ -1,6 +1,6 @@
import type { IPushData } from '@/Interface';
import type { PushMessage } from '@n8n/api-types';
export type PushMessageQueueItem = {
message: IPushData;
message: PushMessage;
retriesLeft: number;
};

View file

@ -103,7 +103,6 @@ import type {
NodeCreatorOpenSource,
AddedNodesAndConnections,
ToggleNodeCreatorOptions,
IPushDataExecutionFinished,
NodeFilterType,
} from '@/Interface';
@ -183,6 +182,7 @@ import { useNpsSurveyStore } from '@/stores/npsSurvey.store';
import { getResourcePermissions } from '@/permissions';
import { useBeforeUnload } from '@/composables/useBeforeUnload';
import NodeViewUnfinishedWorkflowMessage from '@/components/NodeViewUnfinishedWorkflowMessage.vue';
import type { PushPayload } from '@n8n/api-types';
interface AddNodeOptions {
position?: XYPosition;
@ -1714,7 +1714,7 @@ export default defineComponent({
this.workflowsStore.finishActiveExecution({
executionId,
data: { finished: true, stoppedAt: new Date() },
data: { finished: true, stoppedAt: new Date() } as IRun,
});
this.workflowsStore.executingNode.length = 0;
this.uiStore.removeActiveAction('workflowRunning');
@ -1737,11 +1737,11 @@ export default defineComponent({
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
} as IRun;
const pushData = {
const pushData: PushPayload<'executionFinished'> = {
data: executedData,
executionId,
retryOf: execution.retryOf,
} as IPushDataExecutionFinished;
};
this.workflowsStore.finishActiveExecution(pushData);
this.titleSet(execution.workflowData.name, 'IDLE');
this.workflowsStore.executingNode.length = 0;

View file

@ -221,6 +221,12 @@ importers:
specifier: workspace:*
version: link:../packages/workflow
packages/@n8n/api-types:
devDependencies:
n8n-workflow:
specifier: workspace:*
version: link:../../workflow
packages/@n8n/benchmark:
dependencies:
'@oclif/core':
@ -662,6 +668,9 @@ importers:
'@google-cloud/secret-manager':
specifier: ^5.6.0
version: 5.6.0(encoding@0.1.13)
'@n8n/api-types':
specifier: workspace:*
version: link:../@n8n/api-types
'@n8n/client-oauth2':
specifier: workspace:*
version: link:../@n8n/client-oauth2
@ -1286,6 +1295,9 @@ importers:
'@lezer/common':
specifier: ^1.0.4
version: 1.1.0
'@n8n/api-types':
specifier: workspace:*
version: link:../@n8n/api-types
'@n8n/chat':
specifier: workspace:*
version: link:../@n8n/chat

View file

@ -25,6 +25,7 @@
"format": {},
"lint:backend": {
"dependsOn": [
"@n8n/api-types#lint",
"@n8n/config#lint",
"@n8n/client-oauth2#lint",
"@n8n/imap#lint",
@ -52,6 +53,7 @@
"lintfix": {},
"test:backend": {
"dependsOn": [
"@n8n/api-types#test",
"@n8n/config#test",
"@n8n/client-oauth2#test",
"@n8n/imap#test",