Merge branch 'master' into node-1608-credential-parameters-tech-debt-project

This commit is contained in:
Elias Meire 2024-10-10 09:51:26 +02:00
commit c3de63fc68
No known key found for this signature in database
226 changed files with 7845 additions and 4819 deletions

View file

@ -1,3 +1,50 @@
# [1.63.0](https://github.com/n8n-io/n8n/compare/n8n@1.62.1...n8n@1.63.0) (2024-10-09)
### Bug Fixes
* **Convert to File Node:** Convert to ICS start date defaults to now ([#11114](https://github.com/n8n-io/n8n/issues/11114)) ([1146c4e](https://github.com/n8n-io/n8n/commit/1146c4e98d8c85c15ac67fa1c3bfb731234531e3))
* **core:** Allow loading nodes from multiple custom directories ([#11130](https://github.com/n8n-io/n8n/issues/11130)) ([1b84b0e](https://github.com/n8n-io/n8n/commit/1b84b0e5e7485d9f99d61a8ae3df49efadca0745))
* **core:** Always set `startedAt` when executions start running ([#11098](https://github.com/n8n-io/n8n/issues/11098)) ([722f4a8](https://github.com/n8n-io/n8n/commit/722f4a8b771058800b992a482ad5f644b650960d))
* **core:** Fix AI nodes not working with new partial execution flow ([#11055](https://github.com/n8n-io/n8n/issues/11055)) ([0eee5df](https://github.com/n8n-io/n8n/commit/0eee5dfd597817819dbe0463a63f671fde53432f))
* **core:** Print errors that happen before the execution starts on the worker instead of just on the main instance ([#11099](https://github.com/n8n-io/n8n/issues/11099)) ([1d14557](https://github.com/n8n-io/n8n/commit/1d145574611661ecd9ab1a39d815c0ea915b9a1c))
* **core:** Separate error handlers for main and worker ([#11091](https://github.com/n8n-io/n8n/issues/11091)) ([bb59cc7](https://github.com/n8n-io/n8n/commit/bb59cc71acc9e494e54abc8402d58db39e5a664e))
* **editor:** Shorten overflowing Node Label in InputLabels on hover and focus ([#11110](https://github.com/n8n-io/n8n/issues/11110)) ([87a0b68](https://github.com/n8n-io/n8n/commit/87a0b68f9009c1c776d937c6ca62096e88c95ed6))
* **editor:** Add safety to prevent undefined errors ([#11104](https://github.com/n8n-io/n8n/issues/11104)) ([565b117](https://github.com/n8n-io/n8n/commit/565b117a52f8eac9202a1a62c43daf78b293dcf8))
* **editor:** Fix design system form element sizing ([#11040](https://github.com/n8n-io/n8n/issues/11040)) ([67c3453](https://github.com/n8n-io/n8n/commit/67c3453885bc619fedc8338a6dd0d8d66dead931))
* **editor:** Fix getInitials when Intl.Segmenter is not supported ([#11103](https://github.com/n8n-io/n8n/issues/11103)) ([7e8955b](https://github.com/n8n-io/n8n/commit/7e8955b322b1d2c84c0f479a5977484d8d5e3135))
* **editor:** Fix schema view in AI tools ([#11089](https://github.com/n8n-io/n8n/issues/11089)) ([09cfdbd](https://github.com/n8n-io/n8n/commit/09cfdbd1817eba46c935308880fe9f95ded252b0))
* **editor:** Respect tag querystring filter when listing workflows ([#11029](https://github.com/n8n-io/n8n/issues/11029)) ([59c5ff6](https://github.com/n8n-io/n8n/commit/59c5ff61354302562ba5a2340c66811afdd1523b))
* **editor:** Show previous nodes autocomplete in AI tool nodes ([#11111](https://github.com/n8n-io/n8n/issues/11111)) ([8566b3a](https://github.com/n8n-io/n8n/commit/8566b3a99939f45ac263830eee30d0d4ade9305c))
* **editor:** Update Usage page for Community+ edition ([#11074](https://github.com/n8n-io/n8n/issues/11074)) ([3974981](https://github.com/n8n-io/n8n/commit/3974981ea5c67f6f2bbb90a96b405d9d0cfa21af))
* Fix transaction handling for 'revert' command ([#11145](https://github.com/n8n-io/n8n/issues/11145)) ([a782336](https://github.com/n8n-io/n8n/commit/a7823367f13c3dba0c339eaafaad0199bd524b13))
* Forbid access to files outside source control work directory ([#11152](https://github.com/n8n-io/n8n/issues/11152)) ([606eedb](https://github.com/n8n-io/n8n/commit/606eedbf1b302e153bd13b7cef80847711e3a9ee))
* **Gitlab Node:** Author name and email not being set ([#11077](https://github.com/n8n-io/n8n/issues/11077)) ([fce1233](https://github.com/n8n-io/n8n/commit/fce1233b58624d502c9c68f4b32a4bb7d76f1814))
* Incorrect error message on calling wrong webhook method ([#11093](https://github.com/n8n-io/n8n/issues/11093)) ([d974b01](https://github.com/n8n-io/n8n/commit/d974b015d030c608158ff0c3fa3b7f4cbb8eadd3))
* **n8n Form Trigger Node:** When clicking on a multiple choice label, the wrong one is selected ([#11059](https://github.com/n8n-io/n8n/issues/11059)) ([948edd1](https://github.com/n8n-io/n8n/commit/948edd1a047cf3dbddb3b0e9ec5de4bac3e97b9f))
* **NASA Node:** Astronomy-Picture-Of-The-Day fails when it's YouTube video ([#11046](https://github.com/n8n-io/n8n/issues/11046)) ([c70969d](https://github.com/n8n-io/n8n/commit/c70969da2bcabeb33394073a69ccef208311461b))
* **Postgres PGVector Store Node:** Fix filtering in retriever mode ([#11075](https://github.com/n8n-io/n8n/issues/11075)) ([dbd2ae1](https://github.com/n8n-io/n8n/commit/dbd2ae199506a24c2df4c983111a56f2adf63eee))
* Show result of waiting execution on canvas after execution complete ([#10815](https://github.com/n8n-io/n8n/issues/10815)) ([90b4bfc](https://github.com/n8n-io/n8n/commit/90b4bfc472ef132d2280b175ae7410dfb8e549b2))
* **Slack Node:** User id not sent correctly to API when updating user profile ([#11153](https://github.com/n8n-io/n8n/issues/11153)) ([ed9e61c](https://github.com/n8n-io/n8n/commit/ed9e61c46055d8e636a70c9c175d7d4ba596dd48))
### Features
* **core:** Introduce scoped logging ([#11127](https://github.com/n8n-io/n8n/issues/11127)) ([c68782c](https://github.com/n8n-io/n8n/commit/c68782c633b7ef6253ea705c5a222d4536491fd5))
* **editor:** Add navigation dropdown component ([#11047](https://github.com/n8n-io/n8n/issues/11047)) ([e081fd1](https://github.com/n8n-io/n8n/commit/e081fd1f0b5a0700017a8dc92f013f0abdbad319))
* **editor:** Add route for create / edit / share credentials ([#11134](https://github.com/n8n-io/n8n/issues/11134)) ([5697de4](https://github.com/n8n-io/n8n/commit/5697de4429c5d94f25ce1bd14c84fb4266ea47a7))
* **editor:** Community+ enrollment ([#10776](https://github.com/n8n-io/n8n/issues/10776)) ([92cf860](https://github.com/n8n-io/n8n/commit/92cf860f9f2994442facfddc758bc60f5cbec520))
* Human in the loop ([#10675](https://github.com/n8n-io/n8n/issues/10675)) ([41228b4](https://github.com/n8n-io/n8n/commit/41228b472de11affc8cd0821284427c2c9e8b421))
* **OpenAI Node:** Allow to specify thread ID for Assistant -> Message operation ([#11080](https://github.com/n8n-io/n8n/issues/11080)) ([6a2f9e7](https://github.com/n8n-io/n8n/commit/6a2f9e72959fb0e89006b69c31fbcee1ead1cde9))
* Opt in to additional features on community for existing users ([#11166](https://github.com/n8n-io/n8n/issues/11166)) ([c2adfc8](https://github.com/n8n-io/n8n/commit/c2adfc85451c5103eaad068f882066fd36c4aebe))
### Performance Improvements
* **core:** Optimize worker healthchecks ([#11092](https://github.com/n8n-io/n8n/issues/11092)) ([19fb728](https://github.com/n8n-io/n8n/commit/19fb728da0839c57603e55da4e407715e6c5b081))
## [1.62.1](https://github.com/n8n-io/n8n/compare/n8n@1.61.0...n8n@1.62.1) (2024-10-02)

View file

@ -78,7 +78,7 @@ describe('AI Assistant::enabled', () => {
});
it('should start chat session from node error view', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -96,7 +96,7 @@ describe('AI Assistant::enabled', () => {
});
it('should render chat input correctly', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -129,7 +129,7 @@ describe('AI Assistant::enabled', () => {
});
it('should render and handle quick replies', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/quick_reply_message_response.json',
}).as('chatRequest');
@ -146,7 +146,7 @@ describe('AI Assistant::enabled', () => {
});
it('should show quick replies when node is executed after new suggestion', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', (req) => {
cy.intercept('POST', '/rest/ai/chat', (req) => {
req.reply((res) => {
if (['init-error-helper', 'message'].includes(req.body.payload.type)) {
res.send({ statusCode: 200, fixture: 'aiAssistant/simple_message_response.json' });
@ -177,7 +177,7 @@ describe('AI Assistant::enabled', () => {
});
it('should warn before starting a new session', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -204,11 +204,11 @@ describe('AI Assistant::enabled', () => {
});
it('should apply code diff to code node', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/code_diff_suggestion_response.json',
}).as('chatRequest');
cy.intercept('POST', '/rest/ai-assistant/chat/apply-suggestion', {
cy.intercept('POST', '/rest/ai/chat/apply-suggestion', {
statusCode: 200,
fixture: 'aiAssistant/apply_code_diff_response.json',
}).as('applySuggestion');
@ -254,7 +254,7 @@ describe('AI Assistant::enabled', () => {
});
it('should end chat session when `end_session` event is received', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/end_session_response.json',
}).as('chatRequest');
@ -268,7 +268,7 @@ describe('AI Assistant::enabled', () => {
});
it('should reset session after it ended and sidebar is closed', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', (req) => {
cy.intercept('POST', '/rest/ai/chat', (req) => {
req.reply((res) => {
if (['init-support-chat'].includes(req.body.payload.type)) {
res.send({ statusCode: 200, fixture: 'aiAssistant/simple_message_response.json' });
@ -296,7 +296,7 @@ describe('AI Assistant::enabled', () => {
});
it('Should not reset assistant session when workflow is saved', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -321,7 +321,7 @@ describe('AI Assistant Credential Help', () => {
});
it('should start credential help from node credential', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -347,7 +347,7 @@ describe('AI Assistant Credential Help', () => {
});
it('should start credential help from credential list', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
}).as('chatRequest');
@ -446,7 +446,7 @@ describe('General help', () => {
});
it('assistant returns code snippet', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/code_snippet_response.json',
}).as('chatRequest');

View file

@ -91,28 +91,12 @@ return []
});
describe('Ask AI', () => {
it('tab should display based on experiment', () => {
WorkflowPage.actions.visit();
cy.window().then((win) => {
win.featureFlags.override('011_ask_AI', 'control');
WorkflowPage.actions.addInitialNodeToCanvas('Manual');
WorkflowPage.actions.addNodeToCanvas('Code');
WorkflowPage.actions.openNode('Code');
cy.getByTestId('code-node-tab-ai').should('not.exist');
ndv.actions.close();
win.featureFlags.override('011_ask_AI', undefined);
WorkflowPage.actions.openNode('Code');
cy.getByTestId('code-node-tab-ai').should('not.exist');
});
});
describe('Enabled', () => {
beforeEach(() => {
cy.enableFeature('askAi');
WorkflowPage.actions.visit();
cy.window().then((win) => {
win.featureFlags.override('011_ask_AI', 'gpt3');
cy.window().then(() => {
WorkflowPage.actions.addInitialNodeToCanvas('Manual');
WorkflowPage.actions.addNodeToCanvas('Code', true, true);
});
@ -157,7 +141,7 @@ return []
cy.getByTestId('ask-ai-prompt-input').type(prompt);
cy.intercept('POST', '/rest/ask-ai', {
cy.intercept('POST', '/rest/ai/ask-ai', {
statusCode: 200,
body: {
data: {
@ -169,9 +153,7 @@ return []
cy.getByTestId('ask-ai-cta').click();
const askAiReq = cy.wait('@ask-ai');
askAiReq
.its('request.body')
.should('have.keys', ['question', 'model', 'context', 'n8nVersion']);
askAiReq.its('request.body').should('have.keys', ['question', 'context', 'forNode']);
askAiReq.its('context').should('have.keys', ['schema', 'ndvPushRef', 'pushRef']);
@ -195,7 +177,7 @@ return []
];
handledCodes.forEach(({ code, message }) => {
cy.intercept('POST', '/rest/ask-ai', {
cy.intercept('POST', '/rest/ai/ask-ai', {
statusCode: code,
status: code,
}).as('ask-ai');

View file

@ -1,6 +1,6 @@
{
"name": "n8n-monorepo",
"version": "1.62.1",
"version": "1.63.0",
"private": true,
"engines": {
"node": ">=20.15",
@ -69,8 +69,8 @@
],
"overrides": {
"@types/node": "^18.16.16",
"chokidar": "3.5.2",
"esbuild": "^0.20.2",
"chokidar": "^4.0.1",
"esbuild": "^0.21.5",
"formidable": "3.5.1",
"pug": "^3.0.3",
"semver": "^7.5.4",

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/api-types",
"version": "0.3.0",
"version": "0.4.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -2,3 +2,4 @@ export { PasswordUpdateRequestDto } from './user/password-update-request.dto';
export { RoleChangeRequestDto } from './user/role-change-request.dto';
export { SettingsUpdateRequestDto } from './user/settings-update-request.dto';
export { UserUpdateRequestDto } from './user/user-update-request.dto';
export { CommunityRegisteredRequestDto } from './license/community-registered-request.dto';

View file

@ -0,0 +1,27 @@
import { CommunityRegisteredRequestDto } from '../community-registered-request.dto';
describe('CommunityRegisteredRequestDto', () => {
it('should fail validation for missing email', () => {
const invalidRequest = {};
const result = CommunityRegisteredRequestDto.safeParse(invalidRequest);
expect(result.success).toBe(false);
expect(result.error?.issues[0]).toEqual(
expect.objectContaining({ message: 'Required', path: ['email'] }),
);
});
it('should fail validation for an invalid email', () => {
const invalidRequest = {
email: 'invalid-email',
};
const result = CommunityRegisteredRequestDto.safeParse(invalidRequest);
expect(result.success).toBe(false);
expect(result.error?.issues[0]).toEqual(
expect.objectContaining({ message: 'Invalid email', path: ['email'] }),
);
});
});

View file

@ -0,0 +1,4 @@
import { z } from 'zod';
import { Z } from 'zod-class';
export class CommunityRegisteredRequestDto extends Z.class({ email: z.string().email() }) {}

View file

@ -107,6 +107,9 @@ export interface FrontendSettings {
aiAssistant: {
enabled: boolean;
};
askAi: {
enabled: boolean;
};
deployment: {
type: string;
};
@ -154,9 +157,6 @@ export interface FrontendSettings {
banners: {
dismissed: string[];
};
ai: {
enabled: boolean;
};
workflowHistory: {
pruneTime: number;
licensePruneTime: number;

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/n8n-benchmark",
"version": "1.6.1",
"version": "1.7.0",
"description": "Cli for running benchmark tests for n8n",
"main": "dist/index",
"scripts": {

View file

@ -7,7 +7,7 @@ services:
- ${MOCK_API_DATA_PATH}/mappings:/home/wiremock/mappings
postgres:
image: postgres:16
image: postgres:16.4
restart: always
user: root:root
environment:

View file

@ -7,7 +7,7 @@ services:
- ${MOCK_API_DATA_PATH}/mappings:/home/wiremock/mappings
redis:
image: redis:6-alpine
image: redis:6.2.14-alpine
restart: always
ports:
- 6379:6379
@ -17,7 +17,7 @@ services:
timeout: 3s
postgres:
image: postgres:16
image: postgres:16.4
restart: always
environment:
- POSTGRES_DB=n8n

View file

@ -7,7 +7,7 @@ services:
- ${MOCK_API_DATA_PATH}/mappings:/home/wiremock/mappings
redis:
image: redis:6-alpine
image: redis:6.2.14-alpine
ports:
- 6379:6379
healthcheck:
@ -16,7 +16,7 @@ services:
timeout: 3s
postgres:
image: postgres:16
image: postgres:16.4
user: root:root
restart: always
environment:

View file

@ -184,6 +184,16 @@ createChat({
- **Type**: `string[]`
- **Description**: The initial messages to be displayed in the Chat window.
### `allowFileUploads`
- **Type**: `Ref<boolean> | boolean`
- **Default**: `false`
- **Description**: Whether to allow file uploads in the chat. If set to `true`, users will be able to upload files through the chat interface.
### `allowedFilesMimeTypes`
- **Type**: `Ref<string> | string`
- **Default**: `''`
- **Description**: A comma-separated list of allowed MIME types for file uploads. Only applicable if `allowFileUploads` is set to `true`. If left empty, all file types are allowed. For example: `'image/*,application/pdf'`.
## Customization
The Chat window is entirely customizable using CSS variables.

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/chat",
"version": "0.27.1",
"version": "0.28.0",
"scripts": {
"dev": "pnpm run storybook",
"build": "pnpm build:vite && pnpm build:bundle",

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/config",
"version": "1.12.1",
"version": "1.13.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -1,6 +1,17 @@
import { Config, Env, Nested } from '../decorators';
import { StringArray } from '../utils';
/**
* Scopes (areas of functionality) to filter logs by.
*
* `executions` -> execution lifecycle
* `license` -> license SDK
* `scaling` -> scaling mode
*/
export const LOG_SCOPES = ['executions', 'license', 'scaling'] as const;
export type LogScope = (typeof LOG_SCOPES)[number];
@Config
class FileLoggingConfig {
/**
@ -44,4 +55,19 @@ export class LoggingConfig {
@Nested
file: FileLoggingConfig;
/**
* Scopes to filter logs by. Nothing is filtered by default.
*
* Currently supported log scopes:
* - `executions`
* - `license`
* - `scaling`
*
* @example
* `N8N_LOG_SCOPES=license`
* `N8N_LOG_SCOPES=license,executions`
*/
@Env('N8N_LOG_SCOPES')
scopes: StringArray<LogScope> = [];
}

View file

@ -2,7 +2,11 @@ import { Config, Env, Nested } from '../decorators';
@Config
class HealthConfig {
/** Whether to enable the worker health check endpoint `/healthz`. */
/**
* Whether to enable the worker health check endpoints:
* - `/healthz` (worker alive)
* - `/healthz/readiness` (worker connected to migrated database and connected to Redis)
*/
@Env('QUEUE_HEALTH_CHECK_ACTIVE')
active: boolean = false;

View file

@ -18,6 +18,9 @@ import { VersionNotificationsConfig } from './configs/version-notifications.conf
import { WorkflowsConfig } from './configs/workflows.config';
import { Config, Env, Nested } from './decorators';
export { LOG_SCOPES } from './configs/logging.config';
export type { LogScope } from './configs/logging.config';
@Config
export class GlobalConfig {
@Nested

View file

@ -241,13 +241,13 @@ describe('GlobalConfig', () => {
fileSizeMax: 16,
location: 'logs/n8n.log',
},
scopes: [],
},
};
it('should use all default values when no env variables are defined', () => {
process.env = {};
const config = Container.get(GlobalConfig);
expect(deepCopy(config)).toEqual(defaultConfig);
expect(mockFs.readFileSync).not.toHaveBeenCalled();
});

View file

@ -262,7 +262,7 @@ export class InformationExtractor implements INodeType {
}
const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0);
const zodSchema = (await zodSchemaSandbox.runCode()) as z.ZodSchema<object>;
const zodSchema = await zodSchemaSandbox.runCode<z.ZodSchema<object>>();
parser = OutputFixingParser.fromLLM(llm, StructuredOutputParser.fromZodSchema(zodSchema));
}

View file

@ -107,7 +107,7 @@ function getSandbox(
}
// eslint-disable-next-line @typescript-eslint/unbound-method
const sandbox = new JavaScriptSandbox(context, code, itemIndex, this.helpers, {
const sandbox = new JavaScriptSandbox(context, code, this.helpers, {
resolver: vmResolver,
});
@ -368,7 +368,7 @@ export class Code implements INodeType {
}
const sandbox = getSandbox.call(this, code.supplyData.code, { itemIndex });
const response = (await sandbox.runCode()) as Tool;
const response = await sandbox.runCode<Tool>();
return {
response: logWrapper(response, this),

View file

@ -48,7 +48,7 @@ export class N8nStructuredOutputParser<T extends z.ZodTypeAny> extends Structure
sandboxedSchema: JavaScriptSandbox,
nodeVersion: number,
): Promise<StructuredOutputParser<z.ZodType<object, z.ZodTypeDef, object>>> {
const zodSchema = (await sandboxedSchema.runCode()) as z.ZodSchema<object>;
const zodSchema = await sandboxedSchema.runCode<z.ZodSchema<object>>();
let returnSchema: z.ZodSchema<object>;
if (nodeVersion === 1) {

View file

@ -199,9 +199,9 @@ export class ToolCode implements INodeType {
let sandbox: Sandbox;
if (language === 'javaScript') {
sandbox = new JavaScriptSandbox(context, code, index, this.helpers);
sandbox = new JavaScriptSandbox(context, code, this.helpers);
} else {
sandbox = new PythonSandbox(context, code, index, this.helpers);
sandbox = new PythonSandbox(context, code, this.helpers);
}
sandbox.on(
@ -216,7 +216,7 @@ export class ToolCode implements INodeType {
const runFunction = async (query: string | IDataObject): Promise<string> => {
const sandbox = getSandbox(query, itemIndex);
return await (sandbox.runCode() as Promise<string>);
return await sandbox.runCode<string>();
};
const toolHandler = async (query: string | IDataObject): Promise<string> => {
@ -274,7 +274,7 @@ export class ToolCode implements INodeType {
: jsonParse<JSONSchema7>(inputSchema);
const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0);
const zodSchema = (await zodSchemaSandbox.runCode()) as DynamicZodObject;
const zodSchema = await zodSchemaSandbox.runCode<DynamicZodObject>();
tool = new DynamicStructuredTool<typeof zodSchema>({
schema: zodSchema,

View file

@ -530,7 +530,7 @@ export class ToolWorkflow implements INodeType {
: jsonParse<JSONSchema7>(inputSchema);
const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0);
const zodSchema = (await zodSchemaSandbox.runCode()) as DynamicZodObject;
const zodSchema = await zodSchemaSandbox.runCode<DynamicZodObject>();
tool = new DynamicStructuredTool<typeof zodSchema>({
schema: zodSchema,

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/n8n-nodes-langchain",
"version": "1.62.1",
"version": "1.63.0",
"description": "",
"main": "index.js",
"scripts": {
@ -124,18 +124,20 @@
"@types/cheerio": "^0.22.15",
"@types/html-to-text": "^9.0.1",
"@types/json-schema": "^7.0.15",
"@types/pg": "^8.11.6",
"@types/temp": "^0.9.1",
"n8n-core": "workspace:*"
},
"dependencies": {
"@getzep/zep-cloud": "1.0.11",
"@aws-sdk/client-sso-oidc": "3.666.0",
"@getzep/zep-cloud": "1.0.12",
"@getzep/zep-js": "0.9.0",
"@google-ai/generativelanguage": "2.6.0",
"@google-cloud/resource-manager": "5.3.0",
"@google/generative-ai": "0.19.0",
"@huggingface/inference": "2.8.0",
"@langchain/anthropic": "0.3.1",
"@langchain/aws": "^0.1.0",
"@langchain/aws": "0.1.0",
"@langchain/cohere": "0.3.0",
"@langchain/community": "0.3.2",
"@langchain/core": "catalog:",
@ -149,23 +151,22 @@
"@langchain/qdrant": "0.1.0",
"@langchain/redis": "0.1.0",
"@langchain/textsplitters": "0.1.0",
"@mozilla/readability": "^0.5.0",
"@mozilla/readability": "0.5.0",
"@n8n/typeorm": "0.3.20-12",
"@n8n/vm2": "3.9.25",
"@pinecone-database/pinecone": "3.0.3",
"@qdrant/js-client-rest": "1.11.0",
"@supabase/supabase-js": "2.45.4",
"@types/pg": "^8.11.6",
"@xata.io/client": "0.30.0",
"@xata.io/client": "0.28.4",
"basic-auth": "catalog:",
"cheerio": "1.0.0-rc.12",
"cheerio": "1.0.0",
"cohere-ai": "7.13.2",
"d3-dsv": "2.0.0",
"epub2": "3.0.2",
"form-data": "catalog:",
"generate-schema": "2.6.0",
"html-to-text": "9.0.5",
"jsdom": "^23.0.1",
"jsdom": "23.0.1",
"json-schema-to-zod": "2.1.0",
"langchain": "0.3.2",
"lodash": "catalog:",

View file

@ -57,7 +57,6 @@ export function getSandboxWithZod(ctx: IExecuteFunctions, schema: JSONSchema7, i
const itemSchema = new Function('z', 'return (' + zodSchema + ')')(z)
return itemSchema
`,
itemIndex,
ctx.helpers,
{ resolver: vmResolver },
);

View file

@ -1,18 +1,19 @@
{
"name": "@n8n/task-runner",
"version": "1.0.1",
"version": "1.1.0",
"scripts": {
"clean": "rimraf dist .turbo",
"start": "node dist/start.js",
"dev": "pnpm build && pnpm start",
"typecheck": "tsc --noEmit",
"build": "tsc -p ./tsconfig.build.json",
"build": "tsc -p ./tsconfig.build.json && tsc-alias -p tsconfig.build.json",
"format": "biome format --write src",
"format:check": "biome ci src",
"test": "echo \"Error: no tests in this package\" && exit 0",
"test": "jest",
"test:watch": "jest --watch",
"lint": "eslint . --quiet",
"lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch"
"watch": "concurrently \"tsc -w -p tsconfig.build.json\" \"tsc-alias -w -p tsconfig.build.json\""
},
"main": "dist/start.js",
"module": "src/start.ts",
@ -25,5 +26,8 @@
"n8n-core": "workspace:*",
"nanoid": "^3.3.6",
"ws": "^8.18.0"
},
"devDependencies": {
"luxon": "catalog:"
}
}

View file

@ -11,7 +11,7 @@ export type AuthOpts = {
*/
export async function authenticate(opts: AuthOpts) {
try {
const authEndpoint = `http://${opts.n8nUri}/rest/runners/auth`;
const authEndpoint = `http://${opts.n8nUri}/runners/auth`;
const response = await fetch(authEndpoint, {
method: 'POST',
headers: {

View file

@ -1,147 +0,0 @@
import { getAdditionalKeys } from 'n8n-core';
import {
type INode,
type INodeType,
type ITaskDataConnections,
type IWorkflowExecuteAdditionalData,
WorkflowDataProxy,
type WorkflowParameters,
type IDataObject,
type IExecuteData,
type INodeExecutionData,
type INodeParameters,
type IRunExecutionData,
// type IWorkflowDataProxyAdditionalKeys,
Workflow,
type WorkflowExecuteMode,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from './runner-types';
import { type Task, TaskRunner } from './task-runner';
interface JSExecSettings {
code: string;
// For workflow data proxy
mode: WorkflowExecuteMode;
}
export interface PartialAdditionalData {
executionId?: string;
restartExecutionId?: string;
restApiUrl: string;
instanceBaseUrl: string;
formWaitingBaseUrl: string;
webhookBaseUrl: string;
webhookWaitingBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters?: INodeParameters;
executionTimeoutTimestamp?: number;
userId?: string;
variables: IDataObject;
}
export interface AllCodeTaskData {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;
runExecutionData: IRunExecutionData;
runIndex: number;
itemIndex: number;
activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
contextNodeName: string;
additionalData: PartialAdditionalData;
}
export class JsTaskRunner extends TaskRunner {
constructor(
taskType: string,
wsUrl: string,
grantToken: string,
maxConcurrency: number,
name?: string,
) {
super(taskType, wsUrl, grantToken, maxConcurrency, name ?? 'JS Task Runner');
}
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
const allData = await this.requestData<AllCodeTaskData>(task.taskId, 'all');
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');
const workflowParams = allData.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: {
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getByName() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return {};
},
},
});
const dataProxy = new WorkflowDataProxy(
workflow,
allData.runExecutionData,
allData.runIndex,
allData.itemIndex,
allData.activeNodeName,
allData.connectionInputData,
allData.siblingParameters,
settings.mode,
getAdditionalKeys(
allData.additionalData as IWorkflowExecuteAdditionalData,
allData.mode,
allData.runExecutionData,
),
allData.executeData,
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
);
const customConsole = {
log: (...args: unknown[]) => {
const logOutput = args
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
console.log('[JS Code]', logOutput);
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};
const context: Context = {
require,
module: {},
console: customConsole,
...dataProxy.getDataProxy(),
...this.buildRpcCallObject(task.taskId),
};
const result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
return {
result,
customData: allData.runExecutionData.resultData.metadata,
};
}
}

View file

@ -0,0 +1,455 @@
import { DateTime } from 'luxon';
import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import {
JsTaskRunner,
type AllCodeTaskData,
type JSExecSettings,
} from '@/js-task-runner/js-task-runner';
import type { Task } from '@/task-runner';
import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
jest.mock('ws');
describe('JsTaskRunner', () => {
const jsTaskRunner = new JsTaskRunner('taskType', 'ws://localhost', 'grantToken', 1);
const execTaskWithParams = async ({
task,
taskData,
}: {
task: Task<JSExecSettings>;
taskData: AllCodeTaskData;
}) => {
jest.spyOn(jsTaskRunner, 'requestData').mockResolvedValue(taskData);
return await jsTaskRunner.executeTask(task);
};
afterEach(() => {
jest.restoreAllMocks();
});
const executeForAllItems = async ({
code,
inputItems,
settings,
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForAllItems',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
});
};
const executeForEachItem = async ({
code,
inputItems,
settings,
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForEachItem',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
});
};
describe('console', () => {
test.each<[CodeExecutionMode]>([['runOnceForAllItems'], ['runOnceForEachItem']])(
'should make an rpc call for console log in %s mode',
async (nodeMode) => {
jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskWithSettings({
code: "console.log('Hello', 'world!'); return {}",
nodeMode,
});
await execTaskWithParams({
task,
taskData: newAllCodeTaskData([wrapIntoJson({})]),
});
expect(jsTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'Hello world!',
]);
},
);
});
describe('built-in methods and variables available in the context', () => {
const inputItems = [{ a: 1 }];
const testExpressionForAllItems = async (
expression: string,
expected: IDataObject | string | number | boolean,
) => {
const needsWrapping = typeof expected !== 'object';
const outcome = await executeForAllItems({
code: needsWrapping ? `return { val: ${expression} }` : `return ${expression}`,
inputItems,
});
expect(outcome.result).toEqual([wrapIntoJson(needsWrapping ? { val: expected } : expected)]);
};
const testExpressionForEachItem = async (
expression: string,
expected: IDataObject | string | number | boolean,
) => {
const needsWrapping = typeof expected !== 'object';
const outcome = await executeForEachItem({
code: needsWrapping ? `return { val: ${expression} }` : `return ${expression}`,
inputItems,
});
expect(outcome.result).toEqual([
withPairedItem(0, wrapIntoJson(needsWrapping ? { val: expected } : expected)),
]);
};
const testGroups = {
// https://docs.n8n.io/code/builtin/current-node-input/
'current node input': [
['$input.first()', inputItems[0]],
['$input.last()', inputItems[inputItems.length - 1]],
['$input.params', { manualTriggerParam: 'empty' }],
],
// https://docs.n8n.io/code/builtin/output-other-nodes/
'output of other nodes': [
['$("Trigger").first()', inputItems[0]],
['$("Trigger").last()', inputItems[inputItems.length - 1]],
['$("Trigger").params', { manualTriggerParam: 'empty' }],
],
// https://docs.n8n.io/code/builtin/date-time/
'date and time': [
['$now', expect.any(DateTime)],
['$today', expect.any(DateTime)],
['{dt: DateTime}', { dt: expect.any(Function) }],
],
// https://docs.n8n.io/code/builtin/jmespath/
JMESPath: [['{ val: $jmespath([{ f: 1 },{ f: 2 }], "[*].f") }', { val: [1, 2] }]],
// https://docs.n8n.io/code/builtin/n8n-metadata/
'n8n metadata': [
[
'$execution',
{
id: 'exec-id',
mode: 'test',
resumeFormUrl: 'http://formWaitingBaseUrl/exec-id',
resumeUrl: 'http://webhookWaitingBaseUrl/exec-id',
customData: {
get: expect.any(Function),
getAll: expect.any(Function),
set: expect.any(Function),
setAll: expect.any(Function),
},
},
],
['$("Trigger").isExecuted', true],
['$nodeVersion', 2],
['$prevNode.name', 'Trigger'],
['$prevNode.outputIndex', 0],
['$runIndex', 0],
['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }],
['$vars', { var: 'value' }],
],
};
for (const [groupName, tests] of Object.entries(testGroups)) {
describe(`${groupName} runOnceForAllItems`, () => {
test.each(tests)(
'should have the %s available in the context',
async (expression, expected) => {
await testExpressionForAllItems(expression, expected);
},
);
});
describe(`${groupName} runOnceForEachItem`, () => {
test.each(tests)(
'should have the %s available in the context',
async (expression, expected) => {
await testExpressionForEachItem(expression, expected);
},
);
});
}
describe('$env', () => {
it('should have the env available in context when access has not been blocked', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
env: { VAR1: 'value' },
},
}),
});
expect(outcome.result).toEqual([wrapIntoJson({ val: 'value' })]);
});
it('should be possible to access env if it has been blocked', async () => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: true,
isProcessAvailable: true,
env: { VAR1: 'value' },
},
}),
}),
).rejects.toThrow('access to env vars denied');
});
it('should not be possible to iterate $env', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
env: { VAR1: '1', VAR2: '2', VAR3: '3' },
},
}),
});
expect(outcome.result).toEqual([]);
});
it("should not expose task runner's env variables even if no env state is received", async () => {
process.env.N8N_RUNNERS_N8N_URI = 'http://127.0.0.1:5679';
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: undefined,
}),
});
expect(outcome.result).toEqual([wrapIntoJson({ val: undefined })]);
});
});
});
describe('runOnceForAllItems', () => {
describe('continue on fail', () => {
it('should return an item with the error if continueOnFail is true', async () => {
const outcome = await executeForAllItems({
code: 'throw new Error("Error message")',
inputItems: [{ a: 1 }],
settings: { continueOnFail: true },
});
expect(outcome).toEqual({
result: [wrapIntoJson({ error: 'Error message' })],
customData: undefined,
});
});
it('should throw an error if continueOnFail is false', async () => {
await expect(
executeForAllItems({
code: 'throw new Error("Error message")',
inputItems: [{ a: 1 }],
settings: { continueOnFail: false },
}),
).rejects.toThrow('Error message');
});
});
describe('invalid output', () => {
test.each([['undefined'], ['42'], ['"a string"']])(
'should throw a ValidationError if the code output is %s',
async (output) => {
await expect(
executeForAllItems({
code: `return ${output}`,
inputItems: [{ a: 1 }],
}),
).rejects.toThrow(ValidationError);
},
);
it('should throw a ValidationError if some items are wrapped in json and some are not', async () => {
await expect(
executeForAllItems({
code: 'return [{b: 1}, {json: {b: 2}}]',
inputItems: [{ a: 1 }],
}),
).rejects.toThrow(ValidationError);
});
});
it('should return static items', async () => {
const outcome = await executeForAllItems({
code: 'return [{json: {b: 1}}]',
inputItems: [{ a: 1 }],
});
expect(outcome).toEqual({
result: [wrapIntoJson({ b: 1 })],
customData: undefined,
});
});
it('maps null into an empty array', async () => {
const outcome = await executeForAllItems({
code: 'return null',
inputItems: [{ a: 1 }],
});
expect(outcome).toEqual({
result: [],
customData: undefined,
});
});
it("should wrap items into json if they aren't", async () => {
const outcome = await executeForAllItems({
code: 'return [{b: 1}]',
inputItems: [{ a: 1 }],
});
expect(outcome).toEqual({
result: [wrapIntoJson({ b: 1 })],
customData: undefined,
});
});
it('should wrap single item into an array and json', async () => {
const outcome = await executeForAllItems({
code: 'return {b: 1}',
inputItems: [{ a: 1 }],
});
expect(outcome).toEqual({
result: [wrapIntoJson({ b: 1 })],
customData: undefined,
});
});
test.each([['items'], ['$input.all()'], ["$('Trigger').all()"]])(
'should have all input items in the context as %s',
async (expression) => {
const outcome = await executeForAllItems({
code: `return ${expression}`,
inputItems: [{ a: 1 }, { a: 2 }],
});
expect(outcome).toEqual({
result: [wrapIntoJson({ a: 1 }), wrapIntoJson({ a: 2 })],
customData: undefined,
});
},
);
});
describe('runForEachItem', () => {
describe('continue on fail', () => {
it('should return an item with the error if continueOnFail is true', async () => {
const outcome = await executeForEachItem({
code: 'throw new Error("Error message")',
inputItems: [{ a: 1 }, { a: 2 }],
settings: { continueOnFail: true },
});
expect(outcome).toEqual({
result: [
withPairedItem(0, wrapIntoJson({ error: 'Error message' })),
withPairedItem(1, wrapIntoJson({ error: 'Error message' })),
],
customData: undefined,
});
});
it('should throw an error if continueOnFail is false', async () => {
await expect(
executeForEachItem({
code: 'throw new Error("Error message")',
inputItems: [{ a: 1 }],
settings: { continueOnFail: false },
}),
).rejects.toThrow('Error message');
});
});
describe('invalid output', () => {
test.each([['undefined'], ['42'], ['"a string"'], ['[]'], ['[1,2,3]']])(
'should throw a ValidationError if the code output is %s',
async (output) => {
await expect(
executeForEachItem({
code: `return ${output}`,
inputItems: [{ a: 1 }],
}),
).rejects.toThrow(ValidationError);
},
);
});
it('should return static items', async () => {
const outcome = await executeForEachItem({
code: 'return {json: {b: 1}}',
inputItems: [{ a: 1 }],
});
expect(outcome).toEqual({
result: [withPairedItem(0, wrapIntoJson({ b: 1 }))],
customData: undefined,
});
});
it('should filter out null values', async () => {
const outcome = await executeForEachItem({
code: 'return item.json.a === 1 ? item : null',
inputItems: [{ a: 1 }, { a: 2 }, { a: 3 }],
});
expect(outcome).toEqual({
result: [withPairedItem(0, wrapIntoJson({ a: 1 }))],
customData: undefined,
});
});
test.each([['item'], ['$input.item'], ['{ json: $json }']])(
'should have the current input item in the context as %s',
async (expression) => {
const outcome = await executeForEachItem({
code: `return ${expression}`,
inputItems: [{ a: 1 }, { a: 2 }],
});
expect(outcome).toEqual({
result: [
withPairedItem(0, wrapIntoJson({ a: 1 })),
withPairedItem(1, wrapIntoJson({ a: 2 })),
],
customData: undefined,
});
},
);
});
});

View file

@ -0,0 +1,168 @@
import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import type { AllCodeTaskData, JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { Task } from '@/task-runner';
/**
* Creates a new task with the given settings
*/
export const newTaskWithSettings = (
settings: Partial<JSExecSettings> & Pick<JSExecSettings, 'code' | 'nodeMode'>,
): Task<JSExecSettings> => ({
taskId: '1',
settings: {
workflowMode: 'manual',
continueOnFail: false,
mode: 'manual',
...settings,
},
active: true,
cancelled: false,
});
/**
* Creates a new node with the given options
*/
export const newNode = (opts: Partial<INode> = {}): INode => ({
id: nanoid(),
name: 'Test Node' + nanoid(),
parameters: {},
position: [0, 0],
type: 'n8n-nodes-base.code',
typeVersion: 1,
...opts,
});
/**
* Creates a new task data with the given options
*/
export const newTaskData = (opts: Partial<ITaskData> & Pick<ITaskData, 'source'>): ITaskData => ({
startTime: Date.now(),
executionTime: 0,
executionStatus: 'success',
...opts,
});
/**
* Creates a new all code task data with the given options
*/
export const newAllCodeTaskData = (
codeNodeInputData: INodeExecutionData[],
opts: Partial<AllCodeTaskData> = {},
): AllCodeTaskData => {
const codeNode = newNode({
name: 'JsCode',
parameters: {
mode: 'runOnceForEachItem',
language: 'javaScript',
jsCode: 'return item',
},
type: 'n8n-nodes-base.code',
typeVersion: 2,
});
const manualTriggerNode = newNode({
name: 'Trigger',
type: 'n8n-nodes-base.manualTrigger',
parameters: {
manualTriggerParam: 'empty',
},
});
return {
workflow: {
id: '1',
name: 'Test Workflow',
active: true,
connections: {
[manualTriggerNode.name]: {
main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]],
},
},
nodes: [manualTriggerNode, codeNode],
},
inputData: {
main: [codeNodeInputData],
},
connectionInputData: codeNodeInputData,
node: codeNode,
runExecutionData: {
startData: {},
resultData: {
runData: {
[manualTriggerNode.name]: [
newTaskData({
source: [],
data: {
main: [codeNodeInputData],
},
}),
],
},
pinData: {},
lastNodeExecuted: manualTriggerNode.name,
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
},
runIndex: 0,
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
siblingParameters: {},
mode: 'manual',
selfData: {},
envProviderState: {
env: {},
isEnvAccessBlocked: true,
isProcessAvailable: true,
},
additionalData: {
executionId: 'exec-id',
instanceBaseUrl: '',
restartExecutionId: '',
restApiUrl: '',
formWaitingBaseUrl: 'http://formWaitingBaseUrl',
webhookBaseUrl: 'http://webhookBaseUrl',
webhookTestBaseUrl: 'http://webhookTestBaseUrl',
webhookWaitingBaseUrl: 'http://webhookWaitingBaseUrl',
variables: {
var: 'value',
},
},
executeData: {
node: codeNode,
data: {
main: [codeNodeInputData],
},
source: {
main: [{ previousNode: manualTriggerNode.name }],
},
},
...opts,
};
};
/**
* Wraps the given value into an INodeExecutionData object's json property
*/
export const wrapIntoJson = (json: IDataObject): INodeExecutionData => ({
json,
});
/**
* Adds the given index as the pairedItem property to the given INodeExecutionData object
*/
export const withPairedItem = (index: number, data: INodeExecutionData): INodeExecutionData => ({
...data,
pairedItem: {
item: index,
},
});

View file

@ -0,0 +1,84 @@
import { ApplicationError } from 'n8n-workflow';
export class ExecutionError extends ApplicationError {
description: string | null = null;
itemIndex: number | undefined = undefined;
context: { itemIndex: number } | undefined = undefined;
stack = '';
lineNumber: number | undefined = undefined;
constructor(error: Error & { stack?: string }, itemIndex?: number) {
super(error.message);
this.itemIndex = itemIndex;
if (this.itemIndex !== undefined) {
this.context = { itemIndex: this.itemIndex };
}
this.stack = error.stack ?? '';
this.populateFromStack();
}
/**
* Populate error `message` and `description` from error `stack`.
*/
private populateFromStack() {
const stackRows = this.stack.split('\n');
if (stackRows.length === 0) {
this.message = 'Unknown error';
}
const messageRow = stackRows.find((line) => line.includes('Error:'));
const lineNumberRow = stackRows.find((line) => line.includes('Code:'));
const lineNumberDisplay = this.toLineNumberDisplay(lineNumberRow);
if (!messageRow) {
this.message = `Unknown error ${lineNumberDisplay}`;
return;
}
const [errorDetails, errorType] = this.toErrorDetailsAndType(messageRow);
if (errorType) this.description = errorType;
if (!errorDetails) {
this.message = `Unknown error ${lineNumberDisplay}`;
return;
}
this.message = `${errorDetails} ${lineNumberDisplay}`;
}
private toLineNumberDisplay(lineNumberRow?: string) {
const errorLineNumberMatch = lineNumberRow?.match(/Code:(?<lineNumber>\d+)/);
if (!errorLineNumberMatch?.groups?.lineNumber) return null;
const lineNumber = errorLineNumberMatch.groups.lineNumber;
this.lineNumber = Number(lineNumber);
if (!lineNumber) return '';
return this.itemIndex === undefined
? `[line ${lineNumber}]`
: `[line ${lineNumber}, for item ${this.itemIndex}]`;
}
private toErrorDetailsAndType(messageRow?: string) {
if (!messageRow) return [null, null];
const [errorDetails, errorType] = messageRow
.split(':')
.reverse()
.map((i) => i.trim());
return [errorDetails, errorType === 'Error' ? null : errorType];
}
}

View file

@ -0,0 +1,44 @@
import { ApplicationError } from 'n8n-workflow';
export class ValidationError extends ApplicationError {
description = '';
itemIndex: number | undefined = undefined;
context: { itemIndex: number } | undefined = undefined;
lineNumber: number | undefined = undefined;
constructor({
message,
description,
itemIndex,
lineNumber,
}: {
message: string;
description: string;
itemIndex?: number;
lineNumber?: number;
}) {
super(message);
this.lineNumber = lineNumber;
this.itemIndex = itemIndex;
if (this.lineNumber !== undefined && this.itemIndex !== undefined) {
this.message = `${message} [line ${lineNumber}, for item ${itemIndex}]`;
} else if (this.lineNumber !== undefined) {
this.message = `${message} [line ${lineNumber}]`;
} else if (this.itemIndex !== undefined) {
this.message = `${message} [item ${itemIndex}]`;
} else {
this.message = message;
}
this.description = description;
if (this.itemIndex !== undefined) {
this.context = { itemIndex: this.itemIndex };
}
}
}

View file

@ -0,0 +1,284 @@
import { getAdditionalKeys } from 'n8n-core';
import {
WorkflowDataProxy,
// type IWorkflowDataProxyAdditionalKeys,
Workflow,
} from 'n8n-workflow';
import type {
CodeExecutionMode,
INode,
INodeType,
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
WorkflowParameters,
IDataObject,
IExecuteData,
INodeExecutionData,
INodeParameters,
IRunExecutionData,
WorkflowExecuteMode,
EnvProviderState,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
export interface JSExecSettings {
code: string;
nodeMode: CodeExecutionMode;
workflowMode: WorkflowExecuteMode;
continueOnFail: boolean;
// For workflow data proxy
mode: WorkflowExecuteMode;
}
export interface PartialAdditionalData {
executionId?: string;
restartExecutionId?: string;
restApiUrl: string;
instanceBaseUrl: string;
formWaitingBaseUrl: string;
webhookBaseUrl: string;
webhookWaitingBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters?: INodeParameters;
executionTimeoutTimestamp?: number;
userId?: string;
variables: IDataObject;
}
export interface AllCodeTaskData {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;
runExecutionData: IRunExecutionData;
runIndex: number;
itemIndex: number;
activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
envProviderState?: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
contextNodeName: string;
additionalData: PartialAdditionalData;
}
type CustomConsole = {
log: (...args: unknown[]) => void;
};
export class JsTaskRunner extends TaskRunner {
constructor(
taskType: string,
wsUrl: string,
grantToken: string,
maxConcurrency: number,
name?: string,
) {
super(taskType, wsUrl, grantToken, maxConcurrency, name ?? 'JS Task Runner');
}
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
const allData = await this.requestData<AllCodeTaskData>(task.taskId, 'all');
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');
const workflowParams = allData.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: {
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getByName() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return {};
},
},
});
const customConsole = {
// Send log output back to the main process. It will take care of forwarding
// it to the UI or printing to console.
log: (...args: unknown[]) => {
const logOutput = args
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};
const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, allData, workflow, customConsole)
: await this.runForEachItem(task.taskId, settings, allData, workflow, customConsole);
return {
result,
customData: allData.runExecutionData.resultData.metadata,
};
}
/**
* Executes the requested code for all items in a single run
*/
private async runForAllItems(
taskId: string,
settings: JSExecSettings,
allData: AllCodeTaskData,
workflow: Workflow,
customConsole: CustomConsole,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(allData, workflow, allData.itemIndex);
const inputItems = allData.connectionInputData;
const context: Context = {
require,
module: {},
console: customConsole,
items: inputItems,
...dataProxy,
...this.buildRpcCallObject(taskId),
};
try {
const result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
if (result === null) {
return [];
}
return validateRunForAllItemsOutput(result);
} catch (error) {
if (settings.continueOnFail) {
return [{ json: { error: this.getErrorMessageFromVmError(error) } }];
}
(error as Record<string, unknown>).node = allData.node;
throw error;
}
}
/**
* Executes the requested code for each item in the input data
*/
private async runForEachItem(
taskId: string,
settings: JSExecSettings,
allData: AllCodeTaskData,
workflow: Workflow,
customConsole: CustomConsole,
): Promise<INodeExecutionData[]> {
const inputItems = allData.connectionInputData;
const returnData: INodeExecutionData[] = [];
for (let index = 0; index < inputItems.length; index++) {
const item = inputItems[index];
const dataProxy = this.createDataProxy(allData, workflow, index);
const context: Context = {
require,
module: {},
console: customConsole,
item,
...dataProxy,
...this.buildRpcCallObject(taskId),
};
try {
let result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
context,
)) as INodeExecutionData | undefined;
// Filter out null values
if (result === null) {
continue;
}
result = validateRunForEachItemOutput(result, index);
if (result) {
returnData.push(
result.binary
? {
json: result.json,
pairedItem: { item: index },
binary: result.binary,
}
: {
json: result.json,
pairedItem: { item: index },
},
);
}
} catch (error) {
if (!settings.continueOnFail) {
(error as Record<string, unknown>).node = allData.node;
throw error;
}
returnData.push({
json: { error: this.getErrorMessageFromVmError(error) },
pairedItem: {
item: index,
},
});
}
}
return returnData;
}
private createDataProxy(allData: AllCodeTaskData, workflow: Workflow, itemIndex: number) {
return new WorkflowDataProxy(
workflow,
allData.runExecutionData,
allData.runIndex,
itemIndex,
allData.activeNodeName,
allData.connectionInputData,
allData.siblingParameters,
allData.mode,
getAdditionalKeys(
allData.additionalData as IWorkflowExecuteAdditionalData,
allData.mode,
allData.runExecutionData,
),
allData.executeData,
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
// Make sure that even if we don't receive the envProviderState for
// whatever reason, we don't expose the task runner's env to the code
allData.envProviderState ?? {
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
},
).getDataProxy();
}
private getErrorMessageFromVmError(error: unknown): string {
if (typeof error === 'object' && !!error && 'message' in error) {
return error.message as string;
}
return JSON.stringify(error);
}
}

View file

@ -0,0 +1,5 @@
export function isObject(maybe: unknown): maybe is { [key: string]: unknown } {
return (
typeof maybe === 'object' && maybe !== null && !Array.isArray(maybe) && !(maybe instanceof Date)
);
}

View file

@ -0,0 +1,116 @@
import { normalizeItems } from 'n8n-core';
import type { INodeExecutionData } from 'n8n-workflow';
import { ValidationError } from './errors/validation-error';
import { isObject } from './obj-utils';
export const REQUIRED_N8N_ITEM_KEYS = new Set(['json', 'binary', 'pairedItem', 'error']);
function validateTopLevelKeys(item: INodeExecutionData, itemIndex: number) {
for (const key in item) {
if (Object.prototype.hasOwnProperty.call(item, key)) {
if (REQUIRED_N8N_ITEM_KEYS.has(key)) return;
throw new ValidationError({
message: `Unknown top-level item key: ${key}`,
description: 'Access the properties of an item under `.json`, e.g. `item.json`',
itemIndex,
});
}
}
}
function validateItem({ json, binary }: INodeExecutionData, itemIndex: number) {
if (json === undefined || !isObject(json)) {
throw new ValidationError({
message: "A 'json' property isn't an object",
description: "In the returned data, every key named 'json' must point to an object.",
itemIndex,
});
}
if (binary !== undefined && !isObject(binary)) {
throw new ValidationError({
message: "A 'binary' property isn't an object",
description: "In the returned data, every key named 'binary' must point to an object.",
itemIndex,
});
}
}
/**
* Validates the output of a code node in 'Run for All Items' mode.
*/
export function validateRunForAllItemsOutput(
executionResult: INodeExecutionData | INodeExecutionData[] | undefined,
) {
if (typeof executionResult !== 'object') {
throw new ValidationError({
message: "Code doesn't return items properly",
description: 'Please return an array of objects, one for each item you would like to output.',
});
}
if (Array.isArray(executionResult)) {
/**
* If at least one top-level key is an n8n item key (`json`, `binary`, etc.),
* then require all item keys to be an n8n item key.
*
* If no top-level key is an n8n key, then skip this check, allowing non-n8n
* item keys to be wrapped in `json` when normalizing items below.
*/
const mustHaveTopLevelN8nKey = executionResult.some((item) =>
Object.keys(item).find((key) => REQUIRED_N8N_ITEM_KEYS.has(key)),
);
if (mustHaveTopLevelN8nKey) {
for (let index = 0; index < executionResult.length; index++) {
const item = executionResult[index];
validateTopLevelKeys(item, index);
}
}
}
const returnData = normalizeItems(executionResult);
returnData.forEach(validateItem);
return returnData;
}
/**
* Validates the output of a code node in 'Run for Each Item' mode for single item
*/
export function validateRunForEachItemOutput(
executionResult: INodeExecutionData | undefined,
itemIndex: number,
) {
if (typeof executionResult !== 'object') {
throw new ValidationError({
message: "Code doesn't return an object",
description: `Please return an object representing the output item. ('${executionResult}' was returned instead.)`,
itemIndex,
});
}
if (Array.isArray(executionResult)) {
const firstSentence =
executionResult.length > 0
? `An array of ${typeof executionResult[0]}s was returned.`
: 'An empty array was returned.';
throw new ValidationError({
message: "Code doesn't return a single object",
description: `${firstSentence} If you need to output multiple items, please use the 'Run Once for All Items' mode instead.`,
itemIndex,
});
}
const [returnData] = normalizeItems([executionResult]);
validateItem(returnData, itemIndex);
// If at least one top-level key is a supported item key (`json`, `binary`, etc.),
// and another top-level key is unrecognized, then the user mis-added a property
// directly on the item, when they intended to add it on the `json` property
validateTopLevelKeys(returnData, itemIndex);
return returnData;
}

View file

@ -2,7 +2,10 @@ import { ApplicationError, ensureError } from 'n8n-workflow';
import * as a from 'node:assert/strict';
import { authenticate } from './authenticator';
import { JsTaskRunner } from './code';
import { JsTaskRunner } from './js-task-runner/js-task-runner';
let runner: JsTaskRunner | undefined;
let isShuttingDown = false;
type Config = {
n8nUri: string;
@ -20,12 +23,35 @@ function readAndParseConfig(): Config {
}
return {
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678',
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? '127.0.0.1:5679',
authToken,
grantToken,
};
}
function createSignalHandler(signal: string) {
return async function onSignal() {
if (isShuttingDown) {
return;
}
console.log(`Received ${signal} signal, shutting down...`);
isShuttingDown = true;
try {
if (runner) {
await runner.stop();
runner = undefined;
}
} catch (e) {
const error = ensureError(e);
console.error('Error stopping task runner', { error });
} finally {
process.exit(0);
}
};
}
void (async function start() {
const config = readAndParseConfig();
@ -40,7 +66,10 @@ void (async function start() {
}
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
new JsTaskRunner('javascript', wsUrl, grantToken, 5);
runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
process.on('SIGINT', createSignalHandler('SIGINT'));
process.on('SIGTERM', createSignalHandler('SIGTERM'));
})().catch((e) => {
const error = ensureError(e);
console.error('Task runner failed to start', { error });

View file

@ -257,11 +257,8 @@ export abstract class TaskRunner {
const data = await this.executeTask(task);
this.taskDone(taskId, data);
} catch (e) {
if (ensureError(e)) {
this.taskErrored(taskId, (e as Error).message);
} else {
this.taskErrored(taskId, e);
}
const error = ensureError(e);
this.taskErrored(taskId, error);
}
}
@ -359,4 +356,36 @@ export abstract class TaskRunner {
}
return rpcObject;
}
/** Close the connection gracefully and wait until has been closed */
async stop() {
this.stopTaskOffers();
await this.waitUntilAllTasksAreDone();
await this.closeConnection();
}
private async closeConnection() {
// 1000 is the standard close code
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
this.ws.close(1000, 'Shutting down');
await new Promise((resolve) => {
this.ws.once('close', resolve);
});
}
private async waitUntilAllTasksAreDone(maxWaitTimeInMs = 30_000) {
// TODO: Make maxWaitTimeInMs configurable
const start = Date.now();
while (this.runningTasks.size > 0) {
if (Date.now() - start > maxWaitTimeInMs) {
throw new ApplicationError('Timeout while waiting for tasks to finish');
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
}

View file

@ -6,11 +6,13 @@ This list shows all the versions which include breaking changes and how to upgra
### What changed?
The worker server used to bind to IPv6 by default. It now binds to IPv4 by default.
1. The worker server used to bind to IPv6 by default. It now binds to IPv4 by default.
2. The worker server's `/healthz` used to report healthy status based on database and Redis checks. It now reports healthy status regardless of database and Redis status, and the database and Redis checks are part of `/healthz/readiness`.
### When is action necessary?
If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`.
1. If you experience a port conflict error when starting a worker server using its default port, set a different port for the worker server with `QUEUE_HEALTH_CHECK_PORT`.
2. If you are relying on database and Redis checks for worker health status, switch to checking `/healthz/readiness` instead of `/healthz`.
## 1.57.0

View file

@ -1,6 +1,6 @@
{
"name": "n8n",
"version": "1.62.1",
"version": "1.63.0",
"description": "n8n Workflow Automation Tool",
"main": "dist/index",
"types": "dist/index.d.ts",
@ -51,12 +51,12 @@
"!dist/**/e2e.*"
],
"devDependencies": {
"@redocly/cli": "^1.6.0",
"@redocly/cli": "^1.25.5",
"@types/aws4": "^1.5.1",
"@types/bcryptjs": "^2.4.2",
"@types/compression": "1.0.1",
"@types/convict": "^6.1.1",
"@types/cookie-parser": "^1.4.2",
"@types/cookie-parser": "^1.4.7",
"@types/express": "catalog:",
"@types/flat": "^5.0.5",
"@types/formidable": "^3.4.5",
@ -76,7 +76,6 @@
"@types/xml2js": "catalog:",
"@types/yamljs": "^0.2.31",
"@vvo/tzdb": "^6.141.0",
"chokidar": "^3.5.2",
"concurrently": "^8.2.0",
"ioredis-mock": "^8.8.1",
"mjml": "^4.15.3",
@ -94,7 +93,7 @@
"@n8n/permissions": "workspace:*",
"@n8n/task-runner": "workspace:*",
"@n8n/typeorm": "0.3.20-12",
"@n8n_io/ai-assistant-sdk": "1.9.4",
"@n8n_io/ai-assistant-sdk": "1.10.3",
"@n8n_io/license-sdk": "2.13.1",
"@oclif/core": "4.0.7",
"@rudderstack/rudder-sdk-node": "2.0.9",
@ -111,14 +110,14 @@
"class-validator": "0.14.0",
"compression": "1.7.4",
"convict": "6.2.4",
"cookie-parser": "1.4.6",
"cookie-parser": "1.4.7",
"csrf": "3.1.0",
"curlconverter": "3.21.0",
"dotenv": "8.6.0",
"express": "4.21.0",
"express": "4.21.1",
"express-async-errors": "3.1.1",
"express-handlebars": "7.1.2",
"express-openapi-validator": "5.3.3",
"express-openapi-validator": "5.3.7",
"express-prom-bundle": "6.6.0",
"express-rate-limit": "7.2.0",
"fast-glob": "catalog:",
@ -145,7 +144,7 @@
"nodemailer": "6.9.9",
"oauth-1.0a": "2.2.6",
"open": "7.4.2",
"openapi-types": "10.0.0",
"openapi-types": "12.1.3",
"otpauth": "9.1.1",
"p-cancelable": "2.1.1",
"p-lazy": "3.1.0",
@ -166,7 +165,7 @@
"sqlite3": "5.1.7",
"sse-channel": "4.0.0",
"sshpk": "1.17.0",
"swagger-ui-express": "5.0.0",
"swagger-ui-express": "5.0.1",
"syslog-client": "1.1.1",
"typedi": "catalog:",
"uuid": "catalog:",

View file

@ -5,7 +5,7 @@ import type { InstanceSettings } from 'n8n-core';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { License } from '@/license';
import type { Logger } from '@/logging/logger.service';
import { mockLogger } from '@test/mocking';
jest.mock('@n8n_io/license-sdk');
@ -25,37 +25,39 @@ describe('License', () => {
});
let license: License;
const logger = mock<Logger>();
const instanceSettings = mock<InstanceSettings>({
instanceId: MOCK_INSTANCE_ID,
instanceType: 'main',
});
beforeEach(async () => {
license = new License(logger, instanceSettings, mock(), mock(), mock());
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock());
await license.init();
});
test('initializes license manager', async () => {
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('initializes license manager for worker', async () => {
const logger = mockLogger();
license = new License(
logger,
mock<InstanceSettings>({ instanceType: 'worker' }),
@ -64,22 +66,23 @@ describe('License', () => {
mock(),
);
await license.init();
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('attempts to activate license with provided key', async () => {
@ -196,7 +199,7 @@ describe('License', () => {
it('should enable renewal', async () => {
config.set('multiMainSetup.enabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -208,7 +211,7 @@ describe('License', () => {
it('should disable renewal', async () => {
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -226,7 +229,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -241,7 +244,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -252,7 +255,7 @@ describe('License', () => {
config.set('multiMainSetup.enabled', true);
config.set('multiMainSetup.instanceType', 'leader');
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -264,7 +267,7 @@ describe('License', () => {
describe('reinit', () => {
it('should reinitialize license manager', async () => {
const license = new License(mock(), mock(), mock(), mock(), mock());
const license = new License(mockLogger(), mock(), mock(), mock(), mock());
await license.init();
const initSpy = jest.spyOn(license, 'init');

View file

@ -0,0 +1,37 @@
import { mock } from 'jest-mock-extended';
import type { DirectoryLoader } from 'n8n-core';
import { LoadNodesAndCredentials } from '../load-nodes-and-credentials';
describe('LoadNodesAndCredentials', () => {
describe('resolveIcon', () => {
let instance: LoadNodesAndCredentials;
beforeEach(() => {
instance = new LoadNodesAndCredentials(mock(), mock(), mock());
instance.loaders.package1 = mock<DirectoryLoader>({
directory: '/icons/package1',
});
});
it('should return undefined if the loader for the package is not found', () => {
const result = instance.resolveIcon('unknownPackage', '/icons/unknownPackage/icon.png');
expect(result).toBeUndefined();
});
it('should return undefined if the resolved file path is outside the loader directory', () => {
const result = instance.resolveIcon('package1', '/some/other/path/icon.png');
expect(result).toBeUndefined();
});
it('should return the file path if the file is within the loader directory', () => {
const result = instance.resolveIcon('package1', '/icons/package1/icon.png');
expect(result).toBe('/icons/package1/icon.png');
});
it('should return undefined if the URL is outside the package directory', () => {
const result = instance.resolveIcon('package1', '/icons/package1/../../../etc/passwd');
expect(result).toBeUndefined();
});
});
});

View file

@ -5,6 +5,7 @@ import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service';
import { WaitTracker } from '@/wait-tracker';
import { mockLogger } from '@test/mocking';
jest.useFakeTimers();
@ -21,7 +22,7 @@ describe('WaitTracker', () => {
let waitTracker: WaitTracker;
beforeEach(() => {
waitTracker = new WaitTracker(
mock(),
mockLogger(),
executionRepository,
mock(),
mock(),

View file

@ -750,7 +750,7 @@ export class ActiveWorkflowManager {
const wasRemoved = await this.activeWorkflows.remove(workflowId);
if (wasRemoved) {
this.logger.warn(`Removed triggers and pollers for workflow "${workflowId}"`, {
this.logger.debug(`Removed triggers and pollers for workflow "${workflowId}"`, {
workflowId,
});
}

View file

@ -2,7 +2,12 @@ import 'reflect-metadata';
import { GlobalConfig } from '@n8n/config';
import { Command, Errors } from '@oclif/core';
import { BinaryDataService, InstanceSettings, ObjectStoreService } from 'n8n-core';
import { ApplicationError, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import {
ApplicationError,
ensureError,
ErrorReporterProxy as ErrorReporter,
sleep,
} from 'n8n-workflow';
import { Container } from 'typedi';
import type { AbstractServer } from '@/abstract-server';
@ -283,8 +288,9 @@ export abstract class BaseCommand extends Command {
this.logger.debug('Attempting license activation');
await this.license.activate(activationKey);
this.logger.debug('License init complete');
} catch (e) {
this.logger.error('Could not activate license', e as Error);
} catch (e: unknown) {
const error = ensureError(e);
this.logger.error('Could not activate license', { error });
}
}
}

View file

@ -170,3 +170,38 @@ test('revert the last migration if it has a down migration', async () => {
expect(dataSource.undoLastMigration).toHaveBeenCalled();
expect(dataSource.destroy).toHaveBeenCalled();
});
test("don't use transaction if the last migration has transaction = false", async () => {
//
// ARRANGE
//
class TestMigration implements ReversibleMigration {
name = 'ReversibleMigration';
transaction = false as const;
async up() {}
async down() {}
}
const migrationsInDb: Migration[] = [
{ id: 1, timestamp: Date.now(), name: 'ReversibleMigration' },
];
const dataSource = mock<DataSource>({ migrations: [new TestMigration()] });
const migrationExecutor = mock<MigrationExecutor>();
migrationExecutor.getExecutedMigrations.mockResolvedValue(migrationsInDb);
//
// ACT
//
await main(logger, dataSource, migrationExecutor);
//
// ASSERT
//
expect(dataSource.undoLastMigration).toHaveBeenCalledWith({
transaction: 'none',
});
});

View file

@ -55,7 +55,9 @@ export async function main(
return;
}
await connection.undoLastMigration();
await connection.undoLastMigration({
transaction: lastMigrationInstance.transaction === false ? 'none' : 'each',
});
await connection.destroy();
}

View file

@ -112,10 +112,9 @@ export class Webhook extends BaseCommand {
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
Container.get(PubSubHandler).init();
}
}

View file

@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service';
import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { BaseCommand } from './base-command';
@ -128,12 +128,11 @@ export class Worker extends BaseCommand {
*/
async initOrchestration() {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
queueModeId: this.queueModeId,
publisher: Container.get(Publisher),
getRunningJobIds: () => this.jobProcessor.getRunningJobIds(),
getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(),
});
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
}
async setConcurrency() {

View file

@ -11,13 +11,13 @@ import type { ExecutionRepository } from '@/databases/repositories/execution.rep
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
import type { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import type { Logger } from '@/logging/logger.service';
import type { Telemetry } from '@/telemetry';
import { mockLogger } from '@test/mocking';
import { ConcurrencyQueue } from '../concurrency-queue';
describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const logger = mockLogger();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventService = mock<EventService>();

View file

@ -8,7 +8,6 @@ import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error
import { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import type { LogMetadata } from '@/logging/types';
import { Telemetry } from '@/telemetry';
import { ConcurrencyQueue } from './concurrency-queue';
@ -34,6 +33,8 @@ export class ConcurrencyControlService {
private readonly telemetry: Telemetry,
private readonly eventService: EventService,
) {
this.logger = this.logger.withScope('executions');
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
if (this.productionLimit === 0) {
@ -46,7 +47,6 @@ export class ConcurrencyControlService {
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false;
this.log('Service disabled');
return;
}
@ -65,12 +65,12 @@ export class ConcurrencyControlService {
});
this.productionQueue.on('execution-throttled', ({ executionId }) => {
this.log('Execution throttled', { executionId });
this.logger.debug('Execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId });
});
this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
this.logger.debug('Execution released', { executionId });
});
}
@ -144,9 +144,9 @@ export class ConcurrencyControlService {
// ----------------------------------
private logInit() {
this.log('Enabled');
this.logger.debug('Enabled');
this.log(
this.logger.debug(
[
'Production execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
@ -171,10 +171,6 @@ export class ConcurrencyControlService {
throw new UnknownExecutionModeError(mode);
}
private log(message: string, metadata?: LogMetadata) {
this.logger.debug(['[Concurrency Control]', message].join(' '), metadata);
}
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}

View file

@ -91,6 +91,7 @@ export const LICENSE_FEATURES = {
PROJECT_ROLE_EDITOR: 'feat:projectRole:editor',
PROJECT_ROLE_VIEWER: 'feat:projectRole:viewer',
AI_ASSISTANT: 'feat:aiAssistant',
ASK_AI: 'feat:askAi',
COMMUNITY_NODES_CUSTOM_REGISTRY: 'feat:communityNodes:customRegistry',
} as const;

View file

@ -7,18 +7,18 @@ import { WritableStream } from 'node:stream/web';
import { Post, RestController } from '@/decorators';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { AiAssistantRequest } from '@/requests';
import { AiAssistantService } from '@/services/ai-assistant.service';
import { AiService } from '@/services/ai.service';
type FlushableResponse = Response & { flush: () => void };
@RestController('/ai-assistant')
export class AiAssistantController {
constructor(private readonly aiAssistantService: AiAssistantService) {}
@RestController('/ai')
export class AiController {
constructor(private readonly aiService: AiService) {}
@Post('/chat', { rateLimit: { limit: 100 } })
async chat(req: AiAssistantRequest.Chat, res: FlushableResponse) {
try {
const aiResponse = await this.aiAssistantService.chat(req.body, req.user);
const aiResponse = await this.aiService.chat(req.body, req.user);
if (aiResponse.body) {
res.header('Content-type', 'application/json-lines').flush();
await aiResponse.body.pipeTo(
@ -40,10 +40,21 @@ export class AiAssistantController {
@Post('/chat/apply-suggestion')
async applySuggestion(
req: AiAssistantRequest.ApplySuggestion,
req: AiAssistantRequest.ApplySuggestionPayload,
): Promise<AiAssistantSDK.ApplySuggestionResponse> {
try {
return await this.aiAssistantService.applySuggestion(req.body, req.user);
return await this.aiService.applySuggestion(req.body, req.user);
} catch (e) {
assert(e instanceof Error);
ErrorReporterProxy.error(e);
throw new InternalServerError(`Something went wrong: ${e.message}`);
}
}
@Post('/ask-ai')
async askAi(req: AiAssistantRequest.AskAiPayload): Promise<AiAssistantSDK.AskAiResponsePayload> {
try {
return await this.aiService.askAi(req.body, req.user);
} catch (e) {
assert(e instanceof Error);
ErrorReporterProxy.error(e);

View file

@ -92,6 +92,7 @@ export class E2EController {
[LICENSE_FEATURES.PROJECT_ROLE_VIEWER]: false,
[LICENSE_FEATURES.AI_ASSISTANT]: false,
[LICENSE_FEATURES.COMMUNITY_NODES_CUSTOM_REGISTRY]: false,
[LICENSE_FEATURES.ASK_AI]: false,
};
private numericFeatures: Record<NumericLicenseFeature, number> = {

View file

@ -28,11 +28,4 @@ export class OrchestrationController {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerStatus();
}
@GlobalScope('orchestration:list')
@Post('/worker/ids')
async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerIds();
}
}

View file

@ -1,8 +1,8 @@
import type { ApiKey } from '@/databases/entities/api-key';
import type { MigrationContext } from '@/databases/types';
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
import { generateNanoId } from '@/databases/utils/generators';
export class AddApiKeysTable1724951148974 {
export class AddApiKeysTable1724951148974 implements ReversibleMigration {
async up({
queryRunner,
escape,
@ -55,4 +55,55 @@ export class AddApiKeysTable1724951148974 {
// Drop apiKey column on user's table
await queryRunner.query(`ALTER TABLE ${userTable} DROP COLUMN ${apiKeyColumn};`);
}
async down({
queryRunner,
runQuery,
schemaBuilder: { dropTable, addColumns, createIndex, column },
escape,
isMysql,
}: MigrationContext) {
const userTable = escape.tableName('user');
const userApiKeysTable = escape.tableName('user_api_keys');
const apiKeyColumn = escape.columnName('apiKey');
const userIdColumn = escape.columnName('userId');
const idColumn = escape.columnName('id');
const createdAtColumn = escape.columnName('createdAt');
await addColumns('user', [column('apiKey').varchar()]);
await createIndex('user', ['apiKey'], true);
const queryToGetUsersApiKeys = isMysql
? `
SELECT ${userIdColumn},
${apiKeyColumn},
${createdAtColumn}
FROM ${userApiKeysTable} u
WHERE ${createdAtColumn} = (SELECT Min(${createdAtColumn})
FROM ${userApiKeysTable}
WHERE ${userIdColumn} = u.${userIdColumn});`
: `
SELECT DISTINCT ON
(${userIdColumn}) ${userIdColumn},
${apiKeyColumn}, ${createdAtColumn}
FROM ${userApiKeysTable}
ORDER BY ${userIdColumn}, ${createdAtColumn} ASC;`;
const oldestApiKeysPerUser = (await queryRunner.query(queryToGetUsersApiKeys)) as Array<
Partial<ApiKey>
>;
await Promise.all(
oldestApiKeysPerUser.map(
async (user: { userId: string; apiKey: string }) =>
await runQuery(
`UPDATE ${userTable} SET ${apiKeyColumn} = :apiKey WHERE ${idColumn} = :userId`,
user,
),
),
);
await dropTable('user_api_keys');
}
}

View file

@ -1,8 +1,10 @@
import type { ApiKey } from '@/databases/entities/api-key';
import type { MigrationContext } from '@/databases/types';
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
import { generateNanoId } from '@/databases/utils/generators';
export class AddApiKeysTable1724951148974 {
export class AddApiKeysTable1724951148974 implements ReversibleMigration {
transaction = false as const;
async up({ queryRunner, tablePrefix, runQuery }: MigrationContext) {
const tableName = `${tablePrefix}user_api_keys`;
@ -74,4 +76,52 @@ export class AddApiKeysTable1724951148974 {
// Rename the temporary table to users
await queryRunner.query('ALTER TABLE users_new RENAME TO user;');
}
async down({
queryRunner,
runQuery,
tablePrefix,
schemaBuilder: { dropTable, createIndex },
escape,
}: MigrationContext) {
const userApiKeysTable = escape.tableName('user_api_keys');
const apiKeyColumn = escape.columnName('apiKey');
const userIdColumn = escape.columnName('userId');
const idColumn = escape.columnName('id');
const createdAtColumn = escape.columnName('createdAt');
const queryToGetUsersApiKeys = `
SELECT
${userIdColumn},
${apiKeyColumn},
${createdAtColumn}
FROM
${userApiKeysTable}
WHERE
${createdAtColumn} IN(
SELECT
MIN(${createdAtColumn})
FROM ${userApiKeysTable}
GROUP BY ${userIdColumn});`;
const oldestApiKeysPerUser = (await queryRunner.query(queryToGetUsersApiKeys)) as Array<
Partial<ApiKey>
>;
await queryRunner.query(`ALTER TABLE ${tablePrefix}user ADD COLUMN "apiKey" varchar;`);
await createIndex('user', ['apiKey'], true);
await Promise.all(
oldestApiKeysPerUser.map(
async (user: { userId: string; apiKey: string }) =>
await runQuery(
`UPDATE ${tablePrefix}user SET ${apiKeyColumn} = :apiKey WHERE ${idColumn} = :userId`,
user,
),
),
);
await dropTable('user_api_keys');
}
}

View file

@ -12,7 +12,9 @@ import { mockInstance, mockEntityManager } from '@test/mocking';
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const globalConfig = mockInstance(GlobalConfig, { logging: { outputs: ['console'] } });
const globalConfig = mockInstance(GlobalConfig, {
logging: { outputs: ['console'], scopes: [] },
});
const binaryDataService = mockInstance(BinaryDataService);
const executionRepository = Container.get(ExecutionRepository);
const mockDate = new Date('2023-12-28 12:34:56.789Z');

View file

@ -0,0 +1,46 @@
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { SourceControlPreferencesService } from '@/environments/source-control/source-control-preferences.service.ee';
import { SourceControlService } from '@/environments/source-control/source-control.service.ee';
describe('SourceControlService', () => {
const preferencesService = new SourceControlPreferencesService(
new InstanceSettings(),
mock(),
mock(),
);
const sourceControlService = new SourceControlService(
mock(),
mock(),
preferencesService,
mock(),
mock(),
mock(),
mock(),
);
describe('pushWorkfolder', () => {
it('should throw an error if a file is given that is not in the workfolder', async () => {
jest.spyOn(sourceControlService, 'sanityCheck').mockResolvedValue(undefined);
await expect(
sourceControlService.pushWorkfolder({
fileNames: [
{
file: '/etc/passwd',
id: 'test',
name: 'secret-file',
type: 'file',
status: 'modified',
location: 'local',
conflict: false,
updatedAt: new Date().toISOString(),
pushed: false,
},
],
}),
).rejects.toThrow('File path /etc/passwd is invalid');
});
});
});

View file

@ -1,10 +1,13 @@
import { generateKeyPairSync } from 'crypto';
import { constants as fsConstants, mkdirSync, accessSync } from 'fs';
import { ApplicationError } from 'n8n-workflow';
import { ok } from 'node:assert/strict';
import path from 'path';
import { Container } from 'typedi';
import { License } from '@/license';
import { Logger } from '@/logging/logger.service';
import { isContainedWithin } from '@/utils/path-util';
import {
SOURCE_CONTROL_GIT_KEY_COMMENT,
@ -163,3 +166,24 @@ export function getTrackingInformationFromPostPushResult(result: SourceControlle
uniques.filter((file) => file.pushed && file.file.startsWith('variable_stubs')).length ?? 0,
};
}
/**
* Normalizes and validates the given source controlled file path. Ensures
* the path is absolute and contained within the git folder.
*
* @throws {ApplicationError} If the path is not within the git folder
*/
export function normalizeAndValidateSourceControlledFilePath(
gitFolderPath: string,
filePath: string,
) {
ok(path.isAbsolute(gitFolderPath), 'gitFolder must be an absolute path');
const normalizedPath = path.isAbsolute(filePath) ? filePath : path.join(gitFolderPath, filePath);
if (!isContainedWithin(gitFolderPath, filePath)) {
throw new ApplicationError(`File path ${filePath} is invalid`);
}
return normalizedPath;
}

View file

@ -2,7 +2,12 @@
import { In } from '@n8n/typeorm';
import glob from 'fast-glob';
import { Credentials, InstanceSettings } from 'n8n-core';
import { ApplicationError, jsonParse, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ApplicationError,
jsonParse,
ErrorReporterProxy as ErrorReporter,
ensureError,
} from 'n8n-workflow';
import { readFile as fsReadFile } from 'node:fs/promises';
import path from 'path';
import { Container, Service } from 'typedi';
@ -274,8 +279,9 @@ export class SourceControlImportService {
this.logger.debug(`Reactivating workflow id ${existingWorkflow.id}`);
await workflowManager.add(existingWorkflow.id, 'activate');
// update the versionId of the workflow to match the imported workflow
} catch (error) {
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, { error });
} finally {
await Container.get(WorkflowRepository).update(
{ id: existingWorkflow.id },
@ -377,8 +383,9 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: { tags: [], mappings: [] } },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error });
return;
}
@ -444,8 +451,8 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: [] },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error: e });
return;
}
const overriddenKeys = Object.keys(valueOverrides ?? {});

View file

@ -170,6 +170,7 @@ export class SourceControlController {
if (this.sourceControlPreferencesService.isBranchReadOnly()) {
throw new BadRequestError('Cannot push onto read-only branch.');
}
try {
await this.sourceControlService.setGitUserDetails(
`${req.user.firstName} ${req.user.lastName}`,

View file

@ -25,6 +25,7 @@ import {
getTrackingInformationFromPrePushResult,
getTrackingInformationFromPullResult,
getVariablesPath,
normalizeAndValidateSourceControlledFilePath,
sourceControlFoldersExistCheck,
} from './source-control-helper.ee';
import { SourceControlImportService } from './source-control-import.service.ee';
@ -80,7 +81,7 @@ export class SourceControlService {
});
}
private async sanityCheck(): Promise<void> {
public async sanityCheck(): Promise<void> {
try {
const foldersExisted = sourceControlFoldersExistCheck(
[this.gitFolder, this.sshFolder],
@ -217,8 +218,20 @@ export class SourceControlService {
throw new BadRequestError('Cannot push onto read-only branch.');
}
const filesToPush = options.fileNames.map((file) => {
const normalizedPath = normalizeAndValidateSourceControlledFilePath(
this.gitFolder,
file.file,
);
return {
...file,
file: normalizedPath,
};
});
// only determine file status if not provided by the frontend
let statusResult: SourceControlledFile[] = options.fileNames;
let statusResult: SourceControlledFile[] = filesToPush;
if (statusResult.length === 0) {
statusResult = (await this.getStatus({
direction: 'push',
@ -240,7 +253,7 @@ export class SourceControlService {
const filesToBePushed = new Set<string>();
const filesToBeDeleted = new Set<string>();
options.fileNames.forEach((e) => {
filesToPush.forEach((e) => {
if (e.status !== 'deleted') {
filesToBePushed.add(e.file);
} else {
@ -250,12 +263,12 @@ export class SourceControlService {
this.sourceControlExportService.rmFilesFromExportFolder(filesToBeDeleted);
const workflowsToBeExported = options.fileNames.filter(
const workflowsToBeExported = filesToPush.filter(
(e) => e.type === 'workflow' && e.status !== 'deleted',
);
await this.sourceControlExportService.exportWorkflowsToWorkFolder(workflowsToBeExported);
const credentialsToBeExported = options.fileNames.filter(
const credentialsToBeExported = filesToPush.filter(
(e) => e.type === 'credential' && e.status !== 'deleted',
);
const credentialExportResult =
@ -269,11 +282,11 @@ export class SourceControlService {
});
}
if (options.fileNames.find((e) => e.type === 'tags')) {
if (filesToPush.find((e) => e.type === 'tags')) {
await this.sourceControlExportService.exportTagsToWorkFolder();
}
if (options.fileNames.find((e) => e.type === 'variables')) {
if (filesToPush.find((e) => e.type === 'variables')) {
await this.sourceControlExportService.exportVariablesToWorkFolder();
}
@ -281,7 +294,7 @@ export class SourceControlService {
for (let i = 0; i < statusResult.length; i++) {
// eslint-disable-next-line @typescript-eslint/no-loop-func
if (options.fileNames.find((file) => file.file === statusResult[i].file)) {
if (filesToPush.find((file) => file.file === statusResult[i].file)) {
statusResult[i].pushed = true;
}
}

View file

@ -149,7 +149,7 @@ export class MessageEventBusLogWriter {
this._worker = new Worker(workerFileName);
if (this.worker) {
this.worker.on('messageerror', async (error) => {
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', error);
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', { error });
await MessageEventBusLogWriter.instance.startThread();
});
return true;

View file

@ -1057,4 +1057,20 @@ describe('TelemetryEventRelay', () => {
);
});
});
describe('Community+ registered', () => {
it('should track `license-community-plus-registered` event', () => {
const event: RelayEventMap['license-community-plus-registered'] = {
email: 'user@example.com',
licenseKey: 'license123',
};
eventService.emit('license-community-plus-registered', event);
expect(telemetry.track).toHaveBeenCalledWith('User registered for license community plus', {
email: 'user@example.com',
licenseKey: 'license123',
});
});
});
});

View file

@ -80,25 +80,5 @@ export type PubSubCommandMap = {
};
export type PubSubWorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
};

View file

@ -420,6 +420,11 @@ export type RelayEventMap = {
success: boolean;
};
'license-community-plus-registered': {
email: string;
licenseKey: string;
};
// #endregion
// #region Variable

View file

@ -54,6 +54,7 @@ export class TelemetryEventRelay extends EventRelay {
'source-control-user-finished-push-ui': (event) =>
this.sourceControlUserFinishedPushUi(event),
'license-renewal-attempted': (event) => this.licenseRenewalAttempted(event),
'license-community-plus-registered': (event) => this.licenseCommunityPlusRegistered(event),
'variable-created': () => this.variableCreated(),
'external-secrets-provider-settings-saved': (event) =>
this.externalSecretsProviderSettingsSaved(event),
@ -234,6 +235,16 @@ export class TelemetryEventRelay extends EventRelay {
});
}
private licenseCommunityPlusRegistered({
email,
licenseKey,
}: RelayEventMap['license-community-plus-registered']) {
this.telemetry.track('User registered for license community plus', {
email,
licenseKey,
});
}
// #endregion
// #region Variable

View file

@ -1,5 +1,5 @@
import pick from 'lodash/pick';
import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@ -95,7 +95,8 @@ export async function updateExistingExecution(parameters: {
);
}
} catch (e) {
logger.error(`Failed to save metadata for execution ID ${executionId}`, e as Error);
const error = ensureError(e);
logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {

View file

@ -37,7 +37,9 @@ export class License {
private readonly orchestrationService: OrchestrationService,
private readonly settingsRepository: SettingsRepository,
private readonly licenseMetricsService: LicenseMetricsService,
) {}
) {
this.logger = this.logger.withScope('license');
}
/**
* Whether this instance should renew the license - on init and periodically.
@ -109,9 +111,9 @@ export class License {
await this.manager.initialize();
this.logger.debug('License initialized');
} catch (e: unknown) {
if (e instanceof Error) {
this.logger.error('Could not initialize license manager sdk', e);
} catch (error: unknown) {
if (error instanceof Error) {
this.logger.error('Could not initialize license manager sdk', { error });
}
}
}
@ -253,6 +255,10 @@ export class License {
return this.isFeatureEnabled(LICENSE_FEATURES.AI_ASSISTANT);
}
isAskAiEnabled() {
return this.isFeatureEnabled(LICENSE_FEATURES.ASK_AI);
}
isAdvancedExecutionFiltersEnabled() {
return this.isFeatureEnabled(LICENSE_FEATURES.ADVANCED_EXECUTION_FILTERS);
}

View file

@ -1,4 +1,5 @@
import type { TEntitlement } from '@n8n_io/license-sdk';
import axios, { AxiosError } from 'axios';
import { mock } from 'jest-mock-extended';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
@ -7,6 +8,8 @@ import type { EventService } from '@/events/event.service';
import type { License } from '@/license';
import { LicenseErrors, LicenseService } from '@/license/license.service';
jest.mock('axios');
describe('LicenseService', () => {
const license = mock<License>();
const workflowRepository = mock<WorkflowRepository>();
@ -84,4 +87,37 @@ describe('LicenseService', () => {
});
});
});
describe('registerCommunityEdition', () => {
test('on success', async () => {
jest
.spyOn(axios, 'post')
.mockResolvedValueOnce({ data: { title: 'Title', text: 'Text', licenseKey: 'abc-123' } });
const data = await licenseService.registerCommunityEdition({
email: 'test@ema.il',
instanceId: '123',
instanceUrl: 'http://localhost',
licenseType: 'community-registered',
});
expect(data).toEqual({ title: 'Title', text: 'Text' });
expect(eventService.emit).toHaveBeenCalledWith('license-community-plus-registered', {
email: 'test@ema.il',
licenseKey: 'abc-123',
});
});
test('on failure', async () => {
jest.spyOn(axios, 'post').mockRejectedValueOnce(new AxiosError('Failed'));
await expect(
licenseService.registerCommunityEdition({
email: 'test@ema.il',
instanceId: '123',
instanceUrl: 'http://localhost',
licenseType: 'community-registered',
}),
).rejects.toThrowError('Failed');
expect(eventService.emit).not.toHaveBeenCalled();
});
});
});

View file

@ -1,14 +1,21 @@
import { CommunityRegisteredRequestDto } from '@n8n/api-types';
import type { AxiosError } from 'axios';
import { InstanceSettings } from 'n8n-core';
import { Get, Post, RestController, GlobalScope } from '@/decorators';
import { Get, Post, RestController, GlobalScope, Body } from '@/decorators';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { AuthenticatedRequest, LicenseRequest } from '@/requests';
import { AuthenticatedRequest, AuthlessRequest, LicenseRequest } from '@/requests';
import { UrlService } from '@/services/url.service';
import { LicenseService } from './license.service';
@RestController('/license')
export class LicenseController {
constructor(private readonly licenseService: LicenseService) {}
constructor(
private readonly licenseService: LicenseService,
private readonly instanceSettings: InstanceSettings,
private readonly urlService: UrlService,
) {}
@Get('/')
async getLicenseData() {
@ -32,6 +39,20 @@ export class LicenseController {
}
}
@Post('/enterprise/community-registered')
async registerCommunityEdition(
_req: AuthlessRequest,
_res: Response,
@Body payload: CommunityRegisteredRequestDto,
) {
return await this.licenseService.registerCommunityEdition({
email: payload.email,
instanceId: this.instanceSettings.instanceId,
instanceUrl: this.urlService.getInstanceBaseUrl(),
licenseType: 'community-registered',
});
}
@Post('/activate')
@GlobalScope('license:manage')
async activateLicense(req: LicenseRequest.Activate) {

View file

@ -1,4 +1,5 @@
import axios from 'axios';
import axios, { AxiosError } from 'axios';
import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';
import type { User } from '@/databases/entities/user';
@ -13,8 +14,7 @@ type LicenseError = Error & { errorId?: keyof typeof LicenseErrors };
export const LicenseErrors = {
SCHEMA_VALIDATION: 'Activation key is in the wrong format',
RESERVATION_EXHAUSTED:
'Activation key has been used too many times. Please contact sales@n8n.io if you would like to extend it',
RESERVATION_EXHAUSTED: 'Activation key has been used too many times',
RESERVATION_EXPIRED: 'Activation key has expired',
NOT_FOUND: 'Activation key not found',
RESERVATION_CONFLICT: 'Activation key not found',
@ -60,6 +60,43 @@ export class LicenseService {
});
}
async registerCommunityEdition({
email,
instanceId,
instanceUrl,
licenseType,
}: {
email: string;
instanceId: string;
instanceUrl: string;
licenseType: string;
}): Promise<{ title: string; text: string }> {
try {
const {
data: { licenseKey, ...rest },
} = await axios.post<{ title: string; text: string; licenseKey: string }>(
'https://enterprise.n8n.io/community-registered',
{
email,
instanceId,
instanceUrl,
licenseType,
},
);
this.eventService.emit('license-community-plus-registered', { email, licenseKey });
return rest;
} catch (e: unknown) {
if (e instanceof AxiosError) {
const error = e as AxiosError<{ message: string }>;
const errorMsg = error.response?.data?.message ?? e.message;
throw new BadRequestError('Failed to register community edition: ' + errorMsg);
} else {
this.logger.error('Failed to register community edition', { error: ensureError(e) });
throw new BadRequestError('Failed to register community edition');
}
}
}
getManagementJwt(): string {
return this.license.getManagementJwt();
}

View file

@ -29,6 +29,7 @@ import {
inE2ETests,
} from '@/constants';
import { Logger } from '@/logging/logger.service';
import { isContainedWithin } from '@/utils/path-util';
interface LoadedNodesAndCredentials {
nodes: INodeTypeData;
@ -155,14 +156,13 @@ export class LoadNodesAndCredentials {
resolveIcon(packageName: string, url: string): string | undefined {
const loader = this.loaders[packageName];
if (loader) {
const pathPrefix = `/icons/${packageName}/`;
const filePath = path.resolve(loader.directory, url.substring(pathPrefix.length));
if (!path.relative(loader.directory, filePath).includes('..')) {
return filePath;
}
if (!loader) {
return undefined;
}
return undefined;
const pathPrefix = `/icons/${packageName}/`;
const filePath = path.resolve(loader.directory, url.substring(pathPrefix.length));
return isContainedWithin(loader.directory, filePath) ? filePath : undefined;
}
getCustomDirectories(): string[] {
@ -260,7 +260,7 @@ export class LoadNodesAndCredentials {
dir: string,
) {
const loader = new constructor(dir, this.excludeNodes, this.includeNodes);
if (loader.packageName in this.loaders) {
if (loader instanceof PackageDirectoryLoader && loader.packageName in this.loaders) {
throw new ApplicationError(
picocolors.red(
`nodes package ${loader.packageName} is already loaded.\n Please delete this second copy at path ${dir}`,

View file

@ -11,6 +11,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@ -30,6 +31,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['file'],
scopes: [],
file: {
fileSizeMax: 100,
fileCountMax: 16,
@ -56,6 +58,7 @@ describe('Logger', () => {
logging: {
level: 'error',
outputs: ['console'],
scopes: [],
},
});
@ -74,6 +77,7 @@ describe('Logger', () => {
logging: {
level: 'warn',
outputs: ['console'],
scopes: [],
},
});
@ -92,6 +96,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@ -110,6 +115,7 @@ describe('Logger', () => {
logging: {
level: 'debug',
outputs: ['console'],
scopes: [],
},
});
@ -128,6 +134,7 @@ describe('Logger', () => {
logging: {
level: 'silent',
outputs: ['console'],
scopes: [],
},
});

View file

@ -1,11 +1,15 @@
import type { LogScope } from '@n8n/config';
import { GlobalConfig } from '@n8n/config';
import callsites from 'callsites';
import type { TransformableInfo } from 'logform';
import { InstanceSettings } from 'n8n-core';
import { LoggerProxy, LOG_LEVELS } from 'n8n-workflow';
import path, { basename } from 'node:path';
import pc from 'picocolors';
import { Service } from 'typedi';
import winston from 'winston';
import { inDevelopment, inProduction } from '@/constants';
import { isObjectLiteral } from '@/utils';
import { noOp } from './constants';
@ -13,10 +17,16 @@ import type { LogLocationMetadata, LogLevel, LogMetadata } from './types';
@Service()
export class Logger {
private readonly internalLogger: winston.Logger;
private internalLogger: winston.Logger;
private readonly level: LogLevel;
private readonly scopes: Set<LogScope>;
private get isScopingEnabled() {
return this.scopes.size > 0;
}
constructor(
private readonly globalConfig: GlobalConfig,
private readonly instanceSettings: InstanceSettings,
@ -33,15 +43,30 @@ export class Logger {
if (!isSilent) {
this.setLevel();
const { outputs } = this.globalConfig.logging;
const { outputs, scopes } = this.globalConfig.logging;
if (outputs.includes('console')) this.setConsoleTransport();
if (outputs.includes('file')) this.setFileTransport();
this.scopes = new Set(scopes);
}
LoggerProxy.init(this);
}
private setInternalLogger(internalLogger: winston.Logger) {
this.internalLogger = internalLogger;
}
withScope(scope: LogScope) {
const scopedLogger = new Logger(this.globalConfig, this.instanceSettings);
const childLogger = this.internalLogger.child({ scope });
scopedLogger.setInternalLogger(childLogger);
return scopedLogger;
}
private log(level: LogLevel, message: string, metadata: LogMetadata) {
const location: LogLocationMetadata = {};
@ -61,8 +86,7 @@ export class Logger {
for (const logLevel of LOG_LEVELS) {
if (levels[logLevel] > levels[this.level]) {
// winston defines `{ error: 0, warn: 1, info: 2, debug: 5 }`
// so numerically higher (less severe) log levels become no-op
// numerically higher (less severe) log levels become no-op
// to prevent overhead from `callsites` calls
Object.defineProperty(this, logLevel, { value: noOp });
}
@ -71,24 +95,72 @@ export class Logger {
private setConsoleTransport() {
const format =
this.level === 'debug'
? winston.format.combine(
winston.format.metadata(),
winston.format.timestamp(),
winston.format.colorize({ all: true }),
winston.format.printf(({ level, message, timestamp, metadata }) => {
const _metadata = this.toPrintable(metadata);
return `${timestamp} | ${level.padEnd(18)} | ${message}${_metadata}`;
}),
)
: winston.format.printf(({ message }: { message: string }) => message);
this.level === 'debug' && inDevelopment
? this.debugDevConsoleFormat()
: this.level === 'debug' && inProduction
? this.debugProdConsoleFormat()
: winston.format.printf(({ message }: { message: string }) => message);
this.internalLogger.add(new winston.transports.Console({ format }));
}
private scopeFilter() {
return winston.format((info: TransformableInfo & { metadata: LogMetadata }) => {
const shouldIncludeScope = info.metadata.scope && this.scopes.has(info.metadata.scope);
if (this.isScopingEnabled && !shouldIncludeScope) return false;
return info;
})();
}
private debugDevConsoleFormat() {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp({ format: () => this.devTsFormat() }),
winston.format.colorize({ all: true }),
this.scopeFilter(),
winston.format.printf(({ level: _level, message, timestamp, metadata: _metadata }) => {
const SEPARATOR = ' '.repeat(3);
const LOG_LEVEL_COLUMN_WIDTH = 15; // 5 columns + ANSI color codes
const level = _level.toLowerCase().padEnd(LOG_LEVEL_COLUMN_WIDTH, ' ');
const metadata = this.toPrintable(_metadata);
return [timestamp, level, message + ' ' + pc.dim(metadata)].join(SEPARATOR);
}),
);
}
private debugProdConsoleFormat() {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp(),
this.scopeFilter(),
winston.format.printf(({ level, message, timestamp, metadata }) => {
const _metadata = this.toPrintable(metadata);
return `${timestamp} | ${level.padEnd(5)} | ${message}${_metadata ? ' ' + _metadata : ''}`;
}),
);
}
private devTsFormat() {
const now = new Date();
const pad = (num: number, digits: number = 2) => num.toString().padStart(digits, '0');
const hours = pad(now.getHours());
const minutes = pad(now.getMinutes());
const seconds = pad(now.getSeconds());
const milliseconds = pad(now.getMilliseconds(), 3);
return `${hours}:${minutes}:${seconds}.${milliseconds}`;
}
private toPrintable(metadata: unknown) {
if (isObjectLiteral(metadata) && Object.keys(metadata).length > 0) {
return ' ' + JSON.stringify(metadata);
return inProduction
? JSON.stringify(metadata)
: JSON.stringify(metadata)
.replace(/{"/g, '{ "')
.replace(/,"/g, ', "')
.replace(/:/g, ': ')
.replace(/}/g, ' }'); // spacing for readability
}
return '';

View file

@ -1,7 +1,14 @@
import type { LogScope } from '@n8n/config';
import type { LOG_LEVELS } from './constants';
export type LogLevel = (typeof LOG_LEVELS)[number];
export type LogLocationMetadata = Partial<{ file: string; function: string }>;
export type LogMetadata = {
[key: string]: unknown;
scope?: LogScope;
file?: string;
function?: string;
};
export type LogMetadata = Record<string, unknown> | Error;
export type LogLocationMetadata = Pick<LogMetadata, 'file' | 'function'>;

View file

@ -586,5 +586,6 @@ export declare namespace AiAssistantRequest {
type Chat = AuthenticatedRequest<{}, {}, AiAssistantSDK.ChatRequestPayload>;
type SuggestionPayload = { sessionId: string; suggestionId: string };
type ApplySuggestion = AuthenticatedRequest<{}, {}, SuggestionPayload>;
type ApplySuggestionPayload = AuthenticatedRequest<{}, {}, SuggestionPayload>;
type AskAiPayload = AuthenticatedRequest<{}, {}, AiAssistantSDK.AskAiRequestPayload>;
}

View file

@ -1,16 +1,17 @@
import {
type IExecuteFunctions,
type Workflow,
type IRunExecutionData,
type INodeExecutionData,
type ITaskDataConnections,
type INode,
type WorkflowParameters,
type INodeParameters,
type WorkflowExecuteMode,
type IExecuteData,
type IDataObject,
type IWorkflowExecuteAdditionalData,
import type {
EnvProviderState,
IExecuteFunctions,
Workflow,
IRunExecutionData,
INodeExecutionData,
ITaskDataConnections,
INode,
WorkflowParameters,
INodeParameters,
WorkflowExecuteMode,
IExecuteData,
IDataObject,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';
import { nanoid } from 'nanoid';
@ -42,6 +43,7 @@ export interface TaskData {
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
envProviderState: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
@ -76,6 +78,7 @@ export interface AllCodeTaskData {
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
envProviderState: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
@ -137,6 +140,7 @@ export class TaskManager {
connectionInputData: INodeExecutionData[],
siblingParameters: INodeParameters,
mode: WorkflowExecuteMode,
envProviderState: EnvProviderState,
executeData?: IExecuteData,
defaultReturnRunIndex = -1,
selfData: IDataObject = {},
@ -153,6 +157,7 @@ export class TaskManager {
itemIndex,
siblingParameters,
mode,
envProviderState,
executeData,
defaultReturnRunIndex,
selfData,
@ -311,6 +316,7 @@ export class TaskManager {
contextNodeName: jd.contextNodeName,
defaultReturnRunIndex: jd.defaultReturnRunIndex,
mode: jd.mode,
envProviderState: jd.envProviderState,
node: jd.node,
runExecutionData: jd.runExecutionData,
runIndex: jd.runIndex,

View file

@ -52,7 +52,7 @@ describe('Publisher', () => {
expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId }),
JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }),
);
});
});
@ -61,7 +61,7 @@ describe('Publisher', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<PubSub.WorkerResponse>({
command: 'reload-external-secrets-providers',
command: 'get-worker-status',
});
await publisher.publishWorkerResponse(msg);

View file

@ -7,7 +7,9 @@ import type { ExternalSecretsManager } from '@/external-secrets/external-secrets
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';
import type { Publisher } from '../pubsub/publisher.service';
import { PubSubHandler } from '../pubsub/pubsub-handler';
import type { WorkerStatus } from '../worker-status';
describe('PubSubHandler', () => {
const eventService = new EventService();
@ -15,13 +17,19 @@ describe('PubSubHandler', () => {
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();
const publisher = mock<Publisher>();
const workerStatus = mock<WorkerStatus>();
afterEach(() => {
eventService.removeAllListeners();
});
describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });
it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
@ -30,9 +38,18 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
expect(setupWebhookHandlersSpy).toHaveBeenCalled();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
});
});
it('should reload license on `reload-license` event', () => {
@ -43,6 +60,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-license');
@ -58,6 +77,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('restart-event-bus');
@ -73,6 +94,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-external-secrets-providers');
@ -88,6 +111,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-install', {
@ -109,6 +134,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-update', {
@ -130,6 +157,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-uninstall', {
@ -139,4 +168,102 @@ describe('PubSubHandler', () => {
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});
describe('in worker process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'worker' });
it('should set up handlers in worker process', () => {
// @ts-expect-error Spying on private method
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
'get-worker-status': expect.any(Function),
});
});
it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-license');
expect(license.reload).toHaveBeenCalled();
});
it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('restart-event-bus');
expect(eventbus.restart).toHaveBeenCalled();
});
it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-external-secrets-providers');
expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});
it('should generate status on `get-worker-status` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('get-worker-status');
expect(workerStatus.generateStatus).toHaveBeenCalled();
});
});
});

View file

@ -6,7 +6,7 @@ import { ApplicationError } from 'n8n-workflow';
import Container from 'typedi';
import type { OrchestrationService } from '@/services/orchestration.service';
import { mockInstance } from '@test/mocking';
import { mockInstance, mockLogger } from '@test/mocking';
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
import type { JobProcessor } from '../job-processor';
@ -74,7 +74,7 @@ describe('ScalingService', () => {
instanceSettings.markAsLeader();
scalingService = new ScalingService(
mock(),
mockLogger(),
mock(),
jobProcessor,
globalConfig,

View file

@ -50,10 +50,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
mock<InstanceSettings>({ instanceType: 'webhook' }),
prometheusMetricsService,
mock(),
),
).toThrowError(AssertionError);
});
@ -75,10 +75,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
expect(procesExitSpy).toHaveBeenCalledWith(1);
@ -102,10 +102,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites';
@ -137,10 +137,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
await workerServer.init({ health: true, overwrites: false, metrics: true });
@ -158,10 +158,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
await expect(
workerServer.init({ health: false, overwrites: false, metrics: false }),
@ -176,10 +176,10 @@ describe('WorkerServer', () => {
globalConfig,
mock(),
mock(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
server.listen.mockImplementation((...args: unknown[]) => {

View file

@ -7,3 +7,17 @@ export const COMMAND_PUBSUB_CHANNEL = 'n8n.commands';
/** Pubsub channel for messages sent by workers in response to commands from main processes. */
export const WORKER_RESPONSE_PUBSUB_CHANNEL = 'n8n.worker-response';
/**
* Commands that should be sent to the sender as well, e.g. during workflow activation and
* deactivation in multi-main setup. */
export const SELF_SEND_COMMANDS = new Set([
'add-webhooks-triggers-and-pollers',
'remove-triggers-and-pollers',
]);
/**
* Commands that should not be debounced when received, e.g. during webhook handling in
* multi-main setup.
*/
export const IMMEDIATE_COMMANDS = new Set(['relay-execution-lifecycle-event']);

View file

@ -6,6 +6,7 @@ import { Logger } from '@/logging/logger.service';
import { RedisClientService } from '@/services/redis-client.service';
import type { PubSub } from './pubsub.types';
import { IMMEDIATE_COMMANDS, SELF_SEND_COMMANDS } from '../constants';
/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.
@ -43,7 +44,12 @@ export class Publisher {
async publishCommand(msg: Omit<PubSub.Command, 'senderId'>) {
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
JSON.stringify({
...msg,
senderId: config.getEnv('redis.queueModeId'),
selfSend: SELF_SEND_COMMANDS.has(msg.command),
debounce: !IMMEDIATE_COMMANDS.has(msg.command),
}),
);
this.logger.debug(`Published ${msg.command} to command channel`);

View file

@ -1,12 +1,17 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';
import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { assertNever } from '@/utils';
import { WorkerStatus } from '../worker-status';
/**
* Responsible for handling events emitted from messages received via a pubsub channel.
@ -20,10 +25,32 @@ export class PubSubHandler {
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
private readonly publisher: Publisher,
private readonly workerStatus: WorkerStatus,
) {}
init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
switch (this.instanceSettings.instanceType) {
case 'webhook':
this.setupHandlers(this.commonHandlers);
break;
case 'worker':
this.setupHandlers({
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-status',
payload: this.workerStatus.generateStatus(),
}),
});
break;
case 'main':
// TODO
break;
default:
assertNever(this.instanceSettings.instanceType);
}
}
private setupHandlers<EventNames extends keyof PubSubEventMap>(
@ -40,22 +67,27 @@ export class PubSubHandler {
}
}
// #region Webhook process
private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}
// #endregion
/** Handlers shared by webhook and worker processes. */
private commonHandlers: {
[K in keyof Pick<
PubSubEventMap,
| 'reload-license'
| 'restart-event-bus'
| 'reload-external-secrets-providers'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>]: (event: PubSubEventMap[K]) => Promise<void>;
} = {
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
};
}

View file

@ -22,6 +22,12 @@ export namespace PubSub {
senderId: string;
targets?: string[];
command: CommandKey;
/** Whether the command should be sent to the sender as well. */
selfSend?: boolean;
/** Whether the command should be debounced when received. */
debounce?: boolean;
} & (PubSubCommandMap[CommandKey] extends never
? { payload?: never } // some commands carry no payload
: { payload: PubSubCommandMap[CommandKey] });
@ -80,18 +86,6 @@ export namespace PubSub {
_ToWorkerResponse<WorkerResponseKey>
>;
namespace WorkerResponses {
export type RestartEventBus = ToWorkerResponse<'restart-event-bus'>;
export type ReloadExternalSecretsProviders =
ToWorkerResponse<'reload-external-secrets-providers'>;
export type GetWorkerId = ToWorkerResponse<'get-worker-id'>;
export type GetWorkerStatus = ToWorkerResponse<'get-worker-status'>;
}
/** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse =
| WorkerResponses.RestartEventBus
| WorkerResponses.ReloadExternalSecretsProviders
| WorkerResponses.GetWorkerId
| WorkerResponses.GetWorkerStatus;
export type WorkerResponse = ToWorkerResponse<'get-worker-status'>;
}

View file

@ -70,12 +70,15 @@ export class Subscriber {
// #region Commands
setCommandMessageHandler() {
const handlerFn = debounce((str: string) => {
const msg = this.parseCommandMessage(str);
if (msg) this.eventService.emit(msg.command, msg.payload);
}, 300);
const handlerFn = (msg: PubSub.Command) => this.eventService.emit(msg.command, msg.payload);
const debouncedHandlerFn = debounce(handlerFn, 300);
this.setMessageHandler('n8n.commands', handlerFn);
this.setMessageHandler('n8n.commands', (str: string) => {
const msg = this.parseCommandMessage(str);
if (!msg) return;
if (msg.debounce) debouncedHandlerFn(msg);
else handlerFn(msg);
});
}
private parseCommandMessage(str: string) {
@ -91,7 +94,10 @@ export class Subscriber {
const queueModeId = config.getEnv('redis.queueModeId');
if (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) {
if (
!msg.selfSend &&
(msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId)))
) {
this.logger.debug('Disregarding message - not for this instance', msg);
return null;

View file

@ -47,7 +47,9 @@ export class ScalingService {
private readonly instanceSettings: InstanceSettings,
private readonly orchestrationService: OrchestrationService,
private readonly eventService: EventService,
) {}
) {
this.logger = this.logger.withScope('scaling');
}
// #region Lifecycle
@ -77,7 +79,7 @@ export class ScalingService {
this.scheduleQueueMetrics();
this.logger.debug('[ScalingService] Queue setup completed');
this.logger.debug('Queue setup completed');
}
setupWorker(concurrency: number) {
@ -91,7 +93,7 @@ export class ScalingService {
// Errors thrown here will be sent to the main instance by bull. Logging
// them out and rethrowing them allows to find out which worker had the
// issue.
this.logger.error('[ScalingService] Executing a job errored', {
this.logger.error('Executing a job errored', {
jobId: job.id,
executionId: job.data.executionId,
error,
@ -101,19 +103,19 @@ export class ScalingService {
}
});
this.logger.debug('[ScalingService] Worker setup completed');
this.logger.debug('Worker setup completed');
}
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async stop() {
await this.queue.pause(true, true);
this.logger.debug('[ScalingService] Queue paused');
this.logger.debug('Queue paused');
this.stopQueueRecovery();
this.stopQueueMetrics();
this.logger.debug('[ScalingService] Queue recovery and metrics stopped');
this.logger.debug('Queue recovery and metrics stopped');
let count = 0;
@ -159,7 +161,7 @@ export class ScalingService {
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
this.logger.info(`[ScalingService] Added job ${job.id} (execution ${jobData.executionId})`);
this.logger.info(`Added job ${job.id} (execution ${jobData.executionId})`);
return job;
}
@ -180,16 +182,16 @@ export class ScalingService {
try {
if (await job.isActive()) {
await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('[ScalingService] Stopped active job', props);
this.logger.debug('Stopped active job', props);
return true;
}
await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
this.logger.debug('[ScalingService] Stopped inactive job', props);
this.logger.debug('Stopped inactive job', props);
return true;
} catch (error: unknown) {
await job.progress({ kind: 'abort-job' });
this.logger.error('[ScalingService] Failed to stop job', { ...props, error });
this.logger.error('Failed to stop job', { ...props, error });
return false;
}
}
@ -233,12 +235,12 @@ export class ScalingService {
* Even if Redis recovers, worker will remain unable to process jobs.
*/
if (error.message.includes('Error initializing Lua scripts')) {
this.logger.error('[ScalingService] Fatal error initializing worker', { error });
this.logger.error('[ScalingService] Exiting process...');
this.logger.error('Fatal error initializing worker', { error });
this.logger.error('Exiting process...');
process.exit(1);
}
this.logger.error('[ScalingService] Queue errored', { error });
this.logger.error('Queue errored', { error });
throw error;
});
@ -251,7 +253,7 @@ export class ScalingService {
this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
this.logger.error('[ScalingService] Queue errored', { error });
this.logger.error('Queue errored', { error });
throw error;
});
@ -361,10 +363,10 @@ export class ScalingService {
const nextWaitMs = await this.recoverFromQueue();
this.scheduleQueueRecovery(nextWaitMs);
} catch (error) {
this.logger.error('[ScalingService] Failed to recover dangling executions from queue', {
this.logger.error('Failed to recover dangling executions from queue', {
msg: this.toErrorMsg(error),
});
this.logger.error('[ScalingService] Retrying...');
this.logger.error('Retrying...');
this.scheduleQueueRecovery();
}
@ -372,7 +374,7 @@ export class ScalingService {
const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' ');
this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`);
this.logger.debug(`Scheduled queue recovery check for next ${wait}`);
}
private stopQueueRecovery() {
@ -389,7 +391,7 @@ export class ScalingService {
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
if (storedIds.length === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
@ -398,23 +400,22 @@ export class ScalingService {
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
if (queuedIds.size === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
if (danglingIds.length === 0) {
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
this.logger.debug('Completed queue recovery check, no dangling executions');
return waitMs;
}
await this.executionRepository.markAsCrashed(danglingIds);
this.logger.info(
'[ScalingService] Completed queue recovery check, recovered dangling executions',
{ danglingIds },
);
this.logger.info('Completed queue recovery check, recovered dangling executions', {
danglingIds,
});
// if this cycle used up the whole batch size, it is possible for there to be
// dangling executions outside this check, so speed up next cycle

View file

@ -2,7 +2,6 @@ import { GlobalConfig } from '@n8n/config';
import type { Application } from 'express';
import express from 'express';
import { InstanceSettings } from 'n8n-core';
import { ensureError } from 'n8n-workflow';
import { strict as assert } from 'node:assert';
import http from 'node:http';
import type { Server } from 'node:http';
@ -12,14 +11,13 @@ import { CredentialsOverwrites } from '@/credentials-overwrites';
import * as Db from '@/db';
import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error';
import { NonJsonBodyError } from '@/errors/non-json-body.error';
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { ExternalHooks } from '@/external-hooks';
import type { ICredentialsOverwrite } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { rawBodyReader, bodyParser } from '@/middlewares';
import * as ResponseHelper from '@/response-helper';
import { ScalingService } from '@/scaling/scaling.service';
import { RedisClientService } from '@/services/redis-client.service';
export type WorkerServerEndpointsConfig = {
/** Whether the `/healthz` endpoint is enabled. */
@ -52,11 +50,11 @@ export class WorkerServer {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly logger: Logger,
private readonly scalingService: ScalingService,
private readonly credentialsOverwrites: CredentialsOverwrites,
private readonly externalHooks: ExternalHooks,
private readonly instanceSettings: InstanceSettings,
private readonly prometheusMetricsService: PrometheusMetricsService,
private readonly redisClientService: RedisClientService,
) {
assert(this.instanceSettings.instanceType === 'worker');
@ -94,11 +92,14 @@ export class WorkerServer {
}
private async mountEndpoints() {
if (this.endpointsConfig.health) {
this.app.get('/healthz', async (req, res) => await this.healthcheck(req, res));
const { health, overwrites, metrics } = this.endpointsConfig;
if (health) {
this.app.get('/healthz', async (_, res) => res.send({ status: 'ok' }));
this.app.get('/healthz/readiness', async (_, res) => await this.readiness(_, res));
}
if (this.endpointsConfig.overwrites) {
if (overwrites) {
const { endpoint } = this.globalConfig.credentials.overwrite;
this.app.post(`/${endpoint}`, rawBodyReader, bodyParser, (req, res) =>
@ -106,39 +107,20 @@ export class WorkerServer {
);
}
if (this.endpointsConfig.metrics) {
if (metrics) {
await this.prometheusMetricsService.init(this.app);
}
}
private async healthcheck(_req: express.Request, res: express.Response) {
this.logger.debug('[WorkerServer] Health check started');
private async readiness(_req: express.Request, res: express.Response) {
const isReady =
Db.connectionState.connected &&
Db.connectionState.migrated &&
this.redisClientService.isConnected();
try {
await Db.getConnection().query('SELECT 1');
} catch (value) {
this.logger.error('[WorkerServer] No database connection', ensureError(value));
return ResponseHelper.sendErrorResponse(
res,
new ServiceUnavailableError('No database connection'),
);
}
try {
await this.scalingService.pingQueue();
} catch (value) {
this.logger.error('[WorkerServer] No Redis connection', ensureError(value));
return ResponseHelper.sendErrorResponse(
res,
new ServiceUnavailableError('No Redis connection'),
);
}
this.logger.debug('[WorkerServer] Health check succeeded');
ResponseHelper.sendSuccessResponse(res, { status: 'ok' }, true, 200);
return isReady
? res.status(200).send({ status: 'ok' })
: res.status(503).send({ status: 'error' });
}
private handleOverwrites(

View file

@ -0,0 +1,43 @@
import os from 'node:os';
import { Service } from 'typedi';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { JobProcessor } from './job-processor';
@Service()
export class WorkerStatus {
constructor(private readonly jobProcessor: JobProcessor) {}
generateStatus() {
return {
workerId: config.getEnv('redis.queueModeId'),
runningJobsSummary: this.jobProcessor.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: this.getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) =>
(interfaces ?? [])?.map((net) => ({
family: net.family,
address: net.address,
internal: net.internal,
})),
),
version: N8N_VERSION,
};
}
private getOsCpuString() {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}
}

View file

@ -39,7 +39,7 @@ import '@/controllers/annotation-tags.controller.ee';
import '@/controllers/auth.controller';
import '@/controllers/binary-data.controller';
import '@/controllers/curl.controller';
import '@/controllers/ai-assistant.controller';
import '@/controllers/ai.controller';
import '@/controllers/dynamic-node-parameters.controller';
import '@/controllers/invitation.controller';
import '@/controllers/me.controller';

View file

@ -1,3 +1,4 @@
import type { WorkerStatus } from '@n8n/api-types';
import type Redis from 'ioredis';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
@ -34,12 +35,10 @@ mockInstance(ActiveWorkflowManager);
let queueModeId: string;
const workerRestartEventBusResponse: PubSub.WorkerResponse = {
const workerStatusResponse: PubSub.WorkerResponse = {
workerId: 'test',
command: 'restart-event-bus',
payload: {
result: 'success',
},
command: 'get-worker-status',
payload: mock<WorkerStatus>(),
};
describe('Orchestration Service', () => {
@ -74,10 +73,10 @@ describe('Orchestration Service', () => {
test('should handle worker responses', async () => {
const response = await handleWorkerResponseMessageMain(
JSON.stringify(workerRestartEventBusResponse),
JSON.stringify(workerStatusResponse),
mock<MainResponseReceivedHandlerOptions>(),
);
expect(response?.command).toEqual('restart-event-bus');
expect(response?.command).toEqual('get-worker-status');
});
test('should handle command messages from others', async () => {
@ -94,10 +93,10 @@ describe('Orchestration Service', () => {
test('should reject command messages from itself', async () => {
const response = await handleCommandMessageMain(
JSON.stringify({ ...workerRestartEventBusResponse, senderId: queueModeId }),
JSON.stringify({ ...workerStatusResponse, senderId: queueModeId }),
);
expect(response).toBeDefined();
expect(response!.command).toEqual('restart-event-bus');
expect(response!.command).toEqual('get-worker-status');
expect(response!.senderId).toEqual(queueModeId);
expect(eventBus.restart).not.toHaveBeenCalled();
});
@ -105,7 +104,7 @@ describe('Orchestration Service', () => {
test('should send command messages', async () => {
// @ts-expect-error Private field
jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {});
await os.getWorkerIds();
await os.getWorkerStatus();
// @ts-expect-error Private field
expect(os.publisher.publishCommand).toHaveBeenCalled();
// @ts-expect-error Private field

View file

@ -3,7 +3,6 @@ import type { AiAssistantSDK } from '@n8n_io/ai-assistant-sdk';
import { AiAssistantClient } from '@n8n_io/ai-assistant-sdk';
import { assert, type IUser } from 'n8n-workflow';
import { Service } from 'typedi';
import type { Response } from 'undici';
import config from '@/config';
import type { AiAssistantRequest } from '@/requests';
@ -12,7 +11,7 @@ import { N8N_VERSION } from '../constants';
import { License } from '../license';
@Service()
export class AiAssistantService {
export class AiService {
private client: AiAssistantClient | undefined;
constructor(
@ -40,7 +39,7 @@ export class AiAssistantService {
});
}
async chat(payload: AiAssistantSDK.ChatRequestPayload, user: IUser): Promise<Response> {
async chat(payload: AiAssistantSDK.ChatRequestPayload, user: IUser) {
if (!this.client) {
await this.init();
}
@ -57,4 +56,13 @@ export class AiAssistantService {
return await this.client.applySuggestion(payload, { id: user.id });
}
async askAi(payload: AiAssistantSDK.AskAiRequestPayload, user: IUser) {
if (!this.client) {
await this.init();
}
assert(this.client, 'Assistant client not setup');
return await this.client.askAi(payload, { id: user.id });
}
}

View file

@ -212,8 +212,8 @@ export class FrontendService {
banners: {
dismissed: [],
},
ai: {
enabled: config.getEnv('ai.enabled'),
askAi: {
enabled: false,
},
workflowHistory: {
pruneTime: -1,
@ -274,6 +274,7 @@ export class FrontendService {
const isS3Available = config.getEnv('binaryDataManager.availableModes').includes('s3');
const isS3Licensed = this.license.isBinaryDataS3Licensed();
const isAiAssistantEnabled = this.license.isAiAssistantEnabled();
const isAskAiEnabled = this.license.isAskAiEnabled();
this.settings.license.planName = this.license.getPlanName();
this.settings.license.consumerId = this.license.getConsumerId();
@ -330,6 +331,10 @@ export class FrontendService {
this.settings.aiAssistant.enabled = isAiAssistantEnabled;
}
if (isAskAiEnabled) {
this.settings.askAi.enabled = isAskAiEnabled;
}
this.settings.mfa.enabled = config.get('mfa.enabled');
this.settings.executionMode = config.getEnv('executions.mode');

View file

@ -88,8 +88,7 @@ export class ImportService {
try {
await replaceInvalidCredentials(workflow);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
this.logger.error('Failed to replace invalid credential', error);
this.logger.error('Failed to replace invalid credential', { error: e });
}
}

View file

@ -128,16 +128,6 @@ export class OrchestrationService {
});
}
async getWorkerIds() {
if (!this.sanityCheck()) return;
const command = 'get-worker-id';
this.logger.debug(`Sending "${command}" to command channel`);
await this.publisher.publishCommand({ command });
}
// ----------------------------------
// activations
// ----------------------------------

View file

@ -27,17 +27,11 @@ export async function handleCommandMessageMain(messageString: string) {
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
);
const selfSendingAllowed = [
'add-webhooks-triggers-and-pollers',
'remove-triggers-and-pollers',
].includes(message.command);
if (
!selfSendingAllowed &&
!message.selfSend &&
(message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId)))
) {
// Skipping command message because it's not for this instance
logger.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);

View file

@ -4,6 +4,7 @@ import Container from 'typedi';
import { Logger } from '@/logging/logger.service';
import { WORKER_RESPONSE_PUBSUB_CHANNEL } from '@/scaling/constants';
import type { PubSub } from '@/scaling/pubsub/pubsub.types';
import { assertNever } from '@/utils';
import type { MainResponseReceivedHandlerOptions } from './types';
import { Push } from '../../../push';
@ -32,12 +33,8 @@ export async function handleWorkerResponseMessageMain(
status: workerResponse.payload,
});
break;
case 'get-worker-id':
break;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
assertNever(workerResponse.command);
}
return workerResponse;

Some files were not shown because too many files have changed in this diff Show more