Merge branch 'master' into ai-520-impact-of-test-executions-on-concurrency-limits

# Conflicts:
#	packages/cli/src/concurrency/concurrency-control.service.ts
This commit is contained in:
Eugene Molodkin 2025-01-06 14:45:49 +01:00
commit 29603f2dd5
No known key found for this signature in database
614 changed files with 8205 additions and 1928 deletions

View file

@ -1,6 +1,8 @@
name: Chromatic
on:
schedule:
- cron: '0 0 * * *'
workflow_dispatch:
pull_request_review:
types: [submitted]
@ -70,7 +72,7 @@ jobs:
exitZeroOnChanges: false
- name: Success comment
if: steps.chromatic_tests.outcome == 'success'
if: steps.chromatic_tests.outcome == 'success' && github.ref != 'refs/heads/master'
uses: peter-evans/create-or-update-comment@v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}
@ -80,7 +82,7 @@ jobs:
:white_check_mark: No visual regressions found.
- name: Fail comment
if: steps.chromatic_tests.outcome != 'success'
if: steps.chromatic_tests.outcome != 'success' && github.ref != 'refs/heads/master'
uses: peter-evans/create-or-update-comment@v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}

View file

@ -47,6 +47,7 @@ jobs:
nodeVersion: ${{ matrix.node-version }}
cacheKey: ${{ github.sha }}-base:build
collectCoverage: ${{ matrix.node-version == '20.x' }}
ignoreTurboCache: ${{ matrix.node-version == '20.x' }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View file

@ -106,7 +106,7 @@ jobs:
- name: Test MariaDB
working-directory: packages/cli
run: pnpm test:mariadb --testTimeout 20000
run: pnpm test:mariadb --testTimeout 30000
postgres:
name: Postgres

View file

@ -22,6 +22,10 @@ on:
required: false
default: false
type: boolean
ignoreTurboCache:
required: false
default: false
type: boolean
secrets:
CODECOV_TOKEN:
description: 'Codecov upload token.'
@ -32,6 +36,7 @@ jobs:
name: Unit tests
runs-on: ubuntu-latest
env:
TURBO_FORCE: ${{ inputs.ignoreTurboCache }}
COVERAGE_ENABLED: ${{ inputs.collectCoverage }}
steps:
- uses: actions/checkout@v4.1.1
@ -49,7 +54,6 @@ jobs:
run: pnpm install --frozen-lockfile
- name: Setup build cache
if: inputs.collectCoverage != true
uses: rharkor/caching-for-turbo@v1.5
- name: Build
@ -74,6 +78,6 @@ jobs:
- name: Upload coverage to Codecov
if: inputs.collectCoverage
uses: codecov/codecov-action@v4.5.0
uses: codecov/codecov-action@v5.1.2
with:
token: ${{ secrets.CODECOV_TOKEN }}

View file

@ -18,11 +18,16 @@ n8n is a workflow automation platform that gives technical teams the flexibility
Try n8n instantly with [npx](https://docs.n8n.io/hosting/installation/npm/) (requires [Node.js](https://nodejs.org/en/)):
`npx n8n`
```
npx n8n
```
Or deploy with [Docker](https://docs.n8n.io/hosting/installation/docker/):
`docker run -it --rm --name n8n -p 5678:5678 docker.n8n.io/n8nio/n8n`
```
docker volume create n8n_data
docker run -it --rm --name n8n -p 5678:5678 -v n8n_data:/home/node/.n8n docker.n8n.io/n8nio/n8n
```
Access the editor at http://localhost:5678

58
codecov.yml Normal file
View file

@ -0,0 +1,58 @@
codecov:
max_report_age: off
require_ci_to_pass: true
coverage:
status:
patch: false
project:
default:
threshold: 0.5%
github_checks:
annotations: false
flags:
tests:
paths:
- "**"
carryforward: true
component_management:
default_rules:
statuses:
- type: project
target: auto
branches:
- "!master"
individual_components:
- component_id: backend_packages
name: Backend
paths:
- packages/@n8n/api-types/**
- packages/@n8n/config/**
- packages/@n8n/client-oauth2/**
- packages/@n8n/imap/**
- packages/@n8n/permissions/**
- packages/@n8n/task-runner/**
- packages/n8n-workflow/**
- packages/n8n-core/**
- packages/n8n-node-dev/**
- packages/n8n/**
- component_id: frontend_packages
name: Frontend
paths:
- packages/@n8n/chat/**
- packages/@n8n/codemirror-lang/**
- packages/n8n-design-system/**
- packages/n8n-editor-ui/**
- component_id: nodes_packages
name: Nodes
paths:
- packages/n8n-nodes-base/**
- packages/@n8n/n8n-nodes-langchain/**
ignore:
- (?s:.*/[^\/]*\.spec\.ts.*)\Z
- (?s:.*/[^\/]*\.test\.ts.*)\Z
- (?s:.*/[^\/]*e2e[^\/]*\.ts.*)\Z

View file

@ -2,7 +2,7 @@
* Getters
*/
import { getVisibleSelect } from '../utils';
import { getVisibleSelect } from '../utils/popper';
export function getCredentialSelect(eq = 0) {
return cy.getByTestId('node-credentials-select').eq(eq);

View file

@ -29,7 +29,11 @@ export const getAddProjectButton = () => {
return cy.get('@button');
};
export const getAddFirstProjectButton = () => cy.getByTestId('add-first-project-button');
export const getIconPickerButton = () => cy.getByTestId('icon-picker-button');
export const getIconPickerTab = (tab: string) => cy.getByTestId('icon-picker-tabs').contains(tab);
export const getIconPickerIcons = () => cy.getByTestId('icon-picker-icon');
export const getIconPickerEmojis = () => cy.getByTestId('icon-picker-emoji');
// export const getAddProjectButton = () =>
// cy.getByTestId('universal-add').should('contain', 'Add project').should('be.visible');
export const getProjectTabs = () => cy.getByTestId('project-tabs').find('a');

View file

@ -1,4 +1,5 @@
import { getManualChatModal } from './modals/chat-modal';
import { clickGetBackToCanvas, getParameterInputByName } from './ndv';
import { ROUTES } from '../constants';
/**
@ -127,7 +128,7 @@ export function navigateToNewWorkflowPage(preventNodeViewUnload = true) {
});
}
export function addSupplementalNodeToParent(
function connectNodeToParent(
nodeName: string,
endpointType: EndpointType,
parentNodeName: string,
@ -141,6 +142,15 @@ export function addSupplementalNodeToParent(
} else {
getNodeCreatorItems().contains(nodeName).click();
}
}
export function addSupplementalNodeToParent(
nodeName: string,
endpointType: EndpointType,
parentNodeName: string,
exactMatch = false,
) {
connectNodeToParent(nodeName, endpointType, parentNodeName, exactMatch);
getConnectionBySourceAndTarget(parentNodeName, nodeName).should('exist');
}
@ -160,6 +170,15 @@ export function addToolNodeToParent(nodeName: string, parentNodeName: string) {
addSupplementalNodeToParent(nodeName, 'ai_tool', parentNodeName);
}
export function addVectorStoreToolToParent(nodeName: string, parentNodeName: string) {
connectNodeToParent(nodeName, 'ai_tool', parentNodeName, false);
getParameterInputByName('mode')
.find('input')
.should('have.value', 'Retrieve Documents (As Tool for AI Agent)');
clickGetBackToCanvas();
getConnectionBySourceAndTarget(nodeName, parentNodeName).should('exist');
}
export function addOutputParserNodeToParent(nodeName: string, parentNodeName: string) {
addSupplementalNodeToParent(nodeName, 'ai_outputParser', parentNodeName);
}

View file

@ -41,7 +41,9 @@ describe('Data mapping', () => {
ndv.actions.mapDataFromHeader(1, 'value');
ndv.getters.inlineExpressionEditorInput().should('have.text', '{{ $json.timestamp }}');
ndv.getters.inlineExpressionEditorInput().type('{esc}');
ndv.getters.parameterExpressionPreview('value').should('include.text', '2024');
ndv.getters
.parameterExpressionPreview('value')
.should('include.text', new Date().getFullYear());
ndv.actions.mapDataFromHeader(2, 'value');
ndv.getters
@ -113,6 +115,8 @@ describe('Data mapping', () => {
});
it('maps expressions from json view', () => {
// ADO-3063 - followup to make this viewport global
cy.viewport('macbook-16');
cy.fixture('Test_workflow_3.json').then((data) => {
cy.get('body').paste(JSON.stringify(data));
});
@ -121,17 +125,17 @@ describe('Data mapping', () => {
workflowPage.actions.openNode('Set');
ndv.actions.switchInputMode('JSON');
ndv.getters.inputDataContainer().should('exist');
ndv.getters
.inputDataContainer()
.should('exist')
.find('.json-data')
.should(
'have.text',
'[{"input": [{"count": 0,"with space": "!!","with.dot": "!!","with"quotes": "!!"}]},{"input": [{"count": 1}]}]',
)
.find('span')
.contains('"count"')
.realMouseDown();
);
ndv.getters.inputDataContainer().find('span').contains('"count"').realMouseDown();
ndv.actions.mapToParameter('value');
ndv.getters.inlineExpressionEditorInput().should('have.text', '{{ $json.input[0].count }}');

View file

@ -15,7 +15,7 @@ import {
NDV,
MainSidebar,
} from '../pages';
import { clearNotifications } from '../pages/notifications';
import { clearNotifications, successToast } from '../pages/notifications';
import { getVisibleDropdown, getVisibleModalOverlay, getVisibleSelect } from '../utils';
const workflowsPage = new WorkflowsPage();
@ -830,4 +830,23 @@ describe('Projects', { disableAutoLogin: true }, () => {
.should('not.have.length');
});
});
it('should set and update project icon', () => {
const DEFAULT_ICON = 'fa-layer-group';
const NEW_PROJECT_NAME = 'Test Project';
cy.signinAsAdmin();
cy.visit(workflowsPage.url);
projects.createProject(NEW_PROJECT_NAME);
// New project should have default icon
projects.getIconPickerButton().find('svg').should('have.class', DEFAULT_ICON);
// Choose another icon
projects.getIconPickerButton().click();
projects.getIconPickerTab('Emojis').click();
projects.getIconPickerEmojis().first().click();
// Project should be updated with new icon
successToast().contains('Project icon updated successfully');
projects.getIconPickerButton().should('contain', '😀');
projects.getMenuItems().contains(NEW_PROJECT_NAME).should('contain', '😀');
});
});

View file

@ -1,10 +1,12 @@
import { clickGetBackToCanvas } from '../composables/ndv';
import {
addNodeToCanvas,
addRetrieverNodeToParent,
addVectorStoreNodeToParent,
addVectorStoreToolToParent,
getNodeCreatorItems,
} from '../composables/workflow';
import { IF_NODE_NAME } from '../constants';
import { AGENT_NODE_NAME, IF_NODE_NAME, MANUAL_CHAT_TRIGGER_NODE_NAME } from '../constants';
import { NodeCreator } from '../pages/features/node-creator';
import { NDV } from '../pages/ndv';
import { WorkflowPage as WorkflowPageClass } from '../pages/workflow';
@ -536,7 +538,7 @@ describe('Node Creator', () => {
});
});
it('should add node directly for sub-connection', () => {
it('should add node directly for sub-connection as vector store', () => {
addNodeToCanvas('Question and Answer Chain', true);
addRetrieverNodeToParent('Vector Store Retriever', 'Question and Answer Chain');
cy.realPress('Escape');
@ -544,4 +546,12 @@ describe('Node Creator', () => {
cy.realPress('Escape');
WorkflowPage.getters.canvasNodes().should('have.length', 4);
});
it('should add node directly for sub-connection as tool', () => {
addNodeToCanvas(MANUAL_CHAT_TRIGGER_NODE_NAME, true);
addNodeToCanvas(AGENT_NODE_NAME, true, true);
clickGetBackToCanvas();
addVectorStoreToolToParent('In-Memory Vector Store', AGENT_NODE_NAME);
});
});

View file

@ -84,7 +84,6 @@
"ws": ">=8.17.1"
},
"patchedDependencies": {
"typedi@0.10.0": "patches/typedi@0.10.0.patch",
"pkce-challenge@3.0.0": "patches/pkce-challenge@3.0.0.patch",
"pyodide@0.23.4": "patches/pyodide@0.23.4.patch",
"@types/express-serve-static-core@4.17.43": "patches/@types__express-serve-static-core@4.17.43.patch",

View file

@ -27,6 +27,6 @@
"dependencies": {
"xss": "catalog:",
"zod": "catalog:",
"zod-class": "0.0.15"
"zod-class": "0.0.16"
}
}

View file

@ -0,0 +1,55 @@
import { CredentialsGetManyRequestQuery } from '../credentials-get-many-request.dto';
describe('CredentialsGetManyRequestQuery', () => {
describe('should pass validation', () => {
it('with empty object', () => {
const data = {};
const result = CredentialsGetManyRequestQuery.safeParse(data);
expect(result.success).toBe(true);
});
test.each([
{ field: 'includeScopes', value: 'true' },
{ field: 'includeScopes', value: 'false' },
{ field: 'includeData', value: 'true' },
{ field: 'includeData', value: 'false' },
])('with $field set to $value', ({ field, value }) => {
const data = { [field]: value };
const result = CredentialsGetManyRequestQuery.safeParse(data);
expect(result.success).toBe(true);
});
it('with both parameters set', () => {
const data = {
includeScopes: 'true',
includeData: 'true',
};
const result = CredentialsGetManyRequestQuery.safeParse(data);
expect(result.success).toBe(true);
});
});
describe('should fail validation', () => {
test.each([
{ field: 'includeScopes', value: true },
{ field: 'includeScopes', value: false },
{ field: 'includeScopes', value: 'invalid' },
{ field: 'includeData', value: true },
{ field: 'includeData', value: false },
{ field: 'includeData', value: 'invalid' },
])('with invalid value $value for $field', ({ field, value }) => {
const data = { [field]: value };
const result = CredentialsGetManyRequestQuery.safeParse(data);
expect(result.success).toBe(false);
expect(result.error?.issues[0].path[0]).toBe(field);
});
});
});

View file

@ -0,0 +1,52 @@
import { CredentialsGetOneRequestQuery } from '../credentials-get-one-request.dto';
describe('CredentialsGetManyRequestQuery', () => {
describe('should pass validation', () => {
it('with empty object', () => {
const data = {};
const result = CredentialsGetOneRequestQuery.safeParse(data);
expect(result.success).toBe(true);
// defaults to false
expect(result.data?.includeData).toBe(false);
});
test.each([
{ field: 'includeData', value: 'true' },
{ field: 'includeData', value: 'false' },
])('with $field set to $value', ({ field, value }) => {
const data = { [field]: value };
const result = CredentialsGetOneRequestQuery.safeParse(data);
expect(result.success).toBe(true);
});
it('with both parameters set', () => {
const data = {
includeScopes: 'true',
includeData: 'true',
};
const result = CredentialsGetOneRequestQuery.safeParse(data);
expect(result.success).toBe(true);
});
});
describe('should fail validation', () => {
test.each([
{ field: 'includeData', value: true },
{ field: 'includeData', value: false },
{ field: 'includeData', value: 'invalid' },
])('with invalid value $value for $field', ({ field, value }) => {
const data = { [field]: value };
const result = CredentialsGetOneRequestQuery.safeParse(data);
expect(result.success).toBe(false);
expect(result.error?.issues[0].path[0]).toBe(field);
});
});
});

View file

@ -0,0 +1,22 @@
import { Z } from 'zod-class';
import { booleanFromString } from '../../schemas/booleanFromString';
export class CredentialsGetManyRequestQuery extends Z.class({
/**
* Adds the `scopes` field to each credential which includes all scopes the
* requesting user has in relation to the credential, e.g.
* ['credential:read', 'credential:update']
*/
includeScopes: booleanFromString.optional(),
/**
* Adds the decrypted `data` field to each credential.
*
* It only does this for credentials for which the user has the
* `credential:update` scope.
*
* This switches `includeScopes` to true to be able to check for the scopes
*/
includeData: booleanFromString.optional(),
}) {}

View file

@ -0,0 +1,13 @@
import { Z } from 'zod-class';
import { booleanFromString } from '../../schemas/booleanFromString';
export class CredentialsGetOneRequestQuery extends Z.class({
/**
* Adds the decrypted `data` field to each credential.
*
* It only does this for credentials for which the user has the
* `credential:update` scope.
*/
includeData: booleanFromString.optional().default('false'),
}) {}

View file

@ -0,0 +1,81 @@
import { ActionResultRequestDto } from '../action-result-request.dto';
describe('ActionResultRequestDto', () => {
const baseValidRequest = {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
handler: 'testHandler',
currentNodeParameters: {},
};
describe('Valid requests', () => {
test.each([
{
name: 'minimal valid request',
request: baseValidRequest,
},
{
name: 'request with payload',
request: {
...baseValidRequest,
payload: { key: 'value' },
},
},
{
name: 'request with credentials',
request: {
...baseValidRequest,
credentials: { testCredential: { id: 'cred1', name: 'Test Cred' } },
},
},
{
name: 'request with current node parameters',
request: {
...baseValidRequest,
currentNodeParameters: { param1: 'value1' },
},
},
])('should validate $name', ({ request }) => {
const result = ActionResultRequestDto.safeParse(request);
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'missing path',
request: {
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
handler: 'testHandler',
},
expectedErrorPath: ['path'],
},
{
name: 'missing handler',
request: {
path: '/test/path',
currentNodeParameters: {},
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
},
expectedErrorPath: ['handler'],
},
{
name: 'invalid node version',
request: {
...baseValidRequest,
nodeTypeAndVersion: { name: 'TestNode', version: 0 },
},
expectedErrorPath: ['nodeTypeAndVersion', 'version'],
},
])('should fail validation for $name', ({ request, expectedErrorPath }) => {
const result = ActionResultRequestDto.safeParse(request);
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
});
});

View file

@ -0,0 +1,90 @@
import { OptionsRequestDto } from '../options-request.dto';
describe('OptionsRequestDto', () => {
const baseValidRequest = {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
currentNodeParameters: {},
};
describe('Valid requests', () => {
test.each([
{
name: 'minimal valid request',
request: baseValidRequest,
},
{
name: 'request with method name',
request: {
...baseValidRequest,
methodName: 'testMethod',
},
},
{
name: 'request with load options',
request: {
...baseValidRequest,
loadOptions: {
routing: {
operations: { someOperation: 'test' },
output: { someOutput: 'test' },
request: { someRequest: 'test' },
},
},
},
},
{
name: 'request with credentials',
request: {
...baseValidRequest,
credentials: { testCredential: { id: 'cred1', name: 'Test Cred' } },
},
},
{
name: 'request with current node parameters',
request: {
...baseValidRequest,
currentNodeParameters: { param1: 'value1' },
},
},
])('should validate $name', ({ request }) => {
const result = OptionsRequestDto.safeParse(request);
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'missing path',
request: {
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
},
expectedErrorPath: ['path'],
},
{
name: 'missing node type and version',
request: {
path: '/test/path',
},
expectedErrorPath: ['nodeTypeAndVersion'],
},
{
name: 'invalid node version',
request: {
...baseValidRequest,
nodeTypeAndVersion: { name: 'TestNode', version: 0 },
},
expectedErrorPath: ['nodeTypeAndVersion', 'version'],
},
])('should fail validation for $name', ({ request, expectedErrorPath }) => {
const result = OptionsRequestDto.safeParse(request);
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
});
});

View file

@ -0,0 +1,95 @@
import { ResourceLocatorRequestDto } from '../resource-locator-request.dto';
describe('ResourceLocatorRequestDto', () => {
const baseValidRequest = {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
methodName: 'testMethod',
currentNodeParameters: {},
};
describe('Valid requests', () => {
test.each([
{
name: 'minimal valid request',
request: baseValidRequest,
},
{
name: 'request with filter',
request: {
...baseValidRequest,
filter: 'testFilter',
},
},
{
name: 'request with pagination token',
request: {
...baseValidRequest,
paginationToken: 'token123',
},
},
{
name: 'request with credentials',
request: {
...baseValidRequest,
credentials: { testCredential: { id: 'cred1', name: 'Test Cred' } },
},
},
{
name: 'request with current node parameters',
request: {
...baseValidRequest,
currentNodeParameters: { param1: 'value1' },
},
},
{
name: 'request with a semver node version',
request: {
...baseValidRequest,
nodeTypeAndVersion: { name: 'TestNode', version: 1.1 },
},
},
])('should validate $name', ({ request }) => {
const result = ResourceLocatorRequestDto.safeParse(request);
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'missing path',
request: {
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
methodName: 'testMethod',
},
expectedErrorPath: ['path'],
},
{
name: 'missing method name',
request: {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
currentNodeParameters: {},
},
expectedErrorPath: ['methodName'],
},
{
name: 'invalid node version',
request: {
...baseValidRequest,
nodeTypeAndVersion: { name: 'TestNode', version: 0 },
},
expectedErrorPath: ['nodeTypeAndVersion', 'version'],
},
])('should fail validation for $name', ({ request, expectedErrorPath }) => {
const result = ResourceLocatorRequestDto.safeParse(request);
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
});
});

View file

@ -0,0 +1,74 @@
import { ResourceMapperFieldsRequestDto } from '../resource-mapper-fields-request.dto';
describe('ResourceMapperFieldsRequestDto', () => {
const baseValidRequest = {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
methodName: 'testMethod',
currentNodeParameters: {},
};
describe('Valid requests', () => {
test.each([
{
name: 'minimal valid request',
request: baseValidRequest,
},
{
name: 'request with credentials',
request: {
...baseValidRequest,
credentials: { testCredential: { id: 'cred1', name: 'Test Cred' } },
},
},
{
name: 'request with current node parameters',
request: {
...baseValidRequest,
currentNodeParameters: { param1: 'value1' },
},
},
])('should validate $name', ({ request }) => {
const result = ResourceMapperFieldsRequestDto.safeParse(request);
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'missing path',
request: {
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
methodName: 'testMethod',
},
expectedErrorPath: ['path'],
},
{
name: 'missing method name',
request: {
path: '/test/path',
nodeTypeAndVersion: { name: 'TestNode', version: 1 },
currentNodeParameters: {},
},
expectedErrorPath: ['methodName'],
},
{
name: 'invalid node version',
request: {
...baseValidRequest,
nodeTypeAndVersion: { name: 'TestNode', version: 0 },
},
expectedErrorPath: ['nodeTypeAndVersion', 'version'],
},
])('should fail validation for $name', ({ request, expectedErrorPath }) => {
const result = ResourceMapperFieldsRequestDto.safeParse(request);
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
});
});

View file

@ -0,0 +1,11 @@
import type { IDataObject } from 'n8n-workflow';
import { z } from 'zod';
import { BaseDynamicParametersRequestDto } from './base-dynamic-parameters-request.dto';
export class ActionResultRequestDto extends BaseDynamicParametersRequestDto.extend({
handler: z.string(),
payload: z
.union([z.object({}).catchall(z.any()) satisfies z.ZodType<IDataObject>, z.string()])
.optional(),
}) {}

View file

@ -0,0 +1,18 @@
import type { INodeCredentials, INodeParameters, INodeTypeNameVersion } from 'n8n-workflow';
import { z } from 'zod';
import { Z } from 'zod-class';
import { nodeVersionSchema } from '../../schemas/nodeVersion.schema';
export class BaseDynamicParametersRequestDto extends Z.class({
path: z.string(),
nodeTypeAndVersion: z.object({
name: z.string(),
version: nodeVersionSchema,
}) satisfies z.ZodType<INodeTypeNameVersion>,
currentNodeParameters: z.record(z.string(), z.any()) satisfies z.ZodType<INodeParameters>,
methodName: z.string().optional(),
credentials: z.record(z.string(), z.any()).optional() satisfies z.ZodType<
INodeCredentials | undefined
>,
}) {}

View file

@ -0,0 +1,18 @@
import type { ILoadOptions } from 'n8n-workflow';
import { z } from 'zod';
import { BaseDynamicParametersRequestDto } from './base-dynamic-parameters-request.dto';
export class OptionsRequestDto extends BaseDynamicParametersRequestDto.extend({
loadOptions: z
.object({
routing: z
.object({
operations: z.any().optional(),
output: z.any().optional(),
request: z.any().optional(),
})
.optional(),
})
.optional() as z.ZodType<ILoadOptions | undefined>,
}) {}

View file

@ -0,0 +1,9 @@
import { z } from 'zod';
import { BaseDynamicParametersRequestDto } from './base-dynamic-parameters-request.dto';
export class ResourceLocatorRequestDto extends BaseDynamicParametersRequestDto.extend({
methodName: z.string(),
filter: z.string().optional(),
paginationToken: z.string().optional(),
}) {}

View file

@ -0,0 +1,7 @@
import { z } from 'zod';
import { BaseDynamicParametersRequestDto } from './base-dynamic-parameters-request.dto';
export class ResourceMapperFieldsRequestDto extends BaseDynamicParametersRequestDto.extend({
methodName: z.string(),
}) {}

View file

@ -6,6 +6,11 @@ export { AiFreeCreditsRequestDto } from './ai/ai-free-credits-request.dto';
export { LoginRequestDto } from './auth/login-request.dto';
export { ResolveSignupTokenQueryDto } from './auth/resolve-signup-token-query.dto';
export { OptionsRequestDto } from './dynamic-node-parameters/options-request.dto';
export { ResourceLocatorRequestDto } from './dynamic-node-parameters/resource-locator-request.dto';
export { ResourceMapperFieldsRequestDto } from './dynamic-node-parameters/resource-mapper-fields-request.dto';
export { ActionResultRequestDto } from './dynamic-node-parameters/action-result-request.dto';
export { InviteUsersRequestDto } from './invitation/invite-users-request.dto';
export { AcceptInvitationRequestDto } from './invitation/accept-invitation-request.dto';
@ -16,6 +21,10 @@ export { ForgotPasswordRequestDto } from './password-reset/forgot-password-reque
export { ResolvePasswordTokenQueryDto } from './password-reset/resolve-password-token-query.dto';
export { ChangePasswordRequestDto } from './password-reset/change-password-request.dto';
export { SamlAcsDto } from './saml/saml-acs.dto';
export { SamlPreferences } from './saml/saml-preferences.dto';
export { SamlToggleDto } from './saml/saml-toggle.dto';
export { PasswordUpdateRequestDto } from './user/password-update-request.dto';
export { RoleChangeRequestDto } from './user/role-change-request.dto';
export { SettingsUpdateRequestDto } from './user/settings-update-request.dto';
@ -24,3 +33,7 @@ export { UserUpdateRequestDto } from './user/user-update-request.dto';
export { CommunityRegisteredRequestDto } from './license/community-registered-request.dto';
export { VariableListRequestDto } from './variables/variables-list-request.dto';
export { CredentialsGetOneRequestQuery } from './credentials/credentials-get-one-request.dto';
export { CredentialsGetManyRequestQuery } from './credentials/credentials-get-many-request.dto';
export { ImportWorkflowFromUrlDto } from './workflows/import-workflow-from-url.dto';

View file

@ -0,0 +1,155 @@
import { SamlPreferences } from '../saml-preferences.dto';
describe('SamlPreferences', () => {
describe('Valid requests', () => {
test.each([
{
name: 'valid minimal configuration',
request: {
mapping: {
email: 'user@example.com',
firstName: 'John',
lastName: 'Doe',
userPrincipalName: 'johndoe',
},
metadata: '<xml>metadata</xml>',
metadataUrl: 'https://example.com/metadata',
loginEnabled: true,
loginLabel: 'Login with SAML',
},
},
{
name: 'valid full configuration',
request: {
mapping: {
email: 'user@example.com',
firstName: 'John',
lastName: 'Doe',
userPrincipalName: 'johndoe',
},
metadata: '<xml>metadata</xml>',
metadataUrl: 'https://example.com/metadata',
ignoreSSL: true,
loginBinding: 'post',
loginEnabled: true,
loginLabel: 'Login with SAML',
authnRequestsSigned: true,
wantAssertionsSigned: true,
wantMessageSigned: true,
acsBinding: 'redirect',
signatureConfig: {
prefix: 'ds',
location: {
reference: '/samlp:Response/saml:Issuer',
action: 'after',
},
},
relayState: 'https://example.com/relay',
},
},
])('should validate $name', ({ request }) => {
const result = SamlPreferences.safeParse(request);
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'invalid loginBinding',
request: {
loginBinding: 'invalid',
},
expectedErrorPath: ['loginBinding'],
},
{
name: 'invalid acsBinding',
request: {
acsBinding: 'invalid',
},
expectedErrorPath: ['acsBinding'],
},
{
name: 'invalid signatureConfig location action',
request: {
signatureConfig: {
prefix: 'ds',
location: {
reference: '/samlp:Response/saml:Issuer',
action: 'invalid',
},
},
},
expectedErrorPath: ['signatureConfig', 'location', 'action'],
},
{
name: 'missing signatureConfig location reference',
request: {
signatureConfig: {
prefix: 'ds',
location: {
action: 'after',
},
},
},
expectedErrorPath: ['signatureConfig', 'location', 'reference'],
},
{
name: 'invalid mapping email',
request: {
mapping: {
email: 123,
firstName: 'John',
lastName: 'Doe',
userPrincipalName: 'johndoe',
},
},
expectedErrorPath: ['mapping', 'email'],
},
])('should fail validation for $name', ({ request, expectedErrorPath }) => {
const result = SamlPreferences.safeParse(request);
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
describe('Edge cases', () => {
test('should handle optional fields correctly', () => {
const validRequest = {
mapping: undefined,
metadata: undefined,
metadataUrl: undefined,
loginEnabled: undefined,
loginLabel: undefined,
};
const result = SamlPreferences.safeParse(validRequest);
expect(result.success).toBe(true);
});
test('should handle default values correctly', () => {
const validRequest = {};
const result = SamlPreferences.safeParse(validRequest);
expect(result.success).toBe(true);
expect(result.data?.ignoreSSL).toBe(false);
expect(result.data?.loginBinding).toBe('redirect');
expect(result.data?.authnRequestsSigned).toBe(false);
expect(result.data?.wantAssertionsSigned).toBe(true);
expect(result.data?.wantMessageSigned).toBe(true);
expect(result.data?.acsBinding).toBe('post');
expect(result.data?.signatureConfig).toEqual({
prefix: 'ds',
location: {
reference: '/samlp:Response/saml:Issuer',
action: 'after',
},
});
expect(result.data?.relayState).toBe('');
});
});
});
});

View file

@ -0,0 +1,6 @@
import { z } from 'zod';
import { Z } from 'zod-class';
export class SamlAcsDto extends Z.class({
RelayState: z.string().optional(),
}) {}

View file

@ -0,0 +1,50 @@
import { z } from 'zod';
import { Z } from 'zod-class';
const SamlLoginBindingSchema = z.enum(['redirect', 'post']);
/** Schema for configuring the signature in SAML requests/responses. */
const SignatureConfigSchema = z.object({
prefix: z.string().default('ds'),
location: z.object({
reference: z.string(),
action: z.enum(['before', 'after', 'prepend', 'append']),
}),
});
export class SamlPreferences extends Z.class({
/** Mapping of SAML attributes to user fields. */
mapping: z
.object({
email: z.string(),
firstName: z.string(),
lastName: z.string(),
userPrincipalName: z.string(),
})
.optional(),
/** SAML metadata in XML format. */
metadata: z.string().optional(),
metadataUrl: z.string().optional(),
ignoreSSL: z.boolean().default(false),
loginBinding: SamlLoginBindingSchema.default('redirect'),
/** Whether SAML login is enabled. */
loginEnabled: z.boolean().optional(),
/** Label for the SAML login button. on the Auth screen */
loginLabel: z.string().optional(),
authnRequestsSigned: z.boolean().default(false),
wantAssertionsSigned: z.boolean().default(true),
wantMessageSigned: z.boolean().default(true),
acsBinding: SamlLoginBindingSchema.default('post'),
signatureConfig: SignatureConfigSchema.default({
prefix: 'ds',
location: {
reference: '/samlp:Response/saml:Issuer',
action: 'after',
},
}),
relayState: z.string().default(''),
}) {}

View file

@ -0,0 +1,6 @@
import { z } from 'zod';
import { Z } from 'zod-class';
export class SamlToggleDto extends Z.class({
loginEnabled: z.boolean(),
}) {}

View file

@ -0,0 +1,63 @@
import { ImportWorkflowFromUrlDto } from '../import-workflow-from-url.dto';
describe('ImportWorkflowFromUrlDto', () => {
describe('Valid requests', () => {
test('should validate $name', () => {
const result = ImportWorkflowFromUrlDto.safeParse({
url: 'https://example.com/workflow.json',
});
expect(result.success).toBe(true);
});
});
describe('Invalid requests', () => {
test.each([
{
name: 'invalid URL (not ending with .json)',
url: 'https://example.com/workflow',
expectedErrorPath: ['url'],
},
{
name: 'invalid URL (missing protocol)',
url: 'example.com/workflow.json',
expectedErrorPath: ['url'],
},
{
name: 'invalid URL (not a URL)',
url: 'not-a-url',
expectedErrorPath: ['url'],
},
{
name: 'missing URL',
url: undefined,
expectedErrorPath: ['url'],
},
{
name: 'null URL',
url: null,
expectedErrorPath: ['url'],
},
{
name: 'invalid URL (ends with .json but not a valid URL)',
url: 'not-a-url.json',
expectedErrorPath: ['url'],
},
{
name: 'valid URL with query parameters',
url: 'https://example.com/workflow.json?param=value',
},
{
name: 'valid URL with fragments',
url: 'https://example.com/workflow.json#section',
},
])('should fail validation for $name', ({ url, expectedErrorPath }) => {
const result = ImportWorkflowFromUrlDto.safeParse({ url });
expect(result.success).toBe(false);
if (expectedErrorPath) {
expect(result.error?.issues[0].path).toEqual(expectedErrorPath);
}
});
});
});

View file

@ -0,0 +1,6 @@
import { z } from 'zod';
import { Z } from 'zod-class';
export class ImportWorkflowFromUrlDto extends Z.class({
url: z.string().url().endsWith('.json'),
}) {}

View file

@ -0,0 +1,28 @@
import { nodeVersionSchema } from '../nodeVersion.schema';
describe('nodeVersionSchema', () => {
describe('valid versions', () => {
test.each([
[1, 'single digit'],
[2, 'single digit'],
[1.0, 'major.minor with zero minor'],
[1.2, 'major.minor'],
[10.5, 'major.minor with double digits'],
])('should accept %s as a valid version (%s)', (version) => {
const validated = nodeVersionSchema.parse(version);
expect(validated).toBe(version);
});
});
describe('invalid versions', () => {
test.each([
['not-a-number', 'non-number input'],
['1.2.3', 'more than two parts'],
['1.a', 'non-numeric characters'],
['1.2.3', 'more than two parts as string'],
])('should reject %s as an invalid version (%s)', (version) => {
const check = () => nodeVersionSchema.parse(version);
expect(check).toThrowError();
});
});
});

View file

@ -0,0 +1,3 @@
import { z } from 'zod';
export const booleanFromString = z.enum(['true', 'false']).transform((value) => value === 'true');

View file

@ -0,0 +1,17 @@
import { z } from 'zod';
export const nodeVersionSchema = z
.number()
.min(1)
.refine(
(val) => {
const parts = String(val).split('.');
return (
(parts.length === 1 && !isNaN(Number(parts[0]))) ||
(parts.length === 2 && !isNaN(Number(parts[0])) && !isNaN(Number(parts[1])))
);
},
{
message: 'Invalid node version. Must be in format: major.minor',
},
);

View file

@ -21,7 +21,7 @@
"dist/**/*"
],
"dependencies": {
"reflect-metadata": "0.2.2",
"typedi": "catalog:"
"@n8n/di": "workspace:*",
"reflect-metadata": "catalog:"
}
}

View file

@ -9,6 +9,7 @@ export const LOG_SCOPES = [
'multi-main-setup',
'pruning',
'pubsub',
'push',
'redis',
'scaling',
'waiting-executions',
@ -70,10 +71,13 @@ export class LoggingConfig {
* - `external-secrets`
* - `license`
* - `multi-main-setup`
* - `pruning`
* - `pubsub`
* - `push`
* - `redis`
* - `scaling`
* - `waiting-executions`
* - `task-runner`
*
* @example
* `N8N_LOG_SCOPES=license`

View file

@ -1,6 +1,6 @@
import 'reflect-metadata';
import { Container, Service } from '@n8n/di';
import { readFileSync } from 'fs';
import { Container, Service } from 'typedi';
// eslint-disable-next-line @typescript-eslint/ban-types
type Class = Function;
@ -35,7 +35,7 @@ export const Config: ClassDecorator = (ConfigClass: Class) => {
for (const [key, { type, envName }] of classMetadata) {
if (typeof type === 'function' && globalMetadata.has(type)) {
config[key] = Container.get(type);
config[key] = Container.get(type as Constructable);
} else if (envName) {
const value = readEnv(envName);
if (value === undefined) continue;

View file

@ -1,6 +1,6 @@
import { Container } from '@n8n/di';
import fs from 'fs';
import { mock } from 'jest-mock-extended';
import { Container } from 'typedi';
import { GlobalConfig } from '../src/index';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import { Config, Env } from '../src/decorators';

View file

@ -9,5 +9,6 @@
"baseUrl": "src",
"tsBuildInfoFile": "dist/typecheck.tsbuildinfo"
},
"include": ["src/**/*.ts", "test/**/*.ts"]
"include": ["src/**/*.ts", "test/**/*.ts"],
"references": [{ "path": "../di/tsconfig.build.json" }]
}

View file

@ -0,0 +1,7 @@
const sharedOptions = require('@n8n_io/eslint-config/shared');
/** @type {import('@types/eslint').ESLint.ConfigData} */
module.exports = {
extends: ['@n8n_io/eslint-config/base'],
...sharedOptions(__dirname),
};

View file

@ -0,0 +1,52 @@
## @n8n/di
`@n8n/di` is a dependency injection (DI) container library, based on [`typedi`](https://github.com/typestack/typedi).
n8n no longer uses `typedi` because:
- `typedi` is no longer officially maintained
- Need for future-proofing, e.g. stage-3 decorators
- Small enough that it is worth the maintenance burden
- Easier to customize, e.g. to simplify unit tests
### Usage
```typescript
// from https://github.com/typestack/typedi/blob/develop/README.md
import { Container, Service } from 'typedi';
@Service()
class ExampleInjectedService {
printMessage() {
console.log('I am alive!');
}
}
@Service()
class ExampleService {
constructor(
// because we annotated ExampleInjectedService with the @Service()
// decorator TypeDI will automatically inject an instance of
// ExampleInjectedService here when the ExampleService class is requested
// from TypeDI.
public injectedService: ExampleInjectedService
) {}
}
const serviceInstance = Container.get(ExampleService);
// we request an instance of ExampleService from TypeDI
serviceInstance.injectedService.printMessage();
// logs "I am alive!" to the console
```
Requires enabling these flags in `tsconfig.json`:
```json
{
"compilerOptions": {
"experimentalDecorators": true,
"emitDecoratorMetadata": true
}
}
```

View file

@ -0,0 +1,2 @@
/** @type {import('jest').Config} */
module.exports = require('../../../jest.config');

View file

@ -0,0 +1,26 @@
{
"name": "@n8n/di",
"version": "0.1.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",
"typecheck": "tsc --noEmit",
"build": "tsc -p tsconfig.build.json",
"format": "biome format --write .",
"format:check": "biome ci .",
"lint": "eslint .",
"lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch",
"test": "jest",
"test:dev": "jest --watch"
},
"main": "dist/di.js",
"module": "src/di.ts",
"types": "dist/di.d.ts",
"files": [
"dist/**/*"
],
"dependencies": {
"reflect-metadata": "catalog:"
}
}

View file

@ -0,0 +1,287 @@
import { Container, Service } from '../di';
@Service()
class SimpleService {
getValue() {
return 'simple';
}
}
@Service()
class DependentService {
constructor(readonly simple: SimpleService) {}
getValue() {
return this.simple.getValue() + '-dependent';
}
}
class CustomFactory {
getValue() {
return 'factory-made';
}
}
@Service({ factory: () => new CustomFactory() })
class FactoryService {
getValue() {
return 'should-not-be-called';
}
}
abstract class AbstractService {
abstract getValue(): string;
}
@Service()
class ConcreteService extends AbstractService {
getValue(): string {
return 'concrete';
}
}
describe('DI Container', () => {
beforeEach(() => {
jest.clearAllMocks();
Container.reset();
});
describe('basic functionality', () => {
it('should create a simple instance', () => {
const instance = Container.get(SimpleService);
expect(instance).toBeInstanceOf(SimpleService);
expect(instance.getValue()).toBe('simple');
});
it('should return same instance on multiple gets', () => {
const instance1 = Container.get(SimpleService);
const instance2 = Container.get(SimpleService);
expect(instance1).toBe(instance2);
});
it('should handle classes with no dependencies (empty constructor)', () => {
@Service()
class EmptyConstructorService {}
const instance = Container.get(EmptyConstructorService);
expect(instance).toBeInstanceOf(EmptyConstructorService);
});
it('should throw when trying to resolve an undecorated class', () => {
class UnDecoratedService {}
expect(() => Container.get(UnDecoratedService)).toThrow();
});
});
describe('dependency injection', () => {
it('should inject dependencies correctly', () => {
const dependent = Container.get(DependentService);
expect(dependent).toBeInstanceOf(DependentService);
expect(dependent.getValue()).toBe('simple-dependent');
expect(dependent.simple).toBeInstanceOf(SimpleService);
});
it('should handle deep dependency chains', () => {
@Service()
class ServiceC {
getValue() {
return 'C';
}
}
@Service()
class ServiceB {
constructor(private c: ServiceC) {}
getValue() {
return this.c.getValue() + 'B';
}
}
@Service()
class ServiceA {
constructor(private b: ServiceB) {}
getValue() {
return this.b.getValue() + 'A';
}
}
const instance = Container.get(ServiceA);
expect(instance.getValue()).toBe('CBA');
});
it('should return undefined for non-decorated dependencies in resolution chain', () => {
class NonDecoratedDep {}
@Service()
class ServiceWithNonDecoratedDep {
constructor(readonly dep: NonDecoratedDep) {}
}
const instance = Container.get(ServiceWithNonDecoratedDep);
expect(instance).toBeInstanceOf(ServiceWithNonDecoratedDep);
expect(instance.dep).toBeUndefined();
});
});
describe('factory handling', () => {
it('should use factory when provided', () => {
const instance = Container.get(FactoryService);
expect(instance).toBeInstanceOf(CustomFactory);
expect(instance.getValue()).toBe('factory-made');
});
it('should preserve factory metadata when setting instance', () => {
const customInstance = new CustomFactory();
Container.set(FactoryService, customInstance);
const instance = Container.get(FactoryService);
expect(instance).toBe(customInstance);
});
it('should preserve factory when resetting container', () => {
const factoryInstance1 = Container.get(FactoryService);
Container.reset();
const factoryInstance2 = Container.get(FactoryService);
expect(factoryInstance1).not.toBe(factoryInstance2);
expect(factoryInstance2.getValue()).toBe('factory-made');
});
it('should throw error when factory throws', () => {
@Service({
factory: () => {
throw new Error('Factory error');
},
})
class ErrorFactoryService {}
expect(() => Container.get(ErrorFactoryService)).toThrow('Factory error');
});
});
describe('instance management', () => {
it('should allow manual instance setting', () => {
const customInstance = new SimpleService();
Container.set(SimpleService, customInstance);
const instance = Container.get(SimpleService);
expect(instance).toBe(customInstance);
});
});
describe('abstract classes', () => {
it('should throw when trying to instantiate an abstract class directly', () => {
@Service()
abstract class TestAbstractClass {
abstract doSomething(): void;
// Add a concrete method to make the class truly abstract at runtime
constructor() {
if (this.constructor === TestAbstractClass) {
throw new TypeError('Abstract class "TestAbstractClass" cannot be instantiated');
}
}
}
expect(() => Container.get(TestAbstractClass)).toThrow(
'[DI] TestAbstractClass is an abstract class, and cannot be instantiated',
);
});
it('should allow setting an implementation for an abstract class', () => {
const concrete = new ConcreteService();
Container.set(AbstractService, concrete);
const instance = Container.get(AbstractService);
expect(instance).toBe(concrete);
expect(instance.getValue()).toBe('concrete');
});
it('should allow factory for abstract class', () => {
@Service({ factory: () => new ConcreteService() })
abstract class FactoryAbstractService {
abstract getValue(): string;
}
const instance = Container.get(FactoryAbstractService);
expect(instance).toBeInstanceOf(ConcreteService);
expect(instance.getValue()).toBe('concrete');
});
});
describe('inheritance', () => {
it('should handle inheritance in injectable classes', () => {
@Service()
class BaseService {
getValue() {
return 'base';
}
}
@Service()
class DerivedService extends BaseService {
getValue() {
return 'derived-' + super.getValue();
}
}
const instance = Container.get(DerivedService);
expect(instance.getValue()).toBe('derived-base');
});
it('should maintain separate instances for base and derived classes', () => {
@Service()
class BaseService {
getValue() {
return 'base';
}
}
@Service()
class DerivedService extends BaseService {}
const baseInstance = Container.get(BaseService);
const derivedInstance = Container.get(DerivedService);
expect(baseInstance).not.toBe(derivedInstance);
expect(baseInstance).toBeInstanceOf(BaseService);
expect(derivedInstance).toBeInstanceOf(DerivedService);
});
});
describe('type registration checking', () => {
it('should return true for registered classes', () => {
expect(Container.has(SimpleService)).toBe(true);
});
it('should return false for unregistered classes', () => {
class UnregisteredService {}
expect(Container.has(UnregisteredService)).toBe(false);
});
it('should return true for abstract classes with implementations', () => {
const concrete = new ConcreteService();
Container.set(AbstractService, concrete);
expect(Container.has(AbstractService)).toBe(true);
});
it('should return true for factory-provided services before instantiation', () => {
expect(Container.has(FactoryService)).toBe(true);
});
it('should maintain registration after reset', () => {
expect(Container.has(SimpleService)).toBe(true);
Container.reset();
expect(Container.has(SimpleService)).toBe(true);
});
it('should return true after manual instance setting', () => {
class ManualService {}
expect(Container.has(ManualService)).toBe(false);
Container.set(ManualService, new ManualService());
expect(Container.has(ManualService)).toBe(true);
});
});
});

142
packages/@n8n/di/src/di.ts Normal file
View file

@ -0,0 +1,142 @@
import 'reflect-metadata';
/**
* Represents a class constructor type that can be instantiated with 'new'
* @template T The type of instance the constructor creates
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Constructable<T = unknown> = new (...args: any[]) => T;
type AbstractConstructable<T = unknown> = abstract new (...args: unknown[]) => T;
type ServiceIdentifier<T = unknown> = Constructable<T> | AbstractConstructable<T>;
interface Metadata<T = unknown> {
instance?: T;
factory?: () => T;
}
interface Options<T> {
factory?: () => T;
}
const instances = new Map<ServiceIdentifier, Metadata>();
/**
* Decorator that marks a class as available for dependency injection.
* @param options Configuration options for the injectable class
* @param options.factory Optional factory function to create instances of this class
* @returns A class decorator to be applied to the target class
*/
// eslint-disable-next-line @typescript-eslint/ban-types
export function Service<T = unknown>(): Function;
// eslint-disable-next-line @typescript-eslint/ban-types
export function Service<T = unknown>(options: Options<T>): Function;
export function Service<T>({ factory }: Options<T> = {}) {
return function (target: Constructable<T>) {
instances.set(target, { factory });
return target;
};
}
class DIError extends Error {
constructor(message: string) {
super(`[DI] ${message}`);
}
}
class ContainerClass {
/** Stack to track types being resolved to detect circular dependencies */
private readonly resolutionStack: ServiceIdentifier[] = [];
/**
* Checks if a type is registered in the container
* @template T The type to check for
* @param type The constructor of the type to check
* @returns True if the type is registered (has metadata), false otherwise
*/
has<T>(type: ServiceIdentifier<T>): boolean {
return instances.has(type);
}
/**
* Retrieves or creates an instance of the specified type from the container
* @template T The type of instance to retrieve
* @param type The constructor of the type to retrieve
* @returns An instance of the specified type with all dependencies injected
* @throws {DIError} If circular dependencies are detected or if the type is not injectable
*/
get<T>(type: ServiceIdentifier<T>): T {
const { resolutionStack } = this;
const metadata = instances.get(type) as Metadata<T>;
if (!metadata) {
// Special case: Allow undefined returns for non-decorated constructor params
// when resolving a dependency chain (i.e., resolutionStack not empty)
if (resolutionStack.length) return undefined as T;
throw new DIError(`${type.name} is not decorated with ${Service.name}`);
}
if (metadata?.instance) return metadata.instance as T;
// Check for circular dependencies before proceeding with instantiation
if (resolutionStack.includes(type)) {
throw new DIError(
`Circular dependency detected. ${resolutionStack.map((t) => t.name).join(' -> ')}`,
);
}
// Add current type to resolution stack before resolving dependencies
resolutionStack.push(type);
try {
let instance: T;
if (metadata?.factory) {
instance = metadata.factory();
} else {
const paramTypes = (Reflect.getMetadata('design:paramtypes', type) ??
[]) as Constructable[];
const dependencies = paramTypes.map(<P>(paramType: Constructable<P>) =>
this.get(paramType),
);
// Create new instance with resolved dependencies
instance = new (type as Constructable)(...dependencies) as T;
}
instances.set(type, { ...metadata, instance });
return instance;
} catch (error) {
if (error instanceof TypeError && error.message.toLowerCase().includes('abstract')) {
throw new DIError(`${type.name} is an abstract class, and cannot be instantiated`);
}
throw error;
} finally {
resolutionStack.pop();
}
}
/**
* Manually sets an instance for a specific type in the container
* @template T The type of instance being set
* @param type The constructor of the type to set. This can also be an abstract class
* @param instance The instance to store in the container
*/
set<T>(type: ServiceIdentifier<T>, instance: T): void {
// Preserve any existing metadata (like factory) when setting new instance
const metadata = instances.get(type) ?? {};
instances.set(type, { ...metadata, instance });
}
/** Clears all instantiated instances from the container while preserving type registrations */
reset(): void {
for (const metadata of instances.values()) {
delete metadata.instance;
}
}
}
/**
* Global dependency injection container instance
* Used to retrieve and manage class instances and their dependencies
*/
export const Container = new ContainerClass();

View file

@ -0,0 +1,11 @@
{
"extends": ["./tsconfig.json", "../../../tsconfig.build.json"],
"compilerOptions": {
"composite": true,
"rootDir": "src",
"outDir": "dist",
"tsBuildInfoFile": "dist/build.tsbuildinfo"
},
"include": ["src/**/*.ts"],
"exclude": ["src/**/__tests__/**"]
}

View file

@ -0,0 +1,12 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"rootDir": ".",
"types": ["node", "jest"],
"baseUrl": "src",
"tsBuildInfoFile": "dist/typecheck.tsbuildinfo",
"experimentalDecorators": true,
"emitDecoratorMetadata": true
},
"include": ["src/**/*.ts"]
}

View file

@ -15,15 +15,15 @@ import { getConnectionHintNoticeField } from '@utils/sharedFields';
export class ToolVectorStore implements INodeType {
description: INodeTypeDescription = {
displayName: 'Vector Store Tool',
displayName: 'Vector Store Question Answer Tool',
name: 'toolVectorStore',
icon: 'fa:database',
iconColor: 'black',
group: ['transform'],
version: [1],
description: 'Retrieve context from vector store',
description: 'Answer questions with a vector store',
defaults: {
name: 'Vector Store Tool',
name: 'Answer questions with a vector store',
},
codex: {
categories: ['AI'],
@ -60,20 +60,23 @@ export class ToolVectorStore implements INodeType {
properties: [
getConnectionHintNoticeField([NodeConnectionType.AiAgent]),
{
displayName: 'Name',
displayName: 'Data Name',
name: 'name',
type: 'string',
default: '',
placeholder: 'e.g. company_knowledge_base',
placeholder: 'e.g. users_info',
validateType: 'string-alphanumeric',
description: 'Name of the vector store',
description:
'Name of the data in vector store. This will be used to fill this tool description: Useful for when you need to answer questions about [name]. Whenever you need information about [data description], you should ALWAYS use this. Input should be a fully formed question.',
},
{
displayName: 'Description',
displayName: 'Description of Data',
name: 'description',
type: 'string',
default: '',
placeholder: 'Retrieves data about [insert information about your data here]...',
placeholder: "[Describe your data here, e.g. a user's name, email, etc.]",
description:
'Describe the data in vector store. This will be used to fill this tool description: Useful for when you need to answer questions about [name]. Whenever you need information about [data description], you should ALWAYS use this. Input should be a fully formed question.',
typeOptions: {
rows: 3,
},

View file

@ -228,7 +228,7 @@ export class VectorStorePGVector extends createVectorStoreNode({
testedBy: 'postgresConnectionTest',
},
],
operationModes: ['load', 'insert', 'retrieve'],
operationModes: ['load', 'insert', 'retrieve', 'retrieve-as-tool'],
},
sharedFields,
insertFields,

View file

@ -65,7 +65,7 @@ export class VectorStorePinecone extends createVectorStoreNode({
required: true,
},
],
operationModes: ['load', 'insert', 'retrieve', 'update'],
operationModes: ['load', 'insert', 'retrieve', 'update', 'retrieve-as-tool'],
},
methods: { listSearch: { pineconeIndexSearch } },
retrieveFields,

View file

@ -55,7 +55,7 @@ export class VectorStoreSupabase extends createVectorStoreNode({
required: true,
},
],
operationModes: ['load', 'insert', 'retrieve', 'update'],
operationModes: ['load', 'insert', 'retrieve', 'update', 'retrieve-as-tool'],
},
methods: {
listSearch: { supabaseTableNameSearch },

View file

@ -0,0 +1,161 @@
import type { DocumentInterface } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import { mock } from 'jest-mock-extended';
import type { DynamicTool } from 'langchain/tools';
import type { ISupplyDataFunctions, NodeParameterValueType } from 'n8n-workflow';
import type { VectorStoreNodeConstructorArgs } from './createVectorStoreNode';
import { createVectorStoreNode } from './createVectorStoreNode';
jest.mock('@utils/logWrapper', () => ({
logWrapper: jest.fn().mockImplementation((val: DynamicTool) => ({ logWrapped: val })),
}));
const DEFAULT_PARAMETERS = {
options: {},
topK: 1,
};
const MOCK_DOCUMENTS: Array<[DocumentInterface, number]> = [
[
{
pageContent: 'first page',
metadata: {
id: 123,
},
},
0,
],
[
{
pageContent: 'second page',
metadata: {
id: 567,
},
},
0,
],
];
const MOCK_SEARCH_VALUE = 'search value';
const MOCK_EMBEDDED_SEARCH_VALUE = [1, 2, 3];
describe('createVectorStoreNode', () => {
const vectorStore = mock<VectorStore>({
similaritySearchVectorWithScore: jest.fn().mockResolvedValue(MOCK_DOCUMENTS),
});
const vectorStoreNodeArgs = mock<VectorStoreNodeConstructorArgs>({
sharedFields: [],
insertFields: [],
loadFields: [],
retrieveFields: [],
updateFields: [],
getVectorStoreClient: jest.fn().mockReturnValue(vectorStore),
});
const embeddings = mock<Embeddings>({
embedQuery: jest.fn().mockResolvedValue(MOCK_EMBEDDED_SEARCH_VALUE),
});
const context = mock<ISupplyDataFunctions>({
getNodeParameter: jest.fn(),
getInputConnectionData: jest.fn().mockReturnValue(embeddings),
});
describe('retrieve mode', () => {
it('supplies vector store as data', async () => {
// ARRANGE
const parameters: Record<string, NodeParameterValueType | object> = {
...DEFAULT_PARAMETERS,
mode: 'retrieve',
};
context.getNodeParameter.mockImplementation(
(parameterName: string): NodeParameterValueType | object => parameters[parameterName],
);
// ACT
const VectorStoreNodeType = createVectorStoreNode(vectorStoreNodeArgs);
const nodeType = new VectorStoreNodeType();
const data = await nodeType.supplyData.call(context, 1);
const wrappedVectorStore = (data.response as { logWrapped: VectorStore }).logWrapped;
// ASSERT
expect(wrappedVectorStore).toEqual(vectorStore);
expect(vectorStoreNodeArgs.getVectorStoreClient).toHaveBeenCalled();
});
});
describe('retrieve-as-tool mode', () => {
it('supplies DynamicTool that queries vector store and returns documents with metadata', async () => {
// ARRANGE
const parameters: Record<string, NodeParameterValueType | object> = {
...DEFAULT_PARAMETERS,
mode: 'retrieve-as-tool',
description: 'tool description',
toolName: 'tool name',
includeDocumentMetadata: true,
};
context.getNodeParameter.mockImplementation(
(parameterName: string): NodeParameterValueType | object => parameters[parameterName],
);
// ACT
const VectorStoreNodeType = createVectorStoreNode(vectorStoreNodeArgs);
const nodeType = new VectorStoreNodeType();
const data = await nodeType.supplyData.call(context, 1);
const tool = (data.response as { logWrapped: DynamicTool }).logWrapped;
const output = await tool?.func(MOCK_SEARCH_VALUE);
// ASSERT
expect(tool?.getName()).toEqual(parameters.toolName);
expect(tool?.description).toEqual(parameters.toolDescription);
expect(embeddings.embedQuery).toHaveBeenCalledWith(MOCK_SEARCH_VALUE);
expect(vectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
MOCK_EMBEDDED_SEARCH_VALUE,
parameters.topK,
parameters.filter,
);
expect(output).toEqual([
{ type: 'text', text: JSON.stringify(MOCK_DOCUMENTS[0][0]) },
{ type: 'text', text: JSON.stringify(MOCK_DOCUMENTS[1][0]) },
]);
});
it('supplies DynamicTool that queries vector store and returns documents without metadata', async () => {
// ARRANGE
const parameters: Record<string, NodeParameterValueType | object> = {
...DEFAULT_PARAMETERS,
mode: 'retrieve-as-tool',
description: 'tool description',
toolName: 'tool name',
includeDocumentMetadata: false,
};
context.getNodeParameter.mockImplementation(
(parameterName: string): NodeParameterValueType | object => parameters[parameterName],
);
// ACT
const VectorStoreNodeType = createVectorStoreNode(vectorStoreNodeArgs);
const nodeType = new VectorStoreNodeType();
const data = await nodeType.supplyData.call(context, 1);
const tool = (data.response as { logWrapped: DynamicTool }).logWrapped;
const output = await tool?.func(MOCK_SEARCH_VALUE);
// ASSERT
expect(tool?.getName()).toEqual(parameters.toolName);
expect(tool?.description).toEqual(parameters.toolDescription);
expect(embeddings.embedQuery).toHaveBeenCalledWith(MOCK_SEARCH_VALUE);
expect(vectorStore.similaritySearchVectorWithScore).toHaveBeenCalledWith(
MOCK_EMBEDDED_SEARCH_VALUE,
parameters.topK,
parameters.filter,
);
expect(output).toEqual([
{ type: 'text', text: JSON.stringify({ pageContent: MOCK_DOCUMENTS[0][0].pageContent }) },
{ type: 'text', text: JSON.stringify({ pageContent: MOCK_DOCUMENTS[1][0].pageContent }) },
]);
});
});
});

View file

@ -3,6 +3,7 @@
import type { Document } from '@langchain/core/documents';
import type { Embeddings } from '@langchain/core/embeddings';
import type { VectorStore } from '@langchain/core/vectorstores';
import { DynamicTool } from 'langchain/tools';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
IExecuteFunctions,
@ -28,9 +29,14 @@ import { getConnectionHintNoticeField } from '@utils/sharedFields';
import { processDocument } from './processDocuments';
type NodeOperationMode = 'insert' | 'load' | 'retrieve' | 'update';
type NodeOperationMode = 'insert' | 'load' | 'retrieve' | 'update' | 'retrieve-as-tool';
const DEFAULT_OPERATION_MODES: NodeOperationMode[] = ['load', 'insert', 'retrieve'];
const DEFAULT_OPERATION_MODES: NodeOperationMode[] = [
'load',
'insert',
'retrieve',
'retrieve-as-tool',
];
interface NodeMeta {
displayName: string;
@ -43,7 +49,7 @@ interface NodeMeta {
operationModes?: NodeOperationMode[];
}
interface VectorStoreNodeConstructorArgs {
export interface VectorStoreNodeConstructorArgs {
meta: NodeMeta;
methods?: {
listSearch?: {
@ -102,10 +108,18 @@ function getOperationModeOptions(args: VectorStoreNodeConstructorArgs): INodePro
action: 'Add documents to vector store',
},
{
name: 'Retrieve Documents (For Agent/Chain)',
name: 'Retrieve Documents (As Vector Store for AI Agent)',
value: 'retrieve',
description: 'Retrieve documents from vector store to be used with AI nodes',
action: 'Retrieve documents for AI processing',
description: 'Retrieve documents from vector store to be used as vector store with AI nodes',
action: 'Retrieve documents for AI processing as Vector Store',
outputConnectionType: NodeConnectionType.AiVectorStore,
},
{
name: 'Retrieve Documents (As Tool for AI Agent)',
value: 'retrieve-as-tool',
description: 'Retrieve documents from vector store to be used as tool with AI nodes',
action: 'Retrieve documents for AI processing as Tool',
outputConnectionType: NodeConnectionType.AiTool,
},
{
name: 'Update Documents',
@ -136,7 +150,8 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
codex: {
categories: ['AI'],
subcategories: {
AI: ['Vector Stores', 'Root Nodes'],
AI: ['Vector Stores', 'Tools', 'Root Nodes'],
Tools: ['Other Tools'],
},
resources: {
primaryDocumentation: [
@ -153,6 +168,10 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
const mode = parameters?.mode;
const inputs = [{ displayName: "Embedding", type: "${NodeConnectionType.AiEmbedding}", required: true, maxConnections: 1}]
if (mode === 'retrieve-as-tool') {
return inputs;
}
if (['insert', 'load', 'update'].includes(mode)) {
inputs.push({ displayName: "", type: "${NodeConnectionType.Main}"})
}
@ -166,6 +185,11 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
outputs: `={{
((parameters) => {
const mode = parameters?.mode ?? 'retrieve';
if (mode === 'retrieve-as-tool') {
return [{ displayName: "Tool", type: "${NodeConnectionType.AiTool}"}]
}
if (mode === 'retrieve') {
return [{ displayName: "Vector Store", type: "${NodeConnectionType.AiVectorStore}"}]
}
@ -189,6 +213,37 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
},
},
},
{
displayName: 'Name',
name: 'toolName',
type: 'string',
default: '',
required: true,
description: 'Name of the vector store',
placeholder: 'e.g. company_knowledge_base',
validateType: 'string-alphanumeric',
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
{
displayName: 'Description',
name: 'toolDescription',
type: 'string',
default: '',
required: true,
typeOptions: { rows: 2 },
description:
'Explain to the LLM what this tool does, a good, specific description would allow LLMs to produce expected results much more often',
placeholder: `e.g. ${args.meta.description}`,
displayOptions: {
show: {
mode: ['retrieve-as-tool'],
},
},
},
...args.sharedFields,
...transformDescriptionForOperationMode(args.insertFields ?? [], 'insert'),
// Prompt and topK are always used for the load operation
@ -214,7 +269,19 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
description: 'Number of top results to fetch from vector store',
displayOptions: {
show: {
mode: ['load'],
mode: ['load', 'retrieve-as-tool'],
},
},
},
{
displayName: 'Include Metadata',
name: 'includeDocumentMetadata',
type: 'boolean',
default: true,
description: 'Whether or not to include document metadata',
displayOptions: {
show: {
mode: ['load', 'retrieve-as-tool'],
},
},
},
@ -271,10 +338,16 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
filter,
);
const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
const serializedDocs = docs.map(([doc, score]) => {
const document = {
metadata: doc.metadata,
pageContent: doc.pageContent,
...(includeDocumentMetadata ? { metadata: doc.metadata } : {}),
};
return {
@ -381,12 +454,12 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
throw new NodeOperationError(
this.getNode(),
'Only the "load" and "insert" operation modes are supported with execute',
'Only the "load", "update" and "insert" operation modes are supported with execute',
);
}
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const mode = this.getNodeParameter('mode', 0) as 'load' | 'insert' | 'retrieve';
const mode = this.getNodeParameter('mode', 0) as NodeOperationMode;
const filter = getMetadataFiltersValues(this, itemIndex);
const embeddings = (await this.getInputConnectionData(
NodeConnectionType.AiEmbedding,
@ -400,9 +473,54 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
};
}
if (mode === 'retrieve-as-tool') {
const toolDescription = this.getNodeParameter('toolDescription', itemIndex) as string;
const toolName = this.getNodeParameter('toolName', itemIndex) as string;
const topK = this.getNodeParameter('topK', itemIndex, 4) as number;
const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;
const vectorStoreTool = new DynamicTool({
name: toolName,
description: toolDescription,
func: async (input) => {
const vectorStore = await args.getVectorStoreClient(
this,
filter,
embeddings,
itemIndex,
);
const embeddedPrompt = await embeddings.embedQuery(input);
const documents = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
return documents
.map((document) => {
if (includeDocumentMetadata) {
return { type: 'text', text: JSON.stringify(document[0]) };
}
return {
type: 'text',
text: JSON.stringify({ pageContent: document[0].pageContent }),
};
})
.filter((document) => !!document);
},
});
return {
response: logWrapper(vectorStoreTool, this),
};
}
throw new NodeOperationError(
this.getNode(),
'Only the "retrieve" operation mode is supported to supply data',
'Only the "retrieve" and "retrieve-as-tool" operation mode is supported to supply data',
);
}
};

View file

@ -18,14 +18,22 @@ export async function apiRequest(
endpoint: string,
parameters?: RequestParameters,
) {
const { body, qs, uri, option, headers } = parameters ?? {};
const { body, qs, option, headers } = parameters ?? {};
const credentials = await this.getCredentials('openAiApi');
let uri = `https://api.openai.com/v1${endpoint}`;
if (credentials.url) {
uri = `${credentials?.url}${endpoint}`;
}
const options = {
headers,
method,
body,
qs,
uri: uri ?? `https://api.openai.com/v1${endpoint}`,
uri,
json: true,
};

View file

@ -0,0 +1,62 @@
import type { IExecuteFunctions } from 'n8n-workflow';
import { apiRequest } from '../index';
const mockedExecutionContext = {
getCredentials: jest.fn(),
helpers: {
requestWithAuthentication: jest.fn(),
},
};
describe('apiRequest', () => {
beforeEach(() => {
jest.resetAllMocks();
});
it('should call requestWithAuthentication with credentials URL if one is provided', async () => {
mockedExecutionContext.getCredentials.mockResolvedValue({
url: 'http://www.test/url/v1',
});
// Act
await apiRequest.call(mockedExecutionContext as unknown as IExecuteFunctions, 'GET', '/test', {
headers: { 'Content-Type': 'application/json' },
});
// Assert
expect(mockedExecutionContext.getCredentials).toHaveBeenCalledWith('openAiApi');
expect(mockedExecutionContext.helpers.requestWithAuthentication).toHaveBeenCalledWith(
'openAiApi',
{
headers: { 'Content-Type': 'application/json' },
method: 'GET',
uri: 'http://www.test/url/v1/test',
json: true,
},
);
});
it('should call requestWithAuthentication with default URL if credentials URL is not provided', async () => {
mockedExecutionContext.getCredentials.mockResolvedValue({});
// Act
await apiRequest.call(mockedExecutionContext as unknown as IExecuteFunctions, 'GET', '/test', {
headers: { 'Content-Type': 'application/json' },
});
// Assert
expect(mockedExecutionContext.getCredentials).toHaveBeenCalledWith('openAiApi');
expect(mockedExecutionContext.helpers.requestWithAuthentication).toHaveBeenCalledWith(
'openAiApi',
{
headers: { 'Content-Type': 'application/json' },
method: 'GET',
uri: 'https://api.openai.com/v1/test',
json: true,
},
);
});
});

View file

@ -66,7 +66,7 @@ export const inputSchemaField: INodeProperties = {
};
export const promptTypeOptions: INodeProperties = {
displayName: 'Prompt Source (User Message)',
displayName: 'Source for Prompt (User Message)',
name: 'promptType',
type: 'options',
options: [
@ -98,7 +98,7 @@ export const textInput: INodeProperties = {
};
export const textFromPreviousNode: INodeProperties = {
displayName: 'Text From Previous Node',
displayName: 'Prompt (User Message)',
name: 'text',
type: 'string',
required: true,

View file

@ -35,6 +35,7 @@
},
"dependencies": {
"@n8n/config": "workspace:*",
"@n8n/di": "workspace:*",
"@sentry/node": "catalog:",
"acorn": "8.14.0",
"acorn-walk": "8.3.4",
@ -42,7 +43,6 @@
"n8n-core": "workspace:*",
"n8n-workflow": "workspace:*",
"nanoid": "catalog:",
"typedi": "catalog:",
"ws": "^8.18.0"
},
"devDependencies": {

View file

@ -1,4 +1,3 @@
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import type { IBinaryData } from 'n8n-workflow';
import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow';
@ -18,11 +17,12 @@ import {
type DataRequestResponse,
type InputDataChunkDefinition,
} from '@/runner-types';
import type { Task } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import {
newDataRequestResponse,
newTaskWithSettings,
newTaskParamsWithSettings,
newTaskState,
withPairedItem,
wrapIntoJson,
} from './test-data';
@ -64,12 +64,12 @@ describe('JsTaskRunner', () => {
taskData,
runner = defaultTaskRunner,
}: {
task: Task<JSExecSettings>;
task: TaskParams<JSExecSettings>;
taskData: DataRequestResponse;
runner?: JsTaskRunner;
}) => {
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
return await runner.executeTask(task, mock<AbortSignal>());
return await runner.executeTask(task, new AbortController().signal);
};
afterEach(() => {
@ -88,7 +88,7 @@ describe('JsTaskRunner', () => {
runner?: JsTaskRunner;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code,
nodeMode: 'runOnceForAllItems',
...settings,
@ -112,7 +112,7 @@ describe('JsTaskRunner', () => {
chunk?: InputDataChunkDefinition;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code,
nodeMode: 'runOnceForEachItem',
chunk,
@ -128,7 +128,7 @@ describe('JsTaskRunner', () => {
'should make an rpc call for console log in %s mode',
async (nodeMode) => {
jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: "console.log('Hello', 'world!'); return {}",
nodeMode,
});
@ -139,13 +139,14 @@ describe('JsTaskRunner', () => {
});
expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'Hello world!',
"'Hello'",
"'world!'",
]);
},
);
it('should not throw when using unsupported console methods', async () => {
const task = newTaskWithSettings({
const task = newTaskParamsWithSettings({
code: `
console.warn('test');
console.error('test');
@ -173,6 +174,44 @@ describe('JsTaskRunner', () => {
}),
).resolves.toBeDefined();
});
it('should not throw when trying to log the context object', async () => {
const task = newTaskParamsWithSettings({
code: `
console.log(this);
return {json: {}}
`,
nodeMode: 'runOnceForAllItems',
});
await expect(
execTaskWithParams({
task,
taskData: newDataRequestResponse([wrapIntoJson({})]),
}),
).resolves.toBeDefined();
});
it('should log the context object as [[ExecutionContext]]', async () => {
const rpcCallSpy = jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskParamsWithSettings({
code: `
console.log(this);
return {json: {}}
`,
nodeMode: 'runOnceForAllItems',
});
await execTaskWithParams({
task,
taskData: newDataRequestResponse([wrapIntoJson({})]),
});
expect(rpcCallSpy).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'[[ExecutionContext]]',
]);
});
});
describe('built-in methods and variables available in the context', () => {
@ -297,7 +336,7 @@ describe('JsTaskRunner', () => {
describe('$env', () => {
it('should have the env available in context when access has not been blocked', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
@ -316,7 +355,7 @@ describe('JsTaskRunner', () => {
it('should be possible to access env if it has been blocked', async () => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
@ -333,7 +372,7 @@ describe('JsTaskRunner', () => {
it('should not be possible to iterate $env', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems',
}),
@ -352,7 +391,7 @@ describe('JsTaskRunner', () => {
it("should not expose task runner's env variables even if no env state is received", async () => {
process.env.N8N_RUNNERS_TASK_BROKER_URI = 'http://127.0.0.1:5679';
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $env.N8N_RUNNERS_TASK_BROKER_URI }',
nodeMode: 'runOnceForAllItems',
}),
@ -373,7 +412,7 @@ describe('JsTaskRunner', () => {
};
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $now.toSeconds() }',
nodeMode: 'runOnceForAllItems',
}),
@ -390,7 +429,7 @@ describe('JsTaskRunner', () => {
});
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $now.toSeconds() }',
nodeMode: 'runOnceForAllItems',
}),
@ -405,7 +444,7 @@ describe('JsTaskRunner', () => {
describe("$getWorkflowStaticData('global')", () => {
it('should have the global workflow static data available in runOnceForAllItems', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("global") }',
nodeMode: 'runOnceForAllItems',
}),
@ -421,7 +460,7 @@ describe('JsTaskRunner', () => {
it('should have the global workflow static data available in runOnceForEachItem', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("global") }',
nodeMode: 'runOnceForEachItem',
}),
@ -441,7 +480,7 @@ describe('JsTaskRunner', () => {
"does not return static data if it hasn't been modified in %s",
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("global");
return { val: staticData };
@ -463,7 +502,7 @@ describe('JsTaskRunner', () => {
'returns the updated static data in %s',
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("global");
staticData.newKey = 'newValue';
@ -502,7 +541,7 @@ describe('JsTaskRunner', () => {
it('should have the node workflow static data available in runOnceForAllItems', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("node") }',
nodeMode: 'runOnceForAllItems',
}),
@ -514,7 +553,7 @@ describe('JsTaskRunner', () => {
it('should have the node workflow static data available in runOnceForEachItem', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: $getWorkflowStaticData("node") }',
nodeMode: 'runOnceForEachItem',
}),
@ -530,7 +569,7 @@ describe('JsTaskRunner', () => {
"does not return static data if it hasn't been modified in %s",
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("node");
return { val: staticData };
@ -548,7 +587,7 @@ describe('JsTaskRunner', () => {
'returns the updated static data in %s',
async (mode) => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `
const staticData = $getWorkflowStaticData("node");
staticData.newKey = 'newValue';
@ -623,7 +662,7 @@ describe('JsTaskRunner', () => {
// Act
await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `await ${group.invocation}; return []`,
nodeMode: 'runOnceForAllItems',
}),
@ -633,6 +672,7 @@ describe('JsTaskRunner', () => {
),
});
// Assert
expect(rpcCallSpy).toHaveBeenCalledWith('1', group.method, group.expectedParams);
});
@ -644,7 +684,7 @@ describe('JsTaskRunner', () => {
// Act
await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: `await ${group.invocation}; return {}`,
nodeMode: 'runOnceForEachItem',
}),
@ -661,26 +701,22 @@ describe('JsTaskRunner', () => {
describe('unsupported methods', () => {
for (const unsupportedFunction of UNSUPPORTED_HELPER_FUNCTIONS) {
it(`should throw an error if ${unsupportedFunction} is used in runOnceForAllItems`, async () => {
// Act
// Act & Assert
await expect(
async () =>
await executeForAllItems({
code: `${unsupportedFunction}()`,
inputItems,
}),
executeForAllItems({
code: `${unsupportedFunction}()`,
inputItems,
}),
).rejects.toThrow(UnsupportedFunctionError);
});
it(`should throw an error if ${unsupportedFunction} is used in runOnceForEachItem`, async () => {
// Act
// Act & Assert
await expect(
async () =>
await executeForEachItem({
code: `${unsupportedFunction}()`,
inputItems,
}),
executeForEachItem({
code: `${unsupportedFunction}()`,
inputItems,
}),
).rejects.toThrow(UnsupportedFunctionError);
});
}
@ -689,7 +725,7 @@ describe('JsTaskRunner', () => {
it('should allow access to Node.js Buffers', async () => {
const outcomeAll = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForAllItems',
}),
@ -701,7 +737,7 @@ describe('JsTaskRunner', () => {
expect(outcomeAll.result).toEqual([wrapIntoJson({ val: 'test-buffer' })]);
const outcomePer = await execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForEachItem',
}),
@ -1169,7 +1205,7 @@ describe('JsTaskRunner', () => {
async (nodeMode) => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
task: newTaskParamsWithSettings({
code: 'unknown',
nodeMode,
}),
@ -1182,12 +1218,13 @@ describe('JsTaskRunner', () => {
it('sends serializes an error correctly', async () => {
const runner = createRunnerWithOpts({});
const taskId = '1';
const task = newTaskWithSettings({
const task = newTaskState(taskId);
const taskSettings: JSExecSettings = {
code: 'unknown; return []',
nodeMode: 'runOnceForAllItems',
continueOnFail: false,
workflowMode: 'manual',
});
};
runner.runningTasks.set(taskId, task);
const sendSpy = jest.spyOn(runner.ws, 'send').mockImplementation(() => {});
@ -1196,7 +1233,7 @@ describe('JsTaskRunner', () => {
.spyOn(runner, 'requestData')
.mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings);
await runner.receivedSettings(taskId, taskSettings);
expect(sendSpy).toHaveBeenCalled();
const calledWith = sendSpy.mock.calls[0][0] as string;
@ -1268,11 +1305,7 @@ describe('JsTaskRunner', () => {
const emitSpy = jest.spyOn(runner, 'emit');
jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] });
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
runner.runningTasks.set(taskId, newTaskState(taskId));
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
@ -1299,15 +1332,13 @@ describe('JsTaskRunner', () => {
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
const task = newTaskState(taskId);
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
runner.runningTasks.set(taskId, task);
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
task.cleanup();
});
});
});

View file

@ -1,32 +1,47 @@
import { WebSocket } from 'ws';
import { TaskRunner } from '@/task-runner';
import { newTaskState } from '@/js-task-runner/__tests__/test-data';
import { TimeoutError } from '@/js-task-runner/errors/timeout-error';
import { TaskRunner, type TaskRunnerOpts } from '@/task-runner';
import type { TaskStatus } from '@/task-state';
class TestRunner extends TaskRunner {}
jest.mock('ws');
describe('TestRunner', () => {
let runner: TestRunner;
const newTestRunner = (opts: Partial<TaskRunnerOpts> = {}) =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
...opts,
});
afterEach(() => {
runner?.clearIdleTimer();
});
describe('constructor', () => {
afterEach(() => {
jest.clearAllMocks();
});
it('should correctly construct WebSocket URI with provided taskBrokerUri', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
@ -38,25 +53,11 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);
runner.clearIdleTimer();
});
it('should handle different taskBrokerUri formats correctly', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
@ -68,56 +69,175 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);
runner.clearIdleTimer();
});
it('should throw an error if taskBrokerUri is invalid', () => {
expect(
() =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
}),
expect(() =>
newTestRunner({
taskBrokerUri: 'not-a-valid-uri',
}),
).toThrowError(/Invalid URL/);
});
});
describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = new TestRunner({
describe('sendOffers', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.clearAllTimers();
});
it('should not send offers if canSendOffers is false', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
maxConcurrency: 2,
});
const sendSpy = jest.spyOn(runner, 'send');
expect(runner.canSendOffers).toBe(false);
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(0);
});
it('should enable sending of offer on runnerregistered message', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const taskId = 'test-task';
runner.runningTasks.set(taskId, {
taskId,
active: false,
cancelled: false,
expect(runner.canSendOffers).toBe(true);
});
it('should send maxConcurrency offers when there are no offers', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
});
it('should send up to maxConcurrency offers when there is a running task', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const taskState = newTaskState('test-task');
runner.runningTasks.set('test-task', taskState);
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
taskState.cleanup();
});
it('should delete stale offers and send new ones', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
sendSpy.mockClear();
jest.advanceTimersByTime(6000);
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
});
});
describe('taskCancelled', () => {
test.each<[TaskStatus, string]>([
['aborting:cancelled', 'cancelled'],
['aborting:timeout', 'timeout'],
])('should not do anything if task status is %s', async (status, reason) => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = status;
runner.runningTasks.set(taskId, task);
await runner.taskCancelled(taskId, reason);
expect(runner.runningTasks.size).toBe(1);
expect(task.status).toBe(status);
});
it('should delete task if task is waiting for settings when task is cancelled', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
const taskCleanupSpy = jest.spyOn(task, 'cleanup');
runner.runningTasks.set(taskId, task);
await runner.taskCancelled(taskId, 'test-reason');
expect(runner.runningTasks.size).toBe(0);
expect(taskCleanupSpy).toHaveBeenCalled();
});
it('should reject pending requests when task is cancelled', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'running';
runner.runningTasks.set(taskId, task);
const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();
@ -136,7 +256,71 @@ describe('TestRunner', () => {
reject: nodeTypesRequestReject,
});
runner.taskCancelled(taskId, 'test-reason');
await runner.taskCancelled(taskId, 'test-reason');
expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);
expect(nodeTypesRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);
expect(runner.dataRequests.size).toBe(0);
expect(runner.nodeTypesRequests.size).toBe(0);
});
});
describe('taskTimedOut', () => {
it('should error task if task is waiting for settings', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'waitingForSettings';
runner.runningTasks.set(taskId, task);
const sendSpy = jest.spyOn(runner, 'send');
await runner.taskTimedOut(taskId);
expect(runner.runningTasks.size).toBe(0);
expect(sendSpy).toHaveBeenCalledWith({
type: 'runner:taskerror',
taskId,
error: expect.any(TimeoutError),
});
});
it('should reject pending requests when task is running', async () => {
runner = newTestRunner();
const taskId = 'test-task';
const task = newTaskState(taskId);
task.status = 'running';
runner.runningTasks.set(taskId, task);
const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();
runner.dataRequests.set('data-req', {
taskId,
requestId: 'data-req',
resolve: jest.fn(),
reject: dataRequestReject,
});
runner.nodeTypesRequests.set('node-req', {
taskId,
requestId: 'node-req',
resolve: jest.fn(),
reject: nodeTypesRequestReject,
});
await runner.taskCancelled(taskId, 'test-reason');
expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({

View file

@ -4,22 +4,21 @@ import { nanoid } from 'nanoid';
import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import { TaskState } from '@/task-state';
/**
* Creates a new task with the given settings
*/
export const newTaskWithSettings = (
export const newTaskParamsWithSettings = (
settings: Partial<JSExecSettings> & Pick<JSExecSettings, 'code' | 'nodeMode'>,
): Task<JSExecSettings> => ({
): TaskParams<JSExecSettings> => ({
taskId: '1',
settings: {
workflowMode: 'manual',
continueOnFail: false,
...settings,
},
active: true,
cancelled: false,
});
/**
@ -167,3 +166,13 @@ export const withPairedItem = (index: number, data: INodeExecutionData): INodeEx
item: index,
},
});
/**
* Creates a new task state with the given taskId
*/
export const newTaskState = (taskId: string) =>
new TaskState({
taskId,
timeoutInS: 60,
onTimeout: () => {},
});

View file

@ -15,21 +15,23 @@ import type {
EnvProviderState,
IExecuteData,
INodeTypeDescription,
IWorkflowDataProxyData,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { inspect } from 'node:util';
import { runInNewContext, type Context } from 'node:vm';
import type { MainConfig } from '@/config/main-config';
import { UnsupportedFunctionError } from '@/js-task-runner/errors/unsupported-function.error';
import {
EXPOSED_RPC_METHODS,
UNSUPPORTED_HELPER_FUNCTIONS,
type DataRequestResponse,
type InputDataChunkDefinition,
type PartialAdditionalData,
type TaskResultData,
import { EXPOSED_RPC_METHODS, UNSUPPORTED_HELPER_FUNCTIONS } from '@/runner-types';
import type {
DataRequestResponse,
InputDataChunkDefinition,
PartialAdditionalData,
TaskResultData,
} from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import type { TaskParams } from '@/task-runner';
import { noOp, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state';
@ -42,8 +44,8 @@ import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct';
export interface RPCCallObject {
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject;
export interface RpcCallObject {
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RpcCallObject;
}
export interface JSExecSettings {
@ -103,8 +105,11 @@ export class JsTaskRunner extends TaskRunner {
});
}
async executeTask(task: Task<JSExecSettings>, signal: AbortSignal): Promise<TaskResultData> {
const settings = task.settings;
async executeTask(
taskParams: TaskParams<JSExecSettings>,
abortSignal: AbortSignal,
): Promise<TaskResultData> {
const { taskId, settings } = taskParams;
a.ok(settings, 'JS Code not sent to runner');
this.validateTaskSettings(settings);
@ -115,13 +120,13 @@ export class JsTaskRunner extends TaskRunner {
: BuiltInsParserState.newNeedsAllDataState();
const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId,
taskId,
neededBuiltIns.toDataRequestParams(settings.chunk),
);
const data = this.reconstructTaskData(dataResponse, settings.chunk);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, taskId);
const workflowParams = data.workflow;
const workflow = new Workflow({
@ -129,29 +134,12 @@ export class JsTaskRunner extends TaskRunner {
nodeTypes: this.nodeTypes,
});
const noOp = () => {};
const customConsole = {
// all except `log` are dummy methods that disregard without throwing, following existing Code node behavior
...Object.keys(console).reduce<Record<string, () => void>>((acc, name) => {
acc[name] = noOp;
return acc;
}, {}),
// Send log output back to the main process. It will take care of forwarding
// it to the UI or printing to console.
log: (...args: unknown[]) => {
const logOutput = args
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};
workflow.staticData = ObservableObject.create(workflow.staticData);
const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole, signal);
? await this.runForAllItems(taskId, settings, data, workflow, abortSignal)
: await this.runForEachItem(taskId, settings, data, workflow, abortSignal);
return {
result,
@ -200,22 +188,14 @@ export class JsTaskRunner extends TaskRunner {
settings: JSExecSettings,
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData;
const context: Context = {
require: this.requireResolver,
module: {},
console: customConsole,
const context = this.buildContext(taskId, workflow, data.node, dataProxy, {
items: inputItems,
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, data.node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
};
});
try {
const result = await new Promise<TaskResultData['result']>((resolve, reject) => {
@ -264,7 +244,6 @@ export class JsTaskRunner extends TaskRunner {
settings: JSExecSettings,
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const inputItems = data.connectionInputData;
@ -279,17 +258,7 @@ export class JsTaskRunner extends TaskRunner {
for (let index = chunkStartIdx; index < chunkEndIdx; index++) {
const item = inputItems[index];
const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = {
require: this.requireResolver,
module: {},
console: customConsole,
item,
$getWorkflowStaticData: (type: 'global' | 'node') =>
workflow.getStaticData(type, data.node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
};
const context = this.buildContext(taskId, workflow, data.node, dataProxy, { item });
try {
let result = await new Promise<INodeExecutionData | undefined>((resolve, reject) => {
@ -449,7 +418,7 @@ export class JsTaskRunner extends TaskRunner {
}
private buildRpcCallObject(taskId: string) {
const rpcObject: RPCCallObject = {};
const rpcObject: RpcCallObject = {};
for (const rpcMethod of EXPOSED_RPC_METHODS) {
set(
@ -467,4 +436,52 @@ export class JsTaskRunner extends TaskRunner {
return rpcObject;
}
private buildCustomConsole(taskId: string): CustomConsole {
return {
// all except `log` are dummy methods that disregard without throwing, following existing Code node behavior
...Object.keys(console).reduce<Record<string, () => void>>((acc, name) => {
acc[name] = noOp;
return acc;
}, {}),
// Send log output back to the main process. It will take care of forwarding
// it to the UI or printing to console.
log: (...args: unknown[]) => {
const formattedLogArgs = args.map((arg) => inspect(arg));
void this.makeRpcCall(taskId, 'logNodeOutput', formattedLogArgs);
},
};
}
/**
* Builds the 'global' context object that is passed to the script
*
* @param taskId The ID of the task. Needed for RPC calls
* @param workflow The workflow that is being executed. Needed for static data
* @param node The node that is being executed. Needed for static data
* @param dataProxy The data proxy object that provides access to built-ins
* @param additionalProperties Additional properties to add to the context
*/
private buildContext(
taskId: string,
workflow: Workflow,
node: INode,
dataProxy: IWorkflowDataProxyData,
additionalProperties: Record<string, unknown> = {},
): Context {
const context: Context = {
[inspect.custom]: () => '[[ExecutionContext]]',
require: this.requireResolver,
module: {},
console: this.buildCustomConsole(taskId),
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
...additionalProperties,
};
return context;
}
}

View file

@ -1,7 +1,7 @@
import './polyfills';
import { Container } from '@n8n/di';
import type { ErrorReporter } from 'n8n-core';
import { ensureError, setGlobalState } from 'n8n-workflow';
import Container from 'typedi';
import { MainConfig } from './config/main-config';
import type { HealthCheckServer } from './health-check-server';

View file

@ -5,19 +5,14 @@ import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
import { TimeoutError } from '@/js-task-runner/errors/timeout-error';
import type { BrokerMessage, RunnerMessage } from '@/message-types';
import { TaskRunnerNodeTypes } from '@/node-types';
import type { TaskResultData } from '@/runner-types';
import { TaskState } from '@/task-state';
import { TaskCancelledError } from './js-task-runner/errors/task-cancelled-error';
export interface Task<T = unknown> {
taskId: string;
settings?: T;
active: boolean;
cancelled: boolean;
}
export interface TaskOffer {
offerId: string;
validUntil: bigint;
@ -49,6 +44,14 @@ const OFFER_VALID_EXTRA_MS = 100;
/** Converts milliseconds to nanoseconds */
const msToNs = (ms: number) => BigInt(ms * 1_000_000);
export const noOp = () => {};
/** Params the task receives when it is executed */
export interface TaskParams<T = unknown> {
taskId: string;
settings: T;
}
export interface TaskRunnerOpts extends BaseRunnerConfig {
taskType: string;
name?: string;
@ -61,7 +64,7 @@ export abstract class TaskRunner extends EventEmitter {
canSendOffers = false;
runningTasks: Map<Task['taskId'], Task> = new Map();
runningTasks: Map<TaskState['taskId'], TaskState> = new Map();
offerInterval: NodeJS.Timeout | undefined;
@ -89,10 +92,9 @@ export abstract class TaskRunner extends EventEmitter {
/** How long (in seconds) a runner may be idle for before exit. */
private readonly idleTimeout: number;
protected taskCancellations = new Map<Task['taskId'], AbortController>();
constructor(opts: TaskRunnerOpts) {
super();
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
@ -174,9 +176,11 @@ export abstract class TaskRunner extends EventEmitter {
sendOffers() {
this.deleteStaleOffers();
const offersToSend =
this.maxConcurrency -
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
if (!this.canSendOffers) {
return;
}
const offersToSend = this.maxConcurrency - (this.openOffers.size + this.runningTasks.size);
for (let i = 0; i < offersToSend; i++) {
// Add a bit of randomness so that not all offers expire at the same time
@ -217,7 +221,7 @@ export abstract class TaskRunner extends EventEmitter {
this.offerAccepted(message.offerId, message.taskId);
break;
case 'broker:taskcancel':
this.taskCancelled(message.taskId, message.reason);
void this.taskCancelled(message.taskId, message.reason);
break;
case 'broker:tasksettings':
void this.receivedSettings(message.taskId, message.settings);
@ -255,11 +259,12 @@ export abstract class TaskRunner extends EventEmitter {
}
hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency;
return this.runningTasks.size < this.maxConcurrency;
}
offerAccepted(offerId: string, taskId: string) {
if (!this.hasOpenTasks()) {
this.openOffers.delete(offerId);
this.send({
type: 'runner:taskrejected',
taskId,
@ -267,6 +272,7 @@ export abstract class TaskRunner extends EventEmitter {
});
return;
}
const offer = this.openOffers.get(offerId);
if (!offer) {
this.send({
@ -280,11 +286,14 @@ export abstract class TaskRunner extends EventEmitter {
}
this.resetIdleTimer();
this.runningTasks.set(taskId, {
const taskState = new TaskState({
taskId,
active: false,
cancelled: false,
timeoutInS: this.taskTimeout,
onTimeout: () => {
void this.taskTimedOut(taskId);
},
});
this.runningTasks.set(taskId, taskState);
this.send({
type: 'runner:taskaccepted',
@ -292,99 +301,103 @@ export abstract class TaskRunner extends EventEmitter {
});
}
taskCancelled(taskId: string, reason: string) {
const task = this.runningTasks.get(taskId);
if (!task) {
async taskCancelled(taskId: string, reason: string) {
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
task.cancelled = true;
for (const [requestId, request] of this.dataRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.dataRequests.delete(requestId);
}
}
await taskState.caseOf({
// If the cancelled task hasn't received settings yet, we can finish it
waitingForSettings: () => this.finishTask(taskState),
for (const [requestId, request] of this.nodeTypesRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.nodeTypesRequests.delete(requestId);
}
}
// If the task has already timed out or is already cancelled, we can
// ignore the cancellation
'aborting:timeout': noOp,
'aborting:cancelled': noOp,
const controller = this.taskCancellations.get(taskId);
if (controller) {
controller.abort();
this.taskCancellations.delete(taskId);
}
if (!task.active) this.runningTasks.delete(taskId);
this.sendOffers();
running: () => {
taskState.status = 'aborting:cancelled';
taskState.abortController.abort('cancelled');
this.cancelTaskRequests(taskId, reason);
},
});
}
taskErrored(taskId: string, error: unknown) {
this.send({
type: 'runner:taskerror',
taskId,
error,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
async taskTimedOut(taskId: string) {
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
taskDone(taskId: string, data: RunnerMessage.ToBroker.TaskDone['data']) {
this.send({
type: 'runner:taskdone',
taskId,
data,
await taskState.caseOf({
// If we are still waiting for settings for the task, we can error the
// task immediately
waitingForSettings: () => {
try {
this.send({
type: 'runner:taskerror',
taskId,
error: new TimeoutError(this.taskTimeout),
});
} finally {
this.finishTask(taskState);
}
},
// This should never happen, the timeout timer should only fire once
'aborting:timeout': TaskState.throwUnexpectedTaskStatus,
// If we are currently executing the task, abort the execution and
// mark the task as timed out
running: () => {
taskState.status = 'aborting:timeout';
taskState.abortController.abort('timeout');
this.cancelTaskRequests(taskId, 'timeout');
},
// If the task is already cancelling, we can ignore the timeout
'aborting:cancelled': noOp,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
async receivedSettings(taskId: string, settings: unknown) {
const task = this.runningTasks.get(taskId);
if (!task) {
return;
}
if (task.cancelled) {
this.runningTasks.delete(taskId);
const taskState = this.runningTasks.get(taskId);
if (!taskState) {
return;
}
const controller = new AbortController();
this.taskCancellations.set(taskId, controller);
await taskState.caseOf({
// These states should never happen, as they are handled already in
// the other lifecycle methods and the task should be removed from the
// running tasks
'aborting:cancelled': TaskState.throwUnexpectedTaskStatus,
'aborting:timeout': TaskState.throwUnexpectedTaskStatus,
running: TaskState.throwUnexpectedTaskStatus,
const taskTimeout = setTimeout(() => {
if (!task.cancelled) {
controller.abort();
this.taskCancellations.delete(taskId);
}
}, this.taskTimeout * 1_000);
waitingForSettings: async () => {
taskState.status = 'running';
task.settings = settings;
task.active = true;
try {
const data = await this.executeTask(task, controller.signal);
this.taskDone(taskId, data);
} catch (error) {
if (!task.cancelled) this.taskErrored(taskId, error);
} finally {
clearTimeout(taskTimeout);
this.taskCancellations.delete(taskId);
this.resetIdleTimer();
}
await this.executeTask(
{
taskId,
settings,
},
taskState.abortController.signal,
)
.then(async (data) => await this.taskExecutionSucceeded(taskState, data))
.catch(async (error) => await this.taskExecutionFailed(taskState, error));
},
});
}
// eslint-disable-next-line @typescript-eslint/naming-convention
async executeTask(_task: Task, _signal: AbortSignal): Promise<TaskResultData> {
async executeTask(_taskParams: TaskParams, _signal: AbortSignal): Promise<TaskResultData> {
throw new ApplicationError('Unimplemented');
}
async requestNodeTypes<T = unknown>(
taskId: Task['taskId'],
taskId: TaskState['taskId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const requestId = nanoid();
@ -413,12 +426,12 @@ export abstract class TaskRunner extends EventEmitter {
}
async requestData<T = unknown>(
taskId: Task['taskId'],
taskId: TaskState['taskId'],
requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'],
): Promise<T> {
const requestId = nanoid();
const p = new Promise<T>((resolve, reject) => {
const dataRequestPromise = new Promise<T>((resolve, reject) => {
this.dataRequests.set(requestId, {
requestId,
taskId,
@ -435,7 +448,7 @@ export abstract class TaskRunner extends EventEmitter {
});
try {
return await p;
return await dataRequestPromise;
} finally {
this.dataRequests.delete(requestId);
}
@ -452,15 +465,15 @@ export abstract class TaskRunner extends EventEmitter {
});
});
this.send({
type: 'runner:rpc',
callId,
taskId,
name,
params,
});
try {
this.send({
type: 'runner:rpc',
callId,
taskId,
name,
params,
});
const returnValue = await dataPromise;
return isSerializedBuffer(returnValue) ? toBuffer(returnValue) : returnValue;
@ -523,4 +536,86 @@ export abstract class TaskRunner extends EventEmitter {
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
private async taskExecutionSucceeded(taskState: TaskState, data: TaskResultData) {
try {
const sendData = () => {
this.send({
type: 'runner:taskdone',
taskId: taskState.taskId,
data,
});
};
await taskState.caseOf({
waitingForSettings: TaskState.throwUnexpectedTaskStatus,
'aborting:cancelled': noOp,
// If the task timed out but we ended up reaching this point, we
// might as well send the data
'aborting:timeout': sendData,
running: sendData,
});
} finally {
this.finishTask(taskState);
}
}
private async taskExecutionFailed(taskState: TaskState, error: unknown) {
try {
const sendError = () => {
this.send({
type: 'runner:taskerror',
taskId: taskState.taskId,
error,
});
};
await taskState.caseOf({
waitingForSettings: TaskState.throwUnexpectedTaskStatus,
'aborting:cancelled': noOp,
'aborting:timeout': () => {
console.warn(`Task ${taskState.taskId} timed out`);
sendError();
},
running: sendError,
});
} finally {
this.finishTask(taskState);
}
}
/**
* Cancels all node type and data requests made by the given task
*/
private cancelTaskRequests(taskId: string, reason: string) {
for (const [requestId, request] of this.dataRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.dataRequests.delete(requestId);
}
}
for (const [requestId, request] of this.nodeTypesRequests.entries()) {
if (request.taskId === taskId) {
request.reject(new TaskCancelledError(reason));
this.nodeTypesRequests.delete(requestId);
}
}
}
/**
* Finishes task by removing it from the running tasks and sending new offers
*/
private finishTask(taskState: TaskState) {
taskState.cleanup();
this.runningTasks.delete(taskState.taskId);
this.sendOffers();
this.resetIdleTimer();
}
}

View file

@ -0,0 +1,118 @@
import * as a from 'node:assert';
export type TaskStatus =
| 'waitingForSettings'
| 'running'
| 'aborting:cancelled'
| 'aborting:timeout';
export type TaskStateOpts = {
taskId: string;
timeoutInS: number;
onTimeout: () => void;
};
/**
* The state of a task. The task can be in one of the following states:
* - waitingForSettings: The task is waiting for settings from the broker
* - running: The task is currently running
* - aborting:cancelled: The task was canceled by the broker and is being aborted
* - aborting:timeout: The task took too long to complete and is being aborted
*
* The task is discarded once it reaches an end state.
*
* The class only holds the state, and does not have any logic.
*
* The task has the following lifecycle:
*
*
*
*
* broker:taskofferaccept : create task state
*
*
* broker:taskcancel / timeout
* waitingForSettings
*
*
* broker:tasksettings
*
*
*
* running aborting:timeout
* timeout
* - execute task - fire abort signal
*
*
* broker:taskcancel
* Task execution Task execution
* resolves / rejects resolves / rejects
*
*
* aborting:cancelled
*
* - fire abort signal
*
* Task execution
* resolves / rejects
*
*
*
*
*
*/
export class TaskState {
status: TaskStatus = 'waitingForSettings';
readonly taskId: string;
/** Controller for aborting the execution of the task */
readonly abortController = new AbortController();
/** Timeout timer for the task */
private timeoutTimer: NodeJS.Timeout | undefined;
constructor(opts: TaskStateOpts) {
this.taskId = opts.taskId;
this.timeoutTimer = setTimeout(opts.onTimeout, opts.timeoutInS * 1000);
}
/** Cleans up any resources before the task can be removed */
cleanup() {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = undefined;
}
/** Custom JSON serialization for the task state for logging purposes */
toJSON() {
return `[Task ${this.taskId} (${this.status})]`;
}
/**
* Executes the function matching the current task status
*
* @example
* ```ts
* taskState.caseOf({
* waitingForSettings: () => {...},
* running: () => {...},
* aborting:cancelled: () => {...},
* aborting:timeout: () => {...},
* });
* ```
*/
async caseOf(
conditions: Record<TaskStatus, (taskState: TaskState) => void | Promise<void> | never>,
) {
if (!conditions[this.status]) {
TaskState.throwUnexpectedTaskStatus(this);
}
return await conditions[this.status](this);
}
/** Throws an error that the task status is unexpected */
static throwUnexpectedTaskStatus = (taskState: TaskState) => {
a.fail(`Unexpected task status: ${JSON.stringify(taskState)}`);
};
}

View file

@ -89,6 +89,7 @@
"@n8n/api-types": "workspace:*",
"@n8n/client-oauth2": "workspace:*",
"@n8n/config": "workspace:*",
"@n8n/di": "workspace:*",
"@n8n/localtunnel": "3.0.0",
"@n8n/n8n-nodes-langchain": "workspace:*",
"@n8n/permissions": "workspace:*",
@ -154,7 +155,7 @@
"prom-client": "13.2.0",
"psl": "1.9.0",
"raw-body": "2.5.1",
"reflect-metadata": "0.2.2",
"reflect-metadata": "catalog:",
"replacestream": "4.0.3",
"samlify": "2.8.9",
"semver": "7.5.4",
@ -165,7 +166,6 @@
"sshpk": "1.17.0",
"swagger-ui-express": "5.0.1",
"syslog-client": "1.1.1",
"typedi": "catalog:",
"uuid": "catalog:",
"validator": "13.7.0",
"ws": "8.17.1",

View file

@ -1,3 +1,4 @@
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import type { IWorkflowBase } from 'n8n-workflow';
import type {
@ -8,7 +9,6 @@ import type {
INodeExecutionData,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
import Container from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { CredentialsHelper } from '@/credentials-helper';

View file

@ -1,3 +1,4 @@
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { DirectedGraph, WorkflowExecute } from 'n8n-core';
import * as core from 'n8n-core';
@ -18,7 +19,6 @@ import {
type IWorkflowExecuteHooks,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import Container from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';

View file

@ -1,4 +1,5 @@
import { GlobalConfig } from '@n8n/config';
import { Container, Service } from '@n8n/di';
import compression from 'compression';
import express from 'express';
import { engine as expressHandlebars } from 'express-handlebars';
@ -6,7 +7,6 @@ import { readFile } from 'fs/promises';
import type { Server } from 'http';
import isbot from 'isbot';
import { Logger } from 'n8n-core';
import { Container, Service } from 'typedi';
import config from '@/config';
import { N8N_VERSION, TEMPLATES_DIR, inDevelopment, inTest } from '@/constants';

View file

@ -1,4 +1,4 @@
import { Service } from 'typedi';
import { Service } from '@n8n/di';
import { CacheService } from '@/services/cache/cache.service';

View file

@ -1,3 +1,4 @@
import { Service } from '@n8n/di';
import { Logger } from 'n8n-core';
import type {
IDeferredPromise,
@ -9,7 +10,6 @@ import type {
import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
import { strict as assert } from 'node:assert';
import type PCancelable from 'p-cancelable';
import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';

View file

@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Service } from '@n8n/di';
import {
ActiveWorkflows,
ErrorReporter,
@ -28,7 +29,6 @@ import {
WebhookPathTakenError,
ApplicationError,
} from 'n8n-workflow';
import { Service } from 'typedi';
import { ActivationErrorsService } from '@/activation-errors.service';
import { ActiveExecutions } from '@/active-executions';

View file

@ -1,9 +1,9 @@
import { GlobalConfig } from '@n8n/config';
import { Container, Service } from '@n8n/di';
import { createHash } from 'crypto';
import type { NextFunction, Response } from 'express';
import { JsonWebTokenError, TokenExpiredError } from 'jsonwebtoken';
import { Logger } from 'n8n-core';
import Container, { Service } from 'typedi';
import config from '@/config';
import { AUTH_COOKIE_NAME, RESPONSE_ERROR_MESSAGES, Time } from '@/constants';

View file

@ -1,5 +1,5 @@
import { Container } from '@n8n/di';
import type { Response } from 'express';
import { Container } from 'typedi';
import type { User } from '@/databases/entities/user';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import type { User } from '@/databases/entities/user';
import { UserRepository } from '@/databases/repositories/user.repository';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import type { User } from '@/databases/entities/user';
import { EventService } from '@/events/event.service';

View file

@ -1,8 +1,8 @@
import type { PushPayload } from '@n8n/api-types';
import { Service } from '@n8n/di';
import { ErrorReporter } from 'n8n-core';
import type { Workflow } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { Service } from 'typedi';
import { CollaborationState } from '@/collaboration/collaboration.state';
import type { User } from '@/databases/entities/user';

View file

@ -1,6 +1,6 @@
import type { Iso8601DateTimeString } from '@n8n/api-types';
import { Service } from '@n8n/di';
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Time } from '@/constants';
import type { User } from '@/databases/entities/user';

View file

@ -1,7 +1,7 @@
import { SecurityConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';
import { RISK_CATEGORIES } from '@/security-audit/constants';
import { SecurityAuditService } from '@/security-audit/security-audit.service';

View file

@ -1,5 +1,6 @@
import 'reflect-metadata';
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { Command, Errors } from '@oclif/core';
import {
BinaryDataService,
@ -10,7 +11,6 @@ import {
ErrorReporter,
} from 'n8n-core';
import { ApplicationError, ensureError, sleep } from 'n8n-workflow';
import { Container } from 'typedi';
import type { AbstractServer } from '@/abstract-server';
import config from '@/config';

View file

@ -1,10 +1,10 @@
import { Container } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import type { DataSourceOptions as ConnectionOptions } from '@n8n/typeorm';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { MigrationExecutor, DataSource as Connection } from '@n8n/typeorm';
import { Command, Flags } from '@oclif/core';
import { Logger } from 'n8n-core';
import { Container } from 'typedi';
import { getConnectionOptions } from '@/databases/config';
import type { Migration } from '@/databases/types';

View file

@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-loop-func */
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import fs from 'fs';
import { diff } from 'json-diff';
@ -7,7 +8,6 @@ import type { IRun, ITaskData, IWorkflowExecutionDataProcess } from 'n8n-workflo
import { ApplicationError, jsonParse } from 'n8n-workflow';
import os from 'os';
import { sep } from 'path';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import type { User } from '@/databases/entities/user';

View file

@ -1,7 +1,7 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import type { IWorkflowBase, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, ExecutionBaseError } from 'n8n-workflow';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

View file

@ -1,9 +1,9 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import fs from 'fs';
import { Credentials } from 'n8n-core';
import { ApplicationError } from 'n8n-workflow';
import path from 'path';
import Container from 'typedi';
import { CredentialsRepository } from '@/databases/repositories/credentials.repository';
import type { ICredentialsDb, ICredentialsDecryptedDb } from '@/interfaces';

View file

@ -1,8 +1,8 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import fs from 'fs';
import { ApplicationError } from 'n8n-workflow';
import path from 'path';
import Container from 'typedi';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

View file

@ -1,3 +1,4 @@
import { Container } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import type { EntityManager } from '@n8n/typeorm';
import { Flags } from '@oclif/core';
@ -6,7 +7,6 @@ import fs from 'fs';
import { Cipher } from 'n8n-core';
import type { ICredentialsEncrypted } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { Container } from 'typedi';
import { UM_FIX_INSTRUCTION } from '@/constants';
import { CredentialsEntity } from '@/databases/entities/credentials-entity';

View file

@ -1,8 +1,8 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import fs from 'fs';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { Container } from 'typedi';
import { UM_FIX_INSTRUCTION } from '@/constants';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';

View file

@ -1,8 +1,8 @@
import { Container } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import Container from 'typedi';
import { UM_FIX_INSTRUCTION } from '@/constants';
import { CredentialsService } from '@/credentials/credentials.service';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import { SETTINGS_LICENSE_CERT_KEY } from '@/constants';
import { SettingsRepository } from '@/databases/repositories/settings.repository';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import { License } from '@/license';

View file

@ -1,5 +1,5 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import Container from 'typedi';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

View file

@ -1,5 +1,5 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import Container from 'typedi';
import { AuthUserRepository } from '@/databases/repositories/auth-user.repository';

View file

@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs';
@ -9,7 +10,6 @@ import { jsonParse, randomString, type IWorkflowExecutionDataProcess } from 'n8n
import path from 'path';
import replaceStream from 'replacestream';
import { pipeline } from 'stream/promises';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
@ -225,7 +225,7 @@ export class Start extends BaseCommand {
const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}

View file

@ -1,5 +1,5 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import { Container } from 'typedi';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

View file

@ -1,4 +1,4 @@
import { Container } from 'typedi';
import { Container } from '@n8n/di';
import type { CredentialsEntity } from '@/databases/entities/credentials-entity';
import { User } from '@/databases/entities/user';

View file

@ -1,6 +1,6 @@
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';

View file

@ -1,5 +1,5 @@
import { Container } from '@n8n/di';
import { Flags, type Config } from '@oclif/core';
import { Container } from 'typedi';
import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants';
@ -113,7 +113,7 @@ export class Worker extends BaseCommand {
const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (taskRunnerConfig.enabled) {
const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const { TaskRunnerModule } = await import('@/task-runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}

Some files were not shown because too many files have changed in this diff Show more