Merge master, fix conflict

This commit is contained in:
Iván Ovejero 2024-11-19 11:39:03 +01:00
commit c271e8b37d
No known key found for this signature in database
66 changed files with 1419 additions and 557 deletions

View file

@ -12,6 +12,13 @@ type ExecutionStarted = {
};
};
type ExecutionWaiting = {
type: 'executionWaiting';
data: {
executionId: string;
};
};
type ExecutionFinished = {
type: 'executionFinished';
data: {
@ -45,6 +52,7 @@ type NodeExecuteAfter = {
export type ExecutionPushMessage =
| ExecutionStarted
| ExecutionWaiting
| ExecutionFinished
| ExecutionRecovered
| NodeExecuteBefore

View file

@ -0,0 +1,30 @@
import { Config, Env, Nested } from '../decorators';
@Config
class PostHogConfig {
/** API key for PostHog. */
@Env('N8N_DIAGNOSTICS_POSTHOG_API_KEY')
apiKey: string = 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo';
/** API host for PostHog. */
@Env('N8N_DIAGNOSTICS_POSTHOG_API_HOST')
apiHost: string = 'https://ph.n8n.io';
}
@Config
export class DiagnosticsConfig {
/** Whether diagnostics are enabled. */
@Env('N8N_DIAGNOSTICS_ENABLED')
enabled: boolean = false;
/** Diagnostics config for frontend. */
@Env('N8N_DIAGNOSTICS_CONFIG_FRONTEND')
frontendConfig: string = '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io';
/** Diagnostics config for backend. */
@Env('N8N_DIAGNOSTICS_CONFIG_BACKEND')
backendConfig: string = '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io';
@Nested
posthogConfig: PostHogConfig;
}

View file

@ -53,4 +53,12 @@ export class TaskRunnersConfig {
/** Should the output of deduplication be asserted for correctness */
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
assertDeduplicationOutput: boolean = false;
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;
/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */
@Env('N8N_RUNNERS_HEARTBEAT_INTERVAL')
heartbeatInterval: number = 30;
}

View file

@ -1,6 +1,7 @@
import { CacheConfig } from './configs/cache.config';
import { CredentialsConfig } from './configs/credentials.config';
import { DatabaseConfig } from './configs/database.config';
import { DiagnosticsConfig } from './configs/diagnostics.config';
import { EndpointsConfig } from './configs/endpoints.config';
import { EventBusConfig } from './configs/event-bus.config';
import { ExternalSecretsConfig } from './configs/external-secrets.config';
@ -117,4 +118,7 @@ export class GlobalConfig {
@Nested
pruning: PruningConfig;
@Nested
diagnostics: DiagnosticsConfig;
}

View file

@ -234,6 +234,8 @@ describe('GlobalConfig', () => {
maxOldSpaceSize: '',
maxConcurrency: 5,
assertDeduplicationOutput: false,
taskTimeout: 60,
heartbeatInterval: 30,
},
sentry: {
backendDsn: '',
@ -280,6 +282,15 @@ describe('GlobalConfig', () => {
hardDeleteInterval: 15,
softDeleteInterval: 60,
},
diagnostics: {
enabled: false,
frontendConfig: '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io',
backendConfig: '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io',
posthogConfig: {
apiKey: 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo',
apiHost: 'https://ph.n8n.io',
},
},
};
it('should use all default values when no env variables are defined', () => {

View file

@ -35,15 +35,15 @@ export class AnthropicApi implements ICredentialType {
test: ICredentialTestRequest = {
request: {
baseURL: 'https://api.anthropic.com',
url: '/v1/complete',
url: '/v1/messages',
method: 'POST',
headers: {
'anthropic-version': '2023-06-01',
},
body: {
model: 'claude-2',
prompt: '\n\nHuman: Hello, world!\n\nAssistant:',
max_tokens_to_sample: 256,
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'Hey' }],
max_tokens: 1,
},
},
};

View file

@ -87,6 +87,36 @@ export class EmbeddingsAzureOpenAi implements INodeType {
'Maximum amount of time a request is allowed to take in seconds. Set to -1 for no timeout.',
type: 'number',
},
{
displayName: 'Dimensions',
name: 'dimensions',
default: undefined,
description:
'The number of dimensions the resulting output embeddings should have. Only supported in text-embedding-3 and later models.',
type: 'options',
options: [
{
name: '256',
value: 256,
},
{
name: '512',
value: 512,
},
{
name: '1024',
value: 1024,
},
{
name: '1536',
value: 1536,
},
{
name: '3072',
value: 3072,
},
],
},
],
},
],
@ -105,6 +135,7 @@ export class EmbeddingsAzureOpenAi implements INodeType {
batchSize?: number;
stripNewLines?: boolean;
timeout?: number;
dimensions?: number | undefined;
};
if (options.timeout === -1) {

View file

@ -135,6 +135,36 @@ export class EmbeddingsOpenAi implements INodeType {
type: 'collection',
default: {},
options: [
{
displayName: 'Dimensions',
name: 'dimensions',
default: undefined,
description:
'The number of dimensions the resulting output embeddings should have. Only supported in text-embedding-3 and later models.',
type: 'options',
options: [
{
name: '256',
value: 256,
},
{
name: '512',
value: 512,
},
{
name: '1024',
value: 1024,
},
{
name: '1536',
value: 1536,
},
{
name: '3072',
value: 3072,
},
],
},
{
displayName: 'Base URL',
name: 'baseURL',
@ -179,6 +209,7 @@ export class EmbeddingsOpenAi implements INodeType {
batchSize?: number;
stripNewLines?: boolean;
timeout?: number;
dimensions?: number | undefined;
};
if (options.timeout === -1) {

View file

@ -1,4 +1,16 @@
import { Config, Env } from '@n8n/config';
import { Config, Env, Nested } from '@n8n/config';
@Config
class HealthcheckServerConfig {
@Env('N8N_RUNNERS_SERVER_ENABLED')
enabled: boolean = false;
@Env('N8N_RUNNERS_SERVER_HOST')
host: string = '127.0.0.1';
@Env('N8N_RUNNERS_SERVER_PORT')
port: number = 5680;
}
@Config
export class BaseRunnerConfig {
@ -13,4 +25,7 @@ export class BaseRunnerConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
@Nested
healthcheckServer!: HealthcheckServerConfig;
}

View file

@ -0,0 +1,38 @@
import { ApplicationError } from 'n8n-workflow';
import { createServer } from 'node:http';
export class HealthcheckServer {
private server = createServer((_, res) => {
res.writeHead(200);
res.end('OK');
});
async start(host: string, port: number) {
return await new Promise<void>((resolve, reject) => {
const portInUseErrorHandler = (error: NodeJS.ErrnoException) => {
if (error.code === 'EADDRINUSE') {
reject(new ApplicationError(`Port ${port} is already in use`));
} else {
reject(error);
}
};
this.server.on('error', portInUseErrorHandler);
this.server.listen(port, host, () => {
this.server.removeListener('error', portInUseErrorHandler);
console.log(`Healthcheck server listening on ${host}, port ${port}`);
resolve();
});
});
}
async stop() {
return await new Promise<void>((resolve, reject) => {
this.server.close((error) => {
if (error) reject(error);
else resolve();
});
});
}
}

View file

@ -3,8 +3,10 @@ import Container from 'typedi';
import { MainConfig } from './config/main-config';
import type { ErrorReporter } from './error-reporter';
import type { HealthcheckServer } from './healthcheck-server';
import { JsTaskRunner } from './js-task-runner/js-task-runner';
let healthcheckServer: HealthcheckServer | undefined;
let runner: JsTaskRunner | undefined;
let isShuttingDown = false;
let errorReporter: ErrorReporter | undefined;
@ -22,6 +24,7 @@ function createSignalHandler(signal: string) {
if (runner) {
await runner.stop();
runner = undefined;
void healthcheckServer?.stop();
}
if (errorReporter) {
@ -49,6 +52,14 @@ void (async function start() {
runner = new JsTaskRunner(config);
const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;
if (enabled) {
const { HealthcheckServer } = await import('./healthcheck-server');
healthcheckServer = new HealthcheckServer();
await healthcheckServer.start(host, port);
}
process.on('SIGINT', createSignalHandler('SIGINT'));
process.on('SIGTERM', createSignalHandler('SIGTERM'));
})().catch((e) => {

View file

@ -296,43 +296,6 @@ export const schema = {
},
},
diagnostics: {
enabled: {
doc: 'Whether diagnostic mode is enabled.',
format: Boolean,
default: true,
env: 'N8N_DIAGNOSTICS_ENABLED',
},
config: {
posthog: {
apiKey: {
doc: 'API key for PostHog',
format: String,
default: 'phc_4URIAm1uYfJO7j8kWSe0J8lc8IqnstRLS7Jx8NcakHo',
env: 'N8N_DIAGNOSTICS_POSTHOG_API_KEY',
},
apiHost: {
doc: 'API host for PostHog',
format: String,
default: 'https://ph.n8n.io',
env: 'N8N_DIAGNOSTICS_POSTHOG_API_HOST',
},
},
frontend: {
doc: 'Diagnostics config for frontend.',
format: String,
default: '1zPn9bgWPzlQc0p8Gj1uiK6DOTn;https://telemetry.n8n.io',
env: 'N8N_DIAGNOSTICS_CONFIG_FRONTEND',
},
backend: {
doc: 'Diagnostics config for backend.',
format: String,
default: '1zPn7YoGC3ZXE9zLeTKLuQCB4F6;https://telemetry.n8n.io',
env: 'N8N_DIAGNOSTICS_CONFIG_BACKEND',
},
},
},
defaultLocale: {
doc: 'Default locale for the UI',
format: String,

View file

@ -2,7 +2,6 @@ import type { GlobalConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import type { IWorkflowBase } from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import type { ProjectRelationRepository } from '@/databases/repositories/project-relation.repository';
@ -66,7 +65,7 @@ describe('TelemetryEventRelay', () => {
});
beforeEach(() => {
config.set('diagnostics.enabled', true);
globalConfig.diagnostics.enabled = true;
});
afterEach(() => {
@ -75,7 +74,7 @@ describe('TelemetryEventRelay', () => {
describe('init', () => {
it('with diagnostics enabled, should init telemetry and register listeners', async () => {
config.set('diagnostics.enabled', true);
globalConfig.diagnostics.enabled = true;
const telemetryEventRelay = new TelemetryEventRelay(
eventService,
telemetry,
@ -96,7 +95,7 @@ describe('TelemetryEventRelay', () => {
});
it('with diagnostics disabled, should neither init telemetry nor register listeners', async () => {
config.set('diagnostics.enabled', false);
globalConfig.diagnostics.enabled = false;
const telemetryEventRelay = new TelemetryEventRelay(
eventService,
telemetry,

View file

@ -37,7 +37,7 @@ export class TelemetryEventRelay extends EventRelay {
}
async init() {
if (!config.getEnv('diagnostics.enabled')) return;
if (!this.globalConfig.diagnostics.enabled) return;
await this.telemetry.init();

View file

@ -1,5 +1,4 @@
import {
deepCopy,
ErrorReporterProxy,
type IRunExecutionData,
type ITaskData,
@ -87,37 +86,6 @@ test('should update execution when saving progress is enabled', async () => {
expect(reporterSpy).not.toHaveBeenCalled();
});
test('should update execution when saving progress is disabled, but waitTill is defined', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,
progress: false,
});
const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error');
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
const args = deepCopy(commonArgs);
args[4].waitTill = new Date();
await saveExecutionProgress(...args);
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
},
},
startData: {},
},
status: 'running',
});
expect(reporterSpy).not.toHaveBeenCalled();
});
test('should report error on failure', async () => {
jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({
...commonSettings,

View file

@ -16,7 +16,7 @@ export async function saveExecutionProgress(
) {
const saveSettings = toSaveSettings(workflowData.settings);
if (!saveSettings.progress && !executionData.waitTill) return;
if (!saveSettings.progress) return;
const logger = Container.get(Logger);

View file

@ -18,20 +18,20 @@ export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) {
PROGRESS: config.getEnv('executions.saveExecutionProgress'),
};
const {
saveDataErrorExecution = DEFAULTS.ERROR,
saveDataSuccessExecution = DEFAULTS.SUCCESS,
saveManualExecutions = DEFAULTS.MANUAL,
saveExecutionProgress = DEFAULTS.PROGRESS,
} = workflowSettings;
return {
error: workflowSettings.saveDataErrorExecution
? workflowSettings.saveDataErrorExecution !== 'none'
: DEFAULTS.ERROR !== 'none',
success: workflowSettings.saveDataSuccessExecution
? workflowSettings.saveDataSuccessExecution !== 'none'
: DEFAULTS.SUCCESS !== 'none',
manual:
workflowSettings === undefined || workflowSettings.saveManualExecutions === 'DEFAULT'
? DEFAULTS.MANUAL
: (workflowSettings.saveManualExecutions ?? DEFAULTS.MANUAL),
progress:
workflowSettings === undefined || workflowSettings.saveExecutionProgress === 'DEFAULT'
? DEFAULTS.PROGRESS
: (workflowSettings.saveExecutionProgress ?? DEFAULTS.PROGRESS),
error: saveDataErrorExecution === 'DEFAULT' ? DEFAULTS.ERROR : saveDataErrorExecution === 'all',
success:
saveDataSuccessExecution === 'DEFAULT'
? DEFAULTS.SUCCESS
: saveDataSuccessExecution === 'all',
manual: saveManualExecutions === 'DEFAULT' ? DEFAULTS.MANUAL : saveManualExecutions,
progress: saveExecutionProgress === 'DEFAULT' ? DEFAULTS.PROGRESS : saveExecutionProgress,
};
}

View file

@ -3,7 +3,6 @@ import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { PostHog } from 'posthog-node';
import config from '@/config';
import { PostHogClient } from '@/posthog';
import { mockInstance } from '@test/mocking';
@ -20,12 +19,11 @@ describe('PostHog', () => {
const globalConfig = mock<GlobalConfig>({ logging: { level: 'debug' } });
beforeAll(() => {
config.set('diagnostics.config.posthog.apiKey', apiKey);
config.set('diagnostics.config.posthog.apiHost', apiHost);
globalConfig.diagnostics.posthogConfig = { apiKey, apiHost };
});
beforeEach(() => {
config.set('diagnostics.enabled', true);
globalConfig.diagnostics.enabled = true;
jest.resetAllMocks();
});
@ -37,7 +35,7 @@ describe('PostHog', () => {
});
it('does not initialize or track if diagnostics are not enabled', async () => {
config.set('diagnostics.enabled', false);
globalConfig.diagnostics.enabled = false;
const ph = new PostHogClient(instanceSettings, globalConfig);
await ph.init();

View file

@ -4,7 +4,6 @@ import type { FeatureFlags, ITelemetryTrackProperties } from 'n8n-workflow';
import type { PostHog } from 'posthog-node';
import { Service } from 'typedi';
import config from '@/config';
import type { PublicUser } from '@/interfaces';
@Service()
@ -17,14 +16,14 @@ export class PostHogClient {
) {}
async init() {
const enabled = config.getEnv('diagnostics.enabled');
const { enabled, posthogConfig } = this.globalConfig.diagnostics;
if (!enabled) {
return;
}
const { PostHog } = await import('posthog-node');
this.postHog = new PostHog(config.getEnv('diagnostics.config.posthog.apiKey'), {
host: config.getEnv('diagnostics.config.posthog.apiHost'),
this.postHog = new PostHog(posthogConfig.apiKey, {
host: posthogConfig.apiHost,
});
const logLevel = this.globalConfig.logging.level;

View file

@ -1,8 +1,12 @@
import type { TaskRunnersConfig } from '@n8n/config';
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended';
import type { INodeTypeBaseDescription } from 'n8n-workflow';
import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow';
import { Time } from '@/constants';
import { TaskRejectError } from '../errors';
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
import { TaskBroker } from '../task-broker.service';
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
@ -12,7 +16,7 @@ describe('TaskBroker', () => {
let taskBroker: TaskBroker;
beforeEach(() => {
taskBroker = new TaskBroker(mock());
taskBroker = new TaskBroker(mock(), mock(), mock());
jest.restoreAllMocks();
});
@ -707,4 +711,131 @@ describe('TaskBroker', () => {
});
});
});
describe('task timeouts', () => {
let taskBroker: TaskBroker;
let config: TaskRunnersConfig;
let runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
beforeAll(() => {
jest.useFakeTimers();
config = mock<TaskRunnersConfig>({ taskTimeout: 30 });
taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents);
});
afterAll(() => {
jest.useRealTimers();
});
it('on sending task, we should set up task timeout', async () => {
jest.spyOn(global, 'setTimeout');
const taskId = 'task1';
const runnerId = 'runner1';
const runner = mock<TaskRunner>({ id: runnerId });
const runnerMessageCallback = jest.fn();
taskBroker.registerRunner(runner, runnerMessageCallback);
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId: 'requester1', taskType: 'test' },
});
await taskBroker.sendTaskSettings(taskId, {});
expect(setTimeout).toHaveBeenCalledWith(
expect.any(Function),
config.taskTimeout * Time.seconds.toMilliseconds,
);
});
it('on task completion, we should clear timeout', async () => {
jest.spyOn(global, 'clearTimeout');
const taskId = 'task1';
const runnerId = 'runner1';
const requesterId = 'requester1';
const requesterCallback = jest.fn();
taskBroker.registerRequester(requesterId, requesterCallback);
taskBroker.setTasks({
[taskId]: {
id: taskId,
runnerId,
requesterId,
taskType: 'test',
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
},
});
await taskBroker.taskDoneHandler(taskId, { result: [] });
expect(clearTimeout).toHaveBeenCalled();
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
});
it('on task error, we should clear timeout', async () => {
jest.spyOn(global, 'clearTimeout');
const taskId = 'task1';
const runnerId = 'runner1';
const requesterId = 'requester1';
const requesterCallback = jest.fn();
taskBroker.registerRequester(requesterId, requesterCallback);
taskBroker.setTasks({
[taskId]: {
id: taskId,
runnerId,
requesterId,
taskType: 'test',
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
},
});
await taskBroker.taskErrorHandler(taskId, new Error('Test error'));
expect(clearTimeout).toHaveBeenCalled();
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
});
it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => {
jest.spyOn(global, 'clearTimeout');
const taskId = 'task1';
const runnerId = 'runner1';
const requesterId = 'requester1';
const runner = mock<TaskRunner>({ id: runnerId });
const runnerCallback = jest.fn();
const requesterCallback = jest.fn();
taskBroker.registerRunner(runner, runnerCallback);
taskBroker.registerRequester(requesterId, requesterCallback);
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
});
await taskBroker.sendTaskSettings(taskId, {});
jest.runAllTimers();
await Promise.resolve();
expect(runnerLifecycleEvents.emit).toHaveBeenCalledWith('runner:timed-out-during-task');
await Promise.resolve();
expect(clearTimeout).toHaveBeenCalled();
expect(requesterCallback).toHaveBeenCalledWith({
type: 'broker:taskerror',
taskId,
error: new ApplicationError(`Task execution timed out after ${config.taskTimeout} seconds`),
});
await Promise.resolve();
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
});
});
});

View file

@ -7,6 +7,8 @@ import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.serv
import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { mockInstance } from '@test/mocking';
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
const spawnMock = jest.fn(() =>
mock<ChildProcess>({
stdout: {
@ -25,7 +27,7 @@ describe('TaskRunnerProcess', () => {
runnerConfig.enabled = true;
runnerConfig.mode = 'internal_childprocess';
const authService = mock<TaskRunnerAuthService>();
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
afterEach(async () => {
spawnMock.mockClear();
@ -35,15 +37,35 @@ describe('TaskRunnerProcess', () => {
it('should throw if runner mode is external', () => {
runnerConfig.mode = 'external';
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow();
runnerConfig.mode = 'internal_childprocess';
});
it('should register listener for `runner:failed-heartbeat-check` event', () => {
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
'runner:failed-heartbeat-check',
expect.any(Function),
);
});
it('should register listener for `runner:timed-out-during-task` event', () => {
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
'runner:timed-out-during-task',
expect.any(Function),
);
});
});
describe('start', () => {
beforeEach(() => {
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
});
test.each([

View file

@ -0,0 +1,45 @@
import type { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import { Time } from '@/constants';
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
describe('TaskRunnerWsServer', () => {
describe('heartbeat timer', () => {
it('should set up heartbeat timer on server start', async () => {
const setIntervalSpy = jest.spyOn(global, 'setInterval');
const server = new TaskRunnerWsServer(
mock(),
mock(),
mock(),
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
mock(),
);
expect(setIntervalSpy).toHaveBeenCalledWith(
expect.any(Function),
30 * Time.seconds.toMilliseconds,
);
await server.shutdown();
});
it('should clear heartbeat timer on server stop', async () => {
jest.spyOn(global, 'setInterval');
const clearIntervalSpy = jest.spyOn(global, 'clearInterval');
const server = new TaskRunnerWsServer(
mock(),
mock(),
mock(),
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
mock(),
);
await server.shutdown();
expect(clearIntervalSpy).toHaveBeenCalled();
});
});
});

View file

@ -1,8 +1,10 @@
import { Service } from 'typedi';
import config from '@/config';
import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
import type { DisconnectAnalyzer } from './runner-types';
import type { TaskRunner } from './task-broker.service';
import { TaskRunnerFailedHeartbeatError } from './errors/task-runner-failed-heartbeat.error';
import type { DisconnectAnalyzer, DisconnectErrorOptions } from './runner-types';
/**
* Analyzes the disconnect reason of a task runner to provide a more
@ -10,7 +12,16 @@ import type { TaskRunner } from './task-broker.service';
*/
@Service()
export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer {
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
return new TaskRunnerDisconnectedError(runnerId);
async toDisconnectError(opts: DisconnectErrorOptions): Promise<Error> {
const { reason, heartbeatInterval } = opts;
if (reason === 'failed-heartbeat-check' && heartbeatInterval) {
return new TaskRunnerFailedHeartbeatError(
heartbeatInterval,
config.get('deployment.type') !== 'cloud',
);
}
return new TaskRunnerDisconnectedError(opts.runnerId ?? 'Unknown runner ID');
}
}

View file

@ -0,0 +1,7 @@
export class MissingAuthTokenError extends Error {
constructor() {
super(
'Missing auth token. When `N8N_RUNNERS_MODE` is `external`, it is required to set `N8N_RUNNERS_AUTH_TOKEN`. Its value should be a shared secret between the main instance and the launcher.',
);
}
}

View file

@ -0,0 +1,32 @@
import { ApplicationError } from 'n8n-workflow';
export class TaskRunnerFailedHeartbeatError extends ApplicationError {
description: string;
constructor(heartbeatInterval: number, isSelfHosted: boolean) {
super('Task execution aborted because runner became unresponsive');
const subtitle =
'The task runner failed to respond as expected, so it was considered unresponsive, and the task was aborted. You can try the following:';
const fixes = {
optimizeScript:
'Optimize your script to prevent CPU-intensive operations, e.g. by breaking them down into smaller chunks or batch processing.',
ensureTermination:
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
increaseInterval: `If your task can reasonably keep the task runner busy for more than ${heartbeatInterval} ${heartbeatInterval === 1 ? 'second' : 'seconds'}, increase the heartbeat interval using the N8N_RUNNERS_HEARTBEAT_INTERVAL environment variable.`,
};
const suggestions = [fixes.optimizeScript, fixes.ensureTermination];
if (isSelfHosted) suggestions.push(fixes.increaseInterval);
const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');
const description = `${subtitle}<br/><br/>${suggestionsText}`;
this.description = description;
}
}

View file

@ -0,0 +1,34 @@
import { ApplicationError } from 'n8n-workflow';
export class TaskRunnerTimeoutError extends ApplicationError {
description: string;
constructor(taskTimeout: number, isSelfHosted: boolean) {
super(
`Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`,
);
const subtitle =
'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted. You can try the following:';
const fixes = {
optimizeScript:
'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.',
ensureTermination:
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
increaseTimeout: `If your task can reasonably take more than ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}, increase the timeout using the N8N_RUNNERS_TASK_TIMEOUT environment variable.`,
};
const suggestions = [fixes.optimizeScript, fixes.ensureTermination];
if (isSelfHosted) suggestions.push(fixes.increaseTimeout);
const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');
const description = `${subtitle}<br/><br/>${suggestionsText}`;
this.description = description;
}
}

View file

@ -5,8 +5,8 @@ import config from '@/config';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
import type { DisconnectErrorOptions } from './runner-types';
import { SlidingWindowSignal } from './sliding-window-signal';
import type { TaskRunner } from './task-broker.service';
import type { ExitReason, TaskRunnerProcessEventMap } from './task-runner-process';
import { TaskRunnerProcess } from './task-runner-process';
@ -38,13 +38,13 @@ export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisco
});
}
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
async toDisconnectError(opts: DisconnectErrorOptions): Promise<Error> {
const exitCode = await this.awaitExitSignal();
if (exitCode === 'oom') {
return new TaskRunnerOomError(runnerId, this.isCloudDeployment);
return new TaskRunnerOomError(opts.runnerId ?? 'Unknown runner ID', this.isCloudDeployment);
}
return await super.determineDisconnectReason(runnerId);
return await super.toDisconnectError(opts);
}
private async awaitExitSignal(): Promise<ExitReason> {

View file

@ -0,0 +1,11 @@
import { Service } from 'typedi';
import { TypedEmitter } from '@/typed-emitter';
type RunnerLifecycleEventMap = {
'runner:failed-heartbeat-check': never;
'runner:timed-out-during-task': never;
};
@Service()
export class RunnerLifecycleEvents extends TypedEmitter<RunnerLifecycleEventMap> {}

View file

@ -6,7 +6,7 @@ import type { TaskRunner } from './task-broker.service';
import type { AuthlessRequest } from '../requests';
export interface DisconnectAnalyzer {
determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>;
toDisconnectError(opts: DisconnectErrorOptions): Promise<Error>;
}
export type DataRequestType = 'input' | 'node' | 'all';
@ -22,3 +22,11 @@ export interface TaskRunnerServerInitRequest
}
export type TaskRunnerServerInitResponse = Response & { req: TaskRunnerServerInitRequest };
export type DisconnectReason = 'shutting-down' | 'failed-heartbeat-check' | 'unknown';
export type DisconnectErrorOptions = {
runnerId?: TaskRunner['id'];
reason?: DisconnectReason;
heartbeatInterval?: number;
};

View file

@ -1,12 +1,17 @@
import { TaskRunnersConfig } from '@n8n/config';
import type { BrokerMessage, RunnerMessage } from '@n8n/task-runner';
import { ApplicationError } from 'n8n-workflow';
import { Service } from 'typedi';
import type WebSocket from 'ws';
import { Time } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
import type {
DisconnectAnalyzer,
DisconnectReason,
TaskRunnerServerInitRequest,
TaskRunnerServerInitResponse,
} from './runner-types';
@ -20,11 +25,50 @@ function heartbeat(this: WebSocket) {
export class TaskRunnerWsServer {
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();
private heartbeatTimer: NodeJS.Timer | undefined;
constructor(
private readonly logger: Logger,
private readonly taskBroker: TaskBroker,
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer,
) {}
private readonly taskTunnersConfig: TaskRunnersConfig,
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
) {
this.startHeartbeatChecks();
}
private startHeartbeatChecks() {
const { heartbeatInterval } = this.taskTunnersConfig;
if (heartbeatInterval <= 0) {
throw new ApplicationError('Heartbeat interval must be greater than 0');
}
this.heartbeatTimer = setInterval(() => {
for (const [runnerId, connection] of this.runnerConnections.entries()) {
if (!connection.isAlive) {
void this.removeConnection(runnerId, 'failed-heartbeat-check');
this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check');
return;
}
connection.isAlive = false;
connection.ping();
}
}, heartbeatInterval * Time.seconds.toMilliseconds);
}
async shutdown() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = undefined;
}
await Promise.all(
Array.from(this.runnerConnections.keys()).map(
async (id) => await this.removeConnection(id, 'shutting-down'),
),
);
}
setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) {
this.disconnectAnalyzer = disconnectAnalyzer;
@ -97,11 +141,15 @@ export class TaskRunnerWsServer {
);
}
async removeConnection(id: TaskRunner['id']) {
async removeConnection(id: TaskRunner['id'], reason: DisconnectReason = 'unknown') {
const connection = this.runnerConnections.get(id);
if (connection) {
const disconnectReason = await this.disconnectAnalyzer.determineDisconnectReason(id);
this.taskBroker.deregisterRunner(id, disconnectReason);
const disconnectError = await this.disconnectAnalyzer.toDisconnectError({
runnerId: id,
reason,
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
});
this.taskBroker.deregisterRunner(id, disconnectError);
connection.close();
this.runnerConnections.delete(id);
}

View file

@ -1,3 +1,4 @@
import { TaskRunnersConfig } from '@n8n/config';
import type {
BrokerMessage,
RequesterMessage,
@ -8,9 +9,13 @@ import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import config from '@/config';
import { Time } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { TaskDeferredError, TaskRejectError } from './errors';
import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error';
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
export interface TaskRunner {
id: string;
@ -24,6 +29,7 @@ export interface Task {
runnerId: TaskRunner['id'];
requesterId: string;
taskType: string;
timeout?: NodeJS.Timeout;
}
export interface TaskOffer {
@ -80,7 +86,15 @@ export class TaskBroker {
private pendingTaskRequests: TaskRequest[] = [];
constructor(private readonly logger: Logger) {}
constructor(
private readonly logger: Logger,
private readonly taskRunnersConfig: TaskRunnersConfig,
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
) {
if (this.taskRunnersConfig.taskTimeout <= 0) {
throw new ApplicationError('Task timeout must be greater than 0');
}
}
expireTasks() {
const now = process.hrtime.bigint();
@ -426,6 +440,14 @@ export class TaskBroker {
async sendTaskSettings(taskId: Task['id'], settings: unknown) {
const runner = await this.getRunnerOrFailTask(taskId);
const task = this.tasks.get(taskId);
if (!task) return;
task.timeout = setTimeout(async () => {
await this.handleTaskTimeout(taskId);
}, this.taskRunnersConfig.taskTimeout * Time.seconds.toMilliseconds);
await this.messageRunner(runner.id, {
type: 'broker:tasksettings',
taskId,
@ -433,11 +455,27 @@ export class TaskBroker {
});
}
private async handleTaskTimeout(taskId: Task['id']) {
const task = this.tasks.get(taskId);
if (!task) return;
this.runnerLifecycleEvents.emit('runner:timed-out-during-task');
await this.taskErrorHandler(
taskId,
new TaskRunnerTimeoutError(
this.taskRunnersConfig.taskTimeout,
config.getEnv('deployment.type') !== 'cloud',
),
);
}
async taskDoneHandler(taskId: Task['id'], data: TaskResultData) {
const task = this.tasks.get(taskId);
if (!task) {
return;
}
if (!task) return;
clearTimeout(task.timeout);
await this.requesters.get(task.requesterId)?.({
type: 'broker:taskdone',
taskId: task.id,
@ -448,9 +486,10 @@ export class TaskBroker {
async taskErrorHandler(taskId: Task['id'], error: unknown) {
const task = this.tasks.get(taskId);
if (!task) {
return;
}
if (!task) return;
clearTimeout(task.timeout);
await this.requesters.get(task.requesterId)?.({
type: 'broker:taskerror',
taskId: task.id,

View file

@ -4,6 +4,7 @@ import Container, { Service } from 'typedi';
import type { TaskRunnerProcess } from '@/runners/task-runner-process';
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
import { TaskRunnerWsServer } from './runner-ws-server';
import type { LocalTaskManager } from './task-managers/local-task-manager';
import type { TaskRunnerServer } from './task-runner-server';
@ -28,13 +29,14 @@ export class TaskRunnerModule {
async start() {
a.ok(this.runnerConfig.enabled, 'Task runner is disabled');
const { mode, authToken } = this.runnerConfig;
if (mode === 'external' && !authToken) throw new MissingAuthTokenError();
await this.loadTaskManager();
await this.loadTaskRunnerServer();
if (
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher'
) {
if (mode === 'internal_childprocess' || mode === 'internal_launcher') {
await this.startInternalTaskRunner();
}
}

View file

@ -10,6 +10,7 @@ import { Logger } from '@/logging/logger.service';
import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { forwardToLogger } from './forward-to-logger';
import { NodeProcessOomDetector } from './node-process-oom-detector';
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
import { TypedEmitter } from '../typed-emitter';
type ChildProcess = ReturnType<typeof spawn>;
@ -70,6 +71,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
logger: Logger,
private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService,
private readonly runnerLifecycleEvents: RunnerLifecycleEvents,
) {
super();
@ -79,6 +81,16 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
);
this.logger = logger.scoped('task-runner');
this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => {
this.logger.warn('Task runner failed heartbeat check, restarting...');
void this.forceRestart();
});
this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => {
this.logger.warn('Task runner timed out during task, restarting...');
void this.forceRestart();
});
}
async start() {
@ -116,9 +128,7 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
@OnShutdown()
async stop() {
if (!this.process) {
return;
}
if (!this.process) return;
this.isShuttingDown = true;
@ -133,10 +143,22 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
this.isShuttingDown = false;
}
killNode() {
if (!this.process) {
return;
/** Force-restart a runner suspected of being unresponsive. */
async forceRestart() {
if (!this.process) return;
if (this.useLauncher) {
await this.killLauncher(); // @TODO: Implement SIGKILL in launcher
} else {
this.process.kill('SIGKILL');
}
await this._runPromise;
}
killNode() {
if (!this.process) return;
this.process.kill();
}
@ -173,7 +195,6 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' });
resolveFn();
// If we are not shutting down, restart the process
if (!this.isShuttingDown) {
setImmediate(async () => await this.start());
}

View file

@ -44,7 +44,7 @@ export class TaskRunnerServer {
private readonly logger: Logger,
private readonly globalConfig: GlobalConfig,
private readonly taskRunnerAuthController: TaskRunnerAuthController,
private readonly taskRunnerService: TaskRunnerWsServer,
private readonly taskRunnerWsServer: TaskRunnerWsServer,
) {
this.app = express();
this.app.disable('x-powered-by');
@ -148,7 +148,7 @@ export class TaskRunnerServer {
// eslint-disable-next-line @typescript-eslint/unbound-method
this.taskRunnerAuthController.authMiddleware,
(req: TaskRunnerServerInitRequest, res: TaskRunnerServerInitResponse) =>
this.taskRunnerService.handleRequest(req, res),
this.taskRunnerWsServer.handleRequest(req, res),
);
const authEndpoint = `${this.getEndpointBasePath()}/auth`;
@ -181,7 +181,10 @@ export class TaskRunnerServer {
const response = new ServerResponse(request);
response.writeHead = (statusCode) => {
if (statusCode > 200) ws.close();
if (statusCode > 200) {
this.logger.error(`Task runner connection attempt failed with status code ${statusCode}`);
ws.close();
}
return response;
};

View file

@ -103,7 +103,7 @@ export class InstanceRiskReporter implements RiskReporter {
};
settings.telemetry = {
diagnosticsEnabled: config.getEnv('diagnostics.enabled'),
diagnosticsEnabled: this.globalConfig.diagnostics.enabled,
};
return settings;

View file

@ -39,7 +39,7 @@ describe('WorkflowStatisticsService', () => {
});
Object.assign(entityManager, { connection: dataSource });
config.set('diagnostics.enabled', true);
globalConfig.diagnostics.enabled = true;
config.set('deployment.type', 'n8n-testing');
mocked(ownershipService.getWorkflowProjectCached).mockResolvedValue(fakeProject);
mocked(ownershipService.getPersonalProjectOwnerCached).mockResolvedValue(fakeUser);

View file

@ -66,11 +66,11 @@ export class FrontendService {
const restEndpoint = this.globalConfig.endpoints.rest;
const telemetrySettings: ITelemetrySettings = {
enabled: config.getEnv('diagnostics.enabled'),
enabled: this.globalConfig.diagnostics.enabled,
};
if (telemetrySettings.enabled) {
const conf = config.getEnv('diagnostics.config.frontend');
const conf = this.globalConfig.diagnostics.frontendConfig;
const [key, url] = conf.split(';');
if (!key || !url) {
@ -122,15 +122,15 @@ export class FrontendService {
instanceId: this.instanceSettings.instanceId,
telemetry: telemetrySettings,
posthog: {
enabled: config.getEnv('diagnostics.enabled'),
apiHost: config.getEnv('diagnostics.config.posthog.apiHost'),
apiKey: config.getEnv('diagnostics.config.posthog.apiKey'),
enabled: this.globalConfig.diagnostics.enabled,
apiHost: this.globalConfig.diagnostics.posthogConfig.apiHost,
apiKey: this.globalConfig.diagnostics.posthogConfig.apiKey,
autocapture: false,
disableSessionRecording: config.getEnv('deployment.type') !== 'cloud',
debug: this.globalConfig.logging.level === 'debug',
},
personalizationSurveyEnabled:
config.getEnv('personalization.enabled') && config.getEnv('diagnostics.enabled'),
config.getEnv('personalization.enabled') && this.globalConfig.diagnostics.enabled,
defaultLocale: config.getEnv('defaultLocale'),
userManagement: {
quota: this.license.getUsersLimit(),

View file

@ -21,6 +21,10 @@ describe('Telemetry', () => {
const instanceId = 'Telemetry unit test';
const testDateTime = new Date('2022-01-01 00:00:00');
const instanceSettings = mockInstance(InstanceSettings, { instanceId });
const globalConfig = mock<GlobalConfig>({
diagnostics: { enabled: true },
logging: { level: 'info', outputs: ['console'] },
});
beforeAll(() => {
// @ts-expect-error Spying on private method
@ -28,7 +32,6 @@ describe('Telemetry', () => {
jest.useFakeTimers();
jest.setSystemTime(testDateTime);
config.set('diagnostics.enabled', true);
config.set('deployment.type', 'n8n-testing');
});
@ -45,14 +48,7 @@ describe('Telemetry', () => {
const postHog = new PostHogClient(instanceSettings, mock());
await postHog.init();
telemetry = new Telemetry(
mock(),
postHog,
mock(),
instanceSettings,
mock(),
mock<GlobalConfig>({ logging: { level: 'info', outputs: ['console'] } }),
);
telemetry = new Telemetry(mock(), postHog, mock(), instanceSettings, mock(), globalConfig);
// @ts-expect-error Assigning to private property
telemetry.rudderStack = mockRudderStack;
});

View file

@ -5,7 +5,6 @@ import { InstanceSettings } from 'n8n-core';
import type { ITelemetryTrackProperties } from 'n8n-workflow';
import { Container, Service } from 'typedi';
import config from '@/config';
import { LOWEST_SHUTDOWN_PRIORITY, N8N_VERSION } from '@/constants';
import { ProjectRelationRepository } from '@/databases/repositories/project-relation.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
@ -54,10 +53,9 @@ export class Telemetry {
) {}
async init() {
const enabled = config.getEnv('diagnostics.enabled');
const { enabled, backendConfig } = this.globalConfig.diagnostics;
if (enabled) {
const conf = config.getEnv('diagnostics.config.backend');
const [key, dataPlaneUrl] = conf.split(';');
const [key, dataPlaneUrl] = backendConfig.split(';');
if (!key || !dataPlaneUrl) {
this.logger.warn('Diagnostics backend config is invalid');

View file

@ -464,6 +464,11 @@ export async function executeWebhook(
projectId: project?.id,
};
// When resuming from a wait node, copy over the pushRef from the execution-data
if (!runData.pushRef) {
runData.pushRef = runExecutionData.pushRef;
}
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') {
responsePromise = createDeferredPromise<IN8nHttpFullResponse>();

View file

@ -307,7 +307,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks): Promise<void> {
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
@ -318,7 +318,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId,
});
pushInstance.send('executionFinished', { executionId }, pushRef);
const pushType =
fullRunData.status === 'waiting' ? 'executionWaiting' : 'executionFinished';
pushInstance.send(pushType, { executionId }, pushRef);
},
],
};
@ -430,22 +432,21 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill) {
if (!fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
return;
}
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
return;
}
// Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive
@ -1110,6 +1111,9 @@ export function getWorkflowHooksWorkerMain(
hookFunctions.nodeExecuteAfter = [];
hookFunctions.workflowExecuteAfter = [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
const saveSettings = toSaveSettings(this.workflowData.settings);

View file

@ -740,14 +740,6 @@
}
return;
}).then(() => {
window.addEventListener('storage', function(event) {
if (event.key === 'n8n_redirect_to_next_form_test_page' && event.newValue) {
const newUrl = event.newValue;
localStorage.removeItem('n8n_redirect_to_next_form_test_page');
window.location.replace(newUrl);
}
});
})
.catch(function (error) {
console.error('Error:', error);

View file

@ -1,6 +1,7 @@
import { TaskRunnersConfig } from '@n8n/config';
import Container from 'typedi';
import { MissingAuthTokenError } from '@/runners/errors/missing-auth-token.error';
import { TaskRunnerModule } from '@/runners/task-runner-module';
import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/runners/default-task-runner-disconnect-analyzer';
@ -10,6 +11,7 @@ describe('TaskRunnerModule in external mode', () => {
const runnerConfig = Container.get(TaskRunnersConfig);
runnerConfig.mode = 'external';
runnerConfig.port = 0;
runnerConfig.authToken = 'test';
const module = Container.get(TaskRunnerModule);
afterEach(async () => {
@ -24,6 +26,17 @@ describe('TaskRunnerModule in external mode', () => {
await expect(module.start()).rejects.toThrow('Task runner is disabled');
});
it('should throw if auth token is missing', async () => {
const runnerConfig = new TaskRunnersConfig();
runnerConfig.mode = 'external';
runnerConfig.enabled = true;
runnerConfig.authToken = '';
const module = new TaskRunnerModule(runnerConfig);
await expect(module.start()).rejects.toThrowError(MissingAuthTokenError);
});
it('should start the task runner', async () => {
runnerConfig.enabled = true;

View file

@ -1,9 +1,9 @@
import { GlobalConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import { NodeConnectionType } from 'n8n-workflow';
import Container from 'typedi';
import { v4 as uuid } from 'uuid';
import config from '@/config';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { generateNanoId } from '@/databases/utils/generators';
import { INSTANCE_REPORT, WEBHOOK_VALIDATOR_NODE_TYPES } from '@/security-audit/constants';
@ -239,8 +239,7 @@ test('should not report outdated instance when up to date', async () => {
});
test('should report security settings', async () => {
config.set('diagnostics.enabled', true);
Container.get(GlobalConfig).diagnostics.enabled = true;
const testAudit = await securityAuditService.run(['instance']);
const section = getRiskSection(

View file

@ -916,7 +916,6 @@ export class WorkflowExecute {
let nodeSuccessData: INodeExecutionData[][] | null | undefined;
let runIndex: number;
let startTime: number;
let taskData: ITaskData;
if (this.runExecutionData.startData === undefined) {
this.runExecutionData.startData = {};
@ -1446,13 +1445,13 @@ export class WorkflowExecute {
this.runExecutionData.resultData.runData[executionNode.name] = [];
}
taskData = {
const taskData: ITaskData = {
hints: executionHints,
startTime,
executionTime: new Date().getTime() - startTime,
source: !executionData.source ? [] : executionData.source.main,
metadata: executionData.metadata,
executionStatus: 'success',
executionStatus: this.runExecutionData.waitTill ? 'waiting' : 'success',
};
if (executionError !== undefined) {

View file

@ -212,7 +212,10 @@ const activeNodeType = computed(() => {
return nodeTypesStore.getNodeType(activeNode.value.type, activeNode.value.typeVersion);
});
const waitingMessage = computed(() => waitingNodeTooltip());
const waitingMessage = computed(() => {
const parentNode = parentNodes.value[0];
return parentNode && waitingNodeTooltip(workflowsStore.getNodeByName(parentNode.name));
});
watch(
inputMode,

View file

@ -65,7 +65,7 @@ const lastPopupCountUpdate = ref(0);
const codeGenerationInProgress = ref(false);
const router = useRouter();
const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution } = useRunWorkflow({ router });
const { runWorkflow, stopCurrentExecution } = useRunWorkflow({ router });
const workflowsStore = useWorkflowsStore();
const externalHooks = useExternalHooks();
@ -353,17 +353,10 @@ async function onClick() {
telemetry.track('User clicked execute node button', telemetryPayload);
await externalHooks.run('nodeExecuteButton.onClick', telemetryPayload);
if (workflowsStore.isWaitingExecution) {
await runWorkflowResolvePending({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
} else {
await runWorkflow({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
}
await runWorkflow({
destinationNode: props.nodeName,
source: 'RunData.ExecuteNodeButton',
});
emit('execute');
}

View file

@ -352,7 +352,7 @@ const activatePane = () => {
<template #node-waiting>
<N8nText :bold="true" color="text-dark" size="large">Waiting for input</N8nText>
<N8nText v-n8n-html="waitingNodeTooltip()"></N8nText>
<N8nText v-n8n-html="waitingNodeTooltip(node)"></N8nText>
</template>
<template #no-output-data>

View file

@ -200,12 +200,13 @@ const displayMode = computed(() =>
);
const isReadOnlyRoute = computed(() => route.meta.readOnlyCanvas === true);
const isWaitNodeWaiting = computed(
() =>
workflowExecution.value?.status === 'waiting' &&
workflowExecution.value.data?.waitTill &&
workflowExecution.value?.data?.resultData?.lastNodeExecuted === node.value?.name,
);
const isWaitNodeWaiting = computed(() => {
return (
node.value?.name &&
workflowExecution.value?.data?.resultData?.runData?.[node.value?.name]?.[props.runIndex]
?.executionStatus === 'waiting'
);
});
const { activeNode } = storeToRefs(ndvStore);
const nodeType = computed(() => {
@ -1508,7 +1509,11 @@ defineExpose({ enterEditMode });
</div>
<div ref="dataContainerRef" :class="$style.dataContainer" data-test-id="ndv-data-container">
<div v-if="isExecuting" :class="$style.center" data-test-id="ndv-executing">
<div
v-if="isExecuting && !isWaitNodeWaiting"
:class="$style.center"
data-test-id="ndv-executing"
>
<div :class="$style.spinner"><N8nSpinner type="ring" /></div>
<N8nText>{{ executingMessage }}</N8nText>
</div>

View file

@ -119,13 +119,6 @@ describe('Canvas', () => {
[
{
id: '1',
type: 'position',
dragging: true,
from: {
x: 100,
y: 100,
z: 0,
},
position: { x: 120, y: 120 },
},
],

View file

@ -10,8 +10,8 @@ import type {
Connection,
XYPosition,
ViewportTransform,
NodeChange,
NodePositionChange,
NodeDragEvent,
GraphNode,
} from '@vue-flow/core';
import { useVueFlow, VueFlow, PanelPosition, MarkerType } from '@vue-flow/core';
import { Background } from '@vue-flow/background';
@ -29,9 +29,10 @@ import type { PinDataSource } from '@/composables/usePinnedData';
import { isPresent } from '@/utils/typesUtils';
import { GRID_SIZE } from '@/utils/nodeViewUtils';
import { CanvasKey } from '@/constants';
import { onKeyDown, onKeyUp, useDebounceFn } from '@vueuse/core';
import { onKeyDown, onKeyUp } from '@vueuse/core';
import CanvasArrowHeadMarker from './elements/edges/CanvasArrowHeadMarker.vue';
import CanvasBackgroundStripedPattern from './elements/CanvasBackgroundStripedPattern.vue';
import { useCanvasTraversal } from '@/composables/useCanvasTraversal';
const $style = useCssModule();
@ -97,6 +98,7 @@ const props = withDefaults(
const { controlKeyCode } = useDeviceSupport();
const vueFlow = useVueFlow({ id: props.id, deleteKeyCode: null });
const {
getSelectedNodes: selectedNodes,
addSelectedNodes,
@ -113,7 +115,14 @@ const {
onPaneReady,
findNode,
viewport,
} = useVueFlow({ id: props.id, deleteKeyCode: null });
} = vueFlow;
const {
getIncomingNodes,
getOutgoingNodes,
getSiblingNodes,
getDownstreamNodes,
getUpstreamNodes,
} = useCanvasTraversal(vueFlow);
const isPaneReady = ref(false);
@ -131,7 +140,6 @@ const disableKeyBindings = computed(() => !props.keyBindings);
/**
* @see https://developer.mozilla.org/en-US/docs/Web/API/UI_Events/Keyboard_event_key_values#whitespace_keys
*/
const panningKeyCode = ref<string[]>([' ', controlKeyCode]);
const panningMouseButton = ref<number[]>([1]);
const selectionKeyCode = ref<true | null>(true);
@ -144,6 +152,54 @@ onKeyUp(panningKeyCode.value, () => {
selectionKeyCode.value = true;
});
function selectLeftNode(id: string) {
const incomingNodes = getIncomingNodes(id);
const previousNode = incomingNodes[0];
if (previousNode) {
onSelectNodes({ ids: [previousNode.id] });
}
}
function selectRightNode(id: string) {
const outgoingNodes = getOutgoingNodes(id);
const nextNode = outgoingNodes[0];
if (nextNode) {
onSelectNodes({ ids: [nextNode.id] });
}
}
function selectLowerSiblingNode(id: string) {
const siblingNodes = getSiblingNodes(id);
const index = siblingNodes.findIndex((n) => n.id === id);
const nextNode = siblingNodes[index + 1] ?? siblingNodes[0];
if (nextNode) {
onSelectNodes({
ids: [nextNode.id],
});
}
}
function selectUpperSiblingNode(id: string) {
const siblingNodes = getSiblingNodes(id);
const index = siblingNodes.findIndex((n) => n.id === id);
const previousNode = siblingNodes[index - 1] ?? siblingNodes[siblingNodes.length - 1];
if (previousNode) {
onSelectNodes({
ids: [previousNode.id],
});
}
}
function selectDownstreamNodes(id: string) {
const downstreamNodes = getDownstreamNodes(id);
onSelectNodes({ ids: [...downstreamNodes.map((node) => node.id), id] });
}
function selectUpstreamNodes(id: string) {
const upstreamNodes = getUpstreamNodes(id);
onSelectNodes({ ids: [...upstreamNodes.map((node) => node.id), id] });
}
const keyMap = computed(() => ({
ctrl_c: emitWithSelectedNodes((ids) => emit('copy:nodes', ids)),
enter: emitWithLastSelectedNode((id) => onSetNodeActive(id)),
@ -152,6 +208,12 @@ const keyMap = computed(() => ({
'-|_': async () => await onZoomOut(),
0: async () => await onResetZoom(),
1: async () => await onFitView(),
ArrowUp: emitWithLastSelectedNode(selectUpperSiblingNode),
ArrowDown: emitWithLastSelectedNode(selectLowerSiblingNode),
ArrowLeft: emitWithLastSelectedNode(selectLeftNode),
ArrowRight: emitWithLastSelectedNode(selectRightNode),
shift_ArrowLeft: emitWithLastSelectedNode(selectUpstreamNodes),
shift_ArrowRight: emitWithLastSelectedNode(selectDownstreamNodes),
// @TODO implement arrow key shortcuts to modify selection
...(props.readOnly
@ -177,31 +239,30 @@ useKeybindings(keyMap, { disabled: disableKeyBindings });
* Nodes
*/
const lastSelectedNode = computed(() => selectedNodes.value[selectedNodes.value.length - 1]);
const hasSelection = computed(() => selectedNodes.value.length > 0);
const selectedNodeIds = computed(() => selectedNodes.value.map((node) => node.id));
const lastSelectedNode = ref<GraphNode>();
watch(selectedNodes, (nodes) => {
if (!lastSelectedNode.value || !nodes.find((node) => node.id === lastSelectedNode.value?.id)) {
lastSelectedNode.value = nodes[nodes.length - 1];
}
});
function onClickNodeAdd(id: string, handle: string) {
emit('click:node:add', id, handle);
}
// Debounced to prevent emitting too many events, necessary for undo/redo
const onUpdateNodesPosition = useDebounceFn((events: NodePositionChange[]) => {
function onUpdateNodesPosition(events: CanvasNodeMoveEvent[]) {
emit('update:nodes:position', events);
}, 200);
}
function onUpdateNodePosition(id: string, position: XYPosition) {
emit('update:node:position', id, position);
}
const isPositionChangeEvent = (event: NodeChange): event is NodePositionChange =>
event.type === 'position' && 'position' in event;
function onNodesChange(events: NodeChange[]) {
const positionChangeEndEvents = events.filter(isPositionChangeEvent);
if (positionChangeEndEvents.length > 0) {
void onUpdateNodesPosition(positionChangeEndEvents);
}
function onNodeDragStop(event: NodeDragEvent) {
onUpdateNodesPosition(event.nodes.map(({ id, position }) => ({ id, position })));
}
function onSetNodeActive(id: string) {
@ -517,6 +578,7 @@ provide(CanvasKey, {
:class="classes"
:selection-key-code="selectionKeyCode"
:pan-activation-key-code="panningKeyCode"
:disable-keyboard-a11y="true"
data-test-id="canvas"
@connect-start="onConnectStart"
@connect="onConnect"
@ -524,9 +586,9 @@ provide(CanvasKey, {
@pane-click="onClickPane"
@contextmenu="onOpenContextMenu"
@viewport-change="onViewportChange"
@nodes-change="onNodesChange"
@move-start="onPaneMoveStart"
@move-end="onPaneMoveEnd"
@node-drag-stop="onNodeDragStop"
>
<template #node-canvas-node="canvasNodeProps">
<Node

View file

@ -0,0 +1,180 @@
import { useCanvasTraversal } from '@/composables/useCanvasTraversal';
import type { CanvasNode } from '@/types';
import type { VueFlowStore } from '@vue-flow/core';
import { mock } from 'vitest-mock-extended';
describe('useCanvasTraversal', () => {
const mockGetIncomers = vi.fn();
const mockGetOutgoers = vi.fn();
const vueFlow = mock<VueFlowStore>({
getIncomers: mockGetIncomers,
getOutgoers: mockGetOutgoers,
});
const {
sortNodesByVerticalPosition,
getIncomingNodes,
getOutgoingNodes,
getSiblingNodes,
getDownstreamNodes,
getUpstreamNodes,
} = useCanvasTraversal(vueFlow);
describe('sortNodesByVerticalPosition', () => {
it('should sort nodes by their vertical position', () => {
const nodes: CanvasNode[] = [
{ id: '1', position: { x: 0, y: 200 } },
{ id: '2', position: { x: 0, y: 100 } },
];
const result = sortNodesByVerticalPosition(nodes);
expect(result).toEqual([
{ id: '2', position: { x: 0, y: 100 } },
{ id: '1', position: { x: 0, y: 200 } },
]);
});
});
describe('getIncomingNodes', () => {
it('should return sorted incoming nodes by vertical position', () => {
const incomingNodes = [
{ id: '1', position: { x: 0, y: 200 } },
{ id: '2', position: { x: 0, y: 100 } },
];
mockGetIncomers.mockReturnValue([...incomingNodes]);
const result = getIncomingNodes('3');
expect(result).toEqual([incomingNodes[1], incomingNodes[0]]);
});
});
describe('getOutgoingNodes', () => {
it('should return sorted outgoing nodes by vertical position', () => {
const outgoingNodes = [
{ id: '1', position: { x: 0, y: 300 } },
{ id: '2', position: { x: 0, y: 150 } },
];
mockGetOutgoers.mockReturnValue([...outgoingNodes]);
const result = getOutgoingNodes('3');
expect(result).toEqual([outgoingNodes[1], outgoingNodes[0]]);
});
});
describe('getSiblingNodes', () => {
it('should return sorted sibling nodes by vertical position', () => {
const incomingNodes = [{ id: '1', position: { x: 0, y: 200 } }];
const incomingNodesOutgoingChildren = [{ id: '2', position: { x: 0, y: 400 } }];
const outgoingNodes = [{ id: '3', position: { x: 0, y: 300 } }];
const outgoingNodesIncomingChildren = [{ id: '4', position: { x: 0, y: 500 } }];
mockGetIncomers.mockReturnValueOnce(incomingNodes);
mockGetOutgoers.mockReturnValueOnce(incomingNodesOutgoingChildren);
mockGetOutgoers.mockReturnValueOnce(outgoingNodes);
mockGetIncomers.mockReturnValueOnce(outgoingNodesIncomingChildren);
const result = getSiblingNodes('5');
expect(result).toEqual([...incomingNodesOutgoingChildren, ...outgoingNodesIncomingChildren]);
});
});
describe('getDownstreamNodes', () => {
it('should return sorted downstream nodes by vertical position - one level', () => {
const outgoingNodes = [{ id: '2', position: { x: 0, y: 100 } }];
mockGetOutgoers.mockReturnValue(outgoingNodes);
const result = getDownstreamNodes('1');
expect(result).toEqual([{ id: '2', position: { x: 0, y: 100 } }]);
});
it('should return sorted downstream nodes by vertical position - multiple levels', () => {
const outgoingNodes1 = [{ id: '1', position: { x: 0, y: 100 } }];
const outgoingNodes2 = [{ id: '2', position: { x: 0, y: 200 } }];
const outgoingNodes3 = [{ id: '3', position: { x: 0, y: 300 } }];
mockGetOutgoers
.mockReturnValueOnce(outgoingNodes1)
.mockReturnValueOnce(outgoingNodes2)
.mockReturnValueOnce(outgoingNodes3);
const result = getDownstreamNodes('4');
expect(result).toEqual([...outgoingNodes1, ...outgoingNodes2, ...outgoingNodes3]);
});
it('should handle circular references in downstream nodes', () => {
const outgoingNodes1 = [{ id: '1', position: { x: 0, y: 100 } }];
const outgoingNodes2 = [
{ id: '1', position: { x: 0, y: 100 } },
{ id: '2', position: { x: 0, y: 300 } },
{ id: '3', position: { x: 0, y: 400 } },
];
const outgoingNodes3 = [
{ id: '1', position: { x: 0, y: 100 } },
{ id: '4', position: { x: 0, y: 600 } },
];
mockGetOutgoers
.mockReturnValueOnce(outgoingNodes1)
.mockReturnValueOnce(outgoingNodes2)
.mockReturnValueOnce(outgoingNodes3);
const result = getDownstreamNodes('5');
expect(result).toEqual([
outgoingNodes1[0],
outgoingNodes2[1],
outgoingNodes2[2],
outgoingNodes3[1],
]);
});
});
describe('getUpstreamNodes', () => {
it('should return sorted upstream nodes by vertical position - one level', () => {
const incomingNodes = [{ id: '2', position: { x: 0, y: 100 } }];
mockGetIncomers.mockReturnValue(incomingNodes);
const result = getUpstreamNodes('1');
expect(result).toEqual([{ id: '2', position: { x: 0, y: 100 } }]);
});
it('should return sorted upstream nodes by vertical position - multiple levels', () => {
const incomingNodes1 = [{ id: '1', position: { x: 0, y: 100 } }];
const incomingNodes2 = [{ id: '2', position: { x: 0, y: 200 } }];
const incomingNodes3 = [{ id: '3', position: { x: 0, y: 300 } }];
mockGetIncomers
.mockReturnValueOnce(incomingNodes1)
.mockReturnValueOnce(incomingNodes2)
.mockReturnValueOnce(incomingNodes3);
const result = getUpstreamNodes('4');
expect(result).toEqual([...incomingNodes1, ...incomingNodes2, ...incomingNodes3]);
});
it('should handle circular references in upstream nodes', () => {
const incomingNodes1 = [{ id: '1', position: { x: 0, y: 100 } }];
const incomingNodes2 = [
{ id: '1', position: { x: 0, y: 100 } },
{ id: '2', position: { x: 0, y: 300 } },
{ id: '3', position: { x: 0, y: 400 } },
];
const incomingNodes3 = [
{ id: '1', position: { x: 0, y: 100 } },
{ id: '4', position: { x: 0, y: 600 } },
];
mockGetIncomers
.mockReturnValueOnce(incomingNodes1)
.mockReturnValueOnce(incomingNodes2)
.mockReturnValueOnce(incomingNodes3);
const result = getUpstreamNodes('5');
expect(result).toEqual([
incomingNodes1[0],
incomingNodes2[1],
incomingNodes2[2],
incomingNodes3[1],
]);
});
});
});

View file

@ -0,0 +1,68 @@
import type { CanvasNode } from '@/types';
import type { VueFlowStore } from '@vue-flow/core';
export function useCanvasTraversal({ getIncomers, getOutgoers }: VueFlowStore) {
function sortNodesByVerticalPosition(nodes: CanvasNode[]) {
return nodes.sort((a, b) => a.position.y - b.position.y);
}
function getIncomingNodes(id: string) {
return sortNodesByVerticalPosition(getIncomers(id));
}
function getOutgoingNodes(id: string) {
return sortNodesByVerticalPosition(getOutgoers(id));
}
function getSiblingNodes(id: string) {
const incomingSiblings = getIncomers(id).flatMap((incomingNode) =>
getOutgoers(incomingNode.id),
);
const outgoingSiblings = getOutgoers(id).flatMap((outgoingNode) =>
getIncomers(outgoingNode.id),
);
return sortNodesByVerticalPosition(
[...incomingSiblings, ...outgoingSiblings].filter(
(node, index, nodes) => nodes.findIndex((n) => n.id === node.id) === index,
),
);
}
function getDownstreamNodes(id: string, visited: string[] = []): CanvasNode[] {
if (visited.includes(id)) {
return [];
}
visited.push(id);
const downstreamNodes = getOutgoers(id);
return [
...downstreamNodes,
...downstreamNodes.flatMap((node) => getDownstreamNodes(node.id, visited)),
].filter((node, index, nodes) => nodes.findIndex((n) => n.id === node.id) === index);
}
function getUpstreamNodes(id: string, visited: string[] = []): CanvasNode[] {
if (visited.includes(id)) {
return [];
}
visited.push(id);
const upstreamNodes = getIncomers(id);
return [
...upstreamNodes,
...upstreamNodes.flatMap((node) => getUpstreamNodes(node.id, visited)),
].filter((node, index, nodes) => nodes.findIndex((n) => n.id === node.id) === index);
}
return {
sortNodesByVerticalPosition,
getIncomingNodes,
getOutgoingNodes,
getSiblingNodes,
getDownstreamNodes,
getUpstreamNodes,
};
}

View file

@ -446,13 +446,14 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
runDataExecutedStartData: iRunExecutionData.startData,
resultDataError: iRunExecutionData.resultData.error,
});
} else if (receivedData.type === 'executionWaiting') {
// Nothing to do
} else if (receivedData.type === 'executionStarted') {
// Nothing to do
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
workflowsStore.addNodeExecutionData(pushData);
workflowsStore.removeExecutingNode(pushData.nodeName);
workflowsStore.updateNodeExecutionData(pushData);
void assistantStore.onNodeExecution(pushData);
} else if (receivedData.type === 'nodeExecuteBefore') {
// A node started to be executed. Set it as executing.

View file

@ -6,7 +6,7 @@ import { ExpressionError, type IPinData, type IRunData, type Workflow } from 'n8
import { useRootStore } from '@/stores/root.store';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import type { IExecutionResponse, IStartRunData, IWorkflowData } from '@/Interface';
import type { IStartRunData, IWorkflowData } from '@/Interface';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
@ -321,77 +321,4 @@ describe('useRunWorkflow({ router })', () => {
expect(result.runData).toEqual(undefined);
});
});
describe('useRunWorkflow({ router }) - runWorkflowResolvePending', () => {
let uiStore: ReturnType<typeof useUIStore>;
let workflowsStore: ReturnType<typeof useWorkflowsStore>;
let router: ReturnType<typeof useRouter>;
beforeAll(() => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
rootStore = useRootStore();
uiStore = useUIStore();
workflowsStore = useWorkflowsStore();
router = useRouter();
workflowHelpers = useWorkflowHelpers({ router });
});
beforeEach(() => {
uiStore.activeActions = [];
vi.mocked(workflowsStore).runWorkflow.mockReset();
});
it('should resolve when runWorkflow finished', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: true,
workflowData: { nodes: [] },
} as unknown as IExecutionResponse);
vi.mocked(workflowsStore).workflowExecutionData = {
id: '123',
} as unknown as IExecutionResponse;
const result = await runWorkflowResolvePending({});
expect(result).toEqual(mockExecutionResponse);
});
it('should return when workflowExecutionData is null', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: true,
} as unknown as IExecutionResponse);
vi.mocked(workflowsStore).workflowExecutionData = null;
const result = await runWorkflowResolvePending({});
expect(result).toEqual(mockExecutionResponse);
});
it('should handle workflow execution error properly', async () => {
const { runWorkflowResolvePending } = useRunWorkflow({ router });
const mockExecutionResponse = { executionId: '123' };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).allNodes = [];
vi.mocked(workflowsStore).getExecution.mockResolvedValue({
finished: false,
status: 'error',
} as unknown as IExecutionResponse);
await runWorkflowResolvePending({});
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalled();
expect(workflowsStore.workflowExecutionData).toBe(null);
});
});
});

View file

@ -17,17 +17,17 @@ import type {
IDataObject,
} from 'n8n-workflow';
import { FORM_NODE_TYPE, NodeConnectionType } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { useToast } from '@/composables/useToast';
import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { CHAT_TRIGGER_NODE_TYPE, FORM_TRIGGER_NODE_TYPE, WAIT_NODE_TYPE } from '@/constants';
import { CHAT_TRIGGER_NODE_TYPE } from '@/constants';
import { useRootStore } from '@/stores/root.store';
import { useUIStore } from '@/stores/ui.store';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { displayForm, openPopUpWindow } from '@/utils/executionUtils';
import { displayForm } from '@/utils/executionUtils';
import { useExternalHooks } from '@/composables/useExternalHooks';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import type { useRouter } from 'vue-router';
@ -37,8 +37,6 @@ import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store';
import { useLocalStorage } from '@vueuse/core';
const FORM_RELOAD = 'n8n_redirect_to_next_form_test_page';
export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof useRouter> }) {
const nodeHelpers = useNodeHelpers();
const workflowHelpers = useWorkflowHelpers({ router: useRunWorkflowOpts.router });
@ -303,152 +301,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
}
}
function getFormResumeUrl(node: INode, executionId: string) {
const { webhookSuffix } = (node.parameters.options ?? {}) as IDataObject;
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
const testUrl = `${rootStore.formWaitingUrl}/${executionId}${suffix}`;
return testUrl;
}
async function runWorkflowResolvePending(options: {
destinationNode?: string;
triggerNode?: string;
nodeData?: ITaskData;
source?: string;
}): Promise<IExecutionPushResponse | undefined> {
let runWorkflowApiResponse = await runWorkflow(options);
let { executionId } = runWorkflowApiResponse || {};
const MAX_DELAY = 3000;
const waitForWebhook = async (): Promise<string> => {
return await new Promise<string>((resolve) => {
let delay = 300;
let timeoutId: NodeJS.Timeout | null = null;
const checkWebhook = async () => {
await useExternalHooks().run('workflowRun.runWorkflow', {
nodeName: options.destinationNode,
source: options.source,
});
if (workflowsStore.activeExecutionId) {
executionId = workflowsStore.activeExecutionId;
runWorkflowApiResponse = { executionId };
if (timeoutId) clearTimeout(timeoutId);
resolve(executionId);
}
delay = Math.min(delay * 1.1, MAX_DELAY);
timeoutId = setTimeout(checkWebhook, delay);
};
timeoutId = setTimeout(checkWebhook, delay);
});
};
if (!executionId) executionId = await waitForWebhook();
let isFormShown =
!options.destinationNode &&
workflowsStore.allNodes.some(
(node) =>
node.type === FORM_TRIGGER_NODE_TYPE && !workflowsStore?.pinnedWorkflowData?.[node.name],
);
const resolveWaitingNodesData = async (): Promise<void> => {
return await new Promise<void>((resolve) => {
let delay = 300;
let timeoutId: NodeJS.Timeout | null = null;
const processExecution = async () => {
await useExternalHooks().run('workflowRun.runWorkflow', {
nodeName: options.destinationNode,
source: options.source,
});
const execution = await workflowsStore.getExecution((executionId as string) || '');
localStorage.removeItem(FORM_RELOAD);
if (!execution || workflowsStore.workflowExecutionData === null) {
uiStore.removeActiveAction('workflowRunning');
if (timeoutId) clearTimeout(timeoutId);
resolve();
return;
}
const { lastNodeExecuted } = execution.data?.resultData || {};
const lastNode = execution.workflowData.nodes.find((node) => {
return node.name === lastNodeExecuted;
});
if (
execution.finished ||
['error', 'canceled', 'crashed', 'success'].includes(execution.status)
) {
workflowsStore.setWorkflowExecutionData(execution);
uiStore.removeActiveAction('workflowRunning');
workflowsStore.activeExecutionId = null;
if (timeoutId) clearTimeout(timeoutId);
resolve();
return;
}
if (execution.status === 'waiting' && execution.data?.waitTill) {
delete execution.data.resultData.runData[
execution.data.resultData.lastNodeExecuted as string
];
workflowsStore.setWorkflowExecutionRunData(execution.data);
if (
lastNode &&
(lastNode.type === FORM_NODE_TYPE ||
(lastNode.type === WAIT_NODE_TYPE && lastNode.parameters.resume === 'form'))
) {
let testUrl = getFormResumeUrl(lastNode, executionId as string);
if (isFormShown) {
localStorage.setItem(FORM_RELOAD, testUrl);
} else {
if (options.destinationNode) {
// Check if the form trigger has starting data
// if not do not show next form as trigger would redirect to page
// otherwise there would be duplicate popup
const formTrigger = execution?.workflowData.nodes.find((node) => {
return node.type === FORM_TRIGGER_NODE_TYPE;
});
const runNodeFilter = execution?.data?.startData?.runNodeFilter || [];
if (formTrigger && !runNodeFilter.includes(formTrigger.name)) {
isFormShown = true;
}
}
if (!isFormShown) {
if (lastNode.type === FORM_NODE_TYPE) {
testUrl = `${rootStore.formWaitingUrl}/${executionId}`;
} else {
testUrl = getFormResumeUrl(lastNode, executionId as string);
}
isFormShown = true;
if (testUrl) openPopUpWindow(testUrl);
}
}
}
}
delay = Math.min(delay * 1.1, MAX_DELAY);
timeoutId = setTimeout(processExecution, delay);
};
timeoutId = setTimeout(processExecution, delay);
});
};
await resolveWaitingNodesData();
return runWorkflowApiResponse;
}
function consolidateRunDataAndStartNodes(
directParentNodes: string[],
runData: IRunData | null,
@ -514,10 +366,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
if (execution === undefined) {
// execution finished but was not saved (e.g. due to low connectivity)
workflowsStore.executingNode.length = 0;
uiStore.removeActiveAction('workflowRunning');
workflowHelpers.setDocumentTitle(workflowsStore.workflowName, 'IDLE');
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionCatch.unsaved.title'),
message: i18n.baseText('nodeView.showMessage.stopExecutionCatch.unsaved.message'),
@ -532,10 +380,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
} as IRun;
workflowHelpers.setDocumentTitle(execution.workflowData.name, 'IDLE');
workflowsStore.executingNode.length = 0;
workflowsStore.setWorkflowExecutionData(executedData as IExecutionResponse);
uiStore.removeActiveAction('workflowRunning');
toast.showMessage({
title: i18n.baseText('nodeView.showMessage.stopExecutionCatch.title'),
message: i18n.baseText('nodeView.showMessage.stopExecutionCatch.message'),
@ -544,6 +389,8 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
} else {
toast.showError(error, i18n.baseText('nodeView.showError.stopExecution.title'));
}
} finally {
workflowsStore.markExecutionAsStopped();
}
}
@ -559,7 +406,6 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
return {
consolidateRunDataAndStartNodes,
runWorkflow,
runWorkflowResolvePending,
runWorkflowApi,
stopCurrentExecution,
stopWaitingForWebhook,

View file

@ -19,6 +19,7 @@ import { useUIStore } from '@/stores/ui.store';
import type { PushPayload } from '@n8n/api-types';
import { flushPromises } from '@vue/test-utils';
import { useNDVStore } from '@/stores/ndv.store';
import { mock } from 'vitest-mock-extended';
vi.mock('@/stores/ndv.store', () => ({
useNDVStore: vi.fn(() => ({
@ -523,20 +524,24 @@ describe('useWorkflowsStore', () => {
});
});
describe('addNodeExecutionData', () => {
const { successEvent, errorEvent, executionReponse } = generateMockExecutionEvents();
it('should throw error if not initalized', () => {
expect(() => workflowsStore.addNodeExecutionData(successEvent)).toThrowError();
describe('updateNodeExecutionData', () => {
const { successEvent, errorEvent, executionResponse } = generateMockExecutionEvents();
it('should throw error if not initialized', () => {
expect(() => workflowsStore.updateNodeExecutionData(successEvent)).toThrowError();
});
it('should add node success run data', () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
workflowsStore.setWorkflowExecutionData(executionResponse);
workflowsStore.nodesByName[successEvent.nodeName] = mock<INodeUi>({
type: 'n8n-nodes-base.manualTrigger',
});
// ACT
workflowsStore.addNodeExecutionData(successEvent);
workflowsStore.updateNodeExecutionData(successEvent);
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
...executionResponse,
data: {
resultData: {
runData: {
@ -548,7 +553,7 @@ describe('useWorkflowsStore', () => {
});
it('should add node error event and track errored executions', async () => {
workflowsStore.setWorkflowExecutionData(executionReponse);
workflowsStore.setWorkflowExecutionData(executionResponse);
workflowsStore.addNode({
parameters: {},
id: '554c7ff4-7ee2-407c-8931-e34234c5056a',
@ -561,11 +566,11 @@ describe('useWorkflowsStore', () => {
getNodeType.mockReturnValue(getMockEditFieldsNode());
// ACT
workflowsStore.addNodeExecutionData(errorEvent);
workflowsStore.updateNodeExecutionData(errorEvent);
await flushPromises();
expect(workflowsStore.workflowExecutionData).toEqual({
...executionReponse,
...executionResponse,
data: {
resultData: {
runData: {
@ -636,7 +641,7 @@ function getMockEditFieldsNode() {
}
function generateMockExecutionEvents() {
const executionReponse: IExecutionResponse = {
const executionResponse: IExecutionResponse = {
id: '1',
workflowData: {
id: '1',
@ -737,5 +742,5 @@ function generateMockExecutionEvents() {
},
};
return { executionReponse, errorEvent, successEvent };
return { executionResponse, errorEvent, successEvent };
}

View file

@ -83,6 +83,7 @@ import { TelemetryHelpers } from 'n8n-workflow';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useRouter } from 'vue-router';
import { useSettingsStore } from './settings.store';
import { openPopUpWindow } from '@/utils/executionUtils';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '',
@ -114,6 +115,8 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const router = useRouter();
const workflowHelpers = useWorkflowHelpers({ router });
const settingsStore = useSettingsStore();
const rootStore = useRootStore();
// -1 means the backend chooses the default
// 0 is the old flow
// 1 is the new flow
@ -137,6 +140,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const chatMessages = ref<string[]>([]);
const isChatPanelOpen = ref(false);
const isLogsPanelOpen = ref(false);
const formPopupWindow = ref<Window | null>(null);
const workflowName = computed(() => workflow.value.name);
@ -453,14 +457,12 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function getWorkflowFromUrl(url: string): Promise<IWorkflowDb> {
const rootStore = useRootStore();
return await makeRestApiRequest(rootStore.restApiContext, 'GET', '/workflows/from-url', {
url,
});
}
async function getActivationError(id: string): Promise<string | undefined> {
const rootStore = useRootStore();
return await makeRestApiRequest(
rootStore.restApiContext,
'GET',
@ -469,8 +471,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchAllWorkflows(projectId?: string): Promise<IWorkflowDb[]> {
const rootStore = useRootStore();
const filter = {
projectId,
};
@ -484,7 +484,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchWorkflow(id: string): Promise<IWorkflowDb> {
const rootStore = useRootStore();
const workflowData = await workflowsApi.getWorkflow(rootStore.restApiContext, id);
addWorkflow(workflowData);
return workflowData;
@ -497,8 +496,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
settings: { ...defaults.settings },
};
try {
const rootStore = useRootStore();
const data: IDataObject = {
name,
projectId,
@ -632,7 +629,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function deleteWorkflow(id: string) {
const rootStore = useRootStore();
await makeRestApiRequest(rootStore.restApiContext, 'DELETE', `/workflows/${id}`);
const { [id]: deletedWorkflow, ...workflows } = workflowsById.value;
workflowsById.value = workflows;
@ -676,7 +672,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchActiveWorkflows(): Promise<string[]> {
const rootStore = useRootStore();
const data = await workflowsApi.getActiveWorkflows(rootStore.restApiContext);
activeWorkflows.value = data;
return data;
@ -696,7 +691,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
let newName = `${currentWorkflowName}${DUPLICATE_POSTFFIX}`;
try {
const rootStore = useRootStore();
const newWorkflow = await workflowsApi.getNewWorkflow(rootStore.restApiContext, {
name: newName,
});
@ -1276,12 +1270,24 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
}
function addNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
function getFormResumeUrl(node: INode, executionId: string) {
const { webhookSuffix } = (node.parameters.options ?? {}) as IDataObject;
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
const testUrl = `${rootStore.formWaitingUrl}/${executionId}${suffix}`;
return testUrl;
}
function updateNodeExecutionData(pushData: PushPayload<'nodeExecuteAfter'>): void {
if (!workflowExecutionData.value?.data) {
throw new Error('The "workflowExecutionData" is not initialized!');
}
if (workflowExecutionData.value.data.resultData.runData[pushData.nodeName] === undefined) {
const { nodeName, data, executionId } = pushData;
const isNodeWaiting = data.executionStatus === 'waiting';
const node = getNodeByName(nodeName);
if (!node) return;
if (workflowExecutionData.value.data.resultData.runData[nodeName] === undefined) {
workflowExecutionData.value = {
...workflowExecutionData.value,
data: {
@ -1290,15 +1296,38 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
...workflowExecutionData.value.data.resultData,
runData: {
...workflowExecutionData.value.data.resultData.runData,
[pushData.nodeName]: [],
[nodeName]: [],
},
},
},
};
}
workflowExecutionData.value.data!.resultData.runData[pushData.nodeName].push(pushData.data);
void trackNodeExecution(pushData);
const tasksData = workflowExecutionData.value.data!.resultData.runData[nodeName];
if (isNodeWaiting) {
tasksData.push(data);
if (
node.type === FORM_NODE_TYPE ||
(node.type === WAIT_NODE_TYPE && node.parameters.resume === 'form')
) {
const testUrl = getFormResumeUrl(node, executionId);
if (!formPopupWindow.value || formPopupWindow.value.closed) {
formPopupWindow.value = openPopUpWindow(testUrl);
} else {
formPopupWindow.value.location = testUrl;
formPopupWindow.value.focus();
}
}
} else {
if (tasksData.length && tasksData[tasksData.length - 1].executionStatus === 'waiting') {
tasksData.splice(tasksData.length - 1, 1, data);
} else {
tasksData.push(data);
}
removeExecutingNode(nodeName);
void trackNodeExecution(pushData);
}
}
function clearNodeExecutionData(nodeName: string): void {
@ -1348,12 +1377,10 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
limit,
};
}
const rootStore = useRootStore();
return await makeRestApiRequest(rootStore.restApiContext, 'GET', '/executions', sendData);
}
async function getExecution(id: string): Promise<IExecutionResponse | undefined> {
const rootStore = useRootStore();
const response = await makeRestApiRequest<IExecutionFlattedResponse | undefined>(
rootStore.restApiContext,
'GET',
@ -1367,7 +1394,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
// make sure that the new ones are not active
sendData.active = false;
const rootStore = useRootStore();
const projectStore = useProjectsStore();
if (projectStore.currentProjectId) {
@ -1387,8 +1413,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
data: IWorkflowDataUpdate,
forceSave = false,
): Promise<IWorkflowDb> {
const rootStore = useRootStore();
if (data.settings === null) {
data.settings = undefined;
}
@ -1402,8 +1426,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function runWorkflow(startRunData: IStartRunData): Promise<IExecutionPushResponse> {
const rootStore = useRootStore();
if (startRunData.workflowData.settings === null) {
startRunData.workflowData.settings = undefined;
}
@ -1427,7 +1449,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function removeTestWebhook(targetWorkflowId: string): Promise<boolean> {
const rootStore = useRootStore();
return await makeRestApiRequest(
rootStore.restApiContext,
'DELETE',
@ -1436,7 +1457,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
}
async function fetchExecutionDataById(executionId: string): Promise<IExecutionResponse | null> {
const rootStore = useRootStore();
return await workflowsApi.getExecutionData(rootStore.restApiContext, executionId);
}
@ -1459,7 +1479,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
fileName: string,
mimeType: string,
): string {
const rootStore = useRootStore();
let restUrl = rootStore.restUrl;
if (restUrl.startsWith('/')) restUrl = window.location.origin + restUrl;
const url = new URL(`${restUrl}/binary-data`);
@ -1538,6 +1557,24 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
isLogsPanelOpen.value = isOpen;
}
function markExecutionAsStopped() {
activeExecutionId.value = null;
executingNode.value.length = 0;
executionWaitingForWebhook.value = false;
uiStore.removeActiveAction('workflowRunning');
workflowHelpers.setDocumentTitle(workflowName.value, 'IDLE');
formPopupWindow.value?.close();
formPopupWindow.value = null;
const runData = workflowExecutionData.value?.data?.resultData.runData ?? {};
for (const nodeName in runData) {
runData[nodeName] = runData[nodeName].filter(
({ executionStatus }) => executionStatus === 'success',
);
}
}
return {
workflow,
usedCredentials,
@ -1651,7 +1688,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
setNodeValue,
setNodeParameters,
setLastNodeParameters,
addNodeExecutionData,
updateNodeExecutionData,
clearNodeExecutionData,
pinDataByNodeName,
activeNode,
@ -1675,5 +1712,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
removeNodeExecutionDataById,
setNodes,
setConnections,
markExecutionAsStopped,
};
});

View file

@ -1,6 +1,12 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { displayForm, openPopUpWindow, executionFilterToQueryFilter } from './executionUtils';
import {
displayForm,
openPopUpWindow,
executionFilterToQueryFilter,
waitingNodeTooltip,
} from './executionUtils';
import type { INode, IRunData, IPinData } from 'n8n-workflow';
import { type INodeUi } from '../Interface';
const FORM_TRIGGER_NODE_TYPE = 'formTrigger';
const WAIT_NODE_TYPE = 'waitNode';
@ -13,6 +19,33 @@ vi.mock('./executionUtils', async () => {
};
});
vi.mock('@/stores/root.store', () => ({
useRootStore: () => ({
formWaitingUrl: 'http://localhost:5678/form-waiting',
webhookWaitingUrl: 'http://localhost:5678/webhook-waiting',
}),
}));
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: () => ({
activeExecutionId: '123',
}),
}));
vi.mock('@/plugins/i18n', () => ({
i18n: {
baseText: (key: string) => {
const texts: { [key: string]: string } = {
'ndv.output.waitNodeWaiting': 'Waiting for execution to resume...',
'ndv.output.waitNodeWaitingForFormSubmission': 'Waiting for form submission: ',
'ndv.output.waitNodeWaitingForWebhook': 'Waiting for webhook call: ',
'ndv.output.sendAndWaitWaitingApproval': 'Waiting for approval...',
};
return texts[key] || key;
},
},
}));
describe('displayForm', () => {
const getTestUrlMock = vi.fn();
@ -124,3 +157,116 @@ describe('displayForm', () => {
});
});
});
describe('waitingNodeTooltip', () => {
it('should return empty string for null or undefined node', () => {
expect(waitingNodeTooltip(null)).toBe('');
expect(waitingNodeTooltip(undefined)).toBe('');
});
it('should return default waiting message for time resume types', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'timeInterval',
},
};
expect(waitingNodeTooltip(node)).toBe('Waiting for execution to resume...');
});
it('should return form submission message with URL for form resume type', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'form',
},
};
const expectedUrl = 'http://localhost:5678/form-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for form submission: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should include webhook suffix in URL when provided', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'webhook',
options: {
webhookSuffix: 'test-suffix',
},
},
};
const expectedUrl = 'http://localhost:5678/webhook-waiting/123/test-suffix';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for webhook call: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should handle form node type', () => {
const node: INodeUi = {
id: '1',
name: 'Form',
type: 'n8n-nodes-base.form',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const expectedUrl = 'http://localhost:5678/form-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for form submission: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
it('should handle send and wait operation', () => {
const node: INodeUi = {
id: '1',
name: 'SendWait',
type: 'n8n-nodes-base.sendWait',
typeVersion: 1,
position: [0, 0],
parameters: {
operation: 'sendAndWait',
},
};
expect(waitingNodeTooltip(node)).toBe('Waiting for approval...');
});
it('should ignore object-type webhook suffix', () => {
const node: INodeUi = {
id: '1',
name: 'Wait',
type: 'n8n-nodes-base.wait',
typeVersion: 1,
position: [0, 0],
parameters: {
resume: 'webhook',
options: {
webhookSuffix: { some: 'object' },
},
},
};
const expectedUrl = 'http://localhost:5678/webhook-waiting/123';
expect(waitingNodeTooltip(node)).toBe(
`Waiting for webhook call: <a href="${expectedUrl}" target="_blank">${expectedUrl}</a>`,
);
});
});

View file

@ -6,7 +6,7 @@ import {
type IPinData,
type IRunData,
} from 'n8n-workflow';
import type { ExecutionFilterType, ExecutionsQueryFilter } from '@/Interface';
import type { ExecutionFilterType, ExecutionsQueryFilter, INodeUi } from '@/Interface';
import { isEmpty } from '@/utils/typesUtils';
import { FORM_NODE_TYPE, FORM_TRIGGER_NODE_TYPE } from '../constants';
import { useWorkflowsStore } from '@/stores/workflows.store';
@ -136,18 +136,17 @@ export function displayForm({
}
}
export const waitingNodeTooltip = () => {
export const waitingNodeTooltip = (node: INodeUi | null | undefined) => {
if (!node) return '';
try {
const lastNode =
useWorkflowsStore().workflowExecutionData?.data?.executionData?.nodeExecutionStack[0]?.node;
const resume = lastNode?.parameters?.resume;
const resume = node?.parameters?.resume;
if (resume) {
if (!['webhook', 'form'].includes(resume as string)) {
return i18n.baseText('ndv.output.waitNodeWaiting');
}
const { webhookSuffix } = (lastNode.parameters.options ?? {}) as { webhookSuffix: string };
const { webhookSuffix } = (node.parameters.options ?? {}) as { webhookSuffix: string };
const suffix = webhookSuffix && typeof webhookSuffix !== 'object' ? `/${webhookSuffix}` : '';
let message = '';
@ -168,13 +167,13 @@ export const waitingNodeTooltip = () => {
}
}
if (lastNode?.type === FORM_NODE_TYPE) {
if (node?.type === FORM_NODE_TYPE) {
const message = i18n.baseText('ndv.output.waitNodeWaitingForFormSubmission');
const resumeUrl = `${useRootStore().formWaitingUrl}/${useWorkflowsStore().activeExecutionId}`;
return `${message}<a href="${resumeUrl}" target="_blank">${resumeUrl}</a>`;
}
if (lastNode?.parameters.operation === SEND_AND_WAIT_OPERATION) {
if (node?.parameters.operation === SEND_AND_WAIT_OPERATION) {
return i18n.baseText('ndv.output.sendAndWaitWaitingApproval');
}
} catch (error) {

View file

@ -154,8 +154,7 @@ const { addBeforeUnloadEventBindings, removeBeforeUnloadEventBindings } = useBef
route,
});
const { registerCustomAction, unregisterCustomAction } = useGlobalLinkActions();
const { runWorkflow, runWorkflowResolvePending, stopCurrentExecution, stopWaitingForWebhook } =
useRunWorkflow({ router });
const { runWorkflow, stopCurrentExecution, stopWaitingForWebhook } = useRunWorkflow({ router });
const {
updateNodePosition,
updateNodesPosition,
@ -1011,11 +1010,7 @@ const workflowExecutionData = computed(() => workflowsStore.workflowExecutionDat
async function onRunWorkflow() {
trackRunWorkflow();
if (!isExecutionPreview.value && workflowsStore.isWaitingExecution) {
void runWorkflowResolvePending({});
} else {
void runWorkflow({});
}
void runWorkflow({});
}
function trackRunWorkflow() {
@ -1041,11 +1036,7 @@ async function onRunWorkflowToNode(id: string) {
trackRunWorkflowToNode(node);
if (!isExecutionPreview.value && workflowsStore.isWaitingExecution) {
void runWorkflowResolvePending({ destinationNode: node.name, source: 'Node.executeNode' });
} else {
void runWorkflow({ destinationNode: node.name, source: 'Node.executeNode' });
}
void runWorkflow({ destinationNode: node.name, source: 'Node.executeNode' });
}
function trackRunWorkflowToNode(node: INodeUi) {

View file

@ -232,7 +232,7 @@ export default defineComponent({
const { callDebounced } = useDebounce();
const canvasPanning = useCanvasPanning(nodeViewRootRef, { onMouseMoveEnd });
const workflowHelpers = useWorkflowHelpers({ router });
const { runWorkflow, stopCurrentExecution, runWorkflowResolvePending } = useRunWorkflow({
const { runWorkflow, stopCurrentExecution } = useRunWorkflow({
router,
});
const { addBeforeUnloadEventBindings, removeBeforeUnloadEventBindings } = useBeforeUnload({
@ -254,7 +254,6 @@ export default defineComponent({
onMouseMoveEnd,
workflowHelpers,
runWorkflow,
runWorkflowResolvePending,
stopCurrentExecution,
callDebounced,
...useCanvasMouseSelect(),
@ -852,11 +851,7 @@ export default defineComponent({
this.$telemetry.track('User clicked execute node button', telemetryPayload);
void this.externalHooks.run('nodeView.onRunNode', telemetryPayload);
if (!this.isExecutionPreview && this.workflowsStore.isWaitingExecution) {
void this.runWorkflowResolvePending({ destinationNode: nodeName, source });
} else {
void this.runWorkflow({ destinationNode: nodeName, source });
}
void this.runWorkflow({ destinationNode: nodeName, source });
},
async onOpenChat() {
const telemetryPayload = {
@ -883,11 +878,7 @@ export default defineComponent({
void this.externalHooks.run('nodeView.onRunWorkflow', telemetryPayload);
});
if (!this.isExecutionPreview && this.workflowsStore.isWaitingExecution) {
void this.runWorkflowResolvePending({});
} else {
void this.runWorkflow({});
}
void this.runWorkflow({});
this.refreshEndpointsErrorsState();
},
@ -1758,6 +1749,8 @@ export default defineComponent({
} else {
this.showError(error, this.i18n.baseText('nodeView.showError.stopExecution.title'));
}
} finally {
this.workflowsStore.markExecutionAsStopped();
}
this.stopExecutionInProgress = false;
void this.workflowHelpers.getWorkflowDataToSave().then((workflowData) => {

View file

@ -7,7 +7,7 @@ import type {
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { GOOGLE_DRIVE_FILE_URL_REGEX } from '../constants';
import { GOOGLE_DRIVE_FILE_URL_REGEX, GOOGLE_SHEETS_SHEET_URL_REGEX } from '../constants';
import { apiRequest } from './v2/transport';
import { sheetsSearch, spreadSheetsSearch } from './v2/methods/listSearch';
import { GoogleSheet } from './v2/helpers/GoogleSheet';
@ -137,15 +137,13 @@ export class GoogleSheetsTrigger implements INodeType {
type: 'string',
extractValue: {
type: 'regex',
regex:
'https:\\/\\/docs\\.google\\.com/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+\\/edit\\#gid=([0-9]+)',
regex: GOOGLE_SHEETS_SHEET_URL_REGEX,
},
validation: [
{
type: 'regex',
properties: {
regex:
'https:\\/\\/docs\\.google\\.com/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+\\/edit\\#gid=([0-9]+)',
regex: GOOGLE_SHEETS_SHEET_URL_REGEX,
errorMessage: 'Not a valid Sheet URL',
},
},

View file

@ -1,9 +1,16 @@
import type { IExecuteFunctions, INode, ResourceMapperField } from 'n8n-workflow';
import {
NodeOperationError,
type IExecuteFunctions,
type INode,
type ResourceMapperField,
} from 'n8n-workflow';
import { GoogleSheet } from '../../../v2/helpers/GoogleSheet';
import {
addRowNumber,
autoMapInputData,
checkForSchemaChanges,
getSpreadsheetId,
prepareSheetData,
removeEmptyColumns,
removeEmptyRows,
@ -11,6 +18,8 @@ import {
trimToFirstEmptyRow,
} from '../../../v2/helpers/GoogleSheets.utils';
import { GOOGLE_SHEETS_SHEET_URL_REGEX } from '../../../../constants';
describe('Test Google Sheets, addRowNumber', () => {
it('should add row nomber', () => {
const data = [
@ -444,3 +453,73 @@ describe('Test Google Sheets, checkForSchemaChanges', () => {
).toThrow("Column names were updated after the node's setup");
});
});
describe('Test Google Sheets, getSpreadsheetId', () => {
let mockNode: INode;
beforeEach(() => {
mockNode = { name: 'Google Sheets' } as INode;
jest.clearAllMocks();
});
it('should throw an error if value is empty', () => {
expect(() => getSpreadsheetId(mockNode, 'url', '')).toThrow(NodeOperationError);
});
it('should return the ID from a valid URL', () => {
const url =
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0';
const result = getSpreadsheetId(mockNode, 'url', url);
expect(result).toBe('1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms');
});
it('should return an empty string for an invalid URL', () => {
const url = 'https://docs.google.com/spreadsheets/d/';
const result = getSpreadsheetId(mockNode, 'url', url);
expect(result).toBe('');
});
it('should return the value for documentIdType byId or byList', () => {
const value = '1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms';
expect(getSpreadsheetId(mockNode, 'id', value)).toBe(value);
expect(getSpreadsheetId(mockNode, 'list', value)).toBe(value);
});
});
describe('Test Google Sheets, Google Sheets Sheet URL Regex', () => {
const regex = new RegExp(GOOGLE_SHEETS_SHEET_URL_REGEX);
it('should match a valid Google Sheets URL', () => {
const urls = [
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0',
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=123456',
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit?gid=654321#gid=654321',
];
for (const url of urls) {
expect(regex.test(url)).toBe(true);
}
});
it('should not match an invalid Google Sheets URL', () => {
const url = 'https://docs.google.com/spreadsheets/d/';
expect(regex.test(url)).toBe(false);
});
it('should not match a URL that does not match the pattern', () => {
const url =
'https://example.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=0';
expect(regex.test(url)).toBe(false);
});
it('should extract the gid from a valid Google Sheets URL', () => {
const urls = [
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit#gid=12345',
'https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit?gid=12345#gid=12345',
];
for (const url of urls) {
const match = url.match(regex);
expect(match).not.toBeNull();
expect(match?.[1]).toBe('12345');
}
});
});

View file

@ -1,5 +1,5 @@
import type { INodeProperties } from 'n8n-workflow';
import { GOOGLE_DRIVE_FILE_URL_REGEX } from '../../../../constants';
import { GOOGLE_DRIVE_FILE_URL_REGEX, GOOGLE_SHEETS_SHEET_URL_REGEX } from '../../../../constants';
import * as append from './append.operation';
import * as appendOrUpdate from './appendOrUpdate.operation';
import * as clear from './clear.operation';
@ -156,15 +156,13 @@ export const descriptions: INodeProperties[] = [
type: 'string',
extractValue: {
type: 'regex',
regex:
'https:\\/\\/docs\\.google.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)',
regex: GOOGLE_SHEETS_SHEET_URL_REGEX,
},
validation: [
{
type: 'regex',
properties: {
regex:
'https:\\/\\/docs\\.google.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)',
regex: GOOGLE_SHEETS_SHEET_URL_REGEX,
errorMessage: 'Not a valid Sheet URL',
},
},

View file

@ -3,3 +3,6 @@ export const GOOGLE_DRIVE_FILE_URL_REGEX =
export const GOOGLE_DRIVE_FOLDER_URL_REGEX =
'https:\\/\\/drive\\.google\\.com(?:\\/.*|)\\/folders\\/([0-9a-zA-Z\\-_]+)(?:\\/.*|)';
export const GOOGLE_SHEETS_SHEET_URL_REGEX =
'https:\\/\\/docs\\.google\\.com\\/spreadsheets\\/d\\/[0-9a-zA-Z\\-_]+.*\\#gid=([0-9]+)';