Merge branch 'master' of github.com:n8n-io/n8n into feature-sub-workflow-inputs

This commit is contained in:
Ivan Atanasov 2024-12-02 10:02:57 +01:00
commit 75a571e02b
No known key found for this signature in database
344 changed files with 15959 additions and 4856 deletions

View file

@ -7,6 +7,8 @@ services:
- MARIADB_MYSQL_LOCALHOST_USER=true
ports:
- 3306:3306
tmpfs:
- /var/lib/mysql
postgres:
image: postgres:16
@ -17,3 +19,5 @@ services:
- POSTGRES_PASSWORD=password
ports:
- 5432:5432
tmpfs:
- /var/lib/postgresql/data

View file

@ -74,6 +74,8 @@ jobs:
N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}}
SKIP_STATISTICS_EVENTS: true
DB_SQLITE_POOL_SIZE: 4
N8N_SENTRY_DSN: ${{secrets.CI_SENTRY_DSN}}
# -
# name: Export credentials
# if: always()
@ -93,7 +95,7 @@ jobs:
- name: Notify Slack on failure
uses: act10ns/slack@v2.0.0
if: failure()
if: failure() && github.ref == 'refs/heads/master'
with:
status: ${{ job.status }}
channel: '#alerts-build'

View file

@ -1,3 +1,89 @@
# [1.70.0](https://github.com/n8n-io/n8n/compare/n8n@1.69.0...n8n@1.70.0) (2024-11-27)
### Bug Fixes
* **AI Agent Node:** Add binary message before scratchpad to prevent tool calling loops ([#11845](https://github.com/n8n-io/n8n/issues/11845)) ([5c80cb5](https://github.com/n8n-io/n8n/commit/5c80cb57cf709a1097a38e0394aad6fce5330eba))
* CodeNodeEditor walk cannot read properties of null ([#11129](https://github.com/n8n-io/n8n/issues/11129)) ([d99e0a7](https://github.com/n8n-io/n8n/commit/d99e0a7c979a1ee96b2eea1b9011d5bce375289a))
* **core:** Bring back execution data on the `executionFinished` push message ([#11821](https://github.com/n8n-io/n8n/issues/11821)) ([0313570](https://github.com/n8n-io/n8n/commit/03135702f18e750ba44840dccfec042270629a2b))
* **core:** Correct invalid WS status code on removing connection ([#11901](https://github.com/n8n-io/n8n/issues/11901)) ([1d80225](https://github.com/n8n-io/n8n/commit/1d80225d26ba01f78934a455acdcca7b83be7205))
* **core:** Don't use unbound context methods in code sandboxes ([#11914](https://github.com/n8n-io/n8n/issues/11914)) ([f6c0d04](https://github.com/n8n-io/n8n/commit/f6c0d045e9683cd04ee849f37b96697097c5b41d))
* **core:** Fix broken execution query when using projectId ([#11852](https://github.com/n8n-io/n8n/issues/11852)) ([a061dbc](https://github.com/n8n-io/n8n/commit/a061dbca07ad686c563e85c56081bc1a7830259b))
* **core:** Fix validation of items returned in the task runner ([#11897](https://github.com/n8n-io/n8n/issues/11897)) ([a535e88](https://github.com/n8n-io/n8n/commit/a535e88f1aec8fbbf2eb9397d38748f49773de2d))
* **editor:** Add missing trigger waiting tooltip on new canvas ([#11918](https://github.com/n8n-io/n8n/issues/11918)) ([a8df221](https://github.com/n8n-io/n8n/commit/a8df221bfbb5428d93d03f539bcfdaf29ee20c21))
* **editor:** Don't re-render input panel after node finishes executing ([#11813](https://github.com/n8n-io/n8n/issues/11813)) ([b3a99a2](https://github.com/n8n-io/n8n/commit/b3a99a2351079c37ed6d83f43920ba80f3832234))
* **editor:** Fix AI assistant loading message layout ([#11819](https://github.com/n8n-io/n8n/issues/11819)) ([89b4807](https://github.com/n8n-io/n8n/commit/89b48072432753137b498c338af7777036fdde7a))
* **editor:** Fix new canvas discovery tooltip position after adding github stars button ([#11898](https://github.com/n8n-io/n8n/issues/11898)) ([f4ab5c7](https://github.com/n8n-io/n8n/commit/f4ab5c7b9244b8fdde427c12c1a152fbaaba0c34))
* **editor:** Fix node position not getting set when dragging selection on new canvas ([#11871](https://github.com/n8n-io/n8n/issues/11871)) ([595de81](https://github.com/n8n-io/n8n/commit/595de81c03b3e488ab41fb8d1d316c3db6a8372a))
* **editor:** Restore workers view ([#11876](https://github.com/n8n-io/n8n/issues/11876)) ([3aa72f6](https://github.com/n8n-io/n8n/commit/3aa72f613f64c16d7dff67ffe66037894e45aa7c))
* **editor:** Turn NPS survey into a modal and make sure it shows above the Ask AI button ([#11814](https://github.com/n8n-io/n8n/issues/11814)) ([ca169f3](https://github.com/n8n-io/n8n/commit/ca169f3f3455fa39ce9120b30d7b409bade6561e))
* **editor:** Use `crypto.randomUUID()` to initialize node id if missing on new canvas ([#11873](https://github.com/n8n-io/n8n/issues/11873)) ([bc4857a](https://github.com/n8n-io/n8n/commit/bc4857a1b3d6ea389f11fb8246a1cee33b8a008e))
* **n8n Form Node:** Duplicate popup in manual mode ([#11925](https://github.com/n8n-io/n8n/issues/11925)) ([2c34bf4](https://github.com/n8n-io/n8n/commit/2c34bf4ea6137fb0fb321969684ffa621da20fa3))
* **n8n Form Node:** Redirect if completion page to trigger ([#11822](https://github.com/n8n-io/n8n/issues/11822)) ([1a8fb7b](https://github.com/n8n-io/n8n/commit/1a8fb7bdc428c6a23c8708e2dcf924f1f10b47a9))
* **OpenAI Node:** Remove preview chatInput parameter for `Assistant:Messsage` operation ([#11825](https://github.com/n8n-io/n8n/issues/11825)) ([4dde287](https://github.com/n8n-io/n8n/commit/4dde287cde3af7c9c0e57248e96b8f1270da9332))
* Retain execution data between partial executions (new flow) ([#11828](https://github.com/n8n-io/n8n/issues/11828)) ([3320436](https://github.com/n8n-io/n8n/commit/3320436a6fdf8472b3843b9fe8d4de7af7f5ef5c))
### Features
* Add SharePoint credentials ([#11570](https://github.com/n8n-io/n8n/issues/11570)) ([05c6109](https://github.com/n8n-io/n8n/commit/05c61091db9bdd62fdcca910ead50d0bd512966a))
* Add Zabbix credential only node ([#11489](https://github.com/n8n-io/n8n/issues/11489)) ([fbd1ecf](https://github.com/n8n-io/n8n/commit/fbd1ecfb29461fee393914bc200ec72c654d8944))
* **AI Transform Node:** Support for drag and drop ([#11276](https://github.com/n8n-io/n8n/issues/11276)) ([2c252b0](https://github.com/n8n-io/n8n/commit/2c252b0b2d5282f4a87bce76f93c4c02dd8ff5e3))
* **editor:** Drop `response` wrapper requirement from Subworkflow Tool output ([#11785](https://github.com/n8n-io/n8n/issues/11785)) ([cd3598a](https://github.com/n8n-io/n8n/commit/cd3598aaab6cefe58a4cb9df7d93fb501415e9d3))
* **editor:** Improve node and edge bring-to-front mechanism on new canvas ([#11793](https://github.com/n8n-io/n8n/issues/11793)) ([b89ca9d](https://github.com/n8n-io/n8n/commit/b89ca9d482faa5cb542898f3973fb6e7c9a8437a))
* **editor:** Make new canvas connections go underneath node when looping backwards ([#11833](https://github.com/n8n-io/n8n/issues/11833)) ([91d1bd8](https://github.com/n8n-io/n8n/commit/91d1bd8d333454f3971605df73c3703102d2a9e9))
* **editor:** Make the left sidebar in Expressions editor draggable ([#11838](https://github.com/n8n-io/n8n/issues/11838)) ([a713b3e](https://github.com/n8n-io/n8n/commit/a713b3ed25feb1790412fc320cf41a0967635263))
* **editor:** Migrate existing users to new canvas and set new canvas as default ([#11896](https://github.com/n8n-io/n8n/issues/11896)) ([caa7447](https://github.com/n8n-io/n8n/commit/caa744785a2cc5063a5fb9d269c0ea53ea432298))
* **Slack Node:** Update wait for approval to use markdown ([#11754](https://github.com/n8n-io/n8n/issues/11754)) ([40dd02f](https://github.com/n8n-io/n8n/commit/40dd02f360d0d8752fe89c4304c18cac9858c530))
# [1.69.0](https://github.com/n8n-io/n8n/compare/n8n@1.68.0...n8n@1.69.0) (2024-11-20)
### Bug Fixes
* Add supported versions warning to Zep memory node ([#11803](https://github.com/n8n-io/n8n/issues/11803)) ([9cc5bc1](https://github.com/n8n-io/n8n/commit/9cc5bc1aef974fe6c2511c1597b90c8b54ba6b9c))
* **AI Agent Node:** Escape curly brackets in tools description for non Tool agents ([#11772](https://github.com/n8n-io/n8n/issues/11772)) ([83abdfa](https://github.com/n8n-io/n8n/commit/83abdfaf027a0533824a3ac3e4bab3cad971821a))
* **Anthropic Chat Model Node:** Update credentials test endpoint ([#11756](https://github.com/n8n-io/n8n/issues/11756)) ([6cf0aba](https://github.com/n8n-io/n8n/commit/6cf0abab5bcddb407571271b9f174e66bb209790))
* **core:** Add missing env vars to task runner config ([#11810](https://github.com/n8n-io/n8n/issues/11810)) ([870c576](https://github.com/n8n-io/n8n/commit/870c576ed9d7ce4ef005db9c8bedd78e91084c9c))
* **core:** Allow Azure's SAML metadata XML containing WS-Federation nodes to pass validation ([#11724](https://github.com/n8n-io/n8n/issues/11724)) ([3b62bd5](https://github.com/n8n-io/n8n/commit/3b62bd58c264be0225a74ae0eb35c4761c419b79))
* **core:** Delete binary data parent folder when pruning executions ([#11790](https://github.com/n8n-io/n8n/issues/11790)) ([17ef2c6](https://github.com/n8n-io/n8n/commit/17ef2c63f69b811bdd28006df3b6edd446837971))
* **core:** Fix `diagnostics.enabled` default value ([#11809](https://github.com/n8n-io/n8n/issues/11809)) ([5fa72b0](https://github.com/n8n-io/n8n/commit/5fa72b0512b00bdc6a1065b7b604c9640f469454))
* **core:** Improve the security on OAuth callback endpoints ([#11593](https://github.com/n8n-io/n8n/issues/11593)) ([274fcf4](https://github.com/n8n-io/n8n/commit/274fcf45d393d8db1d2fb5ae1e774a4c9198a178))
* **core:** Restore old names for pruning config keys ([#11782](https://github.com/n8n-io/n8n/issues/11782)) ([d15b8d0](https://github.com/n8n-io/n8n/commit/d15b8d05092d2ed9dd45fcfa34b4177f60469ebd))
* **core:** Unload any existing version of a community nodes package before upgrading it ([#11727](https://github.com/n8n-io/n8n/issues/11727)) ([1d8fd13](https://github.com/n8n-io/n8n/commit/1d8fd13d841b73466ba5f8044d17d7199da7e856))
* **editor:** Add documentation link to insufficient quota message ([#11777](https://github.com/n8n-io/n8n/issues/11777)) ([1987363](https://github.com/n8n-io/n8n/commit/1987363f7941285c51fda849a4ac92832368b25a))
* **editor:** Add project header subtitle ([#11797](https://github.com/n8n-io/n8n/issues/11797)) ([ff4261c](https://github.com/n8n-io/n8n/commit/ff4261c16845c7de1790fdf0eaa9f57b37822289))
* **editor:** Change Home label to Overview ([#11736](https://github.com/n8n-io/n8n/issues/11736)) ([1a78360](https://github.com/n8n-io/n8n/commit/1a783606b4ef22d85e173a2a780d5c49ff208932))
* **editor:** Fix executions sorting ([#11808](https://github.com/n8n-io/n8n/issues/11808)) ([cd5ad65](https://github.com/n8n-io/n8n/commit/cd5ad65e90a3be4d67b51521772e0fceb7f4abc7))
* **editor:** Fix partial executions not working due to broken push message queue and race conditions ([#11798](https://github.com/n8n-io/n8n/issues/11798)) ([b05d435](https://github.com/n8n-io/n8n/commit/b05d43519994abdd34a65462d14184c779d0b667))
* **editor:** Fix reordered switch connections when copying nodes on new canvas ([#11788](https://github.com/n8n-io/n8n/issues/11788)) ([6c2dad7](https://github.com/n8n-io/n8n/commit/6c2dad79143f5b0c255ab8c97c3255314834c458))
* **editor:** Fix the issue with RMC Values to Send collection disappears ([#11710](https://github.com/n8n-io/n8n/issues/11710)) ([7bb9002](https://github.com/n8n-io/n8n/commit/7bb9002cbc10cf58550f53a30c6fd7151f8e7355))
* **editor:** Improve formatting of expired trial error message ([#11708](https://github.com/n8n-io/n8n/issues/11708)) ([8a0ad0f](https://github.com/n8n-io/n8n/commit/8a0ad0f910feeada6d0c63e81c3e97a1a6e44de7))
* **editor:** Optimize application layout ([#11769](https://github.com/n8n-io/n8n/issues/11769)) ([91f9390](https://github.com/n8n-io/n8n/commit/91f9390b90a68d064ea00d10505bf3767ddec1d4))
* **Google Sheets Trigger Node:** Fix issue with regex showing correct sheet as invalid ([#11770](https://github.com/n8n-io/n8n/issues/11770)) ([d5ba1a0](https://github.com/n8n-io/n8n/commit/d5ba1a059b7a67154f17f8ad3fcfe66c5c031059))
* **HTTP Request Node:** Continue using error ([#11733](https://github.com/n8n-io/n8n/issues/11733)) ([d1bae1a](https://github.com/n8n-io/n8n/commit/d1bae1ace062dd5b64087e0313e78599b5994355))
* **n8n Form Node:** Support expressions in completion page ([#11781](https://github.com/n8n-io/n8n/issues/11781)) ([1099167](https://github.com/n8n-io/n8n/commit/10991675fe2e6913e8f03d565b670257941f18e5))
* Prevent workflow to run if active and single webhook service ([#11752](https://github.com/n8n-io/n8n/issues/11752)) ([bcb9a20](https://github.com/n8n-io/n8n/commit/bcb9a2078186ff80e03ca3b8532d3585c307d86b))
* **Read/Write Files from Disk Node:** Escape parenthesis when reading file ([#11753](https://github.com/n8n-io/n8n/issues/11753)) ([285534e](https://github.com/n8n-io/n8n/commit/285534e6d0ceb60290bd0a928054e494252148fe))
* **YouTube Node:** Issue in published before and after dates filters ([#11741](https://github.com/n8n-io/n8n/issues/11741)) ([7381c28](https://github.com/n8n-io/n8n/commit/7381c28af00148b329690021b921267a48a6eaa3))
### Features
* **core:** Improve debugging of sub-workflows ([#11602](https://github.com/n8n-io/n8n/issues/11602)) ([fd3254d](https://github.com/n8n-io/n8n/commit/fd3254d5874a03b57421246b77a519787536a93e))
* **core:** Improve handling of manual executions with wait nodes ([#11750](https://github.com/n8n-io/n8n/issues/11750)) ([61696c3](https://github.com/n8n-io/n8n/commit/61696c3db313cdc97925af728ff5c68415f9b6b2))
* **editor:** Add Info Note to NDV Output Panel if no existing Tools were used during Execution ([#11672](https://github.com/n8n-io/n8n/issues/11672)) ([de0e861](https://github.com/n8n-io/n8n/commit/de0e86150f4d0615481e5ec3869465cfd1ce822f))
* **editor:** Add option to create sub workflow from workflows list in `Execute Workflow` node ([#11706](https://github.com/n8n-io/n8n/issues/11706)) ([c265d44](https://github.com/n8n-io/n8n/commit/c265d44841eb147115563ce24c56666b1e321433))
* **editor:** Add selection navigation using the keyboard on new canvas ([#11679](https://github.com/n8n-io/n8n/issues/11679)) ([6cd9b99](https://github.com/n8n-io/n8n/commit/6cd9b996af0406caf65941503276524de9e2add4))
* **editor:** Add universal Create Resource Menu ([#11564](https://github.com/n8n-io/n8n/issues/11564)) ([b38ce14](https://github.com/n8n-io/n8n/commit/b38ce14ec94d74aa1c9780a0572804ff6266588d))
* **Embeddings Azure OpenAI Node, Azure OpenAI Chat Model Node:** Add support for basePath url in Azure Open AI nodes ([#11784](https://github.com/n8n-io/n8n/issues/11784)) ([e298ebe](https://github.com/n8n-io/n8n/commit/e298ebe90d69f466ee897855472eaa7be1d96aba))
* **Embeddings OpenAI Node, Embeddings Azure OpenAI Node:** Add dimensions option ([#11773](https://github.com/n8n-io/n8n/issues/11773)) ([de01a8a](https://github.com/n8n-io/n8n/commit/de01a8a01d37f33ab8bcbc65588cafebda969922))
* GitHub stars dismiss button ([#11794](https://github.com/n8n-io/n8n/issues/11794)) ([8fbad74](https://github.com/n8n-io/n8n/commit/8fbad74ab685c2ba0395c30cee0ddf9498fb8984))
# [1.68.0](https://github.com/n8n-io/n8n/compare/n8n@1.67.0...n8n@1.68.0) (2024-11-13)

View file

@ -35,7 +35,11 @@ export function setCredentialConnectionParameterInputByName(name: string, value:
}
export function saveCredential() {
getCredentialSaveButton().click({ force: true });
getCredentialSaveButton()
.click({ force: true })
.within(() => {
cy.get('button').should('not.exist');
});
}
export function closeCredentialModal() {

View file

@ -88,7 +88,7 @@ describe('Sharing', { disableAutoLogin: true }, () => {
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCards().should('have.length', 1);
workflowsPage.getters.workflowCard('Workflow W1').click();
workflowsPage.getters.workflowCardContent('Workflow W1').click();
workflowPage.actions.addNodeToCanvas('Airtable', true, true);
ndv.getters.credentialInput().find('input').should('have.value', 'Credential C2');
ndv.actions.close();
@ -104,7 +104,7 @@ describe('Sharing', { disableAutoLogin: true }, () => {
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCards().should('have.length', 2);
workflowsPage.getters.workflowCard('Workflow W1').click();
workflowsPage.getters.workflowCardContent('Workflow W1').click();
workflowPage.actions.addNodeToCanvas('Airtable', true, true);
ndv.getters.credentialInput().find('input').should('have.value', 'Credential C2');
ndv.actions.close();
@ -133,7 +133,7 @@ describe('Sharing', { disableAutoLogin: true }, () => {
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCards().should('have.length', 2);
workflowsPage.getters.workflowCard('Workflow W1').click();
workflowsPage.getters.workflowCardContent('Workflow W1').click();
workflowPage.actions.openNode('Notion');
ndv.getters
.credentialInput()
@ -144,7 +144,7 @@ describe('Sharing', { disableAutoLogin: true }, () => {
cy.waitForLoad();
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCard('Workflow W2').click('top');
workflowsPage.getters.workflowCardContent('Workflow W2').click('top');
workflowPage.actions.executeWorkflow();
});
@ -353,7 +353,7 @@ describe('Credential Usage in Cross Shared Workflows', () => {
credentialsPage.getters.emptyListCreateCredentialButton().click();
credentialsModal.actions.createNewCredential('Notion API');
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCard(workflowName).click();
workflowsPage.getters.workflowCardContent(workflowName).click();
workflowPage.actions.addNodeToCanvas(NOTION_NODE_NAME, true, true);
// Only the own credential the shared one (+ the 'Create new' option)
@ -398,7 +398,7 @@ describe('Credential Usage in Cross Shared Workflows', () => {
credentialsPage.getters.createCredentialButton().click();
credentialsModal.actions.createNewCredential('Notion API');
cy.visit(workflowsPage.url);
workflowsPage.getters.workflowCard(workflowName).click();
workflowsPage.getters.workflowCardContent(workflowName).click();
workflowPage.actions.addNodeToCanvas(NOTION_NODE_NAME, true, true);
// Only the personal credentials of the workflow owner and the global owner

View file

@ -186,7 +186,7 @@ describe('Projects', { disableAutoLogin: true }, () => {
menuItems.filter(':contains("Development")[class*=active_]').should('exist');
cy.intercept('GET', '/rest/workflows/*').as('loadWorkflow');
workflowsPage.getters.workflowCards().first().click();
workflowsPage.getters.workflowCards().first().findChildByTestId('card-content').click();
cy.wait('@loadWorkflow');
menuItems = cy.getByTestId('menu-item');
@ -747,7 +747,7 @@ describe('Projects', { disableAutoLogin: true }, () => {
// Open the moved workflow
workflowsPage.getters.workflowCards().should('have.length', 1);
workflowsPage.getters.workflowCards().first().click();
workflowsPage.getters.workflowCards().first().findChildByTestId('card-content').click();
// Check if the credential can be changed
workflowPage.getters.canvasNodeByName(NOTION_NODE_NAME).should('be.visible').dblclick();

View file

@ -56,6 +56,8 @@ export class WorkflowsPage extends BasePage {
.parents('[data-test-id="resources-list-item"]'),
workflowTags: (workflowName: string) =>
this.getters.workflowCard(workflowName).findChildByTestId('workflow-card-tags'),
workflowCardContent: (workflowName: string) =>
this.getters.workflowCard(workflowName).findChildByTestId('card-content'),
workflowActivator: (workflowName: string) =>
this.getters.workflowCard(workflowName).findChildByTestId('workflow-card-activator'),
workflowActivatorStatus: (workflowName: string) =>

View file

@ -75,8 +75,13 @@ Cypress.Commands.add('signin', ({ email, password }) => {
.then((response) => {
Cypress.env('currentUserId', response.body.data.id);
// @TODO Remove this once the switcher is removed
cy.window().then((win) => {
win.localStorage.setItem('NodeView.switcher.discovered', 'true'); // @TODO Remove this once the switcher is removed
win.localStorage.setItem('NodeView.migrated', 'true');
win.localStorage.setItem('NodeView.switcher.discovered.beta', 'true');
const nodeViewVersion = Cypress.env('NODE_VIEW_VERSION');
win.localStorage.setItem('NodeView.version', nodeViewVersion ?? '1');
});
});
});

View file

@ -20,11 +20,6 @@ beforeEach(() => {
win.localStorage.setItem('N8N_THEME', 'light');
win.localStorage.setItem('N8N_AUTOCOMPLETE_ONBOARDED', 'true');
win.localStorage.setItem('N8N_MAPPING_ONBOARDED', 'true');
const nodeViewVersion = Cypress.env('NODE_VIEW_VERSION');
if (nodeViewVersion) {
win.localStorage.setItem('NodeView.version', nodeViewVersion);
}
});
cy.intercept('GET', '/rest/settings', (req) => {

View file

@ -1,5 +1,6 @@
import { stringify } from 'flatted';
import type { IDataObject, IPinData, ITaskData, ITaskDataConnections } from 'n8n-workflow';
import type { IDataObject, ITaskData, ITaskDataConnections } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { clickExecuteWorkflowButton } from '../composables/workflow';
@ -39,38 +40,6 @@ export function createMockNodeExecutionData(
};
}
function createMockWorkflowExecutionData({
runData,
lastNodeExecuted,
}: {
runData: Record<string, ITaskData | ITaskData[]>;
pinData?: IPinData;
lastNodeExecuted: string;
}) {
return {
data: stringify({
startData: {},
resultData: {
runData,
pinData: {},
lastNodeExecuted,
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
}),
mode: 'manual',
startedAt: new Date().toISOString(),
stoppedAt: new Date().toISOString(),
status: 'success',
finished: true,
};
}
export function runMockWorkflowExecution({
trigger,
lastNodeExecuted,
@ -80,6 +49,7 @@ export function runMockWorkflowExecution({
lastNodeExecuted: string;
runData: Array<ReturnType<typeof createMockNodeExecutionData>>;
}) {
const workflowId = nanoid();
const executionId = Math.floor(Math.random() * 1_000_000).toString();
cy.intercept('POST', '/rest/workflows/**/run?**', {
@ -117,17 +87,24 @@ export function runMockWorkflowExecution({
resolvedRunData[nodeName] = nodeExecution[nodeName];
});
cy.intercept('GET', `/rest/executions/${executionId}`, {
statusCode: 200,
body: {
data: createMockWorkflowExecutionData({
cy.push('executionFinished', {
executionId,
workflowId,
status: 'success',
rawData: stringify({
startData: {},
resultData: {
runData,
pinData: {},
lastNodeExecuted,
runData: resolvedRunData,
}),
},
}).as('getExecution');
cy.push('executionFinished', { executionId });
cy.wait('@getExecution');
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
}),
});
}

View file

@ -33,7 +33,7 @@ COPY docker/images/n8n/docker-entrypoint.sh /
# Setup the Task Runner Launcher
ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.3.0-rc
ARG LAUNCHER_VERSION=0.6.0-rc
COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json
# Download, verify, then extract the launcher binary
RUN \

View file

@ -24,7 +24,7 @@ RUN set -eux; \
# Setup the Task Runner Launcher
ARG TARGETPLATFORM
ARG LAUNCHER_VERSION=0.3.0-rc
ARG LAUNCHER_VERSION=0.6.0-rc
COPY n8n-task-runners.json /etc/n8n-task-runners.json
# Download, verify, then extract the launcher binary
RUN \

View file

@ -1,6 +1,6 @@
{
"name": "n8n-monorepo",
"version": "1.68.0",
"version": "1.70.0",
"private": true,
"engines": {
"node": ">=20.15",
@ -62,7 +62,7 @@
"ts-jest": "^29.1.1",
"tsc-alias": "^1.8.10",
"tsc-watch": "^6.2.0",
"turbo": "2.1.2",
"turbo": "2.3.3",
"typescript": "*",
"zx": "^8.1.4"
},
@ -79,7 +79,7 @@
"semver": "^7.5.4",
"tslib": "^2.6.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.6.2",
"typescript": "^5.7.2",
"vue-tsc": "^2.1.6",
"ws": ">=8.17.1"
},
@ -89,7 +89,8 @@
"pyodide@0.23.4": "patches/pyodide@0.23.4.patch",
"@types/express-serve-static-core@4.17.43": "patches/@types__express-serve-static-core@4.17.43.patch",
"@types/ws@8.5.4": "patches/@types__ws@8.5.4.patch",
"@types/uuencode@0.0.3": "patches/@types__uuencode@0.0.3.patch"
"@types/uuencode@0.0.3": "patches/@types__uuencode@0.0.3.patch",
"vue-tsc@2.1.6": "patches/vue-tsc@2.1.6.patch"
}
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/api-types",
"version": "0.6.0",
"version": "0.8.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -172,4 +172,5 @@ export interface FrontendSettings {
blockFileAccessToN8nFiles: boolean;
};
betaFeatures: FrontendBetaFeatures[];
virtualSchemaView: boolean;
}

View file

@ -1,4 +1,4 @@
import type { ITaskData, WorkflowExecuteMode } from 'n8n-workflow';
import type { ExecutionStatus, ITaskData, WorkflowExecuteMode } from 'n8n-workflow';
type ExecutionStarted = {
type: 'executionStarted';
@ -9,6 +9,7 @@ type ExecutionStarted = {
workflowId: string;
workflowName?: string;
retryOf?: string;
flattedRunData: string;
};
};
@ -23,6 +24,10 @@ type ExecutionFinished = {
type: 'executionFinished';
data: {
executionId: string;
workflowId: string;
status: ExecutionStatus;
/** @deprecated: Please construct execution data in the frontend from the data pushed in previous messages, instead of depending on this additional payload serialization */
rawData?: string;
};
};

View file

@ -38,12 +38,12 @@ const isSubmitting = ref(false);
const resizeObserver = ref<ResizeObserver | null>(null);
const isSubmitDisabled = computed(() => {
return input.value === '' || waitingForResponse.value || options.disabled?.value === true;
return input.value === '' || unref(waitingForResponse) || options.disabled?.value === true;
});
const isInputDisabled = computed(() => options.disabled?.value === true);
const isFileUploadDisabled = computed(
() => isFileUploadAllowed.value && waitingForResponse.value && !options.disabled?.value,
() => isFileUploadAllowed.value && unref(waitingForResponse) && !options.disabled?.value,
);
const isFileUploadAllowed = computed(() => unref(options.allowFileUploads) === true);
const allowedFileTypes = computed(() => unref(options.allowedFilesMimeTypes));
@ -194,10 +194,13 @@ function adjustHeight(event: Event) {
<template>
<div class="chat-input" :style="styleVars" @keydown.stop="onKeyDown">
<div class="chat-inputs">
<div v-if="$slots.leftPanel" class="chat-input-left-panel">
<slot name="leftPanel" />
</div>
<textarea
ref="chatTextArea"
data-test-id="chat-input"
v-model="input"
data-test-id="chat-input"
:disabled="isInputDisabled"
:placeholder="t(props.placeholder)"
@keydown.enter="onSubmitKeydown"
@ -251,7 +254,7 @@ function adjustHeight(event: Event) {
width: 100%;
display: flex;
justify-content: center;
align-items: center;
align-items: flex-end;
textarea {
font-family: inherit;
@ -259,8 +262,7 @@ function adjustHeight(event: Event) {
width: 100%;
border: var(--chat--input--border, 0);
border-radius: var(--chat--input--border-radius, 0);
padding: 0.8rem;
padding-right: calc(0.8rem + (var(--controls-count, 1) * var(--chat--textarea--height)));
padding: var(--chat--input--padding, 0.8rem);
min-height: var(--chat--textarea--height, 2.5rem); // Set a smaller initial height
max-height: var(--chat--textarea--max-height, 30rem);
height: var(--chat--textarea--height, 2.5rem); // Set initial height same as min-height
@ -271,6 +273,9 @@ function adjustHeight(event: Event) {
outline: none;
line-height: var(--chat--input--line-height, 1.5);
&::placeholder {
font-size: var(--chat--input--placeholder--font-size, var(--chat--input--font-size, inherit));
}
&:focus,
&:hover {
border-color: var(--chat--input--border-active, 0);
@ -279,9 +284,6 @@ function adjustHeight(event: Event) {
}
.chat-inputs-controls {
display: flex;
position: absolute;
right: 0.5rem;
bottom: 0;
}
.chat-input-send-button,
.chat-input-file-button {
@ -340,4 +342,9 @@ function adjustHeight(event: Event) {
gap: 0.5rem;
padding: var(--chat--files-spacing, 0.25rem);
}
.chat-input-left-panel {
width: var(--chat--input--left--panel--width, 2rem);
margin-left: 0.4rem;
}
</style>

View file

@ -136,7 +136,8 @@ onMounted(async () => {
font-size: var(--chat--message--font-size, 1rem);
padding: var(--chat--message--padding, var(--chat--spacing));
border-radius: var(--chat--message--border-radius, var(--chat--border-radius));
scroll-margin: 100px;
scroll-margin: 3rem;
.chat-message-actions {
position: absolute;
bottom: calc(100% - 0.5rem);
@ -151,9 +152,6 @@ onMounted(async () => {
left: auto;
right: 0;
}
&.chat-message-from-bot .chat-message-actions {
bottom: calc(100% - 1rem);
}
&:hover {
.chat-message-actions {

View file

@ -37,8 +37,7 @@ body {
4. Prevent font size adjustment after orientation changes (IE, iOS)
5. Prevent overflow from long words (all)
*/
font-size: 110%; /* 2 */
line-height: 1.6; /* 3 */
line-height: 1.4; /* 3 */
-webkit-text-size-adjust: 100%; /* 4 */
word-break: break-word; /* 5 */
@ -407,7 +406,7 @@ body {
h4,
h5,
h6 {
margin: 3.2rem 0 0.8em;
margin: 2rem 0 0.8em;
}
/*
@ -641,4 +640,15 @@ body {
body > a:first-child:focus {
top: 1rem;
}
// Lists
ul,
ol {
padding-left: 1.5rem;
margin-bottom: 1rem;
li {
margin-bottom: 0.5rem;
}
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/config",
"version": "1.18.0",
"version": "1.20.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",

View file

@ -43,10 +43,6 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
/** Should the output of deduplication be asserted for correctness */
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
assertDeduplicationOutput: boolean = false;
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;

View file

@ -231,7 +231,6 @@ describe('GlobalConfig', () => {
port: 5679,
maxOldSpaceSize: '',
maxConcurrency: 5,
assertDeduplicationOutput: false,
taskTimeout: 60,
heartbeatInterval: 30,
},

View file

@ -20,7 +20,7 @@
"dist/**/*"
],
"dependencies": {
"iconv-lite": "0.6.3",
"iconv-lite": "catalog:",
"imap": "0.8.19",
"quoted-printable": "1.0.1",
"utf8": "3.0.0",

View file

@ -258,7 +258,6 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
['system', `{system_message}${outputParser ? '\n\n{formatting_instructions}' : ''}`],
['placeholder', '{chat_history}'],
['human', '{input}'],
['placeholder', '{agent_scratchpad}'],
];
const hasBinaryData = this.getInputData(0, 'main')?.[0]?.binary !== undefined;
@ -266,6 +265,9 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
const binaryMessage = await extractBinaryMessages(this);
messages.push(binaryMessage);
}
// We add the agent scratchpad last, so that the agent will not run in loops
// by adding binary messages between each interaction
messages.push(['placeholder', '{agent_scratchpad}']);
const prompt = ChatPromptTemplate.fromMessages(messages);
const agent = createToolCallingAgent({

View file

@ -81,31 +81,20 @@ function getSandbox(
const workflowMode = this.getMode();
const context = getSandboxContext.call(this, itemIndex);
// eslint-disable-next-line @typescript-eslint/unbound-method
context.addInputData = this.addInputData;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.addOutputData = this.addOutputData;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getInputConnectionData = this.getInputConnectionData;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getInputData = this.getInputData;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getNode = this.getNode;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getExecutionCancelSignal = this.getExecutionCancelSignal;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getNodeOutputs = this.getNodeOutputs;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.executeWorkflow = this.executeWorkflow;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.getWorkflowDataProxy = this.getWorkflowDataProxy;
// eslint-disable-next-line @typescript-eslint/unbound-method
context.addInputData = this.addInputData.bind(this);
context.addOutputData = this.addOutputData.bind(this);
context.getInputConnectionData = this.getInputConnectionData.bind(this);
context.getInputData = this.getInputData.bind(this);
context.getNode = this.getNode.bind(this);
context.getExecutionCancelSignal = this.getExecutionCancelSignal.bind(this);
context.getNodeOutputs = this.getNodeOutputs.bind(this);
context.executeWorkflow = this.executeWorkflow.bind(this);
context.getWorkflowDataProxy = this.getWorkflowDataProxy.bind(this);
context.logger = this.logger;
if (options?.addItems) {
context.items = context.$input.all();
}
// eslint-disable-next-line @typescript-eslint/unbound-method
const sandbox = new JavaScriptSandbox(context, code, this.helpers, {
resolver: vmResolver,

View file

@ -94,7 +94,7 @@ export class DocumentGithubLoader implements INodeType {
};
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
console.log('Supplying data for Github Document Loader');
this.logger.debug('Supplying data for Github Document Loader');
const repository = this.getNodeParameter('repository', itemIndex) as string;
const branch = this.getNodeParameter('branch', itemIndex) as string;

View file

@ -36,7 +36,7 @@ export class ToolWorkflow implements INodeType {
name: 'toolWorkflow',
icon: 'fa:network-wired',
group: ['transform'],
version: [1, 1.1, 1.2],
version: [1, 1.1, 1.2, 1.3],
description: 'Uses another n8n workflow as a tool. Allows packaging any n8n node(s) as a tool.',
defaults: {
name: 'Call n8n Workflow Tool',
@ -200,6 +200,11 @@ export class ToolWorkflow implements INodeType {
hint: 'The field in the last-executed node of the workflow that contains the response',
description:
'Where to find the data that this tool should return. n8n will look in the output of the last-executed node of the workflow for a field with this name, and return its value.',
displayOptions: {
show: {
'@version': [{ _cnd: { lt: 1.3 } }],
},
},
},
{
displayName: 'Extra Workflow Inputs',
@ -376,19 +381,6 @@ export class ToolWorkflow implements INodeType {
runManager?: CallbackManagerForToolRun,
): Promise<string> => {
const source = this.getNodeParameter('source', itemIndex) as string;
const responsePropertyName = this.getNodeParameter(
'responsePropertyName',
itemIndex,
) as string;
if (!responsePropertyName) {
throw new NodeOperationError(this.getNode(), "Field to return can't be empty", {
itemIndex,
description:
'Enter the name of a field in the last node of the workflow that contains the response to return',
});
}
const workflowInfo: IExecuteWorkflowInfo = {};
if (source === 'database') {
// Read workflow from database
@ -467,17 +459,13 @@ export class ToolWorkflow implements INodeType {
throw new NodeOperationError(this.getNode(), error as Error);
}
const response: string | undefined = get(receivedData, [
'data',
0,
0,
'json',
responsePropertyName,
]) as string | undefined;
const response: string | undefined = get(receivedData, 'data[0][0].json') as
| string
| undefined;
if (response === undefined) {
throw new NodeOperationError(
this.getNode(),
`There was an error: "The workflow did not return an item with the property '${responsePropertyName}'"`,
'There was an error: "The workflow did not return a response"',
);
}
@ -531,12 +519,10 @@ export class ToolWorkflow implements INodeType {
if (executionError) {
void this.addOutputData(NodeConnectionType.AiTool, index, executionError, metadata);
} else {
void this.addOutputData(
NodeConnectionType.AiTool,
index,
[[{ json: { response } }]],
metadata,
);
// Output always needs to be an object
// so we try to parse the response as JSON and if it fails we just return the string wrapped in an object
const json = jsonParse<IDataObject>(response, { fallbackValue: { response } });
void this.addOutputData(NodeConnectionType.AiTool, index, [[{ json }]], metadata);
}
return response;
};

View file

@ -18,7 +18,7 @@ import {
} from 'n8n-workflow';
import { OpenAI as OpenAIClient } from 'openai';
import { promptTypeOptions, textFromPreviousNode } from '../../../../../utils/descriptions';
import { promptTypeOptions } from '../../../../../utils/descriptions';
import { getConnectedTools } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
import { formatToOpenAIAssistantTool } from '../../helpers/utils';
@ -30,16 +30,6 @@ const properties: INodeProperties[] = [
...promptTypeOptions,
name: 'prompt',
},
{
...textFromPreviousNode,
disabledOptions: { show: { prompt: ['auto'] } },
displayOptions: {
show: {
prompt: ['auto'],
'@version': [{ _cnd: { gte: 1.7 } }],
},
},
},
{
displayName: 'Text',
name: 'text',

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/n8n-nodes-langchain",
"version": "1.68.0",
"version": "1.70.0",
"description": "",
"main": "index.js",
"scripts": {

View file

@ -66,7 +66,7 @@ export const inputSchemaField: INodeProperties = {
};
export const promptTypeOptions: INodeProperties = {
displayName: 'Prompt Source',
displayName: 'Prompt Source (User Message)',
name: 'promptType',
type: 'options',
options: [

View file

@ -1,6 +1,6 @@
{
"name": "@n8n/task-runner",
"version": "1.6.0",
"version": "1.8.0",
"scripts": {
"clean": "rimraf dist .turbo",
"start": "node dist/start.js",

View file

@ -9,7 +9,7 @@ class HealthcheckServerConfig {
host: string = '127.0.0.1';
@Env('N8N_RUNNERS_SERVER_PORT')
port: number = 5680;
port: number = 5681;
}
@Config
@ -26,6 +26,14 @@ export class BaseRunnerConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;
/**
* How long (in seconds) a runner may be idle for before exit. Intended
* for use in `external` mode - launcher must pass the env var when launching
* the runner. Disabled with `0` on `internal` mode.
*/
@Env('N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT')
idleTimeout: number = 0;
@Nested
healthcheckServer!: HealthcheckServerConfig;
}

View file

@ -0,0 +1,91 @@
import { mock } from 'jest-mock-extended';
import type {
IExecuteData,
INode,
INodeExecutionData,
ITaskDataConnectionsSource,
} from 'n8n-workflow';
import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
import { DataRequestResponseReconstruct } from '../data-request-response-reconstruct';
describe('DataRequestResponseReconstruct', () => {
const reconstruct = new DataRequestResponseReconstruct();
describe('reconstructConnectionInputItems', () => {
it('should return all input items if no chunk is provided', () => {
const inputData: DataRequestResponse['inputData'] = {
main: [[{ json: { key: 'value' } }]],
};
const result = reconstruct.reconstructConnectionInputItems(inputData);
expect(result).toEqual([{ json: { key: 'value' } }]);
});
it('should reconstruct sparse array when chunk is provided', () => {
const inputData: DataRequestResponse['inputData'] = {
main: [[{ json: { key: 'chunked' } }]],
};
const chunk: InputDataChunkDefinition = { startIndex: 2, count: 1 };
const result = reconstruct.reconstructConnectionInputItems(inputData, chunk);
expect(result).toEqual([undefined, undefined, { json: { key: 'chunked' } }, undefined]);
});
it('should handle empty input data gracefully', () => {
const inputData: DataRequestResponse['inputData'] = { main: [[]] };
const chunk: InputDataChunkDefinition = { startIndex: 1, count: 1 };
const result = reconstruct.reconstructConnectionInputItems(inputData, chunk);
expect(result).toEqual([undefined]);
});
});
describe('reconstructExecuteData', () => {
it('should reconstruct execute data with the provided input items', () => {
const node = mock<INode>();
const connectionInputSource = mock<ITaskDataConnectionsSource>();
const response = mock<DataRequestResponse>({
inputData: { main: [[]] },
node,
connectionInputSource,
});
const inputItems: INodeExecutionData[] = [{ json: { key: 'reconstructed' } }];
const result = reconstruct.reconstructExecuteData(response, inputItems);
expect(result).toEqual<IExecuteData>({
data: {
main: [inputItems],
},
node: response.node,
source: response.connectionInputSource,
});
});
it('should handle empty input items gracefully', () => {
const node = mock<INode>();
const connectionInputSource = mock<ITaskDataConnectionsSource>();
const inputItems: INodeExecutionData[] = [];
const response = mock<DataRequestResponse>({
inputData: { main: [[{ json: { key: 'value' } }]] },
node,
connectionInputSource,
});
const result = reconstruct.reconstructExecuteData(response, inputItems);
expect(result).toEqual<IExecuteData>({
data: {
main: [inputItems],
},
node: response.node,
source: response.connectionInputSource,
});
});
});
});

View file

@ -1,6 +1,6 @@
import type { IExecuteData, INodeExecutionData } from 'n8n-workflow';
import type { IExecuteData, INodeExecutionData, ITaskDataConnections } from 'n8n-workflow';
import type { DataRequestResponse } from '@/runner-types';
import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
/**
* Reconstructs data from a DataRequestResponse to the initial
@ -8,20 +8,43 @@ import type { DataRequestResponse } from '@/runner-types';
*/
export class DataRequestResponseReconstruct {
/**
* Reconstructs `connectionInputData` from a DataRequestResponse
* Reconstructs `inputData` from a DataRequestResponse
*/
reconstructConnectionInputData(
reconstructConnectionInputItems(
inputData: DataRequestResponse['inputData'],
): INodeExecutionData[] {
return inputData?.main?.[0] ?? [];
chunk?: InputDataChunkDefinition,
): Array<INodeExecutionData | undefined> {
const inputItems = inputData?.main?.[0] ?? [];
if (!chunk) {
return inputItems;
}
// Only a chunk of the input items was requested. We reconstruct
// the array by filling in the missing items with `undefined`.
let sparseInputItems: Array<INodeExecutionData | undefined> = [];
sparseInputItems = sparseInputItems
.concat(Array.from({ length: chunk.startIndex }))
.concat(inputItems)
.concat(Array.from({ length: inputItems.length - chunk.startIndex - chunk.count }));
return sparseInputItems;
}
/**
* Reconstruct `executeData` from a DataRequestResponse
*/
reconstructExecuteData(response: DataRequestResponse): IExecuteData {
reconstructExecuteData(
response: DataRequestResponse,
inputItems: INodeExecutionData[],
): IExecuteData {
const inputData: ITaskDataConnections = {
...response.inputData,
main: [inputItems],
};
return {
data: response.inputData,
data: inputData,
node: response.node,
source: response.connectionInputSource,
};

View file

@ -3,13 +3,14 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
import { builtinModules } from 'node:module';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
import type { JsRunnerConfig } from '@/config/js-runner-config';
import { MainConfig } from '@/config/main-config';
import { ExecutionError } from '@/js-task-runner/errors/execution-error';
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import { JsTaskRunner } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
import type { Task } from '@/task-runner';
import {
@ -24,17 +25,21 @@ jest.mock('ws');
const defaultConfig = new MainConfig();
describe('JsTaskRunner', () => {
const createRunnerWithOpts = (opts: Partial<JsRunnerConfig> = {}) =>
const createRunnerWithOpts = (
jsRunnerOpts: Partial<JsRunnerConfig> = {},
baseRunnerOpts: Partial<BaseRunnerConfig> = {},
) =>
new JsTaskRunner({
baseRunnerConfig: {
...defaultConfig.baseRunnerConfig,
grantToken: 'grantToken',
maxConcurrency: 1,
n8nUri: 'localhost',
...baseRunnerOpts,
},
jsRunnerConfig: {
...defaultConfig.jsRunnerConfig,
...opts,
...jsRunnerOpts,
},
sentryConfig: {
sentryDsn: '',
@ -90,17 +95,19 @@ describe('JsTaskRunner', () => {
inputItems,
settings,
runner,
chunk,
}: {
code: string;
inputItems: IDataObject[];
settings?: Partial<JSExecSettings>;
runner?: JsTaskRunner;
chunk?: InputDataChunkDefinition;
}) => {
return await execTaskWithParams({
task: newTaskWithSettings({
code,
nodeMode: 'runOnceForEachItem',
chunk,
...settings,
}),
taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
@ -504,6 +511,28 @@ describe('JsTaskRunner', () => {
);
});
describe('chunked execution', () => {
it('should use correct index for each item', async () => {
const outcome = await executeForEachItem({
code: 'return { ...$json, idx: $itemIndex }',
inputItems: [{ a: 1 }, { b: 2 }, { c: 3 }],
chunk: {
startIndex: 100,
count: 3,
},
});
expect(outcome).toEqual({
result: [
withPairedItem(100, wrapIntoJson({ a: 1, idx: 100 })),
withPairedItem(101, wrapIntoJson({ b: 2, idx: 101 })),
withPairedItem(102, wrapIntoJson({ c: 3, idx: 102 })),
],
customData: undefined,
});
});
});
it('should return static items', async () => {
const outcome = await executeForEachItem({
code: 'return {json: {b: 1}}',
@ -796,7 +825,6 @@ describe('JsTaskRunner', () => {
code: 'unknown; return []',
nodeMode: 'runOnceForAllItems',
continueOnFail: false,
mode: 'manual',
workflowMode: 'manual',
});
runner.runningTasks.set(taskId, task);
@ -825,4 +853,100 @@ describe('JsTaskRunner', () => {
});
});
});
describe('idle timeout', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should set idle timer when instantiated', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should reset idle timer when accepting a task', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const offerId = 'offer123';
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
runner.openOffers.set(offerId, {
offerId,
validUntil: process.hrtime.bigint() + BigInt(idleTimeout * 1000 * 1_000_000),
});
runner.offerAccepted(offerId, taskId);
jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset
runner.runningTasks.clear();
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should reset idle timer when finishing a task', async () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] });
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
await runner.receivedSettings(taskId, {});
jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should never reach idle timeout if idle timeout is set to 0', () => {
const runner = createRunnerWithOpts({}, { idleTimeout: 0 });
const emitSpy = jest.spyOn(runner, 'emit');
jest.advanceTimersByTime(999999);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});
it('should not reach idle timeout if there are running tasks', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});
jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});
});
});

View file

@ -0,0 +1,110 @@
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import {
validateRunForAllItemsOutput,
validateRunForEachItemOutput,
} from '@/js-task-runner/result-validation';
describe('result validation', () => {
describe('validateRunForAllItemsOutput', () => {
it('should throw an error if the output is not an object', () => {
expect(() => {
validateRunForAllItemsOutput(undefined);
}).toThrowError(ValidationError);
});
it('should throw an error if the output is an array and at least one item has a non-n8n key', () => {
expect(() => {
validateRunForAllItemsOutput([{ json: {} }, { json: {}, unknownKey: {} }]);
}).toThrowError(ValidationError);
});
it('should not throw an error if the output is an array and all items are json wrapped', () => {
expect(() => {
validateRunForAllItemsOutput([{ json: {} }, { json: {} }, { json: {} }]);
}).not.toThrow();
});
test.each([
['binary', {}],
['pairedItem', {}],
['error', {}],
])(
'should not throw an error if the output item has %s key in addition to json',
(key, value) => {
expect(() => {
validateRunForAllItemsOutput([{ json: {} }, { json: {}, [key]: value }]);
}).not.toThrow();
},
);
it('should not throw an error if the output is an array and all items are not json wrapped', () => {
expect(() => {
validateRunForAllItemsOutput([
{
id: 1,
name: 'test3',
},
{
id: 2,
name: 'test4',
},
{
id: 3,
name: 'test5',
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
] as any);
}).not.toThrow();
});
it('should throw if json is not an object', () => {
expect(() => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
validateRunForAllItemsOutput([{ json: 1 } as any]);
}).toThrowError(ValidationError);
});
});
describe('validateRunForEachItemOutput', () => {
const index = 0;
it('should throw an error if the output is not an object', () => {
expect(() => {
validateRunForEachItemOutput(undefined, index);
}).toThrowError(ValidationError);
});
it('should throw an error if the output is an array', () => {
expect(() => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
validateRunForEachItemOutput([] as any, index);
}).toThrowError(ValidationError);
});
it('should throw if json is not an object', () => {
expect(() => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
validateRunForEachItemOutput({ json: 1 } as any, index);
}).toThrowError(ValidationError);
});
it('should throw an error if the output is an array and at least one item has a non-n8n key', () => {
expect(() => {
validateRunForEachItemOutput({ json: {}, unknownKey: {} }, index);
}).toThrowError(ValidationError);
});
test.each([
['binary', {}],
['pairedItem', {}],
['error', {}],
])(
'should not throw an error if the output item has %s key in addition to json',
(key, value) => {
expect(() => {
validateRunForEachItemOutput({ json: {}, [key]: value }, index);
}).not.toThrow();
},
);
});
});

View file

@ -16,7 +16,6 @@ export const newTaskWithSettings = (
settings: {
workflowMode: 'manual',
continueOnFail: false,
mode: 'manual',
...settings,
},
active: true,

View file

@ -1,14 +1,17 @@
import { BuiltInsParserState } from '../built-ins-parser-state';
describe('BuiltInsParserState', () => {
describe('toDataRequestSpecification', () => {
describe('toDataRequestParams', () => {
it('should return empty array when no properties are marked as needed', () => {
const state = new BuiltInsParserState();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: false,
input: {
chunk: undefined,
include: false,
},
prevNode: false,
});
});
@ -20,7 +23,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: false,
input: true,
input: {
chunk: undefined,
include: true,
},
prevNode: false,
});
});
@ -33,7 +39,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: ['Node1', 'Node2'],
env: false,
input: false,
input: {
chunk: undefined,
include: false,
},
prevNode: false,
});
});
@ -47,7 +56,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: false,
input: true,
input: {
chunk: undefined,
include: true,
},
prevNode: false,
});
});
@ -59,7 +71,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: true,
input: false,
input: {
chunk: undefined,
include: false,
},
prevNode: false,
});
});
@ -71,7 +86,33 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: true,
input: {
chunk: undefined,
include: true,
},
prevNode: false,
});
});
it('should use the given chunk', () => {
const state = new BuiltInsParserState();
state.markInputAsNeeded();
expect(
state.toDataRequestParams({
count: 10,
startIndex: 5,
}),
).toEqual({
dataOfNodes: [],
env: false,
input: {
chunk: {
count: 10,
startIndex: 5,
},
include: true,
},
prevNode: false,
});
});
@ -83,7 +124,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: false,
input: {
chunk: undefined,
include: false,
},
prevNode: true,
});
});
@ -98,7 +142,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: true,
input: true,
input: {
chunk: undefined,
include: true,
},
prevNode: true,
});
});
@ -109,7 +156,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: true,
input: true,
input: {
chunk: undefined,
include: true,
},
prevNode: true,
});
});

View file

@ -1,4 +1,5 @@
import type { BrokerMessage } from '@/message-types';
import type { InputDataChunkDefinition } from '@/runner-types';
/**
* Class to keep track of which built-in variables are accessed in the code
@ -53,11 +54,16 @@ export class BuiltInsParserState {
this.needs$prevNode = true;
}
toDataRequestParams(): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] {
toDataRequestParams(
chunk?: InputDataChunkDefinition,
): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] {
return {
dataOfNodes: this.needsAllNodes ? 'all' : Array.from(this.neededNodeNames),
env: this.needs$env,
input: this.needs$input,
input: {
include: this.needs$input,
chunk,
},
prevNode: this.needs$prevNode,
};
}

View file

@ -19,7 +19,12 @@ import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
import type { MainConfig } from '@/config/main-config';
import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types';
import type {
DataRequestResponse,
InputDataChunkDefinition,
PartialAdditionalData,
TaskResultData,
} from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
@ -37,9 +42,8 @@ export interface JSExecSettings {
nodeMode: CodeExecutionMode;
workflowMode: WorkflowExecuteMode;
continueOnFail: boolean;
// For workflow data proxy
mode: WorkflowExecuteMode;
// For executing partial input data
chunk?: InputDataChunkDefinition;
}
export interface JsTaskData {
@ -94,6 +98,8 @@ export class JsTaskRunner extends TaskRunner {
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');
this.validateTaskSettings(settings);
const neededBuiltInsResult = this.builtInsParser.parseUsedBuiltIns(settings.code);
const neededBuiltIns = neededBuiltInsResult.ok
? neededBuiltInsResult.result
@ -101,10 +107,10 @@ export class JsTaskRunner extends TaskRunner {
const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId,
neededBuiltIns.toDataRequestParams(),
neededBuiltIns.toDataRequestParams(settings.chunk),
);
const data = this.reconstructTaskData(dataResponse);
const data = this.reconstructTaskData(dataResponse, settings.chunk);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
@ -136,6 +142,14 @@ export class JsTaskRunner extends TaskRunner {
};
}
private validateTaskSettings(settings: JSExecSettings) {
a.ok(settings.code, 'No code to execute');
if (settings.nodeMode === 'runOnceForAllItems') {
a.ok(settings.chunk === undefined, 'Chunking is not supported for runOnceForAllItems');
}
}
private getNativeVariables() {
return {
// Exposed Node.js globals in vm2
@ -220,7 +234,13 @@ export class JsTaskRunner extends TaskRunner {
const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = [];
for (let index = 0; index < inputItems.length; index++) {
// If a chunk was requested, only process the items in the chunk
const chunkStartIdx = settings.chunk ? settings.chunk.startIndex : 0;
const chunkEndIdx = settings.chunk
? settings.chunk.startIndex + settings.chunk.count
: inputItems.length;
for (let index = chunkStartIdx; index < chunkEndIdx; index++) {
const item = inputItems[index];
const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = {
@ -325,13 +345,24 @@ export class JsTaskRunner extends TaskRunner {
return new ExecutionError({ message: JSON.stringify(error) });
}
private reconstructTaskData(response: DataRequestResponse): JsTaskData {
private reconstructTaskData(
response: DataRequestResponse,
chunk?: InputDataChunkDefinition,
): JsTaskData {
const inputData = this.taskDataReconstruct.reconstructConnectionInputItems(
response.inputData,
chunk,
// This type assertion is intentional. Chunking is only supported in
// runOnceForEachItem mode and if a chunk was requested, we intentionally
// fill the array with undefined values for the items outside the chunk.
// We only iterate over the chunk items but WorkflowDataProxy expects
// the full array of items.
) as INodeExecutionData[];
return {
...response,
connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData(
response.inputData,
),
executeData: this.taskDataReconstruct.reconstructExecuteData(response),
connectionInputData: inputData,
executeData: this.taskDataReconstruct.reconstructExecuteData(response, inputData),
};
}

View file

@ -9,7 +9,7 @@ export const REQUIRED_N8N_ITEM_KEYS = new Set(['json', 'binary', 'pairedItem', '
function validateTopLevelKeys(item: INodeExecutionData, itemIndex: number) {
for (const key in item) {
if (Object.prototype.hasOwnProperty.call(item, key)) {
if (REQUIRED_N8N_ITEM_KEYS.has(key)) return;
if (REQUIRED_N8N_ITEM_KEYS.has(key)) continue;
throw new ValidationError({
message: `Unknown top-level item key: ${key}`,

View file

@ -15,6 +15,18 @@ import type {
WorkflowParameters,
} from 'n8n-workflow';
export interface InputDataChunkDefinition {
startIndex: number;
count: number;
}
export interface InputDataRequestParams {
/** Whether to include the input data in the response */
include: boolean;
/** Optionally request only a specific chunk of data instead of all input data */
chunk?: InputDataChunkDefinition;
}
/**
* Specifies what data should be included for a task data request.
*/
@ -22,7 +34,7 @@ export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all';
prevNode: boolean;
/** Whether input data for the node should be included */
input: boolean;
input: InputDataRequestParams;
/** Whether env provider's state should be included */
env: boolean;
}

View file

@ -51,6 +51,9 @@ void (async function start() {
}
runner = new JsTaskRunner(config);
runner.on('runner:reached-idle-timeout', () => {
void createSignalHandler('IDLE_TIMEOUT')();
});
const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;

View file

@ -1,5 +1,6 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { ApplicationError, ensureError, randomInt } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws';
import type { BaseRunnerConfig } from '@/config/base-runner-config';
@ -41,15 +42,18 @@ export interface RPCCallObject {
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject;
}
const VALID_TIME_MS = 1000;
const VALID_EXTRA_MS = 100;
const OFFER_VALID_TIME_MS = 5000;
const OFFER_VALID_EXTRA_MS = 100;
/** Converts milliseconds to nanoseconds */
const msToNs = (ms: number) => BigInt(ms * 1_000_000);
export interface TaskRunnerOpts extends BaseRunnerConfig {
taskType: string;
name?: string;
}
export abstract class TaskRunner {
export abstract class TaskRunner extends EventEmitter {
id: string = nanoid();
ws: WebSocket;
@ -76,10 +80,17 @@ export abstract class TaskRunner {
name: string;
private idleTimer: NodeJS.Timeout | undefined;
/** How long (in seconds) a runner may be idle for before exit. */
private readonly idleTimeout: number;
constructor(opts: TaskRunnerOpts) {
super();
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
this.idleTimeout = opts.idleTimeout;
const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`;
this.ws = new WebSocket(wsUrl, {
@ -108,6 +119,17 @@ export abstract class TaskRunner {
});
this.ws.addEventListener('message', this.receiveMessage);
this.ws.addEventListener('close', this.stopTaskOffers);
this.resetIdleTimer();
}
private resetIdleTimer() {
if (this.idleTimeout === 0) return;
this.clearIdleTimer();
this.idleTimer = setTimeout(() => {
if (this.runningTasks.size === 0) this.emit('runner:reached-idle-timeout');
}, this.idleTimeout * 1000);
}
private receiveMessage = (message: MessageEvent) => {
@ -148,16 +170,20 @@ export abstract class TaskRunner {
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
for (let i = 0; i < offersToSend; i++) {
// Add a bit of randomness so that not all offers expire at the same time
const validForInMs = OFFER_VALID_TIME_MS + randomInt(500);
// Add a little extra time to account for latency
const validUntil = process.hrtime.bigint() + msToNs(validForInMs + OFFER_VALID_EXTRA_MS);
const offer: TaskOffer = {
offerId: nanoid(),
validUntil: process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency
validUntil,
};
this.openOffers.set(offer.offerId, offer);
this.send({
type: 'runner:taskoffer',
taskType: this.taskType,
offerId: offer.offerId,
validFor: VALID_TIME_MS,
validFor: validForInMs,
});
}
}
@ -244,6 +270,7 @@ export abstract class TaskRunner {
this.openOffers.delete(offerId);
}
this.resetIdleTimer();
this.runningTasks.set(taskId, {
taskId,
active: false,
@ -306,6 +333,8 @@ export abstract class TaskRunner {
this.taskDone(taskId, data);
} catch (error) {
this.taskErrored(taskId, error);
} finally {
this.resetIdleTimer();
}
}
@ -432,6 +461,8 @@ export abstract class TaskRunner {
/** Close the connection gracefully and wait until has been closed */
async stop() {
this.clearIdleTimer();
this.stopTaskOffers();
await this.waitUntilAllTasksAreDone();
@ -439,6 +470,11 @@ export abstract class TaskRunner {
await this.closeConnection();
}
clearIdleTimer() {
if (this.idleTimer) clearTimeout(this.idleTimer);
this.idleTimer = undefined;
}
private async closeConnection() {
// 1000 is the standard close code
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5

View file

@ -1,6 +1,6 @@
{
"name": "n8n",
"version": "1.68.0",
"version": "1.70.0",
"description": "n8n Workflow Automation Tool",
"main": "dist/index",
"types": "dist/index.d.ts",

View file

@ -1,33 +1,46 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { Project } from '@/databases/entities/project';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { OrchestrationService } from '@/services/orchestration.service';
import type { OwnershipService } from '@/services/ownership.service';
import { WaitTracker } from '@/wait-tracker';
import type { WorkflowRunner } from '@/workflow-runner';
import { mockLogger } from '@test/mocking';
jest.useFakeTimers();
describe('WaitTracker', () => {
const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock());
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const project = mock<Project>({ id: 'projectId' });
const execution = mock<IExecutionResponse>({
id: '123',
finished: false,
waitTill: new Date(Date.now() + 1000),
mode: 'manual',
data: mock({
pushRef: 'push_ref',
}),
});
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
let waitTracker: WaitTracker;
beforeEach(() => {
waitTracker = new WaitTracker(
mockLogger(),
executionRepository,
mock(),
mock(),
ownershipService,
workflowRunner,
orchestrationService,
instanceSettings,
);
@ -64,29 +77,31 @@ describe('WaitTracker', () => {
});
describe('if execution to start', () => {
it('if not enough time passed, should not start execution', async () => {
let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>;
beforeEach(() => {
executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
startExecutionSpy = jest
.spyOn(waitTracker, 'startExecution')
.mockImplementation(async () => {});
waitTracker.init();
});
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
it('if not enough time passed, should not start execution', async () => {
await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(100);
expect(startExecutionSpy).not.toHaveBeenCalled();
});
it('if enough time passed, should start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init();
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution');
jest.advanceTimersByTime(2_000);
expect(startExecutionSpy).toHaveBeenCalledWith(execution.id);
@ -100,13 +115,27 @@ describe('WaitTracker', () => {
waitTracker.init();
executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.startExecution(execution.id);
jest.advanceTimersByTime(5);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
await waitTracker.startExecution(execution.id);
expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
includeData: true,
unflattenData: true,
});
expect(workflowRunner.run).toHaveBeenCalledWith(
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);
});
});
@ -135,8 +164,8 @@ describe('WaitTracker', () => {
const waitTracker = new WaitTracker(
mockLogger(),
executionRepository,
mock(),
mock(),
ownershipService,
workflowRunner,
orchestrationService,
mock<InstanceSettings>({ isLeader: false }),
);

View file

@ -21,6 +21,7 @@ import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import * as Db from '@/db';
import { getDataDeduplicationService } from '@/deduplication';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { initErrorHandling } from '@/error-reporting';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
@ -88,33 +89,14 @@ export abstract class BaseCommand extends Command {
await this.exitWithCrash('There was an error running database migrations', error),
);
const { type: dbType } = this.globalConfig.database;
if (['mysqldb', 'mariadb'].includes(dbType)) {
this.logger.warn(
'Support for MySQL/MariaDB has been deprecated and will be removed with an upcoming version of n8n. Please migrate to PostgreSQL.',
);
}
if (process.env.N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN) {
this.logger.warn(
'The flag to skip webhook deregistration N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN has been removed. n8n no longer deregisters webhooks at startup and shutdown, in main and queue mode.',
);
}
if (config.getEnv('executions.mode') === 'queue' && dbType === 'sqlite') {
this.logger.warn(
'Queue mode is not officially supported with sqlite. Please switch to PostgreSQL.',
);
}
Container.get(DeprecationService).warn();
if (
process.env.N8N_BINARY_DATA_TTL ??
process.env.N8N_PERSISTED_BINARY_DATA_TTL ??
process.env.EXECUTIONS_DATA_PRUNE_TIMEOUT
config.getEnv('executions.mode') === 'queue' &&
this.globalConfig.database.type === 'sqlite'
) {
this.logger.warn(
'The env vars N8N_BINARY_DATA_TTL and N8N_PERSISTED_BINARY_DATA_TTL and EXECUTIONS_DATA_PRUNE_TIMEOUT no longer have any effect and can be safely removed. Instead of relying on a TTL system for binary data, n8n currently cleans up binary data together with executions during pruning.',
'Scaling mode is not officially supported with sqlite. Please use PostgreSQL instead.',
);
}

View file

@ -4,7 +4,7 @@ import fs from 'fs';
import { diff } from 'json-diff';
import pick from 'lodash/pick';
import type { IRun, ITaskData, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { ApplicationError, jsonParse, ErrorReporterProxy } from 'n8n-workflow';
import os from 'os';
import { sep } from 'path';
import { Container } from 'typedi';
@ -822,6 +822,11 @@ export class ExecuteBatch extends BaseCommand {
}
}
} catch (e) {
ErrorReporterProxy.error(e, {
extra: {
workflowId: workflowData.id,
},
});
executionResult.error = `Workflow failed to execute: ${(e as Error).message}`;
executionResult.executionStatus = 'error';
}

View file

@ -405,4 +405,11 @@ export const schema = {
doc: 'Set this to 1 to enable the new partial execution logic by default.',
},
},
virtualSchemaView: {
doc: 'Whether to display the virtualized schema view',
format: Boolean,
default: false,
env: 'N8N_VIRTUAL_SCHEMA_VIEW',
},
};

View file

@ -164,3 +164,13 @@ export const LOWEST_SHUTDOWN_PRIORITY = 0;
export const DEFAULT_SHUTDOWN_PRIORITY = 100;
/** Highest priority, meaning shut down happens before all other groups */
export const HIGHEST_SHUTDOWN_PRIORITY = 200;
export const WsStatusCodes = {
CloseNormal: 1000,
CloseGoingAway: 1001,
CloseProtocolError: 1002,
CloseUnsupportedData: 1003,
CloseNoStatus: 1005,
CloseAbnormal: 1006,
CloseInvalidData: 1007,
} as const;

View file

@ -4,7 +4,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-return */
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import { Credentials, NodeExecuteFunctions } from 'n8n-core';
import { Credentials, getAdditionalKeys } from 'n8n-core';
import type {
ICredentialDataDecryptedObject,
ICredentialsExpressionResolveValues,
@ -379,7 +379,7 @@ export class CredentialsHelper extends ICredentialsHelper {
decryptedData.oauthTokenData = decryptedDataOriginal.oauthTokenData;
}
const additionalKeys = NodeExecuteFunctions.getAdditionalKeys(additionalData, mode, null, {
const additionalKeys = getAdditionalKeys(additionalData, mode, null, {
secretsEnabled: canUseSecrets,
});

View file

@ -21,6 +21,8 @@ import { SharedCredentials } from './shared-credentials';
import { SharedWorkflow } from './shared-workflow';
import { TagEntity } from './tag-entity';
import { TestDefinition } from './test-definition.ee';
import { TestMetric } from './test-metric.ee';
import { TestRun } from './test-run.ee';
import { User } from './user';
import { Variables } from './variables';
import { WebhookEntity } from './webhook-entity';
@ -60,4 +62,6 @@ export const entities = {
ApiKey,
ProcessedData,
TestDefinition,
TestMetric,
TestRun,
};

View file

@ -1,7 +1,8 @@
import { Column, Entity, Index, ManyToOne, RelationId } from '@n8n/typeorm';
import { Column, Entity, Index, ManyToOne, OneToMany, RelationId } from '@n8n/typeorm';
import { Length } from 'class-validator';
import { AnnotationTagEntity } from '@/databases/entities/annotation-tag-entity.ee';
import type { TestMetric } from '@/databases/entities/test-metric.ee';
import { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { WithTimestampsAndStringId } from './abstract-entity';
@ -53,4 +54,7 @@ export class TestDefinition extends WithTimestampsAndStringId {
@RelationId((test: TestDefinition) => test.annotationTag)
annotationTagId: string;
@OneToMany('TestMetric', 'testDefinition')
metrics: TestMetric[];
}

View file

@ -0,0 +1,29 @@
import { Column, Entity, Index, ManyToOne } from '@n8n/typeorm';
import { Length } from 'class-validator';
import { WithTimestampsAndStringId } from '@/databases/entities/abstract-entity';
import { TestDefinition } from '@/databases/entities/test-definition.ee';
/**
* Entity representing a Test Metric
* It represents a single metric that can be retrieved from evaluation workflow execution result
*/
@Entity()
@Index(['testDefinition'])
export class TestMetric extends WithTimestampsAndStringId {
/**
* Name of the metric.
* This will be used as a property name to extract metric value from the evaluation workflow execution result object
*/
@Column({ length: 255 })
@Length(1, 255, {
message: 'Metric name must be $constraint1 to $constraint2 characters long.',
})
name: string;
/**
* Relation to test definition
*/
@ManyToOne('TestDefinition', 'metrics')
testDefinition: TestDefinition;
}

View file

@ -0,0 +1,38 @@
import { Column, Entity, Index, ManyToOne, RelationId } from '@n8n/typeorm';
import {
datetimeColumnType,
jsonColumnType,
WithTimestampsAndStringId,
} from '@/databases/entities/abstract-entity';
import { TestDefinition } from '@/databases/entities/test-definition.ee';
type TestRunStatus = 'new' | 'running' | 'completed' | 'error';
export type AggregatedTestRunMetrics = Record<string, number | boolean>;
/**
* Entity representing a Test Run.
* It stores info about a specific run of a test, combining the test definition with the status and collected metrics
*/
@Entity()
@Index(['testDefinition'])
export class TestRun extends WithTimestampsAndStringId {
@ManyToOne('TestDefinition', 'runs')
testDefinition: TestDefinition;
@RelationId((testRun: TestRun) => testRun.testDefinition)
testDefinitionId: string;
@Column('varchar')
status: TestRunStatus;
@Column({ type: datetimeColumnType, nullable: true })
runAt: Date | null;
@Column({ type: datetimeColumnType, nullable: true })
completedAt: Date | null;
@Column(jsonColumnType, { nullable: true })
metrics: AggregatedTestRunMetrics;
}

View file

@ -0,0 +1,24 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
const testMetricEntityTableName = 'test_metric';
export class CreateTestMetricTable1732271325258 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, column } }: MigrationContext) {
await createTable(testMetricEntityTableName)
.withColumns(
column('id').varchar(36).primary.notNull,
column('name').varchar(255).notNull,
column('testDefinitionId').varchar(36).notNull,
)
.withIndexOn('testDefinitionId')
.withForeignKey('testDefinitionId', {
tableName: 'test_definition',
columnName: 'id',
onDelete: 'CASCADE',
}).withTimestamps;
}
async down({ schemaBuilder: { dropTable } }: MigrationContext) {
await dropTable(testMetricEntityTableName);
}
}

View file

@ -0,0 +1,27 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
const testRunTableName = 'test_run';
export class CreateTestRun1732549866705 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, column } }: MigrationContext) {
await createTable(testRunTableName)
.withColumns(
column('id').varchar(36).primary.notNull,
column('testDefinitionId').varchar(36).notNull,
column('status').varchar().notNull,
column('runAt').timestamp(),
column('completedAt').timestamp(),
column('metrics').json,
)
.withIndexOn('testDefinitionId')
.withForeignKey('testDefinitionId', {
tableName: 'test_definition',
columnName: 'id',
onDelete: 'CASCADE',
}).withTimestamps;
}
async down({ schemaBuilder: { dropTable } }: MigrationContext) {
await dropTable(testRunTableName);
}
}

View file

@ -71,6 +71,8 @@ import { AddMissingPrimaryKeyOnAnnotationTagMapping1728659839644 } from '../comm
import { UpdateProcessedDataValueColumnToText1729607673464 } from '../common/1729607673464-UpdateProcessedDataValueColumnToText';
import { CreateTestDefinitionTable1730386903556 } from '../common/1730386903556-CreateTestDefinitionTable';
import { AddDescriptionToTestDefinition1731404028106 } from '../common/1731404028106-AddDescriptionToTestDefinition';
import { CreateTestMetricTable1732271325258 } from '../common/1732271325258-CreateTestMetricTable';
import { CreateTestRun1732549866705 } from '../common/1732549866705-CreateTestRunTable';
export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
@ -144,4 +146,6 @@ export const mysqlMigrations: Migration[] = [
CreateTestDefinitionTable1730386903556,
AddDescriptionToTestDefinition1731404028106,
MigrateTestDefinitionKeyToString1731582748663,
CreateTestMetricTable1732271325258,
CreateTestRun1732549866705,
];

View file

@ -71,6 +71,8 @@ import { AddMissingPrimaryKeyOnAnnotationTagMapping1728659839644 } from '../comm
import { UpdateProcessedDataValueColumnToText1729607673464 } from '../common/1729607673464-UpdateProcessedDataValueColumnToText';
import { CreateTestDefinitionTable1730386903556 } from '../common/1730386903556-CreateTestDefinitionTable';
import { AddDescriptionToTestDefinition1731404028106 } from '../common/1731404028106-AddDescriptionToTestDefinition';
import { CreateTestMetricTable1732271325258 } from '../common/1732271325258-CreateTestMetricTable';
import { CreateTestRun1732549866705 } from '../common/1732549866705-CreateTestRunTable';
export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
@ -144,4 +146,6 @@ export const postgresMigrations: Migration[] = [
CreateTestDefinitionTable1730386903556,
AddDescriptionToTestDefinition1731404028106,
MigrateTestDefinitionKeyToString1731582748663,
CreateTestMetricTable1732271325258,
CreateTestRun1732549866705,
];

View file

@ -68,6 +68,8 @@ import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-C
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
import { UpdateProcessedDataValueColumnToText1729607673464 } from '../common/1729607673464-UpdateProcessedDataValueColumnToText';
import { CreateTestDefinitionTable1730386903556 } from '../common/1730386903556-CreateTestDefinitionTable';
import { CreateTestMetricTable1732271325258 } from '../common/1732271325258-CreateTestMetricTable';
import { CreateTestRun1732549866705 } from '../common/1732549866705-CreateTestRunTable';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@ -138,6 +140,8 @@ const sqliteMigrations: Migration[] = [
CreateTestDefinitionTable1730386903556,
AddDescriptionToTestDefinition1731404028106,
MigrateTestDefinitionKeyToString1731582748663,
CreateTestMetricTable1732271325258,
CreateTestRun1732549866705,
];
export { sqliteMigrations };

View file

@ -163,7 +163,13 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
if (!queryParams.relations) {
queryParams.relations = [];
}
(queryParams.relations as string[]).push('executionData', 'metadata');
if (Array.isArray(queryParams.relations)) {
queryParams.relations.push('executionData', 'metadata');
} else {
queryParams.relations.executionData = true;
queryParams.relations.metadata = true;
}
}
const executions = await this.find(queryParams);
@ -981,7 +987,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
if (projectId) {
qb.innerJoin(WorkflowEntity, 'w', 'w.id = execution.workflowId')
.innerJoin(SharedWorkflow, 'sw', 'sw.workflowId = w.id')
.where('sw.projectId = :projectId', { projectId });
.andWhere('sw.projectId = :projectId', { projectId });
}
return qb;

View file

@ -3,6 +3,7 @@ import { DataSource, In, Repository } from '@n8n/typeorm';
import { Service } from 'typedi';
import { TestDefinition } from '@/databases/entities/test-definition.ee';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import type { ListQuery } from '@/requests';
@Service()
@ -14,12 +15,21 @@ export class TestDefinitionRepository extends Repository<TestDefinition> {
async getMany(accessibleWorkflowIds: string[], options?: ListQuery.Options) {
if (accessibleWorkflowIds.length === 0) return { tests: [], count: 0 };
const where: FindOptionsWhere<TestDefinition> = {
...options?.filter,
workflow: {
const where: FindOptionsWhere<TestDefinition> = {};
if (options?.filter?.workflowId) {
if (!accessibleWorkflowIds.includes(options.filter.workflowId as string)) {
throw new ForbiddenError('User does not have access to the workflow');
}
where.workflow = {
id: options.filter.workflowId as string,
};
} else {
where.workflow = {
id: In(accessibleWorkflowIds),
},
};
};
}
const findManyOptions: FindManyOptions<TestDefinition> = {
where,
@ -45,7 +55,7 @@ export class TestDefinitionRepository extends Repository<TestDefinition> {
id: In(accessibleWorkflowIds),
},
},
relations: ['annotationTag'],
relations: ['annotationTag', 'metrics'],
});
}

View file

@ -0,0 +1,11 @@
import { DataSource, Repository } from '@n8n/typeorm';
import { Service } from 'typedi';
import { TestMetric } from '@/databases/entities/test-metric.ee';
@Service()
export class TestMetricRepository extends Repository<TestMetric> {
constructor(dataSource: DataSource) {
super(TestMetric, dataSource.manager);
}
}

View file

@ -0,0 +1,29 @@
import { DataSource, Repository } from '@n8n/typeorm';
import { Service } from 'typedi';
import type { AggregatedTestRunMetrics } from '@/databases/entities/test-run.ee';
import { TestRun } from '@/databases/entities/test-run.ee';
@Service()
export class TestRunRepository extends Repository<TestRun> {
constructor(dataSource: DataSource) {
super(TestRun, dataSource.manager);
}
public async createTestRun(testDefinitionId: string) {
const testRun = this.create({
status: 'new',
testDefinition: { id: testDefinitionId },
});
return await this.save(testRun);
}
public async markAsRunning(id: string) {
return await this.update(id, { status: 'running', runAt: new Date() });
}
public async markAsCompleted(id: string, metrics: AggregatedTestRunMetrics) {
return await this.update(id, { status: 'completed', completedAt: new Date(), metrics });
}
}

View file

@ -0,0 +1,41 @@
import { mockLogger } from '@test/mocking';
import { DeprecationService } from '../deprecation.service';
describe('DeprecationService', () => {
const toTest = (envVar: string, value: string, inUse: boolean) => {
process.env[envVar] = value;
const deprecationService = new DeprecationService(mockLogger());
deprecationService.warn();
expect(deprecationService.isInUse(envVar)).toBe(inUse);
};
test.each([
['N8N_BINARY_DATA_TTL', '1', true],
['N8N_PERSISTED_BINARY_DATA_TTL', '1', true],
['EXECUTIONS_DATA_PRUNE_TIMEOUT', '1', true],
['N8N_CONFIG_FILES', '1', true],
['N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN', '1', true],
])('should detect when %s is in use', (envVar, value, inUse) => {
toTest(envVar, value, inUse);
});
test.each([
['default', true],
['filesystem', false],
['s3', false],
])('should handle N8N_BINARY_DATA_MODE as %s', (mode, inUse) => {
toTest('N8N_BINARY_DATA_MODE', mode, inUse);
});
test.each([
['sqlite', false],
['postgresdb', false],
['mysqldb', true],
['mariadb', true],
])('should handle DB_TYPE as %s', (dbType, inUse) => {
toTest('DB_TYPE', dbType, inUse);
});
});

View file

@ -0,0 +1,85 @@
import { ApplicationError } from 'n8n-workflow';
import { Service } from 'typedi';
import { Logger } from '@/logging/logger.service';
type EnvVarName = string;
type Deprecation = {
/** Name of the deprecated env var. */
envVar: EnvVarName;
/** Message to display when the deprecated env var is currently in use. */
message: string;
/** Function to identify the specific value in the env var that is deprecated. */
checkValue?: (value: string) => boolean;
};
const SAFE_TO_REMOVE = 'Remove this environment variable; it is no longer needed.';
/** Responsible for warning about use of deprecated env vars. */
@Service()
export class DeprecationService {
private readonly deprecations: Deprecation[] = [
{ envVar: 'N8N_BINARY_DATA_TTL', message: SAFE_TO_REMOVE },
{ envVar: 'N8N_PERSISTED_BINARY_DATA_TTL', message: SAFE_TO_REMOVE },
{ envVar: 'EXECUTIONS_DATA_PRUNE_TIMEOUT', message: SAFE_TO_REMOVE },
{
envVar: 'N8N_BINARY_DATA_MODE',
message: '`default` is deprecated. Please switch to `filesystem` mode.',
checkValue: (value: string) => value === 'default',
},
{ envVar: 'N8N_CONFIG_FILES', message: 'Please use .env files or *_FILE env vars instead.' },
{
envVar: 'DB_TYPE',
message: 'MySQL and MariaDB are deprecated. Please migrate to PostgreSQL.',
checkValue: (value: string) => ['mysqldb', 'mariadb'].includes(value),
},
{
envVar: 'N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN',
message: `n8n no longer deregisters webhooks at startup and shutdown. ${SAFE_TO_REMOVE}`,
},
];
/** Runtime state of deprecated env vars. */
private readonly state: Record<EnvVarName, { inUse: boolean }> = {};
constructor(private readonly logger: Logger) {}
warn() {
this.deprecations.forEach((d) => {
const envValue = process.env[d.envVar];
this.state[d.envVar] = {
inUse: d.checkValue
? envValue !== undefined && d.checkValue(envValue)
: envValue !== undefined,
};
});
const inUse = Object.entries(this.state)
.filter(([, d]) => d.inUse)
.map(([envVar]) => {
const deprecation = this.deprecations.find((d) => d.envVar === envVar);
if (!deprecation) {
throw new ApplicationError(`Deprecation not found for env var: ${envVar}`);
}
return deprecation;
});
if (inUse.length === 0) return;
const header = `The following environment variable${
inUse.length === 1 ? ' is' : 's are'
} deprecated and will be removed in an upcoming version of n8n. Please take the recommended actions to update your configuration`;
const deprecations = inUse
.map(({ envVar, message }) => ` - ${envVar} -> ${message}\n`)
.join('');
this.logger.warn(`\n${header}:\n${deprecations}`);
}
isInUse(envVar: string) {
return this.state[envVar]?.inUse ?? false;
}
}

View file

@ -0,0 +1,13 @@
import { z } from 'zod';
export const testMetricCreateRequestBodySchema = z
.object({
name: z.string().min(1).max(255),
})
.strict();
export const testMetricPatchRequestBodySchema = z
.object({
name: z.string().min(1).max(255),
})
.strict();

View file

@ -0,0 +1,130 @@
import express from 'express';
import { TestMetricRepository } from '@/databases/repositories/test-metric.repository.ee';
import { Delete, Get, Patch, Post, RestController } from '@/decorators';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import {
testMetricCreateRequestBodySchema,
testMetricPatchRequestBodySchema,
} from '@/evaluation/metric.schema';
import { getSharedWorkflowIds } from '@/public-api/v1/handlers/workflows/workflows.service';
import { TestDefinitionService } from './test-definition.service.ee';
import { TestMetricsRequest } from './test-definitions.types.ee';
@RestController('/evaluation/test-definitions')
export class TestMetricsController {
constructor(
private readonly testDefinitionService: TestDefinitionService,
private readonly testMetricRepository: TestMetricRepository,
) {}
// This method is used in multiple places in the controller to get the test definition
// (or just check that it exists and the user has access to it).
private async getTestDefinition(
req:
| TestMetricsRequest.GetOne
| TestMetricsRequest.GetMany
| TestMetricsRequest.Patch
| TestMetricsRequest.Delete
| TestMetricsRequest.Create,
) {
const { testDefinitionId } = req.params;
const userAccessibleWorkflowIds = await getSharedWorkflowIds(req.user, ['workflow:read']);
const testDefinition = await this.testDefinitionService.findOne(
testDefinitionId,
userAccessibleWorkflowIds,
);
if (!testDefinition) throw new NotFoundError('Test definition not found');
return testDefinition;
}
@Get('/:testDefinitionId/metrics')
async getMany(req: TestMetricsRequest.GetMany) {
const { testDefinitionId } = req.params;
await this.getTestDefinition(req);
return await this.testMetricRepository.find({
where: { testDefinition: { id: testDefinitionId } },
});
}
@Get('/:testDefinitionId/metrics/:id')
async getOne(req: TestMetricsRequest.GetOne) {
const { id: metricId, testDefinitionId } = req.params;
await this.getTestDefinition(req);
const metric = await this.testMetricRepository.findOne({
where: { id: metricId, testDefinition: { id: testDefinitionId } },
});
if (!metric) throw new NotFoundError('Metric not found');
return metric;
}
@Post('/:testDefinitionId/metrics')
async create(req: TestMetricsRequest.Create, res: express.Response) {
const bodyParseResult = testMetricCreateRequestBodySchema.safeParse(req.body);
if (!bodyParseResult.success) {
res.status(400).json({ errors: bodyParseResult.error.errors });
return;
}
const testDefinition = await this.getTestDefinition(req);
const metric = this.testMetricRepository.create({
...req.body,
testDefinition,
});
return await this.testMetricRepository.save(metric);
}
@Patch('/:testDefinitionId/metrics/:id')
async patch(req: TestMetricsRequest.Patch, res: express.Response) {
const { id: metricId, testDefinitionId } = req.params;
const bodyParseResult = testMetricPatchRequestBodySchema.safeParse(req.body);
if (!bodyParseResult.success) {
res.status(400).json({ errors: bodyParseResult.error.errors });
return;
}
await this.getTestDefinition(req);
const metric = await this.testMetricRepository.findOne({
where: { id: metricId, testDefinition: { id: testDefinitionId } },
});
if (!metric) throw new NotFoundError('Metric not found');
await this.testMetricRepository.update(metricId, bodyParseResult.data);
// Respond with the updated metric
return await this.testMetricRepository.findOneBy({ id: metricId });
}
@Delete('/:testDefinitionId/metrics/:id')
async delete(req: TestMetricsRequest.GetOne) {
const { id: metricId, testDefinitionId } = req.params;
await this.getTestDefinition(req);
const metric = await this.testMetricRepository.findOne({
where: { id: metricId, testDefinition: { id: testDefinitionId } },
});
if (!metric) throw new NotFoundError('Metric not found');
await this.testMetricRepository.delete(metricId);
return { success: true };
}
}

View file

@ -10,7 +10,7 @@ import type { ListQuery } from '@/requests';
type TestDefinitionLike = Omit<
Partial<TestDefinition>,
'workflow' | 'evaluationWorkflow' | 'annotationTag'
'workflow' | 'evaluationWorkflow' | 'annotationTag' | 'metrics'
> & {
workflow?: { id: string };
evaluationWorkflow?: { id: string };

View file

@ -8,6 +8,7 @@ import {
testDefinitionCreateRequestBodySchema,
testDefinitionPatchRequestBodySchema,
} from '@/evaluation/test-definition.schema';
import { TestRunnerService } from '@/evaluation/test-runner/test-runner.service.ee';
import { listQueryMiddleware } from '@/middlewares';
import { getSharedWorkflowIds } from '@/public-api/v1/handlers/workflows/workflows.service';
@ -16,7 +17,10 @@ import { TestDefinitionsRequest } from './test-definitions.types.ee';
@RestController('/evaluation/test-definitions')
export class TestDefinitionsController {
constructor(private readonly testDefinitionService: TestDefinitionService) {}
constructor(
private readonly testDefinitionService: TestDefinitionService,
private readonly testRunnerService: TestRunnerService,
) {}
@Get('/', { middlewares: listQueryMiddleware })
async getMany(req: TestDefinitionsRequest.GetMany) {
@ -125,4 +129,20 @@ export class TestDefinitionsController {
return testDefinition;
}
@Post('/:id/run')
async runTest(req: TestDefinitionsRequest.Run, res: express.Response) {
const { id: testDefinitionId } = req.params;
const workflowIds = await getSharedWorkflowIds(req.user, ['workflow:read']);
// Check test definition exists
const testDefinition = await this.testDefinitionService.findOne(testDefinitionId, workflowIds);
if (!testDefinition) throw new NotFoundError('Test definition not found');
// We do not await for the test run to complete
void this.testRunnerService.runTest(req.user, testDefinition);
res.status(202).json({ success: true });
}
}

View file

@ -30,4 +30,36 @@ export declare namespace TestDefinitionsRequest {
>;
type Delete = AuthenticatedRequest<RouteParams.TestId>;
type Run = AuthenticatedRequest<RouteParams.TestId>;
}
// ----------------------------------
// /test-definitions/:testDefinitionId/metrics
// ----------------------------------
export declare namespace TestMetricsRequest {
namespace RouteParams {
type TestDefinitionId = {
testDefinitionId: string;
};
type TestMetricId = {
id: string;
};
}
type GetOne = AuthenticatedRequest<RouteParams.TestDefinitionId & RouteParams.TestMetricId>;
type GetMany = AuthenticatedRequest<RouteParams.TestDefinitionId>;
type Create = AuthenticatedRequest<RouteParams.TestDefinitionId, {}, { name: string }>;
type Patch = AuthenticatedRequest<
RouteParams.TestDefinitionId & RouteParams.TestMetricId,
{},
{ name: string }
>;
type Delete = AuthenticatedRequest<RouteParams.TestDefinitionId & RouteParams.TestMetricId>;
}

View file

@ -0,0 +1,171 @@
{
"startData": {},
"resultData": {
"runData": {
"When clicking Test workflow": [
{
"hints": [],
"startTime": 1731079118048,
"executionTime": 0,
"source": [],
"executionStatus": "success",
"data": {
"main": [
[
{
"json": {
"query": "First item"
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"query": "Second item"
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"query": "Third item"
},
"pairedItem": {
"item": 0
}
}
]
]
}
}
],
"Edit Fields": [
{
"hints": [],
"startTime": 1731079118049,
"executionTime": 0,
"source": [
{
"previousNode": "When clicking Test workflow"
}
],
"executionStatus": "success",
"data": {
"main": [
[
{
"json": {
"foo": "bar"
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"foo": "bar"
},
"pairedItem": {
"item": 1
}
},
{
"json": {
"foo": "bar"
},
"pairedItem": {
"item": 2
}
}
]
]
}
}
],
"Code": [
{
"hints": [],
"startTime": 1731079118049,
"executionTime": 3,
"source": [
{
"previousNode": "Edit Fields"
}
],
"executionStatus": "success",
"data": {
"main": [
[
{
"json": {
"foo": "bar",
"random": 0.6315509336851373
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"foo": "bar",
"random": 0.3336315687359024
},
"pairedItem": {
"item": 1
}
},
{
"json": {
"foo": "bar",
"random": 0.4241870158917733
},
"pairedItem": {
"item": 2
}
}
]
]
}
}
]
},
"pinData": {
"When clicking Test workflow": [
{
"json": {
"query": "First item"
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"query": "Second item"
},
"pairedItem": {
"item": 0
}
},
{
"json": {
"query": "Third item"
},
"pairedItem": {
"item": 0
}
}
]
},
"lastNodeExecuted": "Code"
},
"executionData": {
"contextData": {},
"nodeExecutionStack": [],
"metadata": {},
"waitingExecution": {},
"waitingExecutionSource": {}
}
}

View file

@ -0,0 +1,124 @@
{
"name": "Evaluation Workflow",
"nodes": [
{
"parameters": {},
"id": "285ac92b-256f-4bb2-a450-6486b01593cb",
"name": "Execute Workflow Trigger",
"type": "n8n-nodes-base.executeWorkflowTrigger",
"typeVersion": 1,
"position": [520, 340]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "9d3abc8d-3270-4bec-9a59-82622d5dbb5a",
"leftValue": "={{ $json.newExecution.Code[0].data.main[0].length }}",
"rightValue": 3,
"operator": {
"type": "number",
"operation": "gte"
}
},
{
"id": "894ce84b-13a4-4415-99c0-0c25182903bb",
"leftValue": "={{ $json.newExecution.Code[0].data.main[0][0].json.random }}",
"rightValue": 0.7,
"operator": {
"type": "number",
"operation": "lt"
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "320b0355-3886-41df-b039-4666bf28e47b",
"name": "If",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [740, 340]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "3b65d55a-158f-40c6-9853-a1c44b7ba1e5",
"name": "success",
"value": true,
"type": "boolean"
}
]
},
"options": {}
},
"id": "0c7a1ee8-0cf0-4d7f-99a3-186bbcd8815a",
"name": "Success",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [980, 220]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "6cc8b402-4a30-4873-b825-963a1f1b8b82",
"name": "success",
"value": false,
"type": "boolean"
}
]
},
"options": {}
},
"id": "50d3f84a-d99f-4e04-bdbd-3e8c2668e708",
"name": "Fail",
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [980, 420]
}
],
"connections": {
"Execute Workflow Trigger": {
"main": [
[
{
"node": "If",
"type": "main",
"index": 0
}
]
]
},
"If": {
"main": [
[
{
"node": "Success",
"type": "main",
"index": 0
}
],
[
{
"node": "Fail",
"type": "main",
"index": 0
}
]
]
}
},
"pinData": {}
}

View file

@ -0,0 +1,78 @@
{
"name": "Workflow Under Test",
"nodes": [
{
"parameters": {},
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [-80, 0],
"id": "72256d90-3a67-4e29-b032-47df4e5768af",
"name": "When clicking Test workflow"
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "acfeecbe-443c-4220-b63b-d44d69216902",
"name": "foo",
"value": "bar",
"type": "string"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.set",
"typeVersion": 3.4,
"position": [140, 0],
"id": "319f29bc-1dd4-4122-b223-c584752151a4",
"name": "Edit Fields"
},
{
"parameters": {
"jsCode": "for (const item of $input.all()) {\n item.json.random = Math.random();\n}\n\nreturn $input.all();"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [380, 0],
"id": "d2474215-63af-40a4-a51e-0ea30d762621",
"name": "Code"
}
],
"connections": {
"When clicking Test workflow": {
"main": [
[
{
"node": "Edit Fields",
"type": "main",
"index": 0
}
]
]
},
"Edit Fields": {
"main": [
[
{
"node": "Wait",
"type": "main",
"index": 0
}
]
]
},
"Wait": {
"main": [
[
{
"node": "Code",
"type": "main",
"index": 0
}
]
]
}
}
}

View file

@ -0,0 +1,231 @@
import type { SelectQueryBuilder } from '@n8n/typeorm';
import { stringify } from 'flatted';
import { readFileSync } from 'fs';
import { mock, mockDeep } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import path from 'path';
import type { ActiveExecutions } from '@/active-executions';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { TestDefinition } from '@/databases/entities/test-definition.ee';
import type { TestRun } from '@/databases/entities/test-run.ee';
import type { User } from '@/databases/entities/user';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { TestRunRepository } from '@/databases/repositories/test-run.repository.ee';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { WorkflowRunner } from '@/workflow-runner';
import { TestRunnerService } from '../test-runner.service.ee';
const wfUnderTestJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }),
);
const wfEvaluationJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.evaluation.json'), { encoding: 'utf-8' }),
);
const executionDataJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/execution-data.json'), { encoding: 'utf-8' }),
);
const executionMocks = [
mock<ExecutionEntity>({
id: 'some-execution-id',
workflowId: 'workflow-under-test-id',
status: 'success',
executionData: {
data: stringify(executionDataJson),
},
}),
mock<ExecutionEntity>({
id: 'some-execution-id-2',
workflowId: 'workflow-under-test-id',
status: 'success',
executionData: {
data: stringify(executionDataJson),
},
}),
];
function mockExecutionData() {
return mock<IRun>({
data: {
resultData: {
runData: {},
},
},
});
}
describe('TestRunnerService', () => {
const executionRepository = mock<ExecutionRepository>();
const workflowRepository = mock<WorkflowRepository>();
const workflowRunner = mock<WorkflowRunner>();
const activeExecutions = mock<ActiveExecutions>();
const testRunRepository = mock<TestRunRepository>();
beforeEach(() => {
const executionsQbMock = mockDeep<SelectQueryBuilder<ExecutionEntity>>({
fallbackMockImplementation: jest.fn().mockReturnThis(),
});
executionsQbMock.getMany.mockResolvedValueOnce(executionMocks);
executionRepository.createQueryBuilder.mockReturnValueOnce(executionsQbMock);
executionRepository.findOne
.calledWith(expect.objectContaining({ where: { id: 'some-execution-id' } }))
.mockResolvedValueOnce(executionMocks[0]);
executionRepository.findOne
.calledWith(expect.objectContaining({ where: { id: 'some-execution-id-2' } }))
.mockResolvedValueOnce(executionMocks[1]);
testRunRepository.createTestRun.mockResolvedValue(mock<TestRun>({ id: 'test-run-id' }));
});
afterEach(() => {
activeExecutions.getPostExecutePromise.mockClear();
workflowRunner.run.mockClear();
testRunRepository.createTestRun.mockClear();
testRunRepository.markAsRunning.mockClear();
testRunRepository.markAsCompleted.mockClear();
});
test('should create an instance of TestRunnerService', async () => {
const testRunnerService = new TestRunnerService(
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
testRunRepository,
);
expect(testRunnerService).toBeInstanceOf(TestRunnerService);
});
test('should create and run test cases from past executions', async () => {
const testRunnerService = new TestRunnerService(
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
testRunRepository,
);
workflowRepository.findById.calledWith('workflow-under-test-id').mockResolvedValueOnce({
id: 'workflow-under-test-id',
...wfUnderTestJson,
});
workflowRepository.findById.calledWith('evaluation-workflow-id').mockResolvedValueOnce({
id: 'evaluation-workflow-id',
...wfEvaluationJson,
});
workflowRunner.run.mockResolvedValue('test-execution-id');
await testRunnerService.runTest(
mock<User>(),
mock<TestDefinition>({
workflowId: 'workflow-under-test-id',
evaluationWorkflowId: 'evaluation-workflow-id',
}),
);
expect(executionRepository.createQueryBuilder).toHaveBeenCalledTimes(1);
expect(executionRepository.findOne).toHaveBeenCalledTimes(2);
expect(workflowRunner.run).toHaveBeenCalledTimes(2);
});
test('should run both workflow under test and evaluation workflow', async () => {
const testRunnerService = new TestRunnerService(
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
testRunRepository,
);
workflowRepository.findById.calledWith('workflow-under-test-id').mockResolvedValueOnce({
id: 'workflow-under-test-id',
...wfUnderTestJson,
});
workflowRepository.findById.calledWith('evaluation-workflow-id').mockResolvedValueOnce({
id: 'evaluation-workflow-id',
...wfEvaluationJson,
});
workflowRunner.run.mockResolvedValueOnce('some-execution-id');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-2');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-3');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-4');
// Mock executions of workflow under test
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id')
.mockResolvedValue(mockExecutionData());
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-2')
.mockResolvedValue(mockExecutionData());
// Mock executions of evaluation workflow
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-3')
.mockResolvedValue(mockExecutionData());
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-4')
.mockResolvedValue(mockExecutionData());
await testRunnerService.runTest(
mock<User>(),
mock<TestDefinition>({
workflowId: 'workflow-under-test-id',
evaluationWorkflowId: 'evaluation-workflow-id',
}),
);
expect(workflowRunner.run).toHaveBeenCalledTimes(4);
// Check workflow under test was executed
expect(workflowRunner.run).toHaveBeenCalledWith(
expect.objectContaining({
executionMode: 'evaluation',
pinData: {
'When clicking Test workflow':
executionDataJson.resultData.runData['When clicking Test workflow'][0].data.main[0],
},
workflowData: expect.objectContaining({
id: 'workflow-under-test-id',
}),
}),
);
// Check evaluation workflow was executed
expect(workflowRunner.run).toHaveBeenCalledWith(
expect.objectContaining({
executionMode: 'evaluation',
executionData: expect.objectContaining({
executionData: expect.objectContaining({
nodeExecutionStack: expect.arrayContaining([
expect.objectContaining({ data: expect.anything() }),
]),
}),
}),
workflowData: expect.objectContaining({
id: 'evaluation-workflow-id',
}),
}),
);
// Check Test Run status was updated correctly
expect(testRunRepository.createTestRun).toHaveBeenCalledTimes(1);
expect(testRunRepository.markAsRunning).toHaveBeenCalledTimes(1);
expect(testRunRepository.markAsRunning).toHaveBeenCalledWith('test-run-id');
expect(testRunRepository.markAsCompleted).toHaveBeenCalledTimes(1);
expect(testRunRepository.markAsCompleted).toHaveBeenCalledWith('test-run-id', {
success: false,
});
});
});

View file

@ -0,0 +1,217 @@
import { parse } from 'flatted';
import type {
IDataObject,
IPinData,
IRun,
IRunData,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import assert from 'node:assert';
import { Service } from 'typedi';
import { ActiveExecutions } from '@/active-executions';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { TestDefinition } from '@/databases/entities/test-definition.ee';
import type { User } from '@/databases/entities/user';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { TestRunRepository } from '@/databases/repositories/test-run.repository.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { IExecutionResponse } from '@/interfaces';
import { getRunData } from '@/workflow-execute-additional-data';
import { WorkflowRunner } from '@/workflow-runner';
/**
* This service orchestrates the running of test cases.
* It uses the test definitions to find
* past executions, creates pin data from them,
* and runs the workflow-under-test with the pin data.
* After the workflow-under-test finishes, it runs the evaluation workflow
* with the original and new run data.
* TODO: Node pinning
* TODO: Collect metrics
*/
@Service()
export class TestRunnerService {
constructor(
private readonly workflowRepository: WorkflowRepository,
private readonly workflowRunner: WorkflowRunner,
private readonly executionRepository: ExecutionRepository,
private readonly activeExecutions: ActiveExecutions,
private readonly testRunRepository: TestRunRepository,
) {}
/**
* Extracts the execution data from the past execution.
* Creates a pin data object from the past execution data
* for the given workflow.
* For now, it only pins trigger nodes.
*/
private createTestDataFromExecution(workflow: WorkflowEntity, execution: ExecutionEntity) {
const executionData = parse(execution.executionData.data) as IExecutionResponse['data'];
const triggerNodes = workflow.nodes.filter((node) => /trigger$/i.test(node.type));
const pinData = {} as IPinData;
for (const triggerNode of triggerNodes) {
const triggerData = executionData.resultData.runData[triggerNode.name];
if (triggerData?.[0]?.data?.main?.[0]) {
pinData[triggerNode.name] = triggerData[0]?.data?.main?.[0];
}
}
return { pinData, executionData };
}
/**
* Runs a test case with the given pin data.
* Waits for the workflow under test to finish execution.
*/
private async runTestCase(
workflow: WorkflowEntity,
testCasePinData: IPinData,
userId: string,
): Promise<IRun | undefined> {
// Prepare the data to run the workflow
const data: IWorkflowExecutionDataProcess = {
executionMode: 'evaluation',
runData: {},
pinData: testCasePinData,
workflowData: workflow,
partialExecutionVersion: '-1',
userId,
};
// Trigger the workflow under test with mocked data
const executionId = await this.workflowRunner.run(data);
assert(executionId);
// Wait for the execution to finish
const executePromise = this.activeExecutions.getPostExecutePromise(executionId);
return await executePromise;
}
/**
* Run the evaluation workflow with the expected and actual run data.
*/
private async runTestCaseEvaluation(
evaluationWorkflow: WorkflowEntity,
expectedData: IRunData,
actualData: IRunData,
) {
// Prepare the evaluation wf input data.
// Provide both the expected data and the actual data
const evaluationInputData = {
json: {
originalExecution: expectedData,
newExecution: actualData,
},
};
// Prepare the data to run the evaluation workflow
const data = await getRunData(evaluationWorkflow, [evaluationInputData]);
data.executionMode = 'evaluation';
// Trigger the evaluation workflow
const executionId = await this.workflowRunner.run(data);
assert(executionId);
// Wait for the execution to finish
const executePromise = this.activeExecutions.getPostExecutePromise(executionId);
return await executePromise;
}
private extractEvaluationResult(execution: IRun): IDataObject {
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted;
assert(lastNodeExecuted, 'Could not find the last node executed in evaluation workflow');
// Extract the output of the last node executed in the evaluation workflow
// We use only the first item of a first main output
const lastNodeTaskData = execution.data.resultData.runData[lastNodeExecuted]?.[0];
const mainConnectionData = lastNodeTaskData?.data?.main?.[0];
return mainConnectionData?.[0]?.json ?? {};
}
/**
* Creates a new test run for the given test definition.
*/
public async runTest(user: User, test: TestDefinition): Promise<void> {
const workflow = await this.workflowRepository.findById(test.workflowId);
assert(workflow, 'Workflow not found');
const evaluationWorkflow = await this.workflowRepository.findById(test.evaluationWorkflowId);
assert(evaluationWorkflow, 'Evaluation workflow not found');
// 0. Create new Test Run
const testRun = await this.testRunRepository.createTestRun(test.id);
assert(testRun, 'Unable to create a test run');
// 1. Make test cases from previous executions
// Select executions with the annotation tag and workflow ID of the test.
// Fetch only ids to reduce the data transfer.
const pastExecutions: ReadonlyArray<Pick<ExecutionEntity, 'id'>> =
await this.executionRepository
.createQueryBuilder('execution')
.select('execution.id')
.leftJoin('execution.annotation', 'annotation')
.leftJoin('annotation.tags', 'annotationTag')
.where('annotationTag.id = :tagId', { tagId: test.annotationTagId })
.andWhere('execution.workflowId = :workflowId', { workflowId: test.workflowId })
.getMany();
// 2. Run over all the test cases
await this.testRunRepository.markAsRunning(testRun.id);
const metrics = [];
for (const { id: pastExecutionId } of pastExecutions) {
// Fetch past execution with data
const pastExecution = await this.executionRepository.findOne({
where: { id: pastExecutionId },
relations: ['executionData', 'metadata'],
});
assert(pastExecution, 'Execution not found');
const testData = this.createTestDataFromExecution(workflow, pastExecution);
const { pinData, executionData } = testData;
// Run the test case and wait for it to finish
const testCaseExecution = await this.runTestCase(workflow, pinData, user.id);
// In case of a permission check issue, the test case execution will be undefined.
// Skip them and continue with the next test case
if (!testCaseExecution) {
continue;
}
// Collect the results of the test case execution
const testCaseRunData = testCaseExecution.data.resultData.runData;
// Get the original runData from the test case execution data
const originalRunData = executionData.resultData.runData;
// Run the evaluation workflow with the original and new run data
const evalExecution = await this.runTestCaseEvaluation(
evaluationWorkflow,
originalRunData,
testCaseRunData,
);
assert(evalExecution);
// Extract the output of the last node executed in the evaluation workflow
metrics.push(this.extractEvaluationResult(evalExecution));
}
// TODO: 3. Aggregate the results
// Now we just set success to true if all the test cases passed
const aggregatedMetrics = { success: metrics.every((metric) => metric.success) };
await this.testRunRepository.markAsCompleted(testRun.id, aggregatedMetrics);
}
}

View file

@ -0,0 +1,37 @@
import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { determineFinalExecutionStatus } from '../shared-hook-functions';
describe('determineFinalExecutionStatus', () => {
describe('When waitTill is not set', () => {
test.each(['canceled', 'crashed', 'error', 'success'])('should return "%s"', (status) => {
const runData = { status, data: {} } as IRun;
expect(determineFinalExecutionStatus(runData)).toBe(status);
});
});
it('should return "error" when resultData.error exists', () => {
const runData = {
status: 'running',
data: {
resultData: {
error: new NodeOperationError(mock(), 'An error occurred'),
},
},
} as IRun;
expect(determineFinalExecutionStatus(runData)).toBe('error');
});
it('should return "waiting" when waitTill is defined', () => {
const runData = {
status: 'running',
data: {},
waitTill: new Date('2022-01-01T00:00:00'),
} as IRun;
expect(determineFinalExecutionStatus(runData)).toBe('waiting');
});
});

View file

@ -0,0 +1,15 @@
import { Expose } from 'class-transformer';
import { IsOptional, IsString } from 'class-validator';
import { BaseFilter } from './base.filter.dto';
export class TestDefinitionsFilter extends BaseFilter {
@IsString()
@IsOptional()
@Expose()
workflowId?: string;
static async fromString(rawFilter: string) {
return await this.toFilter(rawFilter, TestDefinitionsFilter);
}
}

View file

@ -5,6 +5,7 @@ import * as ResponseHelper from '@/response-helper';
import { toError } from '@/utils';
import { CredentialsFilter } from './dtos/credentials.filter.dto';
import { TestDefinitionsFilter } from './dtos/test-definitions.filter.dto';
import { UserFilter } from './dtos/user.filter.dto';
import { WorkflowFilter } from './dtos/workflow.filter.dto';
@ -25,6 +26,8 @@ export const filterListQueryMiddleware = async (
Filter = CredentialsFilter;
} else if (req.baseUrl.endsWith('users')) {
Filter = UserFilter;
} else if (req.baseUrl.endsWith('test-definitions')) {
Filter = TestDefinitionsFilter;
} else {
return next();
}

View file

@ -524,7 +524,9 @@ describe('TaskBroker', () => {
const requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'] = {
dataOfNodes: 'all',
env: true,
input: true,
input: {
include: true,
},
prevNode: true,
};

View file

@ -1,10 +1,23 @@
import type { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import type WebSocket from 'ws';
import { Time } from '@/constants';
import { Time, WsStatusCodes } from '@/constants';
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
describe('TaskRunnerWsServer', () => {
describe('removeConnection', () => {
it('should close with 1000 status code by default', async () => {
const server = new TaskRunnerWsServer(mock(), mock(), mock(), mock(), mock());
const ws = mock<WebSocket>();
server.runnerConnections.set('test-runner', ws);
await server.removeConnection('test-runner');
expect(ws.close).toHaveBeenCalledWith(WsStatusCodes.CloseNormal);
});
});
describe('heartbeat timer', () => {
it('should set up heartbeat timer on server start', async () => {
const setIntervalSpy = jest.spyOn(global, 'setInterval');

View file

@ -12,6 +12,10 @@ import type { DisconnectAnalyzer, DisconnectErrorOptions } from './runner-types'
*/
@Service()
export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer {
get isCloudDeployment() {
return config.get('deployment.type') === 'cloud';
}
async toDisconnectError(opts: DisconnectErrorOptions): Promise<Error> {
const { reason, heartbeatInterval } = opts;
@ -22,6 +26,9 @@ export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer {
);
}
return new TaskRunnerDisconnectedError(opts.runnerId ?? 'Unknown runner ID');
return new TaskRunnerDisconnectedError(
opts.runnerId ?? 'Unknown runner ID',
this.isCloudDeployment,
);
}
}

View file

@ -0,0 +1,49 @@
import { TaskRunnerDisconnectedError } from '../task-runner-disconnected-error';
describe('TaskRunnerDisconnectedError', () => {
it('should have the correct default error message', () => {
const error = new TaskRunnerDisconnectedError('test-runner-id', false);
expect(error.message).toBe('Node execution failed');
});
it('should have the error level set to "error"', () => {
const error = new TaskRunnerDisconnectedError('test-runner-id', false);
expect(error.level).toBe('error');
});
it('should set the correct description for non-cloud deployments', () => {
const error = new TaskRunnerDisconnectedError('test-runner-id', false);
expect(error.description).toContain(
'This can happen for various reasons. Please try executing the node again.',
);
expect(error.description).toContain(
'1. Reduce the number of items processed at a time, by batching them using a loop node',
);
expect(error.description).toContain(
"2. Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable",
);
expect(error.description).not.toContain(
'Upgrade your cloud plan to increase the available memory',
);
});
it('should set the correct description for cloud deployments', () => {
const error = new TaskRunnerDisconnectedError('test-runner-id', true);
expect(error.description).toContain(
'This can happen for various reasons. Please try executing the node again.',
);
expect(error.description).toContain(
'1. Reduce the number of items processed at a time, by batching them using a loop node',
);
expect(error.description).toContain(
'2. Upgrade your cloud plan to increase the available memory',
);
expect(error.description).not.toContain(
"Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable",
);
});
});

View file

@ -1,7 +1,34 @@
import type { TaskRunner } from '@n8n/task-runner';
import { ApplicationError } from 'n8n-workflow';
export class TaskRunnerDisconnectedError extends ApplicationError {
constructor(runnerId: string) {
super(`Task runner (${runnerId}) disconnected`);
public description: string;
constructor(
public readonly runnerId: TaskRunner['id'],
isCloudDeployment: boolean,
) {
super('Node execution failed');
const fixSuggestions = {
reduceItems:
'Reduce the number of items processed at a time, by batching them using a loop node',
increaseMemory:
"Increase the memory available to the task runner with 'N8N_RUNNERS_MAX_OLD_SPACE_SIZE' environment variable",
upgradePlan: 'Upgrade your cloud plan to increase the available memory',
};
const subtitle =
'This can happen for various reasons. Please try executing the node again. If the problem persists, you can try the following:';
const suggestions = isCloudDeployment
? [fixSuggestions.reduceItems, fixSuggestions.upgradePlan]
: [fixSuggestions.reduceItems, fixSuggestions.increaseMemory];
const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');
const description = `${subtitle}<br/><br/>${suggestionsText}`;
this.description = description;
}
}

View file

@ -1,8 +1,6 @@
import { TaskRunnersConfig } from '@n8n/config';
import { Service } from 'typedi';
import config from '@/config';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
import type { DisconnectErrorOptions } from './runner-types';
@ -16,10 +14,6 @@ import { TaskRunnerProcess } from './task-runner-process';
*/
@Service()
export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisconnectAnalyzer {
private get isCloudDeployment() {
return config.get('deployment.type') === 'cloud';
}
private readonly exitReasonSignal: SlidingWindowSignal<TaskRunnerProcessEventMap, 'exit'>;
constructor(

View file

@ -6,6 +6,8 @@ import type { TaskRunner } from './task-broker.service';
import type { AuthlessRequest } from '../requests';
export interface DisconnectAnalyzer {
isCloudDeployment: boolean;
toDisconnectError(opts: DisconnectErrorOptions): Promise<Error>;
}

View file

@ -4,7 +4,7 @@ import { ApplicationError } from 'n8n-workflow';
import { Service } from 'typedi';
import type WebSocket from 'ws';
import { Time } from '@/constants';
import { Time, WsStatusCodes } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
@ -21,15 +21,7 @@ function heartbeat(this: WebSocket) {
this.isAlive = true;
}
const enum WsStatusCode {
CloseNormal = 1000,
CloseGoingAway = 1001,
CloseProtocolError = 1002,
CloseUnsupportedData = 1003,
CloseNoStatus = 1005,
CloseAbnormal = 1006,
CloseInvalidData = 1007,
}
type WsStatusCode = (typeof WsStatusCodes)[keyof typeof WsStatusCodes];
@Service()
export class TaskRunnerWsServer {
@ -62,7 +54,7 @@ export class TaskRunnerWsServer {
void this.removeConnection(
runnerId,
'failed-heartbeat-check',
WsStatusCode.CloseNoStatus,
WsStatusCodes.CloseNoStatus,
);
this.runnerLifecycleEvents.emit('runner:failed-heartbeat-check');
return;
@ -126,7 +118,7 @@ export class TaskRunnerWsServer {
this.sendMessage.bind(this, id) as MessageCallback,
);
this.logger.info(`Runner "${message.name}" (${id}) has been registered`);
this.logger.info(`Registered runner "${message.name}" (${id}) `);
return;
}
@ -156,7 +148,7 @@ export class TaskRunnerWsServer {
async removeConnection(
id: TaskRunner['id'],
reason: DisconnectReason = 'unknown',
code?: WsStatusCode,
code: WsStatusCode = WsStatusCodes.CloseNormal,
) {
const connection = this.runnerConnections.get(id);
if (connection) {
@ -166,6 +158,7 @@ export class TaskRunnerWsServer {
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
});
this.taskBroker.deregisterRunner(id, disconnectError);
this.logger.debug(`Deregistered runner "${id}"`);
connection.close(code);
this.runnerConnections.delete(id);
}
@ -180,7 +173,8 @@ export class TaskRunnerWsServer {
// shutting them down
await Promise.all(
Array.from(this.runnerConnections.keys()).map(
async (id) => await this.removeConnection(id, 'shutting-down', WsStatusCode.CloseGoingAway),
async (id) =>
await this.removeConnection(id, 'shutting-down', WsStatusCodes.CloseGoingAway),
),
);
}

View file

@ -525,7 +525,7 @@ export class TaskBroker {
return;
}
if (e instanceof TaskDeferredError) {
this.logger.info(`Task (${taskId}) deferred until runner is ready`);
this.logger.debug(`Task (${taskId}) deferred until runner is ready`);
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
return;
}

View file

@ -18,6 +18,7 @@ const workflow: DataRequestResponse['workflow'] = mock<DataRequestResponse['work
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
idx: 0,
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
@ -28,6 +29,31 @@ const debugHelperNodeOutItems: INodeExecutionData[] = [
item: 0,
},
},
{
json: {
idx: 1,
uid: '4620e4-c1b4-dd8b-9a45-d0f9a29a3b7f',
email: 'bob.johnson@domain.com',
firstName: 'Bob',
lastName: 'Johnson',
password: '6e41b5ecf',
},
pairedItem: {
item: 1,
},
},
{
json: {
idx: 2,
email: '4358d3-418b-a8b3-49cb-076b1180f402',
firstName: 'Eve',
lastName: 'Johnson',
password: 'e2414620e',
},
pairedItem: {
item: 2,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const envProviderState: DataRequestResponse['envProviderState'] = mock<
@ -137,7 +163,9 @@ describe('DataRequestResponseStripper', () => {
const allDataParam: TaskDataRequestParams = {
dataOfNodes: 'all',
env: true,
input: true,
input: {
include: true,
},
prevNode: true,
};
@ -177,7 +205,9 @@ describe('DataRequestResponseStripper', () => {
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
input: {
include: false,
},
});
it('drops input data from result', () => {
@ -186,10 +216,23 @@ describe('DataRequestResponseStripper', () => {
expect(result.inputData).toStrictEqual({});
});
it('drops input data from result', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip();
it('returns only chunked data', () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({
input: {
include: true,
chunk: {
startIndex: 1,
count: 1,
},
},
}),
).strip();
expect(result.inputData).toStrictEqual({});
expect(result.inputData).toStrictEqual({
main: [debugHelperNodeOutItems.slice(1, 2)],
});
});
});

View file

@ -6,6 +6,7 @@ import type {
IRunExecutionData,
ITaskDataConnections,
} from 'n8n-workflow';
import * as a from 'node:assert/strict';
/**
* Strips data from data request response based on the specified parameters
@ -81,11 +82,31 @@ export class DataRequestResponseStripper {
}
private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.stripParams.input) {
if (!this.stripParams.input.include) {
return {};
}
return this.stripParams.input.chunk ? this.stripChunkedInputData(inputData) : inputData;
}
private stripChunkedInputData(inputData: ITaskDataConnections): ITaskDataConnections {
const { chunk } = this.stripParams.input;
a.ok(chunk);
const inputItems = inputData.main?.[0];
if (!inputItems) {
return inputData;
}
return {};
// If a chunk of the input data is requested, we only return that chunk
// It is the responsibility of the requester to rebuild the input data
const chunkInputItems = inputItems.slice(chunk.startIndex, chunk.startIndex + chunk.count);
const chunkedInputData: ITaskDataConnections = {
...inputData,
main: [chunkInputItems],
};
return chunkedInputData;
}
/**

View file

@ -1,6 +1,5 @@
import { TaskRunnersConfig } from '@n8n/config';
import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner';
import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner';
import { RPC_ALLOW_LIST } from '@n8n/task-runner';
import type {
EnvProviderState,
IExecuteFunctions,
@ -18,8 +17,7 @@ import type {
} from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import * as a from 'node:assert/strict';
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { NodeTypes } from '@/node-types';
@ -59,8 +57,6 @@ export abstract class TaskManager {
tasks: Map<string, Task> = new Map();
private readonly runnerConfig = Container.get(TaskRunnersConfig);
private readonly dataResponseBuilder = new DataRequestResponseBuilder();
constructor(private readonly nodeTypes: NodeTypes) {}
@ -246,18 +242,6 @@ export abstract class TaskManager {
const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data);
if (this.runnerConfig.assertDeduplicationOutput) {
const reconstruct = new DataRequestResponseReconstruct();
a.deepStrictEqual(
reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData),
job.data.connectionInputData,
);
a.deepStrictEqual(
reconstruct.reconstructExecuteData(dataRequestResponse),
job.data.executeData,
);
}
const strippedData = new DataRequestResponseStripper(
dataRequestResponse,
requestParams,

View file

@ -163,6 +163,8 @@ export class TaskRunnerServer {
authEndpoint,
send(async (req) => await this.taskRunnerAuthController.createGrantToken(req)),
);
this.app.get('/healthz', (_, res) => res.send({ status: 'ok' }));
}
private handleUpgradeRequest = (

View file

@ -64,6 +64,7 @@ import '@/executions/executions.controller';
import '@/external-secrets/external-secrets.controller.ee';
import '@/license/license.controller';
import '@/evaluation/test-definitions.controller.ee';
import '@/evaluation/metrics.controller';
import '@/workflows/workflow-history/workflow-history.controller.ee';
import '@/workflows/workflows.controller';

View file

@ -54,7 +54,7 @@ export class DynamicNodeParametersService {
): Promise<INodePropertyOptions[]> {
const nodeType = this.getNodeType(nodeTypeAndVersion);
if (!nodeType.description.requestDefaults?.baseURL) {
// This in in here for now for security reasons.
// This is in here for now for security reasons.
// Background: As the full data for the request to make does get send, and the auth data
// will then be applied, would it be possible to retrieve that data like that. By at least
// requiring a baseURL to be defined can at least not a random server be called.

View file

@ -231,6 +231,7 @@ export class FrontendService {
blockFileAccessToN8nFiles: this.securityConfig.blockFileAccessToN8nFiles,
},
betaFeatures: this.frontendConfig.betaFeatures,
virtualSchemaView: config.getEnv('virtualSchemaView'),
};
}

View file

@ -1,9 +1,5 @@
import { InstanceSettings } from 'n8n-core';
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
type IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@ -88,7 +84,7 @@ export class WaitTracker {
this.waitingExecutions[executionId] = {
executionId,
timer: setTimeout(() => {
this.startExecution(executionId);
void this.startExecution(executionId);
}, triggerTime),
};
}
@ -103,46 +99,40 @@ export class WaitTracker {
delete this.waitingExecutions[executionId];
}
startExecution(executionId: string) {
async startExecution(executionId: string) {
this.logger.debug(`Resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId];
(async () => {
// Get the data to execute
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
throw new ApplicationError('Execution does not exist.', { extra: { executionId } });
}
if (fullExecutionData.finished) {
throw new ApplicationError('The execution did succeed and can so not be started again.');
}
if (!fullExecutionData.workflowData.id) {
throw new ApplicationError('Only saved workflows can be resumed.');
}
const workflowId = fullExecutionData.workflowData.id;
const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
projectId: project.id,
};
// Start the execution again
await this.workflowRunner.run(data, false, false, executionId);
})().catch((error: Error) => {
ErrorReporter.error(error);
this.logger.error(
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
{ executionId },
);
// Get the data to execute
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
throw new ApplicationError('Execution does not exist.', { extra: { executionId } });
}
if (fullExecutionData.finished) {
throw new ApplicationError('The execution did succeed and can so not be started again.');
}
if (!fullExecutionData.workflowData.id) {
throw new ApplicationError('Only saved workflows can be resumed.');
}
const workflowId = fullExecutionData.workflowData.id;
const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
projectId: project.id,
pushRef: fullExecutionData.data.pushRef,
};
// Start the execution again
await this.workflowRunner.run(data, false, false, executionId);
}
stopTracking() {

View file

@ -5,6 +5,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { PushType } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config';
import { stringify } from 'flatted';
import { WorkflowExecute } from 'n8n-core';
import {
ApplicationError,
@ -280,7 +281,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
@ -301,6 +302,9 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
retryOf: this.retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
? stringify(data.resultData.runData)
: stringify({}),
},
pushRef,
);
@ -318,9 +322,17 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId,
});
const pushType =
fullRunData.status === 'waiting' ? 'executionWaiting' : 'executionFinished';
pushInstance.send(pushType, { executionId }, pushRef);
const { status } = fullRunData;
if (status === 'waiting') {
pushInstance.send('executionWaiting', { executionId }, pushRef);
} else {
const rawData = stringify(fullRunData.data);
pushInstance.send(
'executionFinished',
{ executionId, workflowId, status, rawData },
pushRef,
);
}
},
],
};
@ -410,6 +422,9 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
}
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
@ -427,7 +442,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
return;
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
@ -570,6 +584,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
}
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
fullRunData.status = workflowStatusFinal;
if (workflowStatusFinal !== 'success' && workflowStatusFinal !== 'waiting') {
executeErrorWorkflow(
@ -1115,6 +1130,8 @@ export function getWorkflowHooksWorkerMain(
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
const shouldNotSave =

View file

@ -137,7 +137,10 @@ export class WorkflowRunner {
// Create a failed execution with the data for the node, save it and abort execution
const runData = generateFailedExecutionFromError(data.executionMode, error, error.node);
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
await workflowHooks.executeHookFunctions('workflowExecuteBefore', [
undefined,
data.executionData,
]);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
responsePromise?.reject(error);
this.activeExecutions.finalizeExecution(executionId);
@ -314,8 +317,9 @@ export class WorkflowRunner {
workflowExecution = workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.destinationNode,
data.pinData,
data.dirtyNodeNames,
data.destinationNode,
);
} else {
workflowExecution = workflowExecute.runPartialWorkflow(
@ -401,7 +405,7 @@ export class WorkflowRunner {
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await hooks.executeHookFunctions('workflowExecuteBefore', []);
await hooks.executeHookFunctions('workflowExecuteBefore', [undefined, data.executionData]);
} catch (error) {
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.

View file

@ -89,7 +89,13 @@ export class WorkflowExecutionService {
}
async executeManually(
{ workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload,
{
workflowData,
runData,
startNodes,
destinationNode,
dirtyNodeNames,
}: WorkflowRequest.ManualRunPayload,
user: User,
pushRef?: string,
partialExecutionVersion?: string,
@ -137,6 +143,7 @@ export class WorkflowExecutionService {
workflowData,
userId: user.id,
partialExecutionVersion: partialExecutionVersion ?? '0',
dirtyNodeNames,
};
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];

View file

@ -22,6 +22,7 @@ export declare namespace WorkflowRequest {
runData: IRunData;
startNodes?: StartNodeData[];
destinationNode?: string;
dirtyNodeNames?: string[];
};
type Create = AuthenticatedRequest<{}, {}, CreateUpdatePayload>;

Some files were not shown because too many files have changed in this diff Show more