Merge remote-tracking branch 'origin/master' into sec-143-cross-site-scripting-cwe-79

This commit is contained in:
Valya Bullions 2024-10-15 16:39:20 +01:00
commit b677b9a522
No known key found for this signature in database
279 changed files with 9489 additions and 3420 deletions

View file

@ -1,3 +1,50 @@
# [1.63.0](https://github.com/n8n-io/n8n/compare/n8n@1.62.1...n8n@1.63.0) (2024-10-09)
### Bug Fixes
* **Convert to File Node:** Convert to ICS start date defaults to now ([#11114](https://github.com/n8n-io/n8n/issues/11114)) ([1146c4e](https://github.com/n8n-io/n8n/commit/1146c4e98d8c85c15ac67fa1c3bfb731234531e3))
* **core:** Allow loading nodes from multiple custom directories ([#11130](https://github.com/n8n-io/n8n/issues/11130)) ([1b84b0e](https://github.com/n8n-io/n8n/commit/1b84b0e5e7485d9f99d61a8ae3df49efadca0745))
* **core:** Always set `startedAt` when executions start running ([#11098](https://github.com/n8n-io/n8n/issues/11098)) ([722f4a8](https://github.com/n8n-io/n8n/commit/722f4a8b771058800b992a482ad5f644b650960d))
* **core:** Fix AI nodes not working with new partial execution flow ([#11055](https://github.com/n8n-io/n8n/issues/11055)) ([0eee5df](https://github.com/n8n-io/n8n/commit/0eee5dfd597817819dbe0463a63f671fde53432f))
* **core:** Print errors that happen before the execution starts on the worker instead of just on the main instance ([#11099](https://github.com/n8n-io/n8n/issues/11099)) ([1d14557](https://github.com/n8n-io/n8n/commit/1d145574611661ecd9ab1a39d815c0ea915b9a1c))
* **core:** Separate error handlers for main and worker ([#11091](https://github.com/n8n-io/n8n/issues/11091)) ([bb59cc7](https://github.com/n8n-io/n8n/commit/bb59cc71acc9e494e54abc8402d58db39e5a664e))
* **editor:** Shorten overflowing Node Label in InputLabels on hover and focus ([#11110](https://github.com/n8n-io/n8n/issues/11110)) ([87a0b68](https://github.com/n8n-io/n8n/commit/87a0b68f9009c1c776d937c6ca62096e88c95ed6))
* **editor:** Add safety to prevent undefined errors ([#11104](https://github.com/n8n-io/n8n/issues/11104)) ([565b117](https://github.com/n8n-io/n8n/commit/565b117a52f8eac9202a1a62c43daf78b293dcf8))
* **editor:** Fix design system form element sizing ([#11040](https://github.com/n8n-io/n8n/issues/11040)) ([67c3453](https://github.com/n8n-io/n8n/commit/67c3453885bc619fedc8338a6dd0d8d66dead931))
* **editor:** Fix getInitials when Intl.Segmenter is not supported ([#11103](https://github.com/n8n-io/n8n/issues/11103)) ([7e8955b](https://github.com/n8n-io/n8n/commit/7e8955b322b1d2c84c0f479a5977484d8d5e3135))
* **editor:** Fix schema view in AI tools ([#11089](https://github.com/n8n-io/n8n/issues/11089)) ([09cfdbd](https://github.com/n8n-io/n8n/commit/09cfdbd1817eba46c935308880fe9f95ded252b0))
* **editor:** Respect tag querystring filter when listing workflows ([#11029](https://github.com/n8n-io/n8n/issues/11029)) ([59c5ff6](https://github.com/n8n-io/n8n/commit/59c5ff61354302562ba5a2340c66811afdd1523b))
* **editor:** Show previous nodes autocomplete in AI tool nodes ([#11111](https://github.com/n8n-io/n8n/issues/11111)) ([8566b3a](https://github.com/n8n-io/n8n/commit/8566b3a99939f45ac263830eee30d0d4ade9305c))
* **editor:** Update Usage page for Community+ edition ([#11074](https://github.com/n8n-io/n8n/issues/11074)) ([3974981](https://github.com/n8n-io/n8n/commit/3974981ea5c67f6f2bbb90a96b405d9d0cfa21af))
* Fix transaction handling for 'revert' command ([#11145](https://github.com/n8n-io/n8n/issues/11145)) ([a782336](https://github.com/n8n-io/n8n/commit/a7823367f13c3dba0c339eaafaad0199bd524b13))
* Forbid access to files outside source control work directory ([#11152](https://github.com/n8n-io/n8n/issues/11152)) ([606eedb](https://github.com/n8n-io/n8n/commit/606eedbf1b302e153bd13b7cef80847711e3a9ee))
* **Gitlab Node:** Author name and email not being set ([#11077](https://github.com/n8n-io/n8n/issues/11077)) ([fce1233](https://github.com/n8n-io/n8n/commit/fce1233b58624d502c9c68f4b32a4bb7d76f1814))
* Incorrect error message on calling wrong webhook method ([#11093](https://github.com/n8n-io/n8n/issues/11093)) ([d974b01](https://github.com/n8n-io/n8n/commit/d974b015d030c608158ff0c3fa3b7f4cbb8eadd3))
* **n8n Form Trigger Node:** When clicking on a multiple choice label, the wrong one is selected ([#11059](https://github.com/n8n-io/n8n/issues/11059)) ([948edd1](https://github.com/n8n-io/n8n/commit/948edd1a047cf3dbddb3b0e9ec5de4bac3e97b9f))
* **NASA Node:** Astronomy-Picture-Of-The-Day fails when it's YouTube video ([#11046](https://github.com/n8n-io/n8n/issues/11046)) ([c70969d](https://github.com/n8n-io/n8n/commit/c70969da2bcabeb33394073a69ccef208311461b))
* **Postgres PGVector Store Node:** Fix filtering in retriever mode ([#11075](https://github.com/n8n-io/n8n/issues/11075)) ([dbd2ae1](https://github.com/n8n-io/n8n/commit/dbd2ae199506a24c2df4c983111a56f2adf63eee))
* Show result of waiting execution on canvas after execution complete ([#10815](https://github.com/n8n-io/n8n/issues/10815)) ([90b4bfc](https://github.com/n8n-io/n8n/commit/90b4bfc472ef132d2280b175ae7410dfb8e549b2))
* **Slack Node:** User id not sent correctly to API when updating user profile ([#11153](https://github.com/n8n-io/n8n/issues/11153)) ([ed9e61c](https://github.com/n8n-io/n8n/commit/ed9e61c46055d8e636a70c9c175d7d4ba596dd48))
### Features
* **core:** Introduce scoped logging ([#11127](https://github.com/n8n-io/n8n/issues/11127)) ([c68782c](https://github.com/n8n-io/n8n/commit/c68782c633b7ef6253ea705c5a222d4536491fd5))
* **editor:** Add navigation dropdown component ([#11047](https://github.com/n8n-io/n8n/issues/11047)) ([e081fd1](https://github.com/n8n-io/n8n/commit/e081fd1f0b5a0700017a8dc92f013f0abdbad319))
* **editor:** Add route for create / edit / share credentials ([#11134](https://github.com/n8n-io/n8n/issues/11134)) ([5697de4](https://github.com/n8n-io/n8n/commit/5697de4429c5d94f25ce1bd14c84fb4266ea47a7))
* **editor:** Community+ enrollment ([#10776](https://github.com/n8n-io/n8n/issues/10776)) ([92cf860](https://github.com/n8n-io/n8n/commit/92cf860f9f2994442facfddc758bc60f5cbec520))
* Human in the loop ([#10675](https://github.com/n8n-io/n8n/issues/10675)) ([41228b4](https://github.com/n8n-io/n8n/commit/41228b472de11affc8cd0821284427c2c9e8b421))
* **OpenAI Node:** Allow to specify thread ID for Assistant -> Message operation ([#11080](https://github.com/n8n-io/n8n/issues/11080)) ([6a2f9e7](https://github.com/n8n-io/n8n/commit/6a2f9e72959fb0e89006b69c31fbcee1ead1cde9))
* Opt in to additional features on community for existing users ([#11166](https://github.com/n8n-io/n8n/issues/11166)) ([c2adfc8](https://github.com/n8n-io/n8n/commit/c2adfc85451c5103eaad068f882066fd36c4aebe))
### Performance Improvements
* **core:** Optimize worker healthchecks ([#11092](https://github.com/n8n-io/n8n/issues/11092)) ([19fb728](https://github.com/n8n-io/n8n/commit/19fb728da0839c57603e55da4e407715e6c5b081))
## [1.62.1](https://github.com/n8n-io/n8n/compare/n8n@1.61.0...n8n@1.62.1) (2024-10-02)

View file

@ -59,7 +59,7 @@ export function setCredentialByName(name: string) {
export function clickCreateNewCredential() {
openCredentialSelect();
getCreateNewCredentialOption().click();
getCreateNewCredentialOption().click({ force: true });
}
export function clickGetBackToCanvas() {

View file

@ -1,3 +1,6 @@
import { nanoid } from 'nanoid';
import { simpleWebhookCall, waitForWebhook } from './16-webhook-node.cy';
import {
HTTP_REQUEST_NODE_NAME,
MANUAL_TRIGGER_NODE_NAME,
@ -7,6 +10,7 @@ import {
} from '../constants';
import { WorkflowPage, NDV } from '../pages';
import { errorToast } from '../pages/notifications';
import { getVisiblePopper } from '../utils';
const workflowPage = new WorkflowPage();
const ndv = new NDV();
@ -212,6 +216,42 @@ describe('Data pinning', () => {
},
);
});
it('should show pinned data tooltip', () => {
const { callEndpoint } = simpleWebhookCall({
method: 'GET',
webhookPath: nanoid(),
executeNow: false,
});
ndv.actions.close();
workflowPage.actions.executeWorkflow();
cy.wait(waitForWebhook);
// hide other visible popper on workflow execute button
workflowPage.getters.canvasNodes().eq(0).click();
callEndpoint((response) => {
expect(response.status).to.eq(200);
getVisiblePopper().should('have.length', 1);
getVisiblePopper()
.eq(0)
.should(
'have.text',
'You can pin this output instead of waiting for a test event. Open node to do so.',
);
});
});
it('should not show pinned data tooltip', () => {
cy.createFixtureWorkflow('Pinned_webhook_node.json', 'Test');
workflowPage.actions.executeWorkflow();
// hide other visible popper on workflow execute button
workflowPage.getters.canvasNodes().eq(0).click();
getVisiblePopper().should('have.length', 0);
});
});
function setExpressionOnStringValueInSet(expression: string) {

View file

@ -9,7 +9,7 @@ const workflowPage = new WorkflowPage();
const ndv = new NDV();
const credentialsModal = new CredentialsModal();
const waitForWebhook = 500;
export const waitForWebhook = 500;
interface SimpleWebhookCallOptions {
method: string;
@ -21,7 +21,7 @@ interface SimpleWebhookCallOptions {
authentication?: string;
}
const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
export const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
const {
authentication,
method,
@ -65,15 +65,23 @@ const simpleWebhookCall = (options: SimpleWebhookCallOptions) => {
getVisibleSelect().find('.option-headline').contains(responseData).click();
}
const callEndpoint = (cb: (response: Cypress.Response<unknown>) => void) => {
cy.request(method, `${BACKEND_BASE_URL}/webhook-test/${webhookPath}`).then(cb);
};
if (executeNow) {
ndv.actions.execute();
cy.wait(waitForWebhook);
cy.request(method, `${BACKEND_BASE_URL}/webhook-test/${webhookPath}`).then((response) => {
callEndpoint((response) => {
expect(response.status).to.eq(200);
ndv.getters.outputPanel().contains('headers');
});
}
return {
callEndpoint,
};
};
describe('Webhook Trigger node', () => {

View file

@ -78,11 +78,11 @@ describe('AI Assistant::enabled', () => {
});
it('should start chat session from node error view', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Stop and Error');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click();
@ -96,11 +96,11 @@ describe('AI Assistant::enabled', () => {
});
it('should render chat input correctly', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Stop and Error');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click();
@ -129,11 +129,11 @@ describe('AI Assistant::enabled', () => {
});
it('should render and handle quick replies', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/quick_reply_message_response.json',
fixture: 'aiAssistant/responses/quick_reply_message_response.json',
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Stop and Error');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click();
@ -145,43 +145,12 @@ describe('AI Assistant::enabled', () => {
aiAssistant.getters.chatMessagesUser().eq(0).should('contain.text', "Sure, let's do it");
});
it('should show quick replies when node is executed after new suggestion', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', (req) => {
req.reply((res) => {
if (['init-error-helper', 'message'].includes(req.body.payload.type)) {
res.send({ statusCode: 200, fixture: 'aiAssistant/simple_message_response.json' });
} else if (req.body.payload.type === 'event') {
res.send({ statusCode: 200, fixture: 'aiAssistant/node_execution_error_response.json' });
} else {
res.send({ statusCode: 500 });
}
});
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
wf.actions.openNode('Edit Fields');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click();
cy.wait('@chatRequest');
aiAssistant.getters.chatMessagesAssistant().should('have.length', 1);
ndv.getters.nodeExecuteButton().click();
cy.wait('@chatRequest');
// Respond 'Yes' to the quick reply (request new suggestion)
aiAssistant.getters.quickReplies().contains('Yes').click();
cy.wait('@chatRequest');
// No quick replies at this point
aiAssistant.getters.quickReplies().should('not.exist');
ndv.getters.nodeExecuteButton().click();
// But after executing the node again, quick replies should be shown
aiAssistant.getters.chatMessagesAssistant().should('have.length', 4);
aiAssistant.getters.quickReplies().should('have.length', 2);
});
it('should warn before starting a new session', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Edit Fields');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click({ force: true });
@ -204,15 +173,15 @@ describe('AI Assistant::enabled', () => {
});
it('should apply code diff to code node', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/code_diff_suggestion_response.json',
fixture: 'aiAssistant/responses/code_diff_suggestion_response.json',
}).as('chatRequest');
cy.intercept('POST', '/rest/ai-assistant/chat/apply-suggestion', {
cy.intercept('POST', '/rest/ai/chat/apply-suggestion', {
statusCode: 200,
fixture: 'aiAssistant/apply_code_diff_response.json',
fixture: 'aiAssistant/responses/apply_code_diff_response.json',
}).as('applySuggestion');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Code');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click({ force: true });
@ -254,11 +223,11 @@ describe('AI Assistant::enabled', () => {
});
it('should end chat session when `end_session` event is received', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/end_session_response.json',
fixture: 'aiAssistant/responses/end_session_response.json',
}).as('chatRequest');
cy.createFixtureWorkflow('aiAssistant/test_workflow.json');
cy.createFixtureWorkflow('aiAssistant/workflows/test_workflow.json');
wf.actions.openNode('Stop and Error');
ndv.getters.nodeExecuteButton().click();
aiAssistant.getters.nodeErrorViewAssistantButton().click();
@ -268,12 +237,15 @@ describe('AI Assistant::enabled', () => {
});
it('should reset session after it ended and sidebar is closed', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', (req) => {
cy.intercept('POST', '/rest/ai/chat', (req) => {
req.reply((res) => {
if (['init-support-chat'].includes(req.body.payload.type)) {
res.send({ statusCode: 200, fixture: 'aiAssistant/simple_message_response.json' });
res.send({
statusCode: 200,
fixture: 'aiAssistant/responses/simple_message_response.json',
});
} else {
res.send({ statusCode: 200, fixture: 'aiAssistant/end_session_response.json' });
res.send({ statusCode: 200, fixture: 'aiAssistant/responses/end_session_response.json' });
}
});
}).as('chatRequest');
@ -296,9 +268,9 @@ describe('AI Assistant::enabled', () => {
});
it('Should not reset assistant session when workflow is saved', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
wf.actions.addInitialNodeToCanvas(SCHEDULE_TRIGGER_NODE_NAME);
aiAssistant.actions.openChat();
@ -321,9 +293,9 @@ describe('AI Assistant Credential Help', () => {
});
it('should start credential help from node credential', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
wf.actions.addNodeToCanvas(SCHEDULE_TRIGGER_NODE_NAME);
wf.actions.addNodeToCanvas(GMAIL_NODE_NAME);
@ -347,9 +319,9 @@ describe('AI Assistant Credential Help', () => {
});
it('should start credential help from credential list', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/simple_message_response.json',
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
cy.visit(credentialsPage.url);
@ -446,9 +418,9 @@ describe('General help', () => {
});
it('assistant returns code snippet', () => {
cy.intercept('POST', '/rest/ai-assistant/chat', {
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/code_snippet_response.json',
fixture: 'aiAssistant/responses/code_snippet_response.json',
}).as('chatRequest');
aiAssistant.getters.askAssistantFloatingButton().should('be.visible');
@ -492,4 +464,65 @@ describe('General help', () => {
);
aiAssistant.getters.codeSnippet().should('have.text', '{{$json.body.city}}');
});
it('should send current context to support chat', () => {
cy.createFixtureWorkflow('aiAssistant/workflows/simple_http_request_workflow.json');
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
aiAssistant.getters.askAssistantFloatingButton().click();
aiAssistant.actions.sendMessage('What is wrong with this workflow?');
cy.wait('@chatRequest').then((interception) => {
const { body } = interception.request;
// Body should contain the current workflow context
expect(body.payload).to.have.property('context');
expect(body.payload.context).to.have.property('currentView');
expect(body.payload.context.currentView.name).to.equal('NodeViewExisting');
expect(body.payload.context).to.have.property('currentWorkflow');
});
});
it('should not send workflow context if nothing changed', () => {
cy.createFixtureWorkflow('aiAssistant/workflows/simple_http_request_workflow.json');
cy.intercept('POST', '/rest/ai/chat', {
statusCode: 200,
fixture: 'aiAssistant/responses/simple_message_response.json',
}).as('chatRequest');
aiAssistant.getters.askAssistantFloatingButton().click();
aiAssistant.actions.sendMessage('What is wrong with this workflow?');
cy.wait('@chatRequest');
// Send another message without changing workflow or executing any node
aiAssistant.actions.sendMessage('And now?');
cy.wait('@chatRequest').then((interception) => {
const { body } = interception.request;
// Workflow context should be empty
expect(body.payload).to.have.property('context');
expect(body.payload.context).not.to.have.property('currentWorkflow');
});
// Update http request node url
wf.actions.openNode('HTTP Request');
ndv.actions.typeIntoParameterInput('url', 'https://example.com');
ndv.actions.close();
// Also execute the workflow
wf.actions.executeWorkflow();
// Send another message
aiAssistant.actions.sendMessage('What about now?');
cy.wait('@chatRequest').then((interception) => {
const { body } = interception.request;
// Both workflow and execution context should be sent
expect(body.payload).to.have.property('context');
expect(body.payload.context).to.have.property('currentWorkflow');
expect(body.payload.context.currentWorkflow).not.to.be.empty;
expect(body.payload.context).to.have.property('executionData');
expect(body.payload.context.executionData).not.to.be.empty;
});
});
});

View file

@ -91,28 +91,12 @@ return []
});
describe('Ask AI', () => {
it('tab should display based on experiment', () => {
WorkflowPage.actions.visit();
cy.window().then((win) => {
win.featureFlags.override('011_ask_AI', 'control');
WorkflowPage.actions.addInitialNodeToCanvas('Manual');
WorkflowPage.actions.addNodeToCanvas('Code');
WorkflowPage.actions.openNode('Code');
cy.getByTestId('code-node-tab-ai').should('not.exist');
ndv.actions.close();
win.featureFlags.override('011_ask_AI', undefined);
WorkflowPage.actions.openNode('Code');
cy.getByTestId('code-node-tab-ai').should('not.exist');
});
});
describe('Enabled', () => {
beforeEach(() => {
cy.enableFeature('askAi');
WorkflowPage.actions.visit();
cy.window().then((win) => {
win.featureFlags.override('011_ask_AI', 'gpt3');
cy.window().then(() => {
WorkflowPage.actions.addInitialNodeToCanvas('Manual');
WorkflowPage.actions.addNodeToCanvas('Code', true, true);
});
@ -157,7 +141,7 @@ return []
cy.getByTestId('ask-ai-prompt-input').type(prompt);
cy.intercept('POST', '/rest/ask-ai', {
cy.intercept('POST', '/rest/ai/ask-ai', {
statusCode: 200,
body: {
data: {
@ -169,9 +153,7 @@ return []
cy.getByTestId('ask-ai-cta').click();
const askAiReq = cy.wait('@ask-ai');
askAiReq
.its('request.body')
.should('have.keys', ['question', 'model', 'context', 'n8nVersion']);
askAiReq.its('request.body').should('have.keys', ['question', 'context', 'forNode']);
askAiReq.its('context').should('have.keys', ['schema', 'ndvPushRef', 'pushRef']);
@ -195,7 +177,7 @@ return []
];
handledCodes.forEach(({ code, message }) => {
cy.intercept('POST', '/rest/ask-ai', {
cy.intercept('POST', '/rest/ai/ask-ai', {
statusCode: code,
status: code,
}).as('ask-ai');

View file

@ -0,0 +1,39 @@
{
"nodes": [
{
"parameters": {
"path": "FwrbSiaua2Xmvn6-Z-7CQ",
"options": {}
},
"id": "8fcc7e5f-2cef-4938-9564-eea504c20aa0",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
360,
220
],
"webhookId": "9c778f2a-e882-46ed-a0e4-c8e2f76ccd65"
}
],
"connections": {},
"pinData": {
"Webhook": [
{
"headers": {
"connection": "keep-alive",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"accept": "*/*",
"cookie": "n8n-auth=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjNiM2FhOTE5LWRhZDgtNDE5MS1hZWZiLTlhZDIwZTZkMjJjNiIsImhhc2giOiJ1ZVAxR1F3U2paIiwiaWF0IjoxNzI4OTE1NTQyLCJleHAiOjE3Mjk1MjAzNDJ9.fV02gpUnSiUoMxHwfB0npBjcjct7Mv9vGfj-jRTT3-I",
"host": "localhost:5678",
"accept-encoding": "gzip, deflate"
},
"params": {},
"query": {},
"body": {},
"webhookUrl": "http://localhost:5678/webhook-test/FwrbSiaua2Xmvn6-Z-7CQ",
"executionMode": "test"
}
]
}
}

View file

@ -0,0 +1,35 @@
{
"nodes": [
{
"parameters": {},
"id": "298d3dc9-5e99-4b3f-919e-05fdcdfbe2d0",
"name": "When clicking Test workflow",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [360, 220]
},
{
"parameters": {
"options": {}
},
"id": "65c32346-e939-4ec7-88a9-1f9184e2258d",
"name": "HTTP Request",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [580, 220]
}
],
"connections": {
"When clicking Test workflow": {
"main": [
[
{
"node": "HTTP Request",
"type": "main",
"index": 0
}
]
]
}
}
}

View file

@ -156,7 +156,7 @@ export class NDV extends BasePage {
this.getters.nodeExecuteButton().first().click();
},
close: () => {
this.getters.backToCanvas().click();
this.getters.backToCanvas().click({ force: true });
},
openInlineExpressionEditor: () => {
cy.contains('Expression').invoke('show').click();

View file

@ -31,6 +31,30 @@ WORKDIR /home/node
COPY --from=builder /compiled /usr/local/lib/node_modules/n8n
COPY docker/images/n8n/docker-entrypoint.sh /
# Setup the Task Runner Launcher
ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.1.1
ENV N8N_RUNNERS_USE_LAUNCHER=true \
N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher
COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json
# First, download, verify, then extract the launcher binary
# Second, chmod with 4555 to allow the use of setuid
# Third, create a new user and group to execute the Task Runners under
RUN \
if [[ "$TARGETPLATFORM" = "linux/amd64" ]]; then export ARCH_NAME="x86_64"; \
elif [[ "$TARGETPLATFORM" = "linux/arm64" ]]; then export ARCH_NAME="aarch64"; fi; \
mkdir /launcher-temp && \
cd /launcher-temp && \
wget https://github.com/n8n-io/task-runner-launcher/releases/download/${LAUNCHER_VERSION}/task-runner-launcher-$ARCH_NAME-unknown-linux-musl.zip && \
wget https://github.com/n8n-io/task-runner-launcher/releases/download/${LAUNCHER_VERSION}/task-runner-launcher-$ARCH_NAME-unknown-linux-musl.sha256 && \
sha256sum -c task-runner-launcher-$ARCH_NAME-unknown-linux-musl.sha256 && \
unzip -d $(dirname ${N8N_RUNNERS_LAUNCHER_PATH}) task-runner-launcher-$ARCH_NAME-unknown-linux-musl.zip task-runner-launcher && \
cd - && \
rm -r /launcher-temp && \
chmod 4555 ${N8N_RUNNERS_LAUNCHER_PATH} && \
addgroup -g 2000 task-runner && \
adduser -D -u 2000 -g "Task Runner User" -G task-runner task-runner
RUN \
cd /usr/local/lib/node_modules/n8n && \
npm rebuild sqlite3 && \

View file

@ -22,6 +22,30 @@ RUN set -eux; \
find /usr/local/lib/node_modules/n8n -type f -name "*.ts" -o -name "*.js.map" -o -name "*.vue" | xargs rm -f && \
rm -rf /root/.npm
# Setup the Task Runner Launcher
ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.1.1
ENV N8N_RUNNERS_USE_LAUNCHER=true \
N8N_RUNNERS_LAUNCHER_PATH=/usr/local/bin/task-runner-launcher
COPY n8n-task-runners.json /etc/n8n-task-runners.json
# First, download, verify, then extract the launcher binary
# Second, chmod with 4555 to allow the use of setuid
# Third, create a new user and group to execute the Task Runners under
RUN \
if [[ "$TARGETPLATFORM" = "linux/amd64" ]]; then export ARCH_NAME="x86_64"; \
elif [[ "$TARGETPLATFORM" = "linux/arm64" ]]; then export ARCH_NAME="aarch64"; fi; \
mkdir /launcher-temp && \
cd /launcher-temp && \
wget https://github.com/n8n-io/task-runner-launcher/releases/download/${LAUNCHER_VERSION}/task-runner-launcher-$ARCH_NAME-unknown-linux-musl.zip && \
wget https://github.com/n8n-io/task-runner-launcher/releases/download/${LAUNCHER_VERSION}/task-runner-launcher-$ARCH_NAME-unknown-linux-musl.sha256 && \
sha256sum -c task-runner-launcher-$ARCH_NAME-unknown-linux-musl.sha256 && \
unzip -d $(dirname ${N8N_RUNNERS_LAUNCHER_PATH}) task-runner-launcher-$ARCH_NAME-unknown-linux-musl.zip task-runner-launcher && \
cd - && \
rm -r /launcher-temp && \
chmod 4555 ${N8N_RUNNERS_LAUNCHER_PATH} && \
addgroup -g 2000 task-runner && \
adduser -D -u 2000 -g "Task Runner User" -G task-runner task-runner
COPY docker-entrypoint.sh /
RUN \

View file

@ -0,0 +1,19 @@
{
"task-runners": [
{
"runner-type": "javascript",
"workdir": "/home/task-runner",
"command": "/usr/local/bin/node",
"args": ["/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"],
"allowed-env": [
"PATH",
"N8N_RUNNERS_GRANT_TOKEN",
"N8N_RUNNERS_N8N_URI",
"NODE_FUNCTION_ALLOW_BUILTIN",
"NODE_FUNCTION_ALLOW_EXTERNAL"
],
"uid": 2000,
"gid": 2000
}
]
}

View file

@ -1,7 +0,0 @@
{
"folders": [
{
"path": "."
}
]
}

View file

@ -1,6 +1,6 @@
{
"name": "n8n-monorepo",
"version": "1.62.1",
"version": "1.63.0",
"private": true,
"engines": {
"node": ">=20.15",
@ -69,14 +69,15 @@
],
"overrides": {
"@types/node": "^18.16.16",
"chokidar": "3.5.2",
"esbuild": "^0.20.2",
"chokidar": "^4.0.1",
"esbuild": "^0.24.0",
"formidable": "3.5.1",
"pug": "^3.0.3",
"semver": "^7.5.4",
"tslib": "^2.6.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.6.2",
"vue-tsc": "^2.1.6",
"ws": ">=8.17.1"
},
"patchedDependencies": {

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/api-types",
"version": "0.3.0",
"version": "0.4.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -107,6 +107,9 @@ export interface FrontendSettings {
aiAssistant: {
enabled: boolean;
};
askAi: {
enabled: boolean;
};
deployment: {
type: string;
};
@ -154,9 +157,6 @@ export interface FrontendSettings {
banners: {
dismissed: string[];
};
ai: {
enabled: boolean;
};
workflowHistory: {
pruneTime: number;
licensePruneTime: number;

View file

@ -11,7 +11,7 @@ export type RunningJobSummary = {
};
export type WorkerStatus = {
workerId: string;
senderId: string;
runningJobsSummary: RunningJobSummary[];
freeMem: number;
totalMem: number;

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/n8n-benchmark",
"version": "1.6.1",
"version": "1.7.0",
"description": "Cli for running benchmark tests for n8n",
"main": "dist/index",
"scripts": {

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/chat",
"version": "0.27.1",
"version": "0.28.0",
"scripts": {
"dev": "pnpm run storybook",
"build": "pnpm build:vite && pnpm build:bundle",
@ -50,7 +50,7 @@
"unplugin-icons": "^0.19.0",
"vite": "catalog:frontend",
"vitest": "catalog:frontend",
"vite-plugin-dts": "^3.9.1",
"vite-plugin-dts": "^4.2.3",
"vue-tsc": "catalog:frontend"
},
"files": [

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/config",
"version": "1.12.1",
"version": "1.13.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -1,6 +1,17 @@
import { Config, Env, Nested } from '../decorators';
import { StringArray } from '../utils';
/**
* Scopes (areas of functionality) to filter logs by.
*
* `executions` -> execution lifecycle
* `license` -> license SDK
* `scaling` -> scaling mode
*/
export const LOG_SCOPES = ['executions', 'license', 'scaling'] as const;
export type LogScope = (typeof LOG_SCOPES)[number];
@Config
class FileLoggingConfig {
/**
@ -44,4 +55,19 @@ export class LoggingConfig {
@Nested
file: FileLoggingConfig;
/**
* Scopes to filter logs by. Nothing is filtered by default.
*
* Currently supported log scopes:
* - `executions`
* - `license`
* - `scaling`
*
* @example
* `N8N_LOG_SCOPES=license`
* `N8N_LOG_SCOPES=license,executions`
*/
@Env('N8N_LOG_SCOPES')
scopes: StringArray<LogScope> = [];
}

View file

@ -19,4 +19,14 @@ export class TaskRunnersConfig {
/** IP address task runners server should listen on */
@Env('N8N_RUNNERS_SERVER_LISTEN_ADDRESS')
listen_address: string = '127.0.0.1';
@Env('N8N_RUNNERS_USE_LAUNCHER')
useLauncher: boolean = false;
@Env('N8N_RUNNERS_LAUNCHER_PATH')
launcherPath: string = '';
/** Which task runner to launch from the config */
@Env('N8N_RUNNERS_LAUNCHER_RUNNER')
launcherRunner: string = 'javascript';
}

View file

@ -18,6 +18,9 @@ import { VersionNotificationsConfig } from './configs/version-notifications.conf
import { WorkflowsConfig } from './configs/workflows.config';
import { Config, Env, Nested } from './decorators';
export { LOG_SCOPES } from './configs/logging.config';
export type { LogScope } from './configs/logging.config';
@Config
export class GlobalConfig {
@Nested

View file

@ -228,6 +228,9 @@ describe('GlobalConfig', () => {
authToken: '',
listen_address: '127.0.0.1',
port: 5679,
useLauncher: false,
launcherPath: '',
launcherRunner: 'javascript',
},
sentry: {
backendDsn: '',
@ -241,13 +244,13 @@ describe('GlobalConfig', () => {
fileSizeMax: 16,
location: 'logs/n8n.log',
},
scopes: [],
},
};
it('should use all default values when no env variables are defined', () => {
process.env = {};
const config = Container.get(GlobalConfig);
expect(deepCopy(config)).toEqual(defaultConfig);
expect(mockFs.readFileSync).not.toHaveBeenCalled();
});

View file

@ -281,6 +281,7 @@ export class ToolHttpRequest implements INodeType {
'User-Agent': undefined,
},
body: {},
returnFullResponse: true,
};
const authentication = this.getNodeParameter('authentication', itemIndex, 'none') as

View file

@ -0,0 +1,165 @@
import get from 'lodash/get';
import type { IDataObject, IExecuteFunctions } from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import type { N8nTool } from '../../../../utils/N8nTool';
import { ToolHttpRequest } from '../ToolHttpRequest.node';
const createExecuteFunctionsMock = (parameters: IDataObject, requestMock: any) => {
const nodeParameters = parameters;
return {
getNodeParameter(parameter: string) {
return get(nodeParameters, parameter);
},
getNode() {
return {
name: 'HTTP Request',
};
},
getInputData() {
return [{ json: {} }];
},
getWorkflow() {
return {
name: 'Test Workflow',
};
},
continueOnFail() {
return false;
},
addInputData() {
return { index: 0 };
},
addOutputData() {
return;
},
helpers: {
httpRequest: requestMock,
},
} as unknown as IExecuteFunctions;
};
describe('ToolHttpRequest', () => {
let httpTool: ToolHttpRequest;
let mockRequest: jest.Mock;
describe('Binary response', () => {
beforeEach(() => {
httpTool = new ToolHttpRequest();
mockRequest = jest.fn();
});
it('should return the error when receiving a binary response', async () => {
mockRequest.mockResolvedValue({
body: Buffer.from(''),
headers: {
'content-type': 'image/jpeg',
},
});
const { response } = await httpTool.supplyData.call(
createExecuteFunctionsMock(
{
method: 'GET',
url: 'https://httpbin.org/image/jpeg',
options: {},
placeholderDefinitions: {
values: [],
},
},
mockRequest,
),
0,
);
const res = await (response as N8nTool).invoke('');
expect(res).toContain('error');
expect(res).toContain('Binary data is not supported');
});
it('should return the response text when receiving a text response', async () => {
mockRequest.mockResolvedValue({
body: 'Hello World',
headers: {
'content-type': 'text/plain',
},
});
const { response } = await httpTool.supplyData.call(
createExecuteFunctionsMock(
{
method: 'GET',
url: 'https://httpbin.org/text/plain',
options: {},
placeholderDefinitions: {
values: [],
},
},
mockRequest,
),
0,
);
const res = await (response as N8nTool).invoke('');
expect(res).toEqual('Hello World');
});
it('should return the response text when receiving a text response with a charset', async () => {
mockRequest.mockResolvedValue({
body: 'こんにちは世界',
headers: {
'content-type': 'text/plain; charset=iso-2022-jp',
},
});
const { response } = await httpTool.supplyData.call(
createExecuteFunctionsMock(
{
method: 'GET',
url: 'https://httpbin.org/text/plain',
options: {},
placeholderDefinitions: {
values: [],
},
},
mockRequest,
),
0,
);
const res = await (response as N8nTool).invoke('');
expect(res).toEqual('こんにちは世界');
});
it('should return the response object when receiving a JSON response', async () => {
const mockJson = { hello: 'world' };
mockRequest.mockResolvedValue({
body: mockJson,
headers: {
'content-type': 'application/json',
},
});
const { response } = await httpTool.supplyData.call(
createExecuteFunctionsMock(
{
method: 'GET',
url: 'https://httpbin.org/json',
options: {},
placeholderDefinitions: {
values: [],
},
},
mockRequest,
),
0,
);
const res = await (response as N8nTool).invoke('');
expect(jsonParse(res)).toEqual(mockJson);
});
});
});

View file

@ -1,3 +1,12 @@
import { Readability } from '@mozilla/readability';
import cheerio from 'cheerio';
import { convert } from 'html-to-text';
import { JSDOM } from 'jsdom';
import get from 'lodash/get';
import set from 'lodash/set';
import unset from 'lodash/unset';
import * as mime from 'mime-types';
import { getOAuth2AdditionalParameters } from 'n8n-nodes-base/dist/nodes/HttpRequest/GenericFunctions';
import type {
IExecuteFunctions,
IDataObject,
@ -7,20 +16,8 @@ import type {
NodeApiError,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, jsonParse } from 'n8n-workflow';
import { getOAuth2AdditionalParameters } from 'n8n-nodes-base/dist/nodes/HttpRequest/GenericFunctions';
import set from 'lodash/set';
import get from 'lodash/get';
import unset from 'lodash/unset';
import cheerio from 'cheerio';
import { convert } from 'html-to-text';
import { Readability } from '@mozilla/readability';
import { JSDOM } from 'jsdom';
import { z } from 'zod';
import type { DynamicZodObject } from '../../../types/zod.types';
import type {
ParameterInputType,
ParametersValues,
@ -29,6 +26,7 @@ import type {
SendIn,
ToolParameter,
} from './interfaces';
import type { DynamicZodObject } from '../../../types/zod.types';
const genericCredentialRequest = async (ctx: IExecuteFunctions, itemIndex: number) => {
const genericType = ctx.getNodeParameter('genericAuthType', itemIndex) as string;
@ -176,6 +174,7 @@ const htmlOptimizer = (ctx: IExecuteFunctions, itemIndex: number, maxLength: num
);
}
const returnData: string[] = [];
const html = cheerio.load(response);
const htmlElements = html(cssSelector);
@ -574,6 +573,7 @@ export const configureToolFunction = (
// Clone options and rawRequestOptions to avoid mutating the original objects
const options: IHttpRequestOptions | null = structuredClone(requestOptions);
const clonedRawRequestOptions: { [key: string]: string } = structuredClone(rawRequestOptions);
let fullResponse: any;
let response: string = '';
let executionError: Error | undefined = undefined;
@ -732,8 +732,6 @@ export const configureToolFunction = (
}
}
} catch (error) {
console.error(error);
const errorMessage = 'Input provided by model is not valid';
if (error instanceof NodeOperationError) {
@ -749,11 +747,29 @@ export const configureToolFunction = (
if (options) {
try {
response = optimizeResponse(await httpRequest(options));
fullResponse = await httpRequest(options);
} catch (error) {
const httpCode = (error as NodeApiError).httpCode;
response = `${httpCode ? `HTTP ${httpCode} ` : ''}There was an error: "${error.message}"`;
}
if (!response) {
try {
// Check if the response is binary data
if (fullResponse?.headers?.['content-type']) {
const contentType = fullResponse.headers['content-type'] as string;
const mimeType = contentType.split(';')[0].trim();
if (mime.charset(mimeType) !== 'UTF-8') {
throw new NodeOperationError(ctx.getNode(), 'Binary data is not supported');
}
}
response = optimizeResponse(fullResponse.body);
} catch (error) {
response = `There was an error: "${error.message}"`;
}
}
}
if (typeof response !== 'string') {

View file

@ -278,7 +278,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
for (const tool of externalTools ?? []) {
if (tool.name === functionName) {
const parsedArgs: { input: string } = jsonParse(functionArgs);
const functionInput = parsedArgs.input ?? functionArgs;
const functionInput = parsedArgs.input ?? parsedArgs ?? functionArgs;
functionResponse = await tool.invoke(functionInput);
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/n8n-nodes-langchain",
"version": "1.62.1",
"version": "1.63.0",
"description": "",
"main": "index.js",
"scripts": {
@ -124,6 +124,7 @@
"@types/cheerio": "^0.22.15",
"@types/html-to-text": "^9.0.1",
"@types/json-schema": "^7.0.15",
"@types/mime-types": "^2.1.0",
"@types/pg": "^8.11.6",
"@types/temp": "^0.9.1",
"n8n-core": "workspace:*"
@ -171,6 +172,7 @@
"langchain": "0.3.2",
"lodash": "catalog:",
"mammoth": "1.7.2",
"mime-types": "2.1.35",
"n8n-nodes-base": "workspace:*",
"n8n-workflow": "workspace:*",
"openai": "4.63.0",

View file

@ -4,18 +4,18 @@
"version": "0.0.1",
"devDependencies": {
"@chromatic-com/storybook": "^2.0.2",
"@storybook/addon-a11y": "^8.3.1",
"@storybook/addon-actions": "^8.3.1",
"@storybook/addon-docs": "^8.3.1",
"@storybook/addon-essentials": "^8.3.1",
"@storybook/addon-interactions": "^8.3.1",
"@storybook/addon-links": "^8.3.1",
"@storybook/addon-themes": "^8.3.1",
"@storybook/blocks": "^8.3.1",
"@storybook/test": "^8.3.1",
"@storybook/vue3": "^8.3.1",
"@storybook/vue3-vite": "^8.3.1",
"@storybook/addon-a11y": "^8.3.5",
"@storybook/addon-actions": "^8.3.5",
"@storybook/addon-docs": "^8.3.5",
"@storybook/addon-essentials": "^8.3.5",
"@storybook/addon-interactions": "^8.3.5",
"@storybook/addon-links": "^8.3.5",
"@storybook/addon-themes": "^8.3.5",
"@storybook/blocks": "^8.3.5",
"@storybook/test": "^8.3.5",
"@storybook/vue3": "^8.3.5",
"@storybook/vue3-vite": "^8.3.5",
"chromatic": "^11.10.2",
"storybook": "^8.3.1"
"storybook": "^8.3.5"
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/task-runner",
"version": "1.0.1",
"version": "1.1.0",
"scripts": {
"clean": "rimraf dist .turbo",
"start": "node dist/start.js",
@ -26,5 +26,8 @@
"n8n-core": "workspace:*",
"nanoid": "^3.3.6",
"ws": "^8.18.0"
},
"devDependencies": {
"luxon": "catalog:"
}
}

View file

@ -1,6 +1,10 @@
import { DateTime } from 'luxon';
import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
import { builtinModules } from 'node:module';
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { JsTaskRunnerOpts } from '@/js-task-runner/js-task-runner';
import {
JsTaskRunner,
type AllCodeTaskData,
@ -9,32 +13,88 @@ import {
import type { Task } from '@/task-runner';
import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
import { ExecutionError } from '../errors/execution-error';
jest.mock('ws');
describe('JsTaskRunner', () => {
const jsTaskRunner = new JsTaskRunner('taskType', 'ws://localhost', 'grantToken', 1);
const createRunnerWithOpts = (opts: Partial<JsTaskRunnerOpts> = {}) =>
new JsTaskRunner({
wsUrl: 'ws://localhost',
grantToken: 'grantToken',
maxConcurrency: 1,
...opts,
});
const defaultTaskRunner = createRunnerWithOpts();
const execTaskWithParams = async ({
task,
taskData,
runner = defaultTaskRunner,
}: {
task: Task<JSExecSettings>;
taskData: AllCodeTaskData;
runner?: JsTaskRunner;
}) => {
jest.spyOn(jsTaskRunner, 'requestData').mockResolvedValue(taskData);
return await jsTaskRunner.executeTask(task);
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
return await runner.executeTask(task);
};
afterEach(() => {
jest.restoreAllMocks();
});
const executeForAllItems = async ({
code,
inputItems,
settings,
runner,
}: {
code: string;
inputItems: IDataObject[];
settings?: Partial<JSExecSettings>;
runner?: JsTaskRunner;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForAllItems',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
runner,
});
};
const executeForEachItem = async ({
code,
inputItems,
settings,
runner,
}: {
code: string;
inputItems: IDataObject[];
settings?: Partial<JSExecSettings>;
runner?: JsTaskRunner;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForEachItem',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
runner,
});
};
describe('console', () => {
test.each<[CodeExecutionMode]>([['runOnceForAllItems'], ['runOnceForEachItem']])(
'should make an rpc call for console log in %s mode',
async (nodeMode) => {
jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);
const task = newTaskWithSettings({
code: "console.log('Hello', 'world!'); return {}",
nodeMode,
@ -45,29 +105,185 @@ describe('JsTaskRunner', () => {
taskData: newAllCodeTaskData([wrapIntoJson({})]),
});
expect(jsTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'Hello world!',
]);
},
);
});
describe('runOnceForAllItems', () => {
const executeForAllItems = async ({
code,
inputItems,
settings,
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForAllItems',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
describe('built-in methods and variables available in the context', () => {
const inputItems = [{ a: 1 }];
const testExpressionForAllItems = async (
expression: string,
expected: IDataObject | string | number | boolean,
) => {
const needsWrapping = typeof expected !== 'object';
const outcome = await executeForAllItems({
code: needsWrapping ? `return { val: ${expression} }` : `return ${expression}`,
inputItems,
});
expect(outcome.result).toEqual([wrapIntoJson(needsWrapping ? { val: expected } : expected)]);
};
const testExpressionForEachItem = async (
expression: string,
expected: IDataObject | string | number | boolean,
) => {
const needsWrapping = typeof expected !== 'object';
const outcome = await executeForEachItem({
code: needsWrapping ? `return { val: ${expression} }` : `return ${expression}`,
inputItems,
});
expect(outcome.result).toEqual([
withPairedItem(0, wrapIntoJson(needsWrapping ? { val: expected } : expected)),
]);
};
const testGroups = {
// https://docs.n8n.io/code/builtin/current-node-input/
'current node input': [
['$input.first()', inputItems[0]],
['$input.last()', inputItems[inputItems.length - 1]],
['$input.params', { manualTriggerParam: 'empty' }],
],
// https://docs.n8n.io/code/builtin/output-other-nodes/
'output of other nodes': [
['$("Trigger").first()', inputItems[0]],
['$("Trigger").last()', inputItems[inputItems.length - 1]],
['$("Trigger").params', { manualTriggerParam: 'empty' }],
],
// https://docs.n8n.io/code/builtin/date-time/
'date and time': [
['$now', expect.any(DateTime)],
['$today', expect.any(DateTime)],
['{dt: DateTime}', { dt: expect.any(Function) }],
],
// https://docs.n8n.io/code/builtin/jmespath/
JMESPath: [['{ val: $jmespath([{ f: 1 },{ f: 2 }], "[*].f") }', { val: [1, 2] }]],
// https://docs.n8n.io/code/builtin/n8n-metadata/
'n8n metadata': [
[
'$execution',
{
id: 'exec-id',
mode: 'test',
resumeFormUrl: 'http://formWaitingBaseUrl/exec-id',
resumeUrl: 'http://webhookWaitingBaseUrl/exec-id',
customData: {
get: expect.any(Function),
getAll: expect.any(Function),
set: expect.any(Function),
setAll: expect.any(Function),
},
},
],
['$("Trigger").isExecuted', true],
['$nodeVersion', 2],
['$prevNode.name', 'Trigger'],
['$prevNode.outputIndex', 0],
['$runIndex', 0],
['{ wf: $workflow }', { wf: { active: true, id: '1', name: 'Test Workflow' } }],
['$vars', { var: 'value' }],
],
};
for (const [groupName, tests] of Object.entries(testGroups)) {
describe(`${groupName} runOnceForAllItems`, () => {
test.each(tests)(
'should have the %s available in the context',
async (expression, expected) => {
await testExpressionForAllItems(expression, expected);
},
);
});
describe(`${groupName} runOnceForEachItem`, () => {
test.each(tests)(
'should have the %s available in the context',
async (expression, expected) => {
await testExpressionForEachItem(expression, expected);
},
);
});
}
describe('$env', () => {
it('should have the env available in context when access has not been blocked', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
env: { VAR1: 'value' },
},
}),
});
expect(outcome.result).toEqual([wrapIntoJson({ val: 'value' })]);
});
it('should be possible to access env if it has been blocked', async () => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: true,
isProcessAvailable: true,
env: { VAR1: 'value' },
},
}),
}),
).rejects.toThrow('access to env vars denied');
});
it('should not be possible to iterate $env', async () => {
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
env: { VAR1: '1', VAR2: '2', VAR3: '3' },
},
}),
});
expect(outcome.result).toEqual([]);
});
it("should not expose task runner's env variables even if no env state is received", async () => {
process.env.N8N_RUNNERS_N8N_URI = 'http://127.0.0.1:5679';
const outcome = await execTaskWithParams({
task: newTaskWithSettings({
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: undefined,
}),
});
expect(outcome.result).toEqual([wrapIntoJson({ val: undefined })]);
});
});
});
describe('runOnceForAllItems', () => {
describe('continue on fail', () => {
it('should return an item with the error if continueOnFail is true', async () => {
const outcome = await executeForAllItems({
@ -77,7 +293,7 @@ describe('JsTaskRunner', () => {
});
expect(outcome).toEqual({
result: [wrapIntoJson({ error: 'Error message' })],
result: [wrapIntoJson({ error: 'Error message [line 1]' })],
customData: undefined,
});
});
@ -181,21 +397,6 @@ describe('JsTaskRunner', () => {
});
describe('runForEachItem', () => {
const executeForEachItem = async ({
code,
inputItems,
settings,
}: { code: string; inputItems: IDataObject[]; settings?: Partial<JSExecSettings> }) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForEachItem',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
});
};
describe('continue on fail', () => {
it('should return an item with the error if continueOnFail is true', async () => {
const outcome = await executeForEachItem({
@ -206,8 +407,8 @@ describe('JsTaskRunner', () => {
expect(outcome).toEqual({
result: [
withPairedItem(0, wrapIntoJson({ error: 'Error message' })),
withPairedItem(1, wrapIntoJson({ error: 'Error message' })),
withPairedItem(0, wrapIntoJson({ error: 'Error message [line 1]' })),
withPairedItem(1, wrapIntoJson({ error: 'Error message [line 1]' })),
],
customData: undefined,
});
@ -280,4 +481,282 @@ describe('JsTaskRunner', () => {
},
);
});
describe('require', () => {
const inputItems = [{ a: 1 }];
const packageJson = JSON.parse(fs.readFileSync('package.json', 'utf8'));
describe('blocked by default', () => {
const testCases = [...builtinModules, ...Object.keys(packageJson.dependencies)];
test.each(testCases)(
'should throw an error when requiring %s in runOnceForAllItems mode',
async (module) => {
await expect(
executeForAllItems({
code: `return require('${module}')`,
inputItems,
}),
).rejects.toThrow(`Cannot find module '${module}'`);
},
);
test.each(testCases)(
'should throw an error when requiring %s in runOnceForEachItem mode',
async (module) => {
await expect(
executeForEachItem({
code: `return require('${module}')`,
inputItems,
}),
).rejects.toThrow(`Cannot find module '${module}'`);
},
);
});
describe('all built-ins allowed with *', () => {
const testCases = builtinModules;
const runner = createRunnerWithOpts({
allowedBuiltInModules: '*',
});
test.each(testCases)(
'should be able to require %s in runOnceForAllItems mode',
async (module) => {
await expect(
executeForAllItems({
code: `return { val: require('${module}') }`,
inputItems,
runner,
}),
).resolves.toBeDefined();
},
);
test.each(testCases)(
'should be able to require %s in runOnceForEachItem mode',
async (module) => {
await expect(
executeForEachItem({
code: `return { val: require('${module}') }`,
inputItems,
runner,
}),
).resolves.toBeDefined();
},
);
});
describe('all external modules allowed with *', () => {
const testCases = Object.keys(packageJson.dependencies);
const runner = createRunnerWithOpts({
allowedExternalModules: '*',
});
test.each(testCases)(
'should be able to require %s in runOnceForAllItems mode',
async (module) => {
await expect(
executeForAllItems({
code: `return { val: require('${module}') }`,
inputItems,
runner,
}),
).resolves.toBeDefined();
},
);
test.each(testCases)(
'should be able to require %s in runOnceForEachItem mode',
async (module) => {
await expect(
executeForEachItem({
code: `return { val: require('${module}') }`,
inputItems,
runner,
}),
).resolves.toBeDefined();
},
);
});
describe('specifically allowed built-in modules', () => {
const runner = createRunnerWithOpts({
allowedBuiltInModules: 'crypto,path',
});
const allowedCases = [
['crypto', 'require("crypto").randomBytes(16).toString("hex")', expect.any(String)],
['path', 'require("path").normalize("/root/./dir")', '/root/dir'],
];
const blockedCases = [['http'], ['process']];
test.each(allowedCases)(
'should allow requiring %s in runOnceForAllItems mode',
async (_moduleName, expression, expected) => {
const outcome = await executeForAllItems({
code: `return { val: ${expression} }`,
inputItems,
runner,
});
expect(outcome.result).toEqual([wrapIntoJson({ val: expected })]);
},
);
test.each(allowedCases)(
'should allow requiring %s in runOnceForEachItem mode',
async (_moduleName, expression, expected) => {
const outcome = await executeForEachItem({
code: `return { val: ${expression} }`,
inputItems,
runner,
});
expect(outcome.result).toEqual([withPairedItem(0, wrapIntoJson({ val: expected }))]);
},
);
test.each(blockedCases)(
'should throw when trying to require %s in runOnceForAllItems mode',
async (moduleName) => {
await expect(
executeForAllItems({
code: `require("${moduleName}")`,
inputItems,
runner,
}),
).rejects.toThrow(`Cannot find module '${moduleName}'`);
},
);
test.each(blockedCases)(
'should throw when trying to require %s in runOnceForEachItem mode',
async (moduleName) => {
await expect(
executeForEachItem({
code: `require("${moduleName}")`,
inputItems,
runner,
}),
).rejects.toThrow(`Cannot find module '${moduleName}'`);
},
);
});
describe('specifically allowed external modules', () => {
const runner = createRunnerWithOpts({
allowedExternalModules: 'nanoid',
});
const allowedCases = [['nanoid', 'require("nanoid").nanoid()', expect.any(String)]];
const blockedCases = [['n8n-core']];
test.each(allowedCases)(
'should allow requiring %s in runOnceForAllItems mode',
async (_moduleName, expression, expected) => {
const outcome = await executeForAllItems({
code: `return { val: ${expression} }`,
inputItems,
runner,
});
expect(outcome.result).toEqual([wrapIntoJson({ val: expected })]);
},
);
test.each(allowedCases)(
'should allow requiring %s in runOnceForEachItem mode',
async (_moduleName, expression, expected) => {
const outcome = await executeForEachItem({
code: `return { val: ${expression} }`,
inputItems,
runner,
});
expect(outcome.result).toEqual([withPairedItem(0, wrapIntoJson({ val: expected }))]);
},
);
test.each(blockedCases)(
'should throw when trying to require %s in runOnceForAllItems mode',
async (moduleName) => {
await expect(
executeForAllItems({
code: `require("${moduleName}")`,
inputItems,
runner,
}),
).rejects.toThrow(`Cannot find module '${moduleName}'`);
},
);
test.each(blockedCases)(
'should throw when trying to require %s in runOnceForEachItem mode',
async (moduleName) => {
await expect(
executeForEachItem({
code: `require("${moduleName}")`,
inputItems,
runner,
}),
).rejects.toThrow(`Cannot find module '${moduleName}'`);
},
);
});
});
describe('errors', () => {
test.each<[CodeExecutionMode]>([['runOnceForAllItems'], ['runOnceForEachItem']])(
'should throw an ExecutionError if the code is invalid in %s mode',
async (nodeMode) => {
await expect(
execTaskWithParams({
task: newTaskWithSettings({
code: 'unknown',
nodeMode,
}),
taskData: newAllCodeTaskData([wrapIntoJson({ a: 1 })]),
}),
).rejects.toThrow(ExecutionError);
},
);
it('sends serializes an error correctly', async () => {
const runner = createRunnerWithOpts({});
const taskId = '1';
const task = newTaskWithSettings({
code: 'unknown; return []',
nodeMode: 'runOnceForAllItems',
continueOnFail: false,
mode: 'manual',
workflowMode: 'manual',
});
runner.runningTasks.set(taskId, task);
const sendSpy = jest.spyOn(runner.ws, 'send').mockImplementation(() => {});
jest.spyOn(runner, 'sendOffers').mockImplementation(() => {});
jest
.spyOn(runner, 'requestData')
.mockResolvedValue(newAllCodeTaskData([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings);
expect(sendSpy).toHaveBeenCalledWith(
JSON.stringify({
type: 'runner:taskerror',
taskId,
error: {
message: 'unknown is not defined [line 1]',
description: 'ReferenceError',
lineNumber: 1,
},
}),
);
console.log('DONE');
}, 1000);
});
});

View file

@ -65,6 +65,9 @@ export const newAllCodeTaskData = (
const manualTriggerNode = newNode({
name: 'Trigger',
type: 'n8n-nodes-base.manualTrigger',
parameters: {
manualTriggerParam: 'empty',
},
});
return {
@ -116,15 +119,32 @@ export const newAllCodeTaskData = (
siblingParameters: {},
mode: 'manual',
selfData: {},
envProviderState: {
env: {},
isEnvAccessBlocked: true,
isProcessAvailable: true,
},
additionalData: {
formWaitingBaseUrl: '',
executionId: 'exec-id',
instanceBaseUrl: '',
restartExecutionId: '',
restApiUrl: '',
webhookBaseUrl: '',
webhookTestBaseUrl: '',
webhookWaitingBaseUrl: '',
variables: {},
formWaitingBaseUrl: 'http://formWaitingBaseUrl',
webhookBaseUrl: 'http://webhookBaseUrl',
webhookTestBaseUrl: 'http://webhookTestBaseUrl',
webhookWaitingBaseUrl: 'http://webhookWaitingBaseUrl',
variables: {
var: 'value',
},
},
executeData: {
node: codeNode,
data: {
main: [codeNodeInputData],
},
source: {
main: [{ previousNode: manualTriggerNode.name }],
},
},
...opts,
};

View file

@ -0,0 +1,53 @@
import { ExecutionError } from '../execution-error';
describe('ExecutionError', () => {
const defaultStack = `TypeError: a.unknown is not a function
at VmCodeWrapper (evalmachine.<anonymous>:2:3)
at evalmachine.<anonymous>:7:2
at Script.runInContext (node:vm:148:12)
at Script.runInNewContext (node:vm:153:17)
at runInNewContext (node:vm:309:38)
at JsTaskRunner.runForAllItems (/n8n/packages/@n8n/task-runner/dist/js-task-runner/js-task-runner.js:90:65)
at JsTaskRunner.executeTask (/n8n/packages/@n8n/task-runner/dist/js-task-runner/js-task-runner.js:71:26)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
at async JsTaskRunner.receivedSettings (/n8n/packages/@n8n/task-runner/dist/task-runner.js:190:26)`;
it('should parse error details from stack trace without itemIndex', () => {
const error = new Error('a.unknown is not a function');
error.stack = defaultStack;
const executionError = new ExecutionError(error);
expect(executionError.message).toBe('a.unknown is not a function [line 2]');
expect(executionError.lineNumber).toBe(2);
expect(executionError.description).toBe('TypeError');
expect(executionError.context).toBeUndefined();
});
it('should parse error details from stack trace with itemIndex', () => {
const error = new Error('a.unknown is not a function');
error.stack = defaultStack;
const executionError = new ExecutionError(error, 1);
expect(executionError.message).toBe('a.unknown is not a function [line 2, for item 1]');
expect(executionError.lineNumber).toBe(2);
expect(executionError.description).toBe('TypeError');
expect(executionError.context).toEqual({ itemIndex: 1 });
});
it('should serialize correctly', () => {
const error = new Error('a.unknown is not a function');
error.stack = defaultStack;
const executionError = new ExecutionError(error, 1);
expect(JSON.stringify(executionError)).toBe(
JSON.stringify({
message: 'a.unknown is not a function [line 2, for item 1]',
description: 'TypeError',
itemIndex: 1,
context: { itemIndex: 1 },
lineNumber: 2,
}),
);
});
});

View file

@ -0,0 +1,12 @@
export interface ErrorLike {
message: string;
stack?: string;
}
export function isErrorLike(value: unknown): value is ErrorLike {
if (typeof value !== 'object' || value === null) return false;
const errorLike = value as ErrorLike;
return typeof errorLike.message === 'string';
}

View file

@ -1,6 +1,9 @@
import { ApplicationError } from 'n8n-workflow';
import type { ErrorLike } from './error-like';
import { SerializableError } from './serializable-error';
export class ExecutionError extends ApplicationError {
const VM_WRAPPER_FN_NAME = 'VmCodeWrapper';
export class ExecutionError extends SerializableError {
description: string | null = null;
itemIndex: number | undefined = undefined;
@ -11,7 +14,7 @@ export class ExecutionError extends ApplicationError {
lineNumber: number | undefined = undefined;
constructor(error: Error & { stack?: string }, itemIndex?: number) {
constructor(error: ErrorLike, itemIndex?: number) {
super(error.message);
this.itemIndex = itemIndex;
@ -32,10 +35,11 @@ export class ExecutionError extends ApplicationError {
if (stackRows.length === 0) {
this.message = 'Unknown error';
return;
}
const messageRow = stackRows.find((line) => line.includes('Error:'));
const lineNumberRow = stackRows.find((line) => line.includes('Code:'));
const lineNumberRow = stackRows.find((line) => line.includes(`at ${VM_WRAPPER_FN_NAME} `));
const lineNumberDisplay = this.toLineNumberDisplay(lineNumberRow);
if (!messageRow) {
@ -56,16 +60,22 @@ export class ExecutionError extends ApplicationError {
}
private toLineNumberDisplay(lineNumberRow?: string) {
const errorLineNumberMatch = lineNumberRow?.match(/Code:(?<lineNumber>\d+)/);
if (!lineNumberRow) return '';
// TODO: This doesn't work if there is a function definition in the code
// and the error is thrown from that function.
const regex = new RegExp(
`at ${VM_WRAPPER_FN_NAME} \\(evalmachine\\.<anonymous>:(?<lineNumber>\\d+):`,
);
const errorLineNumberMatch = lineNumberRow.match(regex);
if (!errorLineNumberMatch?.groups?.lineNumber) return null;
const lineNumber = errorLineNumberMatch.groups.lineNumber;
if (!lineNumber) return '';
this.lineNumber = Number(lineNumber);
if (!lineNumber) return '';
return this.itemIndex === undefined
? `[line ${lineNumber}]`
: `[line ${lineNumber}, for item ${this.itemIndex}]`;

View file

@ -0,0 +1,21 @@
/**
* Error that has its message property serialized as well. Used to transport
* errors over the wire.
*/
export abstract class SerializableError extends Error {
constructor(message: string) {
super(message);
// So it is serialized as well
this.makeMessageEnumerable();
}
private makeMessageEnumerable() {
Object.defineProperty(this, 'message', {
value: this.message,
enumerable: true, // This makes the message property enumerable
writable: true,
configurable: true,
});
}
}

View file

@ -1,6 +1,6 @@
import { ApplicationError } from 'n8n-workflow';
import { SerializableError } from './serializable-error';
export class ValidationError extends ApplicationError {
export class ValidationError extends SerializableError {
description = '';
itemIndex: number | undefined = undefined;

View file

@ -17,6 +17,7 @@ import type {
INodeParameters,
IRunExecutionData,
WorkflowExecuteMode,
EnvProviderState,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
@ -24,6 +25,10 @@ import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import { isErrorLike } from './errors/error-like';
import { ExecutionError } from './errors/execution-error';
import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
export interface JSExecSettings {
@ -63,6 +68,7 @@ export interface AllCodeTaskData {
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
envProviderState?: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
@ -70,19 +76,47 @@ export interface AllCodeTaskData {
additionalData: PartialAdditionalData;
}
export interface JsTaskRunnerOpts {
wsUrl: string;
grantToken: string;
maxConcurrency: number;
name?: string;
/**
* List of built-in nodejs modules that are allowed to be required in the
* execution sandbox. Asterisk (*) can be used to allow all.
*/
allowedBuiltInModules?: string;
/**
* List of npm modules that are allowed to be required in the execution
* sandbox. Asterisk (*) can be used to allow all.
*/
allowedExternalModules?: string;
}
type CustomConsole = {
log: (...args: unknown[]) => void;
};
export class JsTaskRunner extends TaskRunner {
constructor(
taskType: string,
wsUrl: string,
grantToken: string,
maxConcurrency: number,
name?: string,
) {
super(taskType, wsUrl, grantToken, maxConcurrency, name ?? 'JS Task Runner');
private readonly requireResolver: RequireResolver;
constructor({
grantToken,
maxConcurrency,
wsUrl,
name = 'JS Task Runner',
allowedBuiltInModules,
allowedExternalModules,
}: JsTaskRunnerOpts) {
super('javascript', wsUrl, grantToken, maxConcurrency, name);
const parseModuleAllowList = (moduleList: string) =>
moduleList === '*' ? null : new Set(moduleList.split(',').map((x) => x.trim()));
this.requireResolver = createRequireResolver({
allowedBuiltInModules: parseModuleAllowList(allowedBuiltInModules ?? ''),
allowedExternalModules: parseModuleAllowList(allowedExternalModules ?? ''),
});
}
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
@ -143,7 +177,7 @@ export class JsTaskRunner extends TaskRunner {
const inputItems = allData.connectionInputData;
const context: Context = {
require,
require: this.requireResolver,
module: {},
console: customConsole,
@ -154,7 +188,7 @@ export class JsTaskRunner extends TaskRunner {
try {
const result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
@ -163,12 +197,14 @@ export class JsTaskRunner extends TaskRunner {
}
return validateRunForAllItemsOutput(result);
} catch (error) {
} catch (e) {
// Errors thrown by the VM are not instances of Error, so map them to an ExecutionError
const error = this.toExecutionErrorIfNeeded(e);
if (settings.continueOnFail) {
return [{ json: { error: this.getErrorMessageFromVmError(error) } }];
return [{ json: { error: error.message } }];
}
(error as Record<string, unknown>).node = allData.node;
throw error;
}
}
@ -190,7 +226,7 @@ export class JsTaskRunner extends TaskRunner {
const item = inputItems[index];
const dataProxy = this.createDataProxy(allData, workflow, index);
const context: Context = {
require,
require: this.requireResolver,
module: {},
console: customConsole,
item,
@ -201,7 +237,7 @@ export class JsTaskRunner extends TaskRunner {
try {
let result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as INodeExecutionData | undefined;
@ -225,14 +261,16 @@ export class JsTaskRunner extends TaskRunner {
},
);
}
} catch (error) {
} catch (e) {
// Errors thrown by the VM are not instances of Error, so map them to an ExecutionError
const error = this.toExecutionErrorIfNeeded(e);
if (!settings.continueOnFail) {
(error as Record<string, unknown>).node = allData.node;
throw error;
}
returnData.push({
json: { error: this.getErrorMessageFromVmError(error) },
json: { error: error.message },
pairedItem: {
item: index,
},
@ -262,14 +300,25 @@ export class JsTaskRunner extends TaskRunner {
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
// Make sure that even if we don't receive the envProviderState for
// whatever reason, we don't expose the task runner's env to the code
allData.envProviderState ?? {
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
},
).getDataProxy();
}
private getErrorMessageFromVmError(error: unknown): string {
if (typeof error === 'object' && !!error && 'message' in error) {
return error.message as string;
private toExecutionErrorIfNeeded(error: unknown): Error {
if (error instanceof Error) {
return error;
}
return JSON.stringify(error);
if (isErrorLike(error)) {
return new ExecutionError(error);
}
return new ExecutionError({ message: JSON.stringify(error) });
}
}

View file

@ -0,0 +1,43 @@
import { ApplicationError } from 'n8n-workflow';
import { isBuiltin } from 'node:module';
import { ExecutionError } from './errors/execution-error';
export type RequireResolverOpts = {
/**
* List of built-in nodejs modules that are allowed to be required in the
* execution sandbox. `null` means all are allowed.
*/
allowedBuiltInModules: Set<string> | null;
/**
* List of external modules that are allowed to be required in the
* execution sandbox. `null` means all are allowed.
*/
allowedExternalModules: Set<string> | null;
};
export type RequireResolver = (request: string) => unknown;
export function createRequireResolver({
allowedBuiltInModules,
allowedExternalModules,
}: RequireResolverOpts) {
return (request: string) => {
const checkIsAllowed = (allowList: Set<string> | null, moduleName: string) => {
return allowList ? allowList.has(moduleName) : true;
};
const isAllowed = isBuiltin(request)
? checkIsAllowed(allowedBuiltInModules, request)
: checkIsAllowed(allowedExternalModules, request);
if (!isAllowed) {
const error = new ApplicationError(`Cannot find module '${request}'`);
throw new ExecutionError(error);
}
// eslint-disable-next-line @typescript-eslint/no-var-requires
return require(request) as unknown;
};
}

View file

@ -66,7 +66,13 @@ void (async function start() {
}
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
runner = new JsTaskRunner({
wsUrl,
grantToken,
maxConcurrency: 5,
allowedBuiltInModules: process.env.NODE_FUNCTION_ALLOW_BUILTIN,
allowedExternalModules: process.env.NODE_FUNCTION_ALLOW_EXTERNAL,
});
process.on('SIGINT', createSignalHandler('SIGINT'));
process.on('SIGTERM', createSignalHandler('SIGTERM'));

View file

@ -1,4 +1,4 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { URL } from 'node:url';
import { type MessageEvent, WebSocket } from 'ws';
@ -256,8 +256,7 @@ export abstract class TaskRunner {
try {
const data = await this.executeTask(task);
this.taskDone(taskId, data);
} catch (e) {
const error = ensureError(e);
} catch (error) {
this.taskErrored(taskId, error);
}
}

View file

@ -1,6 +1,6 @@
{
"name": "n8n",
"version": "1.62.1",
"version": "1.63.0",
"description": "n8n Workflow Automation Tool",
"main": "dist/index",
"types": "dist/index.d.ts",
@ -51,12 +51,12 @@
"!dist/**/e2e.*"
],
"devDependencies": {
"@redocly/cli": "^1.6.0",
"@redocly/cli": "^1.25.5",
"@types/aws4": "^1.5.1",
"@types/bcryptjs": "^2.4.2",
"@types/compression": "1.0.1",
"@types/convict": "^6.1.1",
"@types/cookie-parser": "^1.4.2",
"@types/cookie-parser": "^1.4.7",
"@types/express": "catalog:",
"@types/flat": "^5.0.5",
"@types/formidable": "^3.4.5",
@ -76,7 +76,6 @@
"@types/xml2js": "catalog:",
"@types/yamljs": "^0.2.31",
"@vvo/tzdb": "^6.141.0",
"chokidar": "^3.5.2",
"concurrently": "^8.2.0",
"ioredis-mock": "^8.8.1",
"mjml": "^4.15.3",
@ -94,7 +93,7 @@
"@n8n/permissions": "workspace:*",
"@n8n/task-runner": "workspace:*",
"@n8n/typeorm": "0.3.20-12",
"@n8n_io/ai-assistant-sdk": "1.9.4",
"@n8n_io/ai-assistant-sdk": "1.10.3",
"@n8n_io/license-sdk": "2.13.1",
"@oclif/core": "4.0.7",
"@rudderstack/rudder-sdk-node": "2.0.9",
@ -111,14 +110,14 @@
"class-validator": "0.14.0",
"compression": "1.7.4",
"convict": "6.2.4",
"cookie-parser": "1.4.6",
"cookie-parser": "1.4.7",
"csrf": "3.1.0",
"curlconverter": "3.21.0",
"dotenv": "8.6.0",
"express": "4.21.0",
"express": "4.21.1",
"express-async-errors": "3.1.1",
"express-handlebars": "7.1.2",
"express-openapi-validator": "5.3.3",
"express-openapi-validator": "5.3.7",
"express-prom-bundle": "6.6.0",
"express-rate-limit": "7.2.0",
"fast-glob": "catalog:",
@ -145,7 +144,7 @@
"nodemailer": "6.9.9",
"oauth-1.0a": "2.2.6",
"open": "7.4.2",
"openapi-types": "10.0.0",
"openapi-types": "12.1.3",
"otpauth": "9.1.1",
"p-cancelable": "2.1.1",
"p-lazy": "3.1.0",
@ -164,9 +163,8 @@
"simple-git": "3.17.0",
"source-map-support": "0.5.21",
"sqlite3": "5.1.7",
"sse-channel": "4.0.0",
"sshpk": "1.17.0",
"swagger-ui-express": "5.0.0",
"swagger-ui-express": "5.0.1",
"syslog-client": "1.1.1",
"typedi": "catalog:",
"uuid": "catalog:",

View file

@ -5,7 +5,7 @@ import type { InstanceSettings } from 'n8n-core';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { License } from '@/license';
import type { Logger } from '@/logging/logger.service';
import { mockLogger } from '@test/mocking';
jest.mock('@n8n_io/license-sdk');
@ -25,37 +25,39 @@ describe('License', () => {
});
let license: License;
const logger = mock<Logger>();
const instanceSettings = mock<InstanceSettings>({
instanceId: MOCK_INSTANCE_ID,
instanceType: 'main',
});
beforeEach(async () => {
license = new License(logger, instanceSettings, mock(), mock(), mock());
license = new License(mockLogger(), instanceSettings, mock(), mock(), mock());
await license.init();
});
test('initializes license manager', async () => {
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: true,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: false,
renewOnInit: true,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('initializes license manager for worker', async () => {
const logger = mockLogger();
license = new License(
logger,
mock<InstanceSettings>({ instanceType: 'worker' }),
@ -64,22 +66,23 @@ describe('License', () => {
mock(),
);
await license.init();
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
logger,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
});
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({
autoRenewEnabled: false,
autoRenewOffset: MOCK_RENEW_OFFSET,
offlineMode: true,
renewOnInit: false,
deviceFingerprint: expect.any(Function),
productIdentifier: `n8n-${N8N_VERSION}`,
loadCertStr: expect.any(Function),
saveCertStr: expect.any(Function),
onFeatureChange: expect.any(Function),
collectUsageMetrics: expect.any(Function),
collectPassthroughData: expect.any(Function),
server: MOCK_SERVER_URL,
tenantId: 1,
}),
);
});
test('attempts to activate license with provided key', async () => {
@ -196,7 +199,7 @@ describe('License', () => {
it('should enable renewal', async () => {
config.set('multiMainSetup.enabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -208,7 +211,7 @@ describe('License', () => {
it('should disable renewal', async () => {
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -226,7 +229,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -241,7 +244,7 @@ describe('License', () => {
config.set('multiMainSetup.instanceType', status);
config.set('license.autoRenewEnabled', false);
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: false, renewOnInit: false }),
@ -252,7 +255,7 @@ describe('License', () => {
config.set('multiMainSetup.enabled', true);
config.set('multiMainSetup.instanceType', 'leader');
await new License(mock(), mock(), mock(), mock(), mock()).init();
await new License(mockLogger(), mock(), mock(), mock(), mock()).init();
expect(LicenseManager).toHaveBeenCalledWith(
expect.objectContaining({ autoRenewEnabled: true, renewOnInit: true }),
@ -264,7 +267,7 @@ describe('License', () => {
describe('reinit', () => {
it('should reinitialize license manager', async () => {
const license = new License(mock(), mock(), mock(), mock(), mock());
const license = new License(mockLogger(), mock(), mock(), mock(), mock());
await license.init();
const initSpy = jest.spyOn(license, 'init');

View file

@ -0,0 +1,100 @@
import { mock } from 'jest-mock-extended';
import type { INodeType, IVersionedNodeType } from 'n8n-workflow';
import type { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { NodeTypes } from '../node-types';
describe('NodeTypes', () => {
let nodeTypes: NodeTypes;
const loadNodesAndCredentials = mock<LoadNodesAndCredentials>();
beforeEach(() => {
jest.clearAllMocks();
nodeTypes = new NodeTypes(loadNodesAndCredentials);
});
describe('getByNameAndVersion', () => {
const nodeTypeName = 'n8n-nodes-base.testNode';
it('should throw an error if the node-type does not exist', () => {
const nodeTypeName = 'unknownNode';
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {};
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.knownNodes = {};
expect(() => nodeTypes.getByNameAndVersion(nodeTypeName)).toThrow(
'Unrecognized node type: unknownNode',
);
});
it('should return a regular node-type without version', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(nodeTypeName);
expect(result).toEqual(nodeType);
});
it('should return a regular node-type with version', () => {
const nodeTypeV1 = mock<INodeType>();
const nodeType = mock<IVersionedNodeType>({
nodeVersions: { 1: nodeTypeV1 },
getNodeType: () => nodeTypeV1,
});
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(nodeTypeName);
expect(result).toEqual(nodeTypeV1);
});
it('should throw when a node-type is requested as tool, but does not support being used as one', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
expect(() => nodeTypes.getByNameAndVersion(`${nodeTypeName}Tool`)).toThrow(
'Node cannot be used as a tool',
);
});
it('should return the tool node-type when requested as tool', () => {
const nodeType = mock<INodeType>();
// @ts-expect-error can't use a mock here
nodeType.description = {
name: nodeTypeName,
displayName: 'TestNode',
usableAsTool: true,
properties: [],
};
// @ts-expect-error overwriting a readonly property
loadNodesAndCredentials.loadedNodes = {
[nodeTypeName]: { type: nodeType },
};
const result = nodeTypes.getByNameAndVersion(`${nodeTypeName}Tool`);
expect(result).not.toEqual(nodeType);
expect(result.description.name).toEqual('n8n-nodes-base.testNodeTool');
expect(result.description.displayName).toEqual('TestNode Tool');
expect(result.description.codex?.categories).toContain('AI');
expect(result.description.inputs).toEqual([]);
expect(result.description.outputs).toEqual(['ai_tool']);
});
});
});

View file

@ -1,10 +1,12 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/services/orchestration/main/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service';
import { WaitTracker } from '@/wait-tracker';
import { mockLogger } from '@test/mocking';
jest.useFakeTimers();
@ -12,6 +14,7 @@ describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const execution = mock<IExecutionResponse>({
id: '123',
@ -21,11 +24,12 @@ describe('WaitTracker', () => {
let waitTracker: WaitTracker;
beforeEach(() => {
waitTracker = new WaitTracker(
mock(),
mockLogger(),
executionRepository,
mock(),
mock(),
orchestrationService,
instanceSettings,
);
multiMainSetup.on.mockReturnThis();
});
@ -36,7 +40,6 @@ describe('WaitTracker', () => {
describe('init()', () => {
it('should query DB for waiting executions if leader', async () => {
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
waitTracker.init();
@ -119,7 +122,6 @@ describe('WaitTracker', () => {
describe('multi-main setup', () => {
it('should start tracking if leader', () => {
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
executionRepository.getWaitingExecutions.mockResolvedValue([]);
@ -130,7 +132,14 @@ describe('WaitTracker', () => {
});
it('should not start tracking if follower', () => {
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
const waitTracker = new WaitTracker(
mockLogger(),
executionRepository,
mock(),
mock(),
orchestrationService,
mock<InstanceSettings>({ isLeader: false }),
);
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
executionRepository.getWaitingExecutions.mockResolvedValue([]);

View file

@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import isbot from 'isbot';
import type { InstanceType } from 'n8n-core';
import { Container, Service } from 'typedi';
import config from '@/config';
@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks';
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';
import { generateHostInstanceId } from './databases/utils/generators';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
@Service()
@ -61,7 +59,7 @@ export abstract class AbstractServer {
readonly uniqueInstanceId: string;
constructor(instanceType: Exclude<InstanceType, 'worker'>) {
constructor() {
this.app = express();
this.app.disable('x-powered-by');
@ -85,8 +83,6 @@ export abstract class AbstractServer {
this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest;
this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting;
this.uniqueInstanceId = generateHostInstanceId(instanceType);
this.logger = Container.get(Logger);
}

View file

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions } from 'n8n-core';
import type {
ExecutionError,
IDeferredPromise,
@ -74,6 +74,7 @@ export class ActiveWorkflowManager {
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly activeWorkflowsService: ActiveWorkflowsService,
private readonly workflowExecutionService: WorkflowExecutionService,
private readonly instanceSettings: InstanceSettings,
) {}
async init() {
@ -423,7 +424,7 @@ export class ActiveWorkflowManager {
if (dbWorkflows.length === 0) return;
if (this.orchestrationService.isLeader) {
if (this.instanceSettings.isLeader) {
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');

View file

@ -1,16 +1,26 @@
import 'reflect-metadata';
import { GlobalConfig } from '@n8n/config';
import { Command, Errors } from '@oclif/core';
import { BinaryDataService, InstanceSettings, ObjectStoreService } from 'n8n-core';
import { ApplicationError, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import {
BinaryDataService,
InstanceSettings,
ObjectStoreService,
DataDeduplicationService,
} from 'n8n-core';
import {
ApplicationError,
ensureError,
ErrorReporterProxy as ErrorReporter,
sleep,
} from 'n8n-workflow';
import { Container } from 'typedi';
import type { AbstractServer } from '@/abstract-server';
import config from '@/config';
import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { getDataDeduplicationService } from '@/deduplication';
import { initErrorHandling } from '@/error-reporting';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
@ -34,8 +44,6 @@ export abstract class BaseCommand extends Command {
protected instanceSettings: InstanceSettings = Container.get(InstanceSettings);
queueModeId: string;
protected server?: AbstractServer;
protected shutdownService: ShutdownService = Container.get(ShutdownService);
@ -122,16 +130,6 @@ export abstract class BaseCommand extends Command {
await Container.get(TelemetryEventRelay).init();
}
protected setInstanceQueueModeId() {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!);
config.set('redis.queueModeId', this.queueModeId);
}
protected async stopProcess() {
// This needs to be overridden
}
@ -261,6 +259,11 @@ export abstract class BaseCommand extends Command {
await Container.get(BinaryDataService).init(binaryDataConfig);
}
protected async initDataDeduplicationService() {
const dataDeduplicationService = getDataDeduplicationService();
await DataDeduplicationService.init(dataDeduplicationService);
}
async initExternalHooks() {
this.externalHooks = Container.get(ExternalHooks);
await this.externalHooks.init();
@ -283,8 +286,9 @@ export abstract class BaseCommand extends Command {
this.logger.debug('Attempting license activation');
await this.license.activate(activationKey);
this.logger.debug('License init complete');
} catch (e) {
this.logger.error('Could not activate license', e as Error);
} catch (e: unknown) {
const error = ensureError(e);
this.logger.error('Could not activate license', { error });
}
}
}

View file

@ -167,6 +167,7 @@ export class ExecuteBatch extends BaseCommand {
async init() {
await super.init();
await this.initBinaryDataService();
await this.initDataDeduplicationService();
await this.initExternalHooks();
}

View file

@ -31,6 +31,7 @@ export class Execute extends BaseCommand {
async init() {
await super.init();
await this.initBinaryDataService();
await this.initDataDeduplicationService();
await this.initExternalHooks();
}

View file

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import { createReadStream, createWriteStream, existsSync } from 'fs';
import { mkdir } from 'fs/promises';
@ -21,11 +21,11 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { SingleMainTaskManager } from '@/runners/task-managers/single-main-task-manager';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { Server } from '@/server';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { OwnershipService } from '@/services/ownership.service';
import { PruningService } from '@/services/pruning.service';
@ -70,11 +70,6 @@ export class Start extends BaseCommand {
override needsCommunityPackages = true;
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceQueueModeId();
}
/**
* Opens the UI in browser
*/
@ -174,8 +169,9 @@ export class Start extends BaseCommand {
this.logger.info('Initializing n8n process');
if (config.getEnv('executions.mode') === 'queue') {
this.logger.debug('Main Instance running in queue mode');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
const scopedLogger = this.logger.withScope('scaling');
scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`);
}
const { flags } = await this.parse(Start);
@ -212,6 +208,8 @@ export class Start extends BaseCommand {
this.logger.debug('Wait tracker init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initDataDeduplicationService();
this.logger.debug('Data deduplication service init complete');
await this.initExternalHooks();
this.logger.debug('External hooks init complete');
await this.initExternalSecrets();
@ -224,7 +222,7 @@ export class Start extends BaseCommand {
}
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new SingleMainTaskManager());
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();
@ -252,10 +250,13 @@ export class Start extends BaseCommand {
await orchestrationService.init();
await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
publisher: Container.get(Publisher),
});
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
await subscriber.subscribe('n8n.worker-response');
this.logger.withScope('scaling').debug('Pubsub setup completed');
if (!orchestrationService.isMultiMainSetupEnabled) return;

View file

@ -1,4 +1,4 @@
import { Flags, type Config } from '@oclif/core';
import { Flags } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';
@ -6,7 +6,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { WebhookServer } from '@/webhooks/webhook-server';
import { BaseCommand } from './base-command';
@ -24,14 +24,6 @@ export class Webhook extends BaseCommand {
override needsCommunityPackages = true;
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
if (this.queueModeId) {
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
}
this.setInstanceQueueModeId();
}
/**
* Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
@ -71,8 +63,8 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Crash journal initialized');
this.logger.info('Initializing n8n webhook process');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
this.logger.info('Starting n8n webhook process...');
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);
await super.init();
@ -82,6 +74,8 @@ export class Webhook extends BaseCommand {
this.logger.debug('Orchestration init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initDataDeduplicationService();
this.logger.debug('Data deduplication service init complete');
await this.initExternalHooks();
this.logger.debug('External hooks init complete');
await this.initExternalSecrets();
@ -98,7 +92,6 @@ export class Webhook extends BaseCommand {
const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
this.logger.info('Webhook listener waiting for requests.');
// Make sure that the process does not close
@ -110,11 +103,9 @@ export class Webhook extends BaseCommand {
}
async initOrchestration() {
await Container.get(OrchestrationWebhookService).init();
await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
await Container.get(Subscriber).subscribe('n8n.commands');
}
}

View file

@ -1,18 +1,20 @@
import { Flags, type Config } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';
import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants';
import { WorkerMissingEncryptionKey } from '@/errors/worker-missing-encryption-key.error';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Logger } from '@/logging/logger.service';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service';
import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { BaseCommand } from './base-command';
@ -39,8 +41,6 @@ export class Worker extends BaseCommand {
scalingService: ScalingService;
jobProcessor: JobProcessor;
override needsCommunityPackages = true;
/**
@ -49,27 +49,27 @@ export class Worker extends BaseCommand {
* get removed.
*/
async stopProcess() {
this.logger.info('Stopping n8n...');
this.logger.info('Stopping worker...');
try {
await this.externalHooks?.run('n8n.stop', []);
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
await this.exitWithCrash('Error shutting down worker', error);
}
await this.exitSuccessFully();
}
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
if (!process.env.N8N_ENCRYPTION_KEY) throw new WorkerMissingEncryptionKey();
if (!process.env.N8N_ENCRYPTION_KEY) {
throw new ApplicationError(
'Missing encryption key. Worker started without the required N8N_ENCRYPTION_KEY env var. More information: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
);
if (config.getEnv('executions.mode') !== 'queue') {
config.set('executions.mode', 'queue');
}
this.setInstanceQueueModeId();
super(argv, cmdConfig);
this.logger = Container.get(Logger).withScope('scaling');
}
async init() {
@ -84,7 +84,7 @@ export class Worker extends BaseCommand {
await this.initCrashJournal();
this.logger.debug('Starting n8n worker...');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`);
await this.setConcurrency();
await super.init();
@ -93,6 +93,8 @@ export class Worker extends BaseCommand {
this.logger.debug('License init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initDataDeduplicationService();
this.logger.debug('Data deduplication service init complete');
await this.initExternalHooks();
this.logger.debug('External hooks init complete');
await this.initExternalSecrets();
@ -107,15 +109,26 @@ export class Worker extends BaseCommand {
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
},
}),
);
if (!this.globalConfig.taskRunners.disabled) {
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
}
}
async initEventBus() {
await Container.get(MessageEventBus).initialize({
workerId: this.queueModeId,
workerId: this.instanceSettings.hostId,
});
Container.get(LogStreamingEventRelay).init();
}
@ -127,12 +140,12 @@ export class Worker extends BaseCommand {
* The subscription connection adds a handler to handle the command messages
*/
async initOrchestration() {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationService).init();
Container.get(PubSubHandler).init();
const subscriber = Container.get(Subscriber);
await subscriber.subscribe('n8n.commands');
subscriber.setCommandMessageHandler();
await Container.get(Subscriber).subscribe('n8n.commands');
this.logger.withScope('scaling').debug('Pubsub setup ready');
}
async setConcurrency() {
@ -150,8 +163,6 @@ export class Worker extends BaseCommand {
await this.scalingService.setupQueue();
this.scalingService.setupWorker(this.concurrency);
this.jobProcessor = Container.get(JobProcessor);
}
async run() {

View file

@ -11,13 +11,13 @@ import type { ExecutionRepository } from '@/databases/repositories/execution.rep
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
import type { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import type { Logger } from '@/logging/logger.service';
import type { Telemetry } from '@/telemetry';
import { mockLogger } from '@test/mocking';
import { ConcurrencyQueue } from '../concurrency-queue';
describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const logger = mockLogger();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventService = mock<EventService>();

View file

@ -8,7 +8,6 @@ import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error
import { EventService } from '@/events/event.service';
import type { IExecutingWorkflowData } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import type { LogMetadata } from '@/logging/types';
import { Telemetry } from '@/telemetry';
import { ConcurrencyQueue } from './concurrency-queue';
@ -34,6 +33,8 @@ export class ConcurrencyControlService {
private readonly telemetry: Telemetry,
private readonly eventService: EventService,
) {
this.logger = this.logger.withScope('executions');
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
if (this.productionLimit === 0) {
@ -46,7 +47,6 @@ export class ConcurrencyControlService {
if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') {
this.isEnabled = false;
this.log('Service disabled');
return;
}
@ -65,12 +65,12 @@ export class ConcurrencyControlService {
});
this.productionQueue.on('execution-throttled', ({ executionId }) => {
this.log('Execution throttled', { executionId });
this.logger.debug('Execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId });
});
this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
this.logger.debug('Execution released', { executionId });
});
}
@ -144,9 +144,9 @@ export class ConcurrencyControlService {
// ----------------------------------
private logInit() {
this.log('Enabled');
this.logger.debug('Enabled');
this.log(
this.logger.debug(
[
'Production execution concurrency is',
this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(),
@ -171,10 +171,6 @@ export class ConcurrencyControlService {
throw new UnknownExecutionModeError(mode);
}
private log(message: string, metadata?: LogMetadata) {
this.logger.debug(['[Concurrency Control]', message].join(' '), metadata);
}
private shouldReport(capacity: number) {
return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity);
}

View file

@ -491,11 +491,6 @@ export const schema = {
default: 'n8n',
env: 'N8N_REDIS_KEY_PREFIX',
},
queueModeId: {
doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup',
format: String,
default: '',
},
},
/**

View file

@ -1,5 +1,6 @@
import type { RedisOptions } from 'ioredis';
import type { BinaryData } from 'n8n-core';
import type { IProcessedDataConfig } from 'n8n-workflow';
import type { schema } from './schema';
@ -76,6 +77,7 @@ type ToReturnType<T extends ConfigOptionPath> = T extends NumericPath
type ExceptionPaths = {
'queue.bull.redis': RedisOptions;
binaryDataManager: BinaryData.Config;
processedDataManager: IProcessedDataConfig;
'userManagement.isInstanceOwnerSetUp': boolean;
'ui.banners.dismissed': string[] | undefined;
};

View file

@ -91,6 +91,7 @@ export const LICENSE_FEATURES = {
PROJECT_ROLE_EDITOR: 'feat:projectRole:editor',
PROJECT_ROLE_VIEWER: 'feat:projectRole:viewer',
AI_ASSISTANT: 'feat:aiAssistant',
ASK_AI: 'feat:askAi',
COMMUNITY_NODES_CUSTOM_REGISTRY: 'feat:communityNodes:customRegistry',
} as const;

View file

@ -7,18 +7,18 @@ import { WritableStream } from 'node:stream/web';
import { Post, RestController } from '@/decorators';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { AiAssistantRequest } from '@/requests';
import { AiAssistantService } from '@/services/ai-assistant.service';
import { AiService } from '@/services/ai.service';
type FlushableResponse = Response & { flush: () => void };
@RestController('/ai-assistant')
export class AiAssistantController {
constructor(private readonly aiAssistantService: AiAssistantService) {}
@RestController('/ai')
export class AiController {
constructor(private readonly aiService: AiService) {}
@Post('/chat', { rateLimit: { limit: 100 } })
async chat(req: AiAssistantRequest.Chat, res: FlushableResponse) {
try {
const aiResponse = await this.aiAssistantService.chat(req.body, req.user);
const aiResponse = await this.aiService.chat(req.body, req.user);
if (aiResponse.body) {
res.header('Content-type', 'application/json-lines').flush();
await aiResponse.body.pipeTo(
@ -40,10 +40,21 @@ export class AiAssistantController {
@Post('/chat/apply-suggestion')
async applySuggestion(
req: AiAssistantRequest.ApplySuggestion,
req: AiAssistantRequest.ApplySuggestionPayload,
): Promise<AiAssistantSDK.ApplySuggestionResponse> {
try {
return await this.aiAssistantService.applySuggestion(req.body, req.user);
return await this.aiService.applySuggestion(req.body, req.user);
} catch (e) {
assert(e instanceof Error);
ErrorReporterProxy.error(e);
throw new InternalServerError(`Something went wrong: ${e.message}`);
}
}
@Post('/ask-ai')
async askAi(req: AiAssistantRequest.AskAiPayload): Promise<AiAssistantSDK.AskAiResponsePayload> {
try {
return await this.aiService.askAi(req.body, req.user);
} catch (e) {
assert(e instanceof Error);
ErrorReporterProxy.error(e);

View file

@ -1,3 +1,5 @@
import { InstanceSettings } from 'n8n-core';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { Get, RestController } from '@/decorators';
@ -9,6 +11,7 @@ export class DebugController {
private readonly orchestrationService: OrchestrationService,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly workflowRepository: WorkflowRepository,
private readonly instanceSettings: InstanceSettings,
) {}
@Get('/multi-main-setup', { skipAuth: true })
@ -24,9 +27,9 @@ export class DebugController {
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();
return {
instanceId: this.orchestrationService.instanceId,
instanceId: this.instanceSettings.instanceId,
leaderKey,
isLeader: this.orchestrationService.isLeader,
isLeader: this.instanceSettings.isLeader,
activeWorkflows: {
webhooks, // webhook-based active workflows
triggersAndPollers, // poller- and trigger-based active workflows

View file

@ -92,6 +92,7 @@ export class E2EController {
[LICENSE_FEATURES.PROJECT_ROLE_VIEWER]: false,
[LICENSE_FEATURES.AI_ASSISTANT]: false,
[LICENSE_FEATURES.COMMUNITY_NODES_CUSTOM_REGISTRY]: false,
[LICENSE_FEATURES.ASK_AI]: false,
};
private numericFeatures: Record<NumericLicenseFeature, number> = {

View file

@ -28,11 +28,4 @@ export class OrchestrationController {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerStatus();
}
@GlobalScope('orchestration:list')
@Post('/worker/ids')
async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return await this.orchestrationService.getWorkerIds();
}
}

View file

@ -13,6 +13,7 @@ import { ExecutionMetadata } from './execution-metadata';
import { InstalledNodes } from './installed-nodes';
import { InstalledPackages } from './installed-packages';
import { InvalidAuthToken } from './invalid-auth-token';
import { ProcessedData } from './processed-data';
import { Project } from './project';
import { ProjectRelation } from './project-relation';
import { Settings } from './settings';
@ -56,4 +57,5 @@ export const entities = {
Project,
ProjectRelation,
ApiKey,
ProcessedData,
};

View file

@ -0,0 +1,22 @@
import { Column, Entity, PrimaryColumn } from '@n8n/typeorm';
import type { IProcessedDataEntries, IProcessedDataLatest } from '@/interfaces';
import { jsonColumnType, WithTimestamps } from './abstract-entity';
import { objectRetriever } from '../utils/transformers';
@Entity()
export class ProcessedData extends WithTimestamps {
@PrimaryColumn('varchar')
context: string;
@PrimaryColumn()
workflowId: string;
@Column({
type: jsonColumnType,
nullable: true,
transformer: objectRetriever,
})
value: IProcessedDataEntries | IProcessedDataLatest;
}

View file

@ -0,0 +1,23 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
const processedDataTableName = 'processed_data';
export class CreateProcessedDataTable1726606152711 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, column } }: MigrationContext) {
await createTable(processedDataTableName)
.withColumns(
column('workflowId').varchar(36).notNull.primary,
column('value').varchar(255).notNull,
column('context').varchar(255).notNull.primary,
)
.withForeignKey('workflowId', {
tableName: 'workflow_entity',
columnName: 'id',
onDelete: 'CASCADE',
}).withTimestamps;
}
async down({ schemaBuilder: { dropTable } }: MigrationContext) {
await dropTable(processedDataTableName);
}
}

View file

@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
export const mysqlMigrations: Migration[] = [
@ -132,4 +133,5 @@ export const mysqlMigrations: Migration[] = [
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
CreateProcessedDataTable1726606152711,
];

View file

@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
export const postgresMigrations: Migration[] = [
@ -132,4 +133,5 @@ export const postgresMigrations: Migration[] = [
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
CreateProcessedDataTable1726606152711,
];

View file

@ -61,6 +61,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
const sqliteMigrations: Migration[] = [
@ -126,6 +127,7 @@ const sqliteMigrations: Migration[] = [
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
CreateProcessedDataTable1726606152711,
];
export { sqliteMigrations };

View file

@ -12,7 +12,9 @@ import { mockInstance, mockEntityManager } from '@test/mocking';
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const globalConfig = mockInstance(GlobalConfig, { logging: { outputs: ['console'] } });
const globalConfig = mockInstance(GlobalConfig, {
logging: { outputs: ['console'], scopes: [] },
});
const binaryDataService = mockInstance(BinaryDataService);
const executionRepository = Container.get(ExecutionRepository);
const mockDate = new Date('2023-12-28 12:34:56.789Z');

View file

@ -0,0 +1,11 @@
import { DataSource, Repository } from '@n8n/typeorm';
import { Service } from 'typedi';
import { ProcessedData } from '../entities/processed-data';
@Service()
export class ProcessedDataRepository extends Repository<ProcessedData> {
constructor(dataSource: DataSource) {
super(ProcessedData, dataSource.manager);
}
}

View file

@ -0,0 +1,356 @@
import { createHash } from 'crypto';
import {
type ICheckProcessedContextData,
type IDataDeduplicator,
type ICheckProcessedOptions,
type IDeduplicationOutput,
type DeduplicationScope,
type DeduplicationItemTypes,
type DeduplicationMode,
tryToParseDateTime,
} from 'n8n-workflow';
import * as assert from 'node:assert/strict';
import { Container } from 'typedi';
import type { ProcessedData } from '@/databases/entities/processed-data';
import { ProcessedDataRepository } from '@/databases/repositories/processed-data.repository';
import { DeduplicationError } from '@/errors/deduplication.error';
import type { IProcessedDataEntries, IProcessedDataLatest } from '@/interfaces';
export class DeduplicationHelper implements IDataDeduplicator {
private static sortEntries(
items: DeduplicationItemTypes[],
mode: DeduplicationMode,
): DeduplicationItemTypes[] {
return items.slice().sort((a, b) => DeduplicationHelper.compareValues(mode, a, b));
}
/**
* Compares two values based on the provided mode ('latestIncrementalKey' or 'latestDate').
*
* @param {DeduplicationMode} mode - The mode to determine the comparison logic. Can be either:
* - 'latestIncrementalKey': Compares numeric values and returns true if `value1` is greater than `value2`.
* - 'latestDate': Compares date strings and returns true if `value1` is a later date than `value2`.
*
* @param {DeduplicationItemTypes} value1 - The first value to compare.
* - If the mode is 'latestIncrementalKey', this should be a numeric value or a string that can be converted to a number.
* - If the mode is 'latestDate', this should be a valid date string.
*
* @param {DeduplicationItemTypes} value2 - The second value to compare.
* - If the mode is 'latestIncrementalKey', this should be a numeric value or a string that can be converted to a number.
* - If the mode is 'latestDate', this should be a valid date string.
*
* @returns {boolean} - Returns `true` if `value1` is greater than `value2` based on the comparison mode.
* - In 'latestIncrementalKey' mode, it returns `true` if `value1` is numerically greater than `value2`.
* - In 'latestDate' mode, it returns `true` if `value1` is a later date than `value2`.
*
* @throws {DeduplicationError} - Throws an error if:
* - The mode is 'latestIncrementalKey' and the values are not valid numbers.
* - The mode is 'latestDate' and the values are not valid date strings.
* - An unsupported mode is provided.
*/
private static compareValues(
mode: DeduplicationMode,
value1: DeduplicationItemTypes,
value2: DeduplicationItemTypes,
): 1 | 0 | -1 {
if (mode === 'latestIncrementalKey') {
const num1 = Number(value1);
const num2 = Number(value2);
if (!isNaN(num1) && !isNaN(num2)) {
return num1 === num2 ? 0 : num1 > num2 ? 1 : -1;
}
throw new DeduplicationError(
'Invalid value. Only numbers are supported in mode "latestIncrementalKey"',
);
} else if (mode === 'latestDate') {
try {
const date1 = tryToParseDateTime(value1);
const date2 = tryToParseDateTime(value2);
return date1 === date2 ? 0 : date1 > date2 ? 1 : -1;
} catch (error) {
throw new DeduplicationError(
'Invalid value. Only valid dates are supported in mode "latestDate"',
);
}
} else {
throw new DeduplicationError(
"Invalid mode. Only 'latestIncrementalKey' and 'latestDate' are supported.",
);
}
}
private static createContext(
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
): string {
if (scope === 'node') {
if (!contextData.node) {
throw new DeduplicationError(
"No node information has been provided and so cannot use scope 'node'",
);
}
// Use the node ID to make sure that the data can still be accessed and does not get deleted
// whenever the node gets renamed
return `n:${contextData.node.id}`;
}
return '';
}
private static createValueHash(value: DeduplicationItemTypes): string {
return createHash('md5').update(value.toString()).digest('base64');
}
private async findProcessedData(
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
): Promise<ProcessedData | null> {
return await Container.get(ProcessedDataRepository).findOne({
where: {
workflowId: contextData.workflow.id,
context: DeduplicationHelper.createContext(scope, contextData),
},
});
}
private validateMode(processedData: ProcessedData | null, options: ICheckProcessedOptions) {
if (processedData && processedData.value.mode !== options.mode) {
throw new DeduplicationError(
'Deduplication data was originally saved with an incompatible setting of the Keep Items Where parameter. Try Clean Database operation to reset.',
);
}
}
private processedDataHasEntries(
data: IProcessedDataEntries | IProcessedDataLatest,
): data is IProcessedDataEntries {
return Array.isArray(data.data);
}
private processedDataIsLatest(
data: IProcessedDataEntries | IProcessedDataLatest,
): data is IProcessedDataLatest {
return data && !Array.isArray(data.data);
}
private async handleLatestModes(
items: DeduplicationItemTypes[],
contextData: ICheckProcessedContextData,
options: ICheckProcessedOptions,
processedData: ProcessedData | null,
dbContext: string,
): Promise<IDeduplicationOutput> {
const incomingItems = DeduplicationHelper.sortEntries(items, options.mode);
if (!processedData) {
// All items are new so add new entries
await Container.get(ProcessedDataRepository).insert({
workflowId: contextData.workflow.id,
context: dbContext,
value: {
mode: options.mode,
data: incomingItems.pop(),
},
});
return {
new: items,
processed: [],
};
}
const returnData: IDeduplicationOutput = {
new: [],
processed: [],
};
if (!this.processedDataIsLatest(processedData.value)) {
return returnData;
}
let largestValue = processedData.value.data;
const processedDataValue = processedData.value;
incomingItems.forEach((item) => {
if (DeduplicationHelper.compareValues(options.mode, item, processedDataValue.data) === 1) {
returnData.new.push(item);
if (DeduplicationHelper.compareValues(options.mode, item, largestValue) === 1) {
largestValue = item;
}
} else {
returnData.processed.push(item);
}
});
processedData.value.data = largestValue;
await Container.get(ProcessedDataRepository).update(
{ workflowId: processedData.workflowId, context: processedData.context },
processedData,
);
return returnData;
}
private async handleHashedItems(
items: DeduplicationItemTypes[],
contextData: ICheckProcessedContextData,
options: ICheckProcessedOptions,
processedData: ProcessedData | null,
dbContext: string,
): Promise<IDeduplicationOutput> {
const hashedItems = items.map((item) => DeduplicationHelper.createValueHash(item));
if (!processedData) {
// All items are new so add new entries
if (options.maxEntries) {
hashedItems.splice(0, hashedItems.length - options.maxEntries);
}
await Container.get(ProcessedDataRepository).insert({
workflowId: contextData.workflow.id,
context: dbContext,
value: {
mode: options.mode,
data: hashedItems,
},
});
return {
new: items,
processed: [],
};
}
const returnData: IDeduplicationOutput = {
new: [],
processed: [],
};
if (!this.processedDataHasEntries(processedData.value)) {
return returnData;
}
const processedDataValue = processedData.value;
const processedItemsSet = new Set(processedDataValue.data);
hashedItems.forEach((item, index) => {
if (processedItemsSet.has(item)) {
returnData.processed.push(items[index]);
} else {
returnData.new.push(items[index]);
processedDataValue.data.push(item);
}
});
if (options.maxEntries) {
processedDataValue.data.splice(0, processedDataValue.data.length - options.maxEntries);
}
await Container.get(ProcessedDataRepository).update(
{ workflowId: processedData.workflowId, context: processedData.context },
processedData,
);
return returnData;
}
async checkProcessedAndRecord(
items: DeduplicationItemTypes[],
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
options: ICheckProcessedOptions,
): Promise<IDeduplicationOutput> {
const dbContext = DeduplicationHelper.createContext(scope, contextData);
assert.ok(contextData.workflow.id);
const processedData = await this.findProcessedData(scope, contextData);
this.validateMode(processedData, options);
if (['latestIncrementalKey', 'latestDate'].includes(options.mode)) {
return await this.handleLatestModes(items, contextData, options, processedData, dbContext);
}
//mode entries
return await this.handleHashedItems(items, contextData, options, processedData, dbContext);
}
async removeProcessed(
items: DeduplicationItemTypes[],
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
options: ICheckProcessedOptions,
): Promise<void> {
if (['latestIncrementalKey', 'latestDate'].includes(options.mode)) {
throw new DeduplicationError('Removing processed data is not possible in mode "latest"');
}
assert.ok(contextData.workflow.id);
const processedData = await Container.get(ProcessedDataRepository).findOne({
where: {
workflowId: contextData.workflow.id,
context: DeduplicationHelper.createContext(scope, contextData),
},
});
if (!processedData) {
return;
}
const hashedItems = items.map((item) => DeduplicationHelper.createValueHash(item));
if (!this.processedDataHasEntries(processedData.value)) {
return;
}
const processedDataValue = processedData.value;
hashedItems.forEach((item) => {
const index = processedDataValue.data.findIndex((value) => value === item);
if (index !== -1) {
processedDataValue.data.splice(index, 1);
}
});
await Container.get(ProcessedDataRepository).update(
{ workflowId: processedData.workflowId, context: processedData.context },
processedData,
);
}
async clearAllProcessedItems(
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
): Promise<void> {
await Container.get(ProcessedDataRepository).delete({
workflowId: contextData.workflow.id,
context: DeduplicationHelper.createContext(scope, contextData),
});
}
async getProcessedDataCount(
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
options: ICheckProcessedOptions,
): Promise<number> {
const processedDataRepository = Container.get(ProcessedDataRepository);
const processedData = await processedDataRepository.findOne({
where: {
workflowId: contextData.workflow.id,
context: DeduplicationHelper.createContext(scope, contextData),
},
});
if (
options.mode === 'entries' &&
processedData &&
this.processedDataHasEntries(processedData.value)
) {
return processedData.value.data.length;
} else {
return 0;
}
}
}

View file

@ -0,0 +1,7 @@
import { type IDataDeduplicator } from 'n8n-workflow';
import { DeduplicationHelper } from './deduplication-helper';
export function getDataDeduplicationService(): IDataDeduplicator {
return new DeduplicationHelper();
}

View file

@ -2,7 +2,12 @@
import { In } from '@n8n/typeorm';
import glob from 'fast-glob';
import { Credentials, InstanceSettings } from 'n8n-core';
import { ApplicationError, jsonParse, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ApplicationError,
jsonParse,
ErrorReporterProxy as ErrorReporter,
ensureError,
} from 'n8n-workflow';
import { readFile as fsReadFile } from 'node:fs/promises';
import path from 'path';
import { Container, Service } from 'typedi';
@ -274,8 +279,9 @@ export class SourceControlImportService {
this.logger.debug(`Reactivating workflow id ${existingWorkflow.id}`);
await workflowManager.add(existingWorkflow.id, 'activate');
// update the versionId of the workflow to match the imported workflow
} catch (error) {
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, { error });
} finally {
await Container.get(WorkflowRepository).update(
{ id: existingWorkflow.id },
@ -377,8 +383,9 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: { tags: [], mappings: [] } },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
const error = ensureError(e);
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error });
return;
}
@ -444,8 +451,8 @@ export class SourceControlImportService {
await fsReadFile(candidate.file, { encoding: 'utf8' }),
{ fallbackValue: [] },
);
} catch (error) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, error as Error);
} catch (e) {
this.logger.error(`Failed to import tags from file ${candidate.file}`, { error: e });
return;
}
const overriddenKeys = Object.keys(valueOverrides ?? {});

View file

@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';
export class DeduplicationError extends ApplicationError {
constructor(message: string) {
super(`Deduplication Failed: ${message}`);
}
}

View file

@ -0,0 +1,14 @@
import { ApplicationError } from 'n8n-workflow';
export class WorkerMissingEncryptionKey extends ApplicationError {
constructor() {
super(
[
'Failed to start worker because of missing encryption key.',
'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.',
'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
].join(' '),
{ level: 'warning' },
);
}
}

View file

@ -180,7 +180,7 @@ export class MessageEventBusDestinationWebhook
try {
JSON.parse(this.jsonQuery);
} catch {
this.logger.error('JSON parameter need to be an valid JSON');
this.logger.error('JSON parameter needs to be valid JSON');
}
this.axiosRequestOptions.params = jsonParse(this.jsonQuery);
}
@ -198,7 +198,7 @@ export class MessageEventBusDestinationWebhook
try {
JSON.parse(this.jsonHeaders);
} catch {
this.logger.error('JSON parameter need to be an valid JSON');
this.logger.error('JSON parameter needs to be valid JSON');
}
this.axiosRequestOptions.headers = jsonParse(this.jsonHeaders);
}

View file

@ -149,7 +149,7 @@ export class MessageEventBusLogWriter {
this._worker = new Worker(workerFileName);
if (this.worker) {
this.worker.on('messageerror', async (error) => {
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', error);
this.logger.error('Event Bus Log Writer thread error, attempting to restart...', { error });
await MessageEventBusLogWriter.instance.startThread();
});
return true;

View file

@ -1,4 +1,4 @@
import type { WorkerStatus, PushType } from '@n8n/api-types';
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { IWorkflowDb } from '@/interfaces';
@ -80,25 +80,5 @@ export type PubSubCommandMap = {
};
export type PubSubWorkerResponseMap = {
// #region Lifecycle
'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};
'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};
// #endregion
// #region Worker view
'get-worker-id': never;
'get-worker-status': WorkerStatus;
// #endregion
'response-to-get-worker-status': WorkerStatus;
};

View file

@ -651,7 +651,9 @@ export class TelemetryEventRelay extends EventRelay {
}
if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
@ -663,7 +665,9 @@ export class TelemetryEventRelay extends EventRelay {
if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
}
let userRole: 'owner' | 'sharee' | undefined = undefined;
@ -688,7 +692,9 @@ export class TelemetryEventRelay extends EventRelay {
};
if (!manualExecEventProperties.node_graph_string) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes, {
runData: runData.data.resultData?.runData,
});
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
}

View file

@ -1,5 +1,5 @@
import pick from 'lodash/pick';
import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { ensureError, type ExecutionStatus, type IRun, type IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@ -95,7 +95,8 @@ export async function updateExistingExecution(parameters: {
);
}
} catch (e) {
logger.error(`Failed to save metadata for execution ID ${executionId}`, e as Error);
const error = ensureError(e);
logger.error(`Failed to save metadata for execution ID ${executionId}`, { error });
}
if (executionData.finished === true && executionData.retryOf !== undefined) {

View file

@ -22,6 +22,8 @@ import type {
INodeProperties,
IUserSettings,
IWorkflowExecutionDataProcess,
DeduplicationMode,
DeduplicationItemTypes,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
@ -48,6 +50,20 @@ export interface ICredentialsOverwrite {
[key: string]: ICredentialDataDecryptedObject;
}
// ----------------------------------
// ProcessedData
// ----------------------------------
export interface IProcessedDataLatest {
mode: DeduplicationMode;
data: DeduplicationItemTypes;
}
export interface IProcessedDataEntries {
mode: DeduplicationMode;
data: DeduplicationItemTypes[];
}
// ----------------------------------
// tags
// ----------------------------------

View file

@ -37,7 +37,9 @@ export class License {
private readonly orchestrationService: OrchestrationService,
private readonly settingsRepository: SettingsRepository,
private readonly licenseMetricsService: LicenseMetricsService,
) {}
) {
this.logger = this.logger.withScope('license');
}
/**
* Whether this instance should renew the license - on init and periodically.
@ -109,9 +111,9 @@ export class License {
await this.manager.initialize();
this.logger.debug('License initialized');
} catch (e: unknown) {
if (e instanceof Error) {
this.logger.error('Could not initialize license manager sdk', e);
} catch (error: unknown) {
if (error instanceof Error) {
this.logger.error('Could not initialize license manager sdk', { error });
}
}
}
@ -141,10 +143,7 @@ export class License {
this.orchestrationService.setMultiMainSetupLicensed(isMultiMainLicensed ?? false);
if (
this.orchestrationService.isMultiMainSetupEnabled &&
this.orchestrationService.isFollower
) {
if (this.orchestrationService.isMultiMainSetupEnabled && this.instanceSettings.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping sending of "reload-license" command...',
);
@ -253,6 +252,10 @@ export class License {
return this.isFeatureEnabled(LICENSE_FEATURES.AI_ASSISTANT);
}
isAskAiEnabled() {
return this.isFeatureEnabled(LICENSE_FEATURES.ASK_AI);
}
isAdvancedExecutionFiltersEnabled() {
return this.isFeatureEnabled(LICENSE_FEATURES.ADVANCED_EXECUTION_FILTERS);
}

View file

@ -14,8 +14,7 @@ type LicenseError = Error & { errorId?: keyof typeof LicenseErrors };
export const LicenseErrors = {
SCHEMA_VALIDATION: 'Activation key is in the wrong format',
RESERVATION_EXHAUSTED:
'Activation key has been used too many times. Please contact sales@n8n.io if you would like to extend it',
RESERVATION_EXHAUSTED: 'Activation key has been used too many times',
RESERVATION_EXPIRED: 'Activation key has expired',
NOT_FOUND: 'Activation key not found',
RESERVATION_CONFLICT: 'Activation key not found',

View file

@ -11,6 +11,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@ -30,6 +31,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['file'],
scopes: [],
file: {
fileSizeMax: 100,
fileCountMax: 16,
@ -56,6 +58,7 @@ describe('Logger', () => {
logging: {
level: 'error',
outputs: ['console'],
scopes: [],
},
});
@ -74,6 +77,7 @@ describe('Logger', () => {
logging: {
level: 'warn',
outputs: ['console'],
scopes: [],
},
});
@ -92,6 +96,7 @@ describe('Logger', () => {
logging: {
level: 'info',
outputs: ['console'],
scopes: [],
},
});
@ -110,6 +115,7 @@ describe('Logger', () => {
logging: {
level: 'debug',
outputs: ['console'],
scopes: [],
},
});
@ -128,6 +134,7 @@ describe('Logger', () => {
logging: {
level: 'silent',
outputs: ['console'],
scopes: [],
},
});

View file

@ -1,5 +1,7 @@
import type { LogScope } from '@n8n/config';
import { GlobalConfig } from '@n8n/config';
import callsites from 'callsites';
import type { TransformableInfo } from 'logform';
import { InstanceSettings } from 'n8n-core';
import { LoggerProxy, LOG_LEVELS } from 'n8n-workflow';
import path, { basename } from 'node:path';
@ -15,10 +17,16 @@ import type { LogLocationMetadata, LogLevel, LogMetadata } from './types';
@Service()
export class Logger {
private readonly internalLogger: winston.Logger;
private internalLogger: winston.Logger;
private readonly level: LogLevel;
private readonly scopes: Set<LogScope>;
private get isScopingEnabled() {
return this.scopes.size > 0;
}
constructor(
private readonly globalConfig: GlobalConfig,
private readonly instanceSettings: InstanceSettings,
@ -35,15 +43,30 @@ export class Logger {
if (!isSilent) {
this.setLevel();
const { outputs } = this.globalConfig.logging;
const { outputs, scopes } = this.globalConfig.logging;
if (outputs.includes('console')) this.setConsoleTransport();
if (outputs.includes('file')) this.setFileTransport();
this.scopes = new Set(scopes);
}
LoggerProxy.init(this);
}
private setInternalLogger(internalLogger: winston.Logger) {
this.internalLogger = internalLogger;
}
withScope(scope: LogScope) {
const scopedLogger = new Logger(this.globalConfig, this.instanceSettings);
const childLogger = this.internalLogger.child({ scope });
scopedLogger.setInternalLogger(childLogger);
return scopedLogger;
}
private log(level: LogLevel, message: string, metadata: LogMetadata) {
const location: LogLocationMetadata = {};
@ -81,11 +104,22 @@ export class Logger {
this.internalLogger.add(new winston.transports.Console({ format }));
}
private scopeFilter() {
return winston.format((info: TransformableInfo & { metadata: LogMetadata }) => {
const shouldIncludeScope = info.metadata.scope && this.scopes.has(info.metadata.scope);
if (this.isScopingEnabled && !shouldIncludeScope) return false;
return info;
})();
}
private debugDevConsoleFormat() {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp({ format: () => this.devTsFormat() }),
winston.format.colorize({ all: true }),
this.scopeFilter(),
winston.format.printf(({ level: _level, message, timestamp, metadata: _metadata }) => {
const SEPARATOR = ' '.repeat(3);
const LOG_LEVEL_COLUMN_WIDTH = 15; // 5 columns + ANSI color codes
@ -100,6 +134,7 @@ export class Logger {
return winston.format.combine(
winston.format.metadata(),
winston.format.timestamp(),
this.scopeFilter(),
winston.format.printf(({ level, message, timestamp, metadata }) => {
const _metadata = this.toPrintable(metadata);
return `${timestamp} | ${level.padEnd(5)} | ${message}${_metadata ? ' ' + _metadata : ''}`;

View file

@ -1,7 +1,14 @@
import type { LogScope } from '@n8n/config';
import type { LOG_LEVELS } from './constants';
export type LogLevel = (typeof LOG_LEVELS)[number];
export type LogLocationMetadata = Partial<{ file: string; function: string }>;
export type LogMetadata = {
[key: string]: unknown;
scope?: LogScope;
file?: string;
function?: string;
};
export type LogMetadata = Record<string, unknown> | Error;
export type LogLocationMetadata = Pick<LogMetadata, 'file' | 'function'>;

View file

@ -44,15 +44,38 @@ export class NodeTypes implements INodeTypes {
}
getByNameAndVersion(nodeType: string, version?: number): INodeType {
const versionedNodeType = NodeHelpers.getVersionedNodeType(
this.getNode(nodeType).type,
version,
);
if (versionedNodeType.description.usableAsTool) {
return NodeHelpers.convertNodeToAiTool(versionedNodeType);
const origType = nodeType;
const toolRequested = nodeType.startsWith('n8n-nodes-base') && nodeType.endsWith('Tool');
// Make sure the nodeType to actually get from disk is the un-wrapped type
if (toolRequested) {
nodeType = nodeType.replace(/Tool$/, '');
}
return versionedNodeType;
const node = this.getNode(nodeType);
const versionedNodeType = NodeHelpers.getVersionedNodeType(node.type, version);
if (!toolRequested) return versionedNodeType;
if (!versionedNodeType.description.usableAsTool)
throw new ApplicationError('Node cannot be used as a tool', { extra: { nodeType } });
const { loadedNodes } = this.loadNodesAndCredentials;
if (origType in loadedNodes) {
return loadedNodes[origType].type as INodeType;
}
// Instead of modifying the existing type, we extend it into a new type object
const clonedProperties = Object.create(
versionedNodeType.description.properties,
) as INodeTypeDescription['properties'];
const clonedDescription = Object.create(versionedNodeType.description, {
properties: { value: clonedProperties },
}) as INodeTypeDescription;
const clonedNode = Object.create(versionedNodeType, {
description: { value: clonedDescription },
}) as INodeType;
const tool = NodeHelpers.convertNodeToAiTool(clonedNode);
loadedNodes[nodeType + 'Tool'] = { sourcePath: '', type: tool };
return tool;
}
/* Some nodeTypes need to get special parameters applied like the polling nodes the polling times */

View file

@ -38,7 +38,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [
'license:manage',
'logStreaming:manage',
'orchestration:read',
'orchestration:list',
'saml:manage',
'securityAudit:generate',
'sourceControl:pull',

View file

@ -1,8 +1,9 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import { assert, jsonStringify } from 'n8n-workflow';
import { Service } from 'typedi';
import type { User } from '@/databases/entities/user';
import type { Logger } from '@/logging/logger.service';
import { Logger } from '@/logging/logger.service';
import type { OnPushMessage } from '@/push/types';
import { TypedEmitter } from '@/typed-emitter';
@ -16,6 +17,7 @@ export interface AbstractPushEvents {
*
* @emits message when a message is received from a client
*/
@Service()
export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPushEvents> {
protected connections: Record<string, Connection> = {};
@ -23,9 +25,12 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
protected abstract close(connection: Connection): void;
protected abstract sendToOneConnection(connection: Connection, data: string): void;
protected abstract ping(connection: Connection): void;
constructor(protected readonly logger: Logger) {
super();
// Ping all connected clients every 60 seconds
setInterval(() => this.pingAll(), 60 * 1000);
}
protected add(pushRef: string, userId: User['id'], connection: Connection) {
@ -75,6 +80,12 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
}
}
private pingAll() {
for (const pushRef in this.connections) {
this.ping(this.connections[pushRef]);
}
}
sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.sendTo(type, data, Object.keys(this.connections));
}

View file

@ -1,8 +1,6 @@
import { Service } from 'typedi';
import type { User } from '@/databases/entities/user';
import { Logger } from '@/logging/logger.service';
import SSEChannel from 'sse-channel';
import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types';
@ -11,29 +9,41 @@ type Connection = { req: PushRequest; res: PushResponse };
@Service()
export class SSEPush extends AbstractPush<Connection> {
readonly channel = new SSEChannel();
readonly connections: Record<string, Connection> = {};
constructor(logger: Logger) {
super(logger);
this.channel.on('disconnect', (_, { req }) => {
this.remove(req?.query?.pushRef);
});
}
add(pushRef: string, userId: User['id'], connection: Connection) {
const { req, res } = connection;
// Initialize the connection
req.socket.setTimeout(0);
req.socket.setNoDelay(true);
req.socket.setKeepAlive(true);
res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.writeHead(200);
res.write(':ok\n\n');
res.flush();
super.add(pushRef, userId, connection);
this.channel.addClient(connection.req, connection.res);
// When the client disconnects, remove the client
const removeClient = () => this.remove(pushRef);
req.once('end', removeClient);
req.once('close', removeClient);
res.once('finish', removeClient);
}
protected close({ res }: Connection) {
res.end();
this.channel.removeClient(res);
}
protected sendToOneConnection(connection: Connection, data: string) {
this.channel.send(data, [connection.res]);
const { res } = connection;
res.write('data: ' + data + '\n\n');
res.flush();
}
protected ping({ res }: Connection) {
res.write(':ping\n\n');
res.flush();
}
}

View file

@ -11,7 +11,15 @@ export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined };
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
export type PushResponse = Response & { req: PushRequest };
export type PushResponse = Response & {
req: PushRequest;
/**
* `flush()` is defined in the compression middleware.
* This is necessary because the compression middleware sometimes waits
* for a certain amount of data before sending the data to the client
*/
flush: () => void;
};
export interface OnPushMessage {
pushRef: string;

Some files were not shown because too many files have changed in this diff Show more