Merge remote-tracking branch 'origin/master' into revert-eslint-prettier

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-19 16:02:52 +02:00
commit 81f084cf9d
No known key found for this signature in database
GPG key ID: 9300FF7CDEA1FBAA
34 changed files with 1596 additions and 4708 deletions

View file

@ -3,8 +3,9 @@ import { check } from 'k6';
const apiBaseUrl = __ENV.API_BASE_URL; const apiBaseUrl = __ENV.API_BASE_URL;
const file = open(__ENV.SCRIPT_FILE_PATH, 'b'); // This creates a 2MB file (16 * 128 * 1024 = 2 * 1024 * 1024 = 2MB)
const filename = String(__ENV.SCRIPT_FILE_PATH).split('/').pop(); const file = Array.from({ length: 128 * 1024 }, () => Math.random().toString().slice(2)).join('');
const filename = 'test.bin';
export default function () { export default function () {
const data = { const data = {

View file

@ -77,7 +77,6 @@ export function handleSummary(data) {
env: { env: {
API_BASE_URL: this.opts.n8nApiBaseUrl, API_BASE_URL: this.opts.n8nApiBaseUrl,
K6_CLOUD_TOKEN: this.opts.k6ApiToken, K6_CLOUD_TOKEN: this.opts.k6ApiToken,
SCRIPT_FILE_PATH: augmentedTestScriptPath,
}, },
stdio: 'inherit', stdio: 'inherit',
})`${k6ExecutablePath} run ${flattedFlags} ${augmentedTestScriptPath}`; })`${k6ExecutablePath} run ${flattedFlags} ${augmentedTestScriptPath}`;

View file

@ -3,19 +3,19 @@
"private": true, "private": true,
"version": "0.0.1", "version": "0.0.1",
"devDependencies": { "devDependencies": {
"@chromatic-com/storybook": "^1.5.0", "@chromatic-com/storybook": "^2.0.2",
"@storybook/addon-a11y": "^8.1.4", "@storybook/addon-a11y": "^8.3.1",
"@storybook/addon-actions": "^8.1.4", "@storybook/addon-actions": "^8.3.1",
"@storybook/addon-docs": "^8.1.4", "@storybook/addon-docs": "^8.3.1",
"@storybook/addon-essentials": "^8.1.4", "@storybook/addon-essentials": "^8.3.1",
"@storybook/addon-interactions": "^8.1.4", "@storybook/addon-interactions": "^8.3.1",
"@storybook/addon-links": "^8.1.4", "@storybook/addon-links": "^8.3.1",
"@storybook/addon-themes": "^8.1.4", "@storybook/addon-themes": "^8.3.1",
"@storybook/blocks": "^8.1.4", "@storybook/blocks": "^8.3.1",
"@storybook/test": "^8.1.4", "@storybook/test": "^8.3.1",
"@storybook/vue3": "^8.1.4", "@storybook/vue3": "^8.3.1",
"@storybook/vue3-vite": "^8.1.4", "@storybook/vue3-vite": "^8.3.1",
"chromatic": "^11.4.1", "chromatic": "^11.10.2",
"storybook": "^8.1.4" "storybook": "^8.3.1"
} }
} }

View file

@ -3,11 +3,11 @@ import { mock } from 'jest-mock-extended';
import config from '@/config'; import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators'; import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type { import type {
RedisServiceCommandObject, RedisServiceCommandObject,
RedisServiceWorkerResponseObject, RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands'; } from '@/scaling/redis/redis-service-commands';
import type { RedisClientService } from '@/services/redis-client.service';
import { Publisher } from '../pubsub/publisher.service'; import { Publisher } from '../pubsub/publisher.service';

View file

@ -2,7 +2,7 @@ import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import config from '@/config'; import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service'; import type { RedisClientService } from '@/services/redis-client.service';
import { Subscriber } from '../pubsub/subscriber.service'; import { Subscriber } from '../pubsub/subscriber.service';

View file

@ -1,3 +1,7 @@
export const QUEUE_NAME = 'jobs'; export const QUEUE_NAME = 'jobs';
export const JOB_TYPE_NAME = 'job'; export const JOB_TYPE_NAME = 'job';
export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands';
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';

View file

@ -3,11 +3,11 @@ import { Service } from 'typedi';
import config from '@/config'; import config from '@/config';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type { import type {
RedisServiceCommandObject, RedisServiceCommandObject,
RedisServiceWorkerResponseObject, RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands'; } from '@/scaling/redis/redis-service-commands';
import { RedisClientService } from '@/services/redis-client.service';
/** /**
* Responsible for publishing messages into the pubsub channels used by scaling mode. * Responsible for publishing messages into the pubsub channels used by scaling mode.

View file

@ -1,7 +1,8 @@
import type { import type { PushType, WorkerStatus } from '@n8n/api-types';
COMMAND_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL, import type { IWorkflowDb } from '@/interfaces';
} from '@/services/redis/redis-constants';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
/** /**
* Pubsub channel used by scaling mode: * Pubsub channel used by scaling mode:
@ -10,5 +11,90 @@ import type {
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes * - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/ */
export type ScalingPubSubChannel = export type ScalingPubSubChannel =
| typeof COMMAND_REDIS_CHANNEL | typeof COMMAND_PUBSUB_CHANNEL
| typeof WORKER_RESPONSE_REDIS_CHANNEL; | typeof WORKER_RESPONSE_PUBSUB_CHANNEL;
export type PubSubMessageMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
'stop-worker': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
packageVersion: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
// currently 'workflow-failed-to-activate'
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};

View file

@ -3,7 +3,7 @@ import { Service } from 'typedi';
import config from '@/config'; import config from '@/config';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis-client.service';
import type { ScalingPubSubChannel } from './pubsub.types'; import type { ScalingPubSubChannel } from './pubsub.types';

View file

@ -24,7 +24,7 @@ import type {
JobStatus, JobStatus,
JobId, JobId,
QueueRecoveryContext, QueueRecoveryContext,
PubSubMessage, JobReport,
} from './scaling.types'; } from './scaling.types';
@Service() @Service()
@ -46,7 +46,7 @@ export class ScalingService {
async setupQueue() { async setupQueue() {
const { default: BullQueue } = await import('bull'); const { default: BullQueue } = await import('bull');
const { RedisClientService } = await import('@/services/redis/redis-client.service'); const { RedisClientService } = await import('@/services/redis-client.service');
const service = Container.get(RedisClientService); const service = Container.get(RedisClientService);
const bullPrefix = this.globalConfig.queue.bull.prefix; const bullPrefix = this.globalConfig.queue.bull.prefix;
@ -265,7 +265,7 @@ export class ScalingService {
} }
} }
private isPubSubMessage(candidate: unknown): candidate is PubSubMessage { private isPubSubMessage(candidate: unknown): candidate is JobReport {
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
} }

View file

@ -23,11 +23,11 @@ export type JobStatus = Bull.JobStatus;
export type JobOptions = Bull.JobOptions; export type JobOptions = Bull.JobOptions;
export type PubSubMessage = MessageToMain | MessageToWorker; export type JobReport = JobReportToMain | JobReportToWorker;
type MessageToMain = RespondToWebhookMessage; type JobReportToMain = RespondToWebhookMessage;
type MessageToWorker = AbortJobMessage; type JobReportToWorker = AbortJobMessage;
type RespondToWebhookMessage = { type RespondToWebhookMessage = {
kind: 'respond-to-webhook'; kind: 'respond-to-webhook';

View file

@ -9,13 +9,13 @@ import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { Push } from '@/push'; import { Push } from '@/push';
import type { RedisServiceWorkerResponseObject } from '@/scaling/redis/redis-service-commands';
import * as helpers from '@/services/orchestration/helpers'; import * as helpers from '@/services/orchestration/helpers';
import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main'; import { handleCommandMessageMain } from '@/services/orchestration/main/handle-command-message-main';
import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main'; import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handle-worker-response-message-main';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis-client.service';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/redis-service-commands';
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types'; import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types';

View file

@ -36,7 +36,7 @@ export class CacheService extends TypedEmitter<CacheEvents> {
const useRedis = backend === 'redis' || (backend === 'auto' && mode === 'queue'); const useRedis = backend === 'redis' || (backend === 'auto' && mode === 'queue');
if (useRedis) { if (useRedis) {
const { RedisClientService } = await import('../redis/redis-client.service'); const { RedisClientService } = await import('../redis-client.service');
const redisClientService = Container.get(RedisClientService); const redisClientService = Container.get(RedisClientService);
const prefixBase = config.getEnv('redis.prefix'); const prefixBase = config.getEnv('redis.prefix');

View file

@ -8,7 +8,10 @@ import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee';
import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/redis-service-commands'; import type {
RedisServiceBaseCommand,
RedisServiceCommand,
} from '../scaling/redis/redis-service-commands';
@Service() @Service()
export class OrchestrationService { export class OrchestrationService {

View file

@ -3,9 +3,9 @@ import os from 'node:os';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import { COMMAND_REDIS_CHANNEL } from '../redis/redis-constants'; import type { RedisServiceCommandObject } from '../../scaling/redis/redis-service-commands';
import type { RedisServiceCommandObject } from '../redis/redis-service-commands';
export interface RedisServiceCommandLastReceived { export interface RedisServiceCommandLastReceived {
[date: string]: Date; [date: string]: Date;
@ -18,7 +18,7 @@ export function messageToRedisServiceCommandObject(messageString: string) {
message = jsonParse<RedisServiceCommandObject>(messageString); message = jsonParse<RedisServiceCommandObject>(messageString);
} catch { } catch {
Container.get(Logger).debug( Container.get(Logger).debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
); );
return; return;
} }

View file

@ -3,11 +3,11 @@ import { jsonParse } from 'n8n-workflow';
import Container from 'typedi'; import Container from 'typedi';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/redis-constants'; import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { MainResponseReceivedHandlerOptions } from './types'; import type { MainResponseReceivedHandlerOptions } from './types';
import { Push } from '../../../push'; import { Push } from '../../../push';
import type { RedisServiceWorkerResponseObject } from '../../redis/redis-service-commands'; import type { RedisServiceWorkerResponseObject } from '../../../scaling/redis/redis-service-commands';
export async function handleWorkerResponseMessageMain( export async function handleWorkerResponseMessageMain(
messageString: string, messageString: string,
@ -19,7 +19,7 @@ export async function handleWorkerResponseMessageMain(
if (!workerResponse) { if (!workerResponse) {
Container.get(Logger).debug( Container.get(Logger).debug(
`Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`, `Received invalid message via channel ${WORKER_RESPONSE_PUBSUB_CHANNEL}: "${messageString}"`,
); );
return; return;
} }

View file

@ -6,7 +6,7 @@ import config from '@/config';
import { TIME } from '@/constants'; import { TIME } from '@/constants';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Publisher } from '@/scaling/pubsub/publisher.service';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis-client.service';
import { TypedEmitter } from '@/typed-emitter'; import { TypedEmitter } from '@/typed-emitter';
type MultiMainEvents = { type MultiMainEvents = {

View file

@ -1,12 +1,12 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageMain } from './handle-command-message-main'; import { handleCommandMessageMain } from './handle-command-message-main';
import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main'; import { handleWorkerResponseMessageMain } from './handle-worker-response-message-main';
import type { MainResponseReceivedHandlerOptions } from './types'; import type { MainResponseReceivedHandlerOptions } from './types';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redis/redis-constants';
@Service() @Service()
export class OrchestrationHandlerMainService extends OrchestrationHandlerService { export class OrchestrationHandlerMainService extends OrchestrationHandlerService {
@ -19,9 +19,9 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService
await this.subscriber.subscribe('n8n.worker-response'); await this.subscriber.subscribe('n8n.worker-response');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { if (channel === WORKER_RESPONSE_PUBSUB_CHANNEL) {
await handleWorkerResponseMessageMain(messageString, options); await handleWorkerResponseMessageMain(messageString, options);
} else if (channel === COMMAND_REDIS_CHANNEL) { } else if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageMain(messageString); await handleCommandMessageMain(messageString);
} }
}); });

View file

@ -1,88 +0,0 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
export type PubSubMessageMap = {
// #region Lifecycle
'reload-license': never;
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
'stop-worker': never;
// #endregion
// #region Community packages
'community-package-install': {
packageName: string;
packageVersion: string;
};
'community-package-update': {
packageName: string;
packageVersion: string;
};
'community-package-uninstall': {
packageName: string;
packageVersion: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
// #region Multi-main setup
'add-webhooks-triggers-and-pollers': {
workflowId: string;
};
'remove-triggers-and-pollers': {
workflowId: string;
};
'display-workflow-activation': {
workflowId: string;
};
'display-workflow-deactivation': {
workflowId: string;
};
// currently 'workflow-failed-to-activate'
'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};
'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};
'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};
// #endregion
};

View file

@ -1,10 +1,10 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { handleCommandMessageWebhook } from './handle-command-message-webhook'; import { handleCommandMessageWebhook } from './handle-command-message-webhook';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import { COMMAND_REDIS_CHANNEL } from '../../redis/redis-constants';
@Service() @Service()
export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService { export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService {
@ -16,7 +16,7 @@ export class OrchestrationHandlerWebhookService extends OrchestrationHandlerServ
await this.subscriber.subscribe('n8n.commands'); await this.subscriber.subscribe('n8n.commands');
this.subscriber.addMessageHandler(async (channel: string, messageString: string) => { this.subscriber.addMessageHandler(async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) { if (channel === COMMAND_PUBSUB_CHANNEL) {
await handleCommandMessageWebhook(messageString); await handleCommandMessageWebhook(messageString);
} }
}); });

View file

@ -7,9 +7,9 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee'; import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license'; import { License } from '@/license';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import { COMMAND_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { RedisServiceCommandObject } from '@/scaling/redis/redis-service-commands';
import { CommunityPackagesService } from '@/services/community-packages.service'; import { CommunityPackagesService } from '@/services/community-packages.service';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/redis-constants';
import type { RedisServiceCommandObject } from '@/services/redis/redis-service-commands';
import type { WorkerCommandReceivedHandlerOptions } from './types'; import type { WorkerCommandReceivedHandlerOptions } from './types';
import { debounceMessageReceiver, getOsCpuString } from '../helpers'; import { debounceMessageReceiver, getOsCpuString } from '../helpers';
@ -17,7 +17,7 @@ import { debounceMessageReceiver, getOsCpuString } from '../helpers';
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
// eslint-disable-next-line complexity // eslint-disable-next-line complexity
return async (channel: string, messageString: string) => { return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) { if (channel === COMMAND_PUBSUB_CHANNEL) {
if (!messageString) return; if (!messageString) return;
const logger = Container.get(Logger); const logger = Container.get(Logger);
let message: RedisServiceCommandObject; let message: RedisServiceCommandObject;
@ -25,7 +25,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
message = jsonParse<RedisServiceCommandObject>(messageString); message = jsonParse<RedisServiceCommandObject>(messageString);
} catch { } catch {
logger.debug( logger.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, `Received invalid message via channel ${COMMAND_PUBSUB_CHANNEL}: "${messageString}"`,
); );
return; return;
} }
@ -145,7 +145,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
} }
logger.debug( logger.debug(
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, `Received unknown command via channel ${COMMAND_PUBSUB_CHANNEL}: "${message.command}"`,
); );
break; break;
} }

View file

@ -1,5 +1,4 @@
import type { RunningJobSummary } from '@n8n/api-types'; import type { RunningJobSummary } from '@n8n/api-types';
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Publisher } from '@/scaling/pubsub/publisher.service';
@ -9,14 +8,3 @@ export interface WorkerCommandReceivedHandlerOptions {
getRunningJobIds: () => Array<string | number>; getRunningJobIds: () => Array<string | number>;
getRunningJobsSummary: () => RunningJobSummary[]; getRunningJobsSummary: () => RunningJobSummary[];
} }
export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}

View file

@ -5,7 +5,7 @@ import { Service } from 'typedi';
import { Logger } from '@/logger'; import { Logger } from '@/logger';
import type { RedisClientType } from './redis.types'; import type { RedisClientType } from '../scaling/redis/redis.types';
@Service() @Service()
export class RedisClientService { export class RedisClientService {

View file

@ -1,2 +0,0 @@
export const COMMAND_REDIS_CHANNEL = 'n8n.commands';
export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response';

View file

@ -1,70 +0,0 @@
import type Redis from 'ioredis';
import type { Cluster } from 'ioredis';
import { Service } from 'typedi';
import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from './redis-client.service';
import type { RedisClientType } from './redis.types';
export type RedisServiceMessageHandler =
| ((channel: string, message: string) => void)
| ((stream: string, id: string, message: string[]) => void);
@Service()
class RedisServiceBase {
redisClient: Redis | Cluster | undefined;
isInitialized = false;
constructor(
protected readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {}
async init(type: RedisClientType): Promise<void> {
if (this.redisClient && this.isInitialized) {
return;
}
this.redisClient = this.redisClientService.createClient({ type });
this.redisClient.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) {
this.logger.warn('Error with Redis: ', error);
}
});
this.isInitialized = true;
}
async destroy(): Promise<void> {
if (!this.redisClient) {
return;
}
await this.redisClient.quit();
this.isInitialized = false;
this.redisClient = undefined;
}
}
export abstract class RedisServiceBaseSender extends RedisServiceBase {
senderId: string;
async init(type: RedisClientType): Promise<void> {
await super.init(type);
this.senderId = config.get('redis.queueModeId');
}
}
export abstract class RedisServiceBaseReceiver extends RedisServiceBase {
messageHandlers: Map<string, RedisServiceMessageHandler> = new Map();
addMessageHandler(handlerName: string, handler: RedisServiceMessageHandler): void {
this.messageHandlers.set(handlerName, handler);
}
removeMessageHandler(handlerName: string): void {
this.messageHandlers.delete(handlerName);
}
}

View file

@ -14,6 +14,7 @@ import { nonExistingJsonPath } from '@/constants';
import { useExternalHooks } from '@/composables/useExternalHooks'; import { useExternalHooks } from '@/composables/useExternalHooks';
import TextWithHighlights from './TextWithHighlights.vue'; import TextWithHighlights from './TextWithHighlights.vue';
import { useTelemetry } from '@/composables/useTelemetry'; import { useTelemetry } from '@/composables/useTelemetry';
import { useElementSize } from '@vueuse/core';
const LazyRunDataJsonActions = defineAsyncComponent( const LazyRunDataJsonActions = defineAsyncComponent(
async () => await import('@/components/RunDataJsonActions.vue'), async () => await import('@/components/RunDataJsonActions.vue'),
@ -45,6 +46,9 @@ const telemetry = useTelemetry();
const selectedJsonPath = ref(nonExistingJsonPath); const selectedJsonPath = ref(nonExistingJsonPath);
const draggingPath = ref<null | string>(null); const draggingPath = ref<null | string>(null);
const displayMode = ref('json'); const displayMode = ref('json');
const jsonDataContainer = ref(null);
const { height } = useElementSize(jsonDataContainer);
const jsonData = computed(() => executionDataToJson(props.inputData)); const jsonData = computed(() => executionDataToJson(props.inputData));
@ -110,7 +114,7 @@ const getListItemName = (path: string) => {
</script> </script>
<template> <template>
<div :class="[$style.jsonDisplay, { [$style.highlight]: highlight }]"> <div ref="jsonDataContainer" :class="[$style.jsonDisplay, { [$style.highlight]: highlight }]">
<Suspense> <Suspense>
<LazyRunDataJsonActions <LazyRunDataJsonActions
v-if="!editMode.enabled" v-if="!editMode.enabled"
@ -141,6 +145,8 @@ const getListItemName = (path: string) => {
root-path="" root-path=""
selectable-type="single" selectable-type="single"
class="json-data" class="json-data"
:virtual="true"
:height="height"
@update:selected-value="selectedJsonPath = $event" @update:selected-value="selectedJsonPath = $event"
> >
<template #renderNodeKey="{ node }"> <template #renderNodeKey="{ node }">
@ -192,11 +198,10 @@ const getListItemName = (path: string) => {
left: 0; left: 0;
padding-left: var(--spacing-s); padding-left: var(--spacing-s);
right: 0; right: 0;
overflow-y: auto; overflow-y: hidden;
line-height: 1.5; line-height: 1.5;
word-break: normal; word-break: normal;
height: 100%; height: 100%;
padding-bottom: var(--spacing-3xl);
&:hover { &:hover {
/* Shows .actionsGroup element from <run-data-json-actions /> child component */ /* Shows .actionsGroup element from <run-data-json-actions /> child component */
@ -299,4 +304,8 @@ const getListItemName = (path: string) => {
.vjs-tree .vjs-tree__content.has-line { .vjs-tree .vjs-tree__content.has-line {
border-left: 1px dotted var(--color-json-line); border-left: 1px dotted var(--color-json-line);
} }
.vjs-tree .vjs-tree-list-holder-inner {
padding-bottom: var(--spacing-3xl);
}
</style> </style>

View file

@ -2,6 +2,22 @@ import { createTestingPinia } from '@pinia/testing';
import { screen, cleanup } from '@testing-library/vue'; import { screen, cleanup } from '@testing-library/vue';
import RunDataJson from '@/components/RunDataJson.vue'; import RunDataJson from '@/components/RunDataJson.vue';
import { createComponentRenderer } from '@/__tests__/render'; import { createComponentRenderer } from '@/__tests__/render';
import { useElementSize } from '@vueuse/core'; // Import the composable to mock
vi.mock('@vueuse/core', async () => {
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
const originalModule = await vi.importActual<typeof import('@vueuse/core')>('@vueuse/core');
return {
...originalModule, // Keep all original exports
useElementSize: vi.fn(), // Mock useElementSize
};
});
(useElementSize as jest.Mock).mockReturnValue({
height: 500, // Mocked height value
width: 300, // Mocked width value
});
const renderComponent = createComponentRenderer(RunDataJson, { const renderComponent = createComponentRenderer(RunDataJson, {
props: { props: {
@ -34,18 +50,6 @@ const renderComponent = createComponentRenderer(RunDataJson, {
disabled: false, disabled: false,
}, },
}, },
global: {
mocks: {
$locale: {
baseText() {
return '';
},
},
$store: {
getters: {},
},
},
},
}); });
describe('RunDataJson.vue', () => { describe('RunDataJson.vue', () => {

View file

@ -11,7 +11,19 @@ exports[`RunDataJson.vue > renders json values properly 1`] = `
> >
<div <div
class="vjs-tree json-data" class="vjs-tree is-virtual json-data"
>
<div
class="vjs-tree-list"
style="height: 500px;"
>
<div
class="vjs-tree-list-holder"
style="height: 340px;"
>
<div
class="vjs-tree-list-holder-inner"
style="transform: translateY(0px);"
> >
<div <div
@ -828,6 +840,9 @@ exports[`RunDataJson.vue > renders json values properly 1`] = `
</div> </div>
</div> </div>
</div>
</div>
</div>
<!--teleport start--> <!--teleport start-->
<!--teleport end--> <!--teleport end-->

View file

@ -356,9 +356,9 @@ export const useAssistantStore = defineStore(STORES.ASSISTANT, () => {
resetAssistantChat(); resetAssistantChat();
chatSessionTask.value = credentialType ? 'credentials' : 'support'; chatSessionTask.value = credentialType ? 'credentials' : 'support';
chatSessionCredType.value = credentialType; chatSessionCredType.value = credentialType;
chatWindowOpen.value = true;
addUserMessage(userMessage, id); addUserMessage(userMessage, id);
addLoadingAssistantMessage(locale.baseText('aiAssistant.thinkingSteps.thinking')); addLoadingAssistantMessage(locale.baseText('aiAssistant.thinkingSteps.thinking'));
openChat();
streaming.value = true; streaming.value = true;
let payload: ChatRequest.InitSupportChat | ChatRequest.InitCredHelp = { let payload: ChatRequest.InitSupportChat | ChatRequest.InitCredHelp = {

View file

@ -7,8 +7,9 @@ import {
hasDestinationId, hasDestinationId,
saveDestinationToDb, saveDestinationToDb,
sendTestMessageToDestination, sendTestMessageToDestination,
} from '../api/eventbus.ee'; } from '@/api/eventbus.ee';
import { useRootStore } from './root.store'; import { useRootStore } from './root.store';
import { ref } from 'vue';
export interface EventSelectionItem { export interface EventSelectionItem {
selected: boolean; selected: boolean;
@ -32,80 +33,142 @@ export interface DestinationSettingsStore {
[key: string]: DestinationStoreItem; [key: string]: DestinationStoreItem;
} }
export const useLogStreamingStore = defineStore('logStreaming', { export const useLogStreamingStore = defineStore('logStreaming', () => {
state: () => ({ const items = ref<DestinationSettingsStore>({});
items: {} as DestinationSettingsStore, const eventNames = ref(new Set<string>());
eventNames: new Set<string>(),
}), const rootStore = useRootStore();
getters: {},
actions: { const addDestination = (destination: MessageEventBusDestinationOptions) => {
addDestination(destination: MessageEventBusDestinationOptions) { if (destination.id && items.value[destination.id]) {
if (destination.id && this.items[destination.id]) { items.value[destination.id].destination = destination;
this.items[destination.id].destination = destination;
} else { } else {
this.setSelectionAndBuildItems(destination); setSelectionAndBuildItems(destination);
} }
}, };
getDestination(destinationId: string): MessageEventBusDestinationOptions | undefined {
if (this.items[destinationId]) { const setSelectionAndBuildItems = (destination: MessageEventBusDestinationOptions) => {
return this.items[destinationId].destination; if (destination.id) {
if (!items.value[destination.id]) {
items.value[destination.id] = {
destination,
selectedEvents: new Set<string>(),
eventGroups: [],
isNew: false,
} as DestinationStoreItem;
}
items.value[destination.id]?.selectedEvents?.clear();
if (destination.subscribedEvents) {
for (const eventName of destination.subscribedEvents) {
items.value[destination.id]?.selectedEvents?.add(eventName);
}
}
items.value[destination.id].eventGroups = eventGroupsFromStringList(
eventNames.value,
items.value[destination.id]?.selectedEvents,
);
}
};
const getDestination = (destinationId: string) => {
if (items.value[destinationId]) {
return items.value[destinationId].destination;
} else { } else {
return; return;
} }
}, };
getAllDestinations(): MessageEventBusDestinationOptions[] {
const getAllDestinations = () => {
const destinations: MessageEventBusDestinationOptions[] = []; const destinations: MessageEventBusDestinationOptions[] = [];
for (const key of Object.keys(this.items)) { for (const key of Object.keys(items)) {
destinations.push(this.items[key].destination); destinations.push(items.value[key].destination);
} }
return destinations; return destinations;
}, };
updateDestination(destination: MessageEventBusDestinationOptions) {
if (destination.id && this.items[destination.id]) { const clearDestinations = () => {
this.$patch((state) => { items.value = {};
if (destination.id && this.items[destination.id]) { };
state.items[destination.id].destination = destination;
const addEventName = (name: string) => {
eventNames.value.add(name);
};
const removeEventName = (name: string) => {
eventNames.value.delete(name);
};
const clearEventNames = () => {
eventNames.value.clear();
};
const addSelectedEvent = (id: string, name: string) => {
items.value[id]?.selectedEvents?.add(name);
setSelectedInGroup(id, name, true);
};
const removeSelectedEvent = (id: string, name: string) => {
items.value[id]?.selectedEvents?.delete(name);
setSelectedInGroup(id, name, false);
};
const setSelectedInGroup = (destinationId: string, name: string, isSelected: boolean) => {
if (items.value[destinationId]) {
const groupName = eventGroupFromEventName(name);
const groupIndex = items.value[destinationId].eventGroups.findIndex(
(e) => e.name === groupName,
);
if (groupIndex > -1) {
if (groupName === name) {
items.value[destinationId].eventGroups[groupIndex].selected = isSelected;
} else {
const eventIndex = items.value[destinationId].eventGroups[groupIndex].children.findIndex(
(e) => e.name === name,
);
if (eventIndex > -1) {
items.value[destinationId].eventGroups[groupIndex].children[eventIndex].selected =
isSelected;
if (isSelected) {
items.value[destinationId].eventGroups[groupIndex].indeterminate = isSelected;
} else {
let anySelected = false;
for (
let i = 0;
i < items.value[destinationId].eventGroups[groupIndex].children.length;
i++
) {
anySelected =
anySelected ||
items.value[destinationId].eventGroups[groupIndex].children[i].selected;
} }
// to trigger refresh items.value[destinationId].eventGroups[groupIndex].indeterminate = anySelected;
state.items = { ...state.items };
});
} }
}, }
removeDestination(destinationId: string) { }
}
}
};
const removeDestinationItemTree = (id: string) => {
delete items.value[id];
};
const updateDestination = (destination: MessageEventBusDestinationOptions) => {
if (destination.id && items.value[destination.id]) {
items.value[destination.id].destination = destination;
}
};
const removeDestination = (destinationId: string) => {
if (!destinationId) return; if (!destinationId) return;
delete this.items[destinationId]; delete items.value[destinationId];
if (this.items[destinationId]) { };
this.$patch({
items: { const getSelectedEvents = (destinationId: string): string[] => {
...this.items,
},
});
}
},
clearDestinations() {
this.items = {};
},
addEventName(name: string) {
this.eventNames.add(name);
},
removeEventName(name: string) {
this.eventNames.delete(name);
},
clearEventNames() {
this.eventNames.clear();
},
addSelectedEvent(id: string, name: string) {
this.items[id]?.selectedEvents?.add(name);
this.setSelectedInGroup(id, name, true);
},
removeSelectedEvent(id: string, name: string) {
this.items[id]?.selectedEvents?.delete(name);
this.setSelectedInGroup(id, name, false);
},
getSelectedEvents(destinationId: string): string[] {
const selectedEvents: string[] = []; const selectedEvents: string[] = [];
if (this.items[destinationId]) { if (items.value[destinationId]) {
for (const group of this.items[destinationId].eventGroups) { for (const group of items.value[destinationId].eventGroups) {
if (group.selected) { if (group.selected) {
selectedEvents.push(group.name); selectedEvents.push(group.name);
} }
@ -117,136 +180,95 @@ export const useLogStreamingStore = defineStore('logStreaming', {
} }
} }
return selectedEvents; return selectedEvents;
}, };
setSelectedInGroup(destinationId: string, name: string, isSelected: boolean) {
if (this.items[destinationId]) { const saveDestination = async (
const groupName = eventGroupFromEventName(name); destination: MessageEventBusDestinationOptions,
const groupIndex = this.items[destinationId].eventGroups.findIndex( ): Promise<boolean> => {
(e) => e.name === groupName,
);
if (groupIndex > -1) {
if (groupName === name) {
this.$patch((state) => {
state.items[destinationId].eventGroups[groupIndex].selected = isSelected;
});
} else {
const eventIndex = this.items[destinationId].eventGroups[groupIndex].children.findIndex(
(e) => e.name === name,
);
if (eventIndex > -1) {
this.$patch((state) => {
state.items[destinationId].eventGroups[groupIndex].children[eventIndex].selected =
isSelected;
if (isSelected) {
state.items[destinationId].eventGroups[groupIndex].indeterminate = isSelected;
} else {
let anySelected = false;
for (
let i = 0;
i < state.items[destinationId].eventGroups[groupIndex].children.length;
i++
) {
anySelected =
anySelected ||
state.items[destinationId].eventGroups[groupIndex].children[i].selected;
}
state.items[destinationId].eventGroups[groupIndex].indeterminate = anySelected;
}
});
}
}
}
}
},
removeDestinationItemTree(id: string) {
delete this.items[id];
},
clearDestinationItemTrees() {
this.items = {} as DestinationSettingsStore;
},
setSelectionAndBuildItems(destination: MessageEventBusDestinationOptions) {
if (destination.id) {
if (!this.items[destination.id]) {
this.items[destination.id] = {
destination,
selectedEvents: new Set<string>(),
eventGroups: [],
isNew: false,
} as DestinationStoreItem;
}
this.items[destination.id]?.selectedEvents?.clear();
if (destination.subscribedEvents) {
for (const eventName of destination.subscribedEvents) {
this.items[destination.id]?.selectedEvents?.add(eventName);
}
}
this.items[destination.id].eventGroups = eventGroupsFromStringList(
this.eventNames,
this.items[destination.id]?.selectedEvents,
);
}
},
async saveDestination(destination: MessageEventBusDestinationOptions): Promise<boolean> {
if (!hasDestinationId(destination)) { if (!hasDestinationId(destination)) {
return false; return false;
} }
const rootStore = useRootStore(); const selectedEvents = getSelectedEvents(destination.id);
const selectedEvents = this.getSelectedEvents(destination.id);
try { try {
await saveDestinationToDb(rootStore.restApiContext, destination, selectedEvents); await saveDestinationToDb(rootStore.restApiContext, destination, selectedEvents);
this.updateDestination(destination); updateDestination(destination);
return true; return true;
} catch (e) { } catch (e) {
return false; return false;
} }
}, };
async sendTestMessage(destination: MessageEventBusDestinationOptions): Promise<boolean> {
const sendTestMessage = async (
destination: MessageEventBusDestinationOptions,
): Promise<boolean> => {
if (!hasDestinationId(destination)) { if (!hasDestinationId(destination)) {
return false; return false;
} }
const rootStore = useRootStore();
const testResult = await sendTestMessageToDestination(rootStore.restApiContext, destination); const testResult = await sendTestMessageToDestination(rootStore.restApiContext, destination);
return testResult; return testResult;
}, };
async fetchEventNames(): Promise<string[]> {
const rootStore = useRootStore(); const fetchEventNames = async () => {
return await getEventNamesFromBackend(rootStore.restApiContext); return await getEventNamesFromBackend(rootStore.restApiContext);
}, };
async fetchDestinations(): Promise<MessageEventBusDestinationOptions[]> {
const rootStore = useRootStore(); const fetchDestinations = async (): Promise<MessageEventBusDestinationOptions[]> => {
return await getDestinationsFromBackend(rootStore.restApiContext); return await getDestinationsFromBackend(rootStore.restApiContext);
}, };
async deleteDestination(destinationId: string) {
const rootStore = useRootStore(); const deleteDestination = async (destinationId: string) => {
await deleteDestinationFromDb(rootStore.restApiContext, destinationId); await deleteDestinationFromDb(rootStore.restApiContext, destinationId);
this.removeDestination(destinationId); removeDestination(destinationId);
}, };
},
return {
addDestination,
setSelectionAndBuildItems,
getDestination,
getAllDestinations,
clearDestinations,
addEventName,
removeEventName,
clearEventNames,
addSelectedEvent,
removeSelectedEvent,
setSelectedInGroup,
removeDestinationItemTree,
updateDestination,
removeDestination,
getSelectedEvents,
saveDestination,
sendTestMessage,
fetchEventNames,
fetchDestinations,
deleteDestination,
items,
};
}); });
export function eventGroupFromEventName(eventName: string): string | undefined { export const eventGroupFromEventName = (eventName: string): string | undefined => {
const matches = eventName.match(/^[\w\s]+\.[\w\s]+/); const matches = eventName.match(/^[\w\s]+\.[\w\s]+/);
if (matches && matches?.length > 0) { if (matches && matches?.length > 0) {
return matches[0]; return matches[0];
} }
return undefined; return undefined;
} };
function prettifyEventName(label: string, group = ''): string { const prettifyEventName = (label: string, group = ''): string => {
label = label.replace(group + '.', ''); label = label.replace(group + '.', '');
if (label.length > 0) { if (label.length > 0) {
label = label[0].toUpperCase() + label.substring(1); label = label[0].toUpperCase() + label.substring(1);
label = label.replaceAll('.', ' '); label = label.replaceAll('.', ' ');
} }
return label; return label;
} };
export function eventGroupsFromStringList( export const eventGroupsFromStringList = (
dottedList: Set<string>, dottedList: Set<string>,
selectionList: Set<string> = new Set(), selectionList: Set<string> = new Set(),
) { ) => {
const result = [] as EventSelectionGroup[]; const result = [] as EventSelectionGroup[];
const eventNameArray = Array.from(dottedList.values()); const eventNameArray = Array.from(dottedList.values());
@ -287,4 +309,4 @@ export function eventGroupsFromStringList(
result.push(collection); result.push(collection);
} }
return result; return result;
} };

View file

@ -95,7 +95,7 @@ export default defineComponent({
}, },
async getDestinationDataFromBackend(): Promise<void> { async getDestinationDataFromBackend(): Promise<void> {
this.logStreamingStore.clearEventNames(); this.logStreamingStore.clearEventNames();
this.logStreamingStore.clearDestinationItemTrees(); this.logStreamingStore.clearDestinations();
this.allDestinations = []; this.allDestinations = [];
const eventNamesData = await this.logStreamingStore.fetchEventNames(); const eventNamesData = await this.logStreamingStore.fetchEventNames();
if (eventNamesData) { if (eventNamesData) {

File diff suppressed because it is too large Load diff