fix: Chat triggers don't work with the new partial execution flow (#11952)

This commit is contained in:
Danny Martini 2024-12-04 15:33:46 +01:00 committed by GitHub
parent 0e26f58ae6
commit 2b6a72f128
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 360 additions and 80 deletions

View file

@ -1,4 +1,23 @@
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';
import { DirectedGraph, WorkflowExecute } from 'n8n-core';
import * as core from 'n8n-core';
import type {
IExecuteData,
INode,
IRun,
ITaskData,
IWaitingForExecution,
IWaitingForExecutionSource,
IWorkflowExecutionDataProcess,
StartNodeData,
} from 'n8n-workflow';
import {
Workflow,
WorkflowHooks,
type ExecutionError,
type IWorkflowExecuteHooks,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import Container from 'typedi';
import { ActiveExecutions } from '@/active-executions';
@ -6,6 +25,7 @@ import config from '@/config';
import type { User } from '@/databases/entities/user';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { Telemetry } from '@/telemetry';
import { PermissionChecker } from '@/user-management/permission-checker';
import { WorkflowRunner } from '@/workflow-runner';
import { mockInstance } from '@test/mocking';
import { createExecution } from '@test-integration/db/executions';
@ -43,9 +63,11 @@ afterAll(() => {
beforeEach(async () => {
await testDb.truncate(['Workflow', 'SharedWorkflow']);
jest.clearAllMocks();
});
test('processError should return early in Bull stalled edge case', async () => {
describe('processError', () => {
test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
@ -63,9 +85,9 @@ test('processError should return early in Bull stalled edge case', async () => {
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});
});
test('processError should return early if the error is `ExecutionNotFoundError`', async () => {
test('processError should return early if the error is `ExecutionNotFoundError`', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution({ status: 'success', finished: true }, workflow);
await runner.processError(
@ -76,9 +98,9 @@ test('processError should return early if the error is `ExecutionNotFoundError`'
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});
});
test('processError should process error', async () => {
test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
@ -100,4 +122,79 @@ test('processError should process error', async () => {
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
});
});
describe('run', () => {
it('uses recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom with data is sent', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(PermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce(
new PCancelable(() => {
return mock<IRun>();
}),
);
jest.spyOn(Workflow.prototype, 'getNode').mockReturnValueOnce(mock<INode>());
jest.spyOn(DirectedGraph, 'fromWorkflow').mockReturnValueOnce(new DirectedGraph());
const recreateNodeExecutionStackSpy = jest
.spyOn(core, 'recreateNodeExecutionStack')
.mockReturnValueOnce({
nodeExecutionStack: mock<IExecuteData[]>(),
waitingExecution: mock<IWaitingForExecution>(),
waitingExecutionSource: mock<IWaitingForExecutionSource>(),
});
const data = mock<IWorkflowExecutionDataProcess>({
triggerToStartFrom: { name: 'trigger', data: mock<ITaskData>() },
workflowData: { nodes: [] },
executionData: undefined,
startNodes: [mock<StartNodeData>()],
destinationNode: undefined,
});
// ACT
await runner.run(data);
// ASSERT
expect(recreateNodeExecutionStackSpy).toHaveBeenCalled();
});
it('does not use recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom without data is sent', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(PermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();
jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce(
new PCancelable(() => {
return mock<IRun>();
}),
);
const recreateNodeExecutionStackSpy = jest.spyOn(core, 'recreateNodeExecutionStack');
const data = mock<IWorkflowExecutionDataProcess>({
triggerToStartFrom: { name: 'trigger', data: undefined },
workflowData: { nodes: [] },
executionData: undefined,
startNodes: [mock<StartNodeData>()],
destinationNode: undefined,
});
// ACT
await runner.run(data);
// ASSERT
expect(recreateNodeExecutionStackSpy).not.toHaveBeenCalled();
});
});

View file

@ -1,6 +1,11 @@
import type * as express from 'express';
import { mock } from 'jest-mock-extended';
import type { IWebhookData, IWorkflowExecuteAdditionalData, Workflow } from 'n8n-workflow';
import type { ITaskData } from 'n8n-workflow';
import {
type IWebhookData,
type IWorkflowExecuteAdditionalData,
type Workflow,
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import { generateNanoId } from '@/databases/utils/generators';
@ -43,12 +48,16 @@ describe('TestWebhooks', () => {
jest.useFakeTimers();
});
beforeEach(() => {
jest.clearAllMocks();
});
describe('needsWebhook()', () => {
const args: Parameters<typeof testWebhooks.needsWebhook> = [
const args: Parameters<typeof testWebhooks.needsWebhook>[0] = {
userId,
workflowEntity,
mock<IWorkflowExecuteAdditionalData>(),
];
additionalData: mock<IWorkflowExecuteAdditionalData>(),
};
test('if webhook is needed, should register then create webhook and return true', async () => {
const workflow = mock<Workflow>();
@ -56,7 +65,7 @@ describe('TestWebhooks', () => {
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
const needsWebhook = await testWebhooks.needsWebhook(...args);
const needsWebhook = await testWebhooks.needsWebhook(args);
const [registerOrder] = registrations.register.mock.invocationCallOrder;
const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder;
@ -72,7 +81,7 @@ describe('TestWebhooks', () => {
jest.spyOn(registrations, 'register').mockRejectedValueOnce(new Error(msg));
registrations.getAllRegistrations.mockResolvedValue([]);
const needsWebhook = testWebhooks.needsWebhook(...args);
const needsWebhook = testWebhooks.needsWebhook(args);
await expect(needsWebhook).rejects.toThrowError(msg);
});
@ -81,10 +90,55 @@ describe('TestWebhooks', () => {
webhook.webhookDescription.restartWebhook = true;
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
const result = await testWebhooks.needsWebhook(...args);
const result = await testWebhooks.needsWebhook(args);
expect(result).toBe(false);
});
test('returns false if a triggerToStartFrom with triggerData is given', async () => {
const workflow = mock<Workflow>();
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
const needsWebhook = await testWebhooks.needsWebhook({
...args,
triggerToStartFrom: {
name: 'trigger',
data: mock<ITaskData>(),
},
});
expect(needsWebhook).toBe(false);
});
test('returns true, registers and then creates webhook if triggerToStartFrom is given with no triggerData', async () => {
// ARRANGE
const workflow = mock<Workflow>();
const webhook2 = mock<IWebhookData>({
node: 'trigger',
httpMethod,
path,
workflowId: workflowEntity.id,
userId,
});
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook, webhook2]);
// ACT
const needsWebhook = await testWebhooks.needsWebhook({
...args,
triggerToStartFrom: { name: 'trigger' },
});
// ASSERT
const [registerOrder] = registrations.register.mock.invocationCallOrder;
const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder;
expect(registerOrder).toBeLessThan(createOrder);
expect(registrations.register.mock.calls[0][0].webhook.node).toBe(webhook2.node);
expect(workflow.createWebhookIfNotExists.mock.calls[0][0].node).toBe(webhook2.node);
expect(needsWebhook).toBe(true);
});
});
describe('executeWebhook()', () => {

View file

@ -23,6 +23,7 @@ import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrati
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
import * as WebhookHelpers from '@/webhooks/webhook-helpers';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import type { WorkflowRequest } from '@/workflows/workflow.request';
import type {
IWebhookResponseCallbackData,
@ -218,25 +219,48 @@ export class TestWebhooks implements IWebhookManager {
* Return whether activating a workflow requires listening for webhook calls.
* For every webhook call to listen for, also activate the webhook.
*/
async needsWebhook(
userId: string,
workflowEntity: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
runData?: IRunData,
pushRef?: string,
destinationNode?: string,
) {
async needsWebhook(options: {
userId: string;
workflowEntity: IWorkflowDb;
additionalData: IWorkflowExecuteAdditionalData;
runData?: IRunData;
pushRef?: string;
destinationNode?: string;
triggerToStartFrom?: WorkflowRequest.ManualRunPayload['triggerToStartFrom'];
}) {
const {
userId,
workflowEntity,
additionalData,
runData,
pushRef,
destinationNode,
triggerToStartFrom,
} = options;
if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity);
const workflow = this.toWorkflow(workflowEntity);
const webhooks = WebhookHelpers.getWorkflowWebhooks(
let webhooks = WebhookHelpers.getWorkflowWebhooks(
workflow,
additionalData,
destinationNode,
true,
);
// If we have a preferred trigger with data, we don't have to listen for a
// webhook.
if (triggerToStartFrom?.data) {
return false;
}
// If we have a preferred trigger without data we only want to listen for
// that trigger, not the other ones.
if (triggerToStartFrom) {
webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name);
}
if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) {
return false; // no webhooks found to start a workflow
}

View file

@ -2,7 +2,14 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
import * as a from 'assert/strict';
import {
DirectedGraph,
InstanceSettings,
WorkflowExecute,
filterDisabledNodes,
recreateNodeExecutionStack,
} from 'n8n-core';
import type {
ExecutionError,
IDeferredPromise,
@ -12,6 +19,7 @@ import type {
WorkflowExecuteMode,
WorkflowHooks,
IWorkflowExecutionDataProcess,
IRunExecutionData,
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
@ -203,6 +211,7 @@ export class WorkflowRunner {
}
/** Run the workflow in current process */
// eslint-disable-next-line complexity
private async runMainProcess(
executionId: string,
data: IWorkflowExecutionDataProcess,
@ -286,12 +295,50 @@ export class WorkflowRunner {
data.executionData,
);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
);
const startNodes = data.startNodes.map((data) => {
const node = workflow.getNode(data.name);
a.ok(node, `Could not find a node named "${data.name}" in the workflow.`);
return node;
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
// be removed and the previous one can be merged into
// `workflowExecute.runPartialWorkflow2`.
// Partial executions then require either a destination node from which
// everything else can be derived, or a triggerToStartFrom with
// triggerData.
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});

View file

@ -95,6 +95,7 @@ export class WorkflowExecutionService {
startNodes,
destinationNode,
dirtyNodeNames,
triggerToStartFrom,
}: WorkflowRequest.ManualRunPayload,
user: User,
pushRef?: string,
@ -117,14 +118,15 @@ export class WorkflowExecutionService {
) {
const additionalData = await WorkflowExecuteAdditionalData.getBase(user.id);
const needsWebhook = await this.testWebhooks.needsWebhook(
user.id,
workflowData,
const needsWebhook = await this.testWebhooks.needsWebhook({
userId: user.id,
workflowEntity: workflowData,
additionalData,
runData,
pushRef,
destinationNode,
);
triggerToStartFrom,
});
if (needsWebhook) return { waitingForWebhook: true };
}
@ -144,6 +146,7 @@ export class WorkflowExecutionService {
userId: user.id,
partialExecutionVersion: partialExecutionVersion ?? '0',
dirtyNodeNames,
triggerToStartFrom,
};
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];

View file

@ -1,4 +1,11 @@
import type { INode, IConnections, IWorkflowSettings, IRunData, StartNodeData } from 'n8n-workflow';
import type {
INode,
IConnections,
IWorkflowSettings,
IRunData,
StartNodeData,
ITaskData,
} from 'n8n-workflow';
import type { IWorkflowDb } from '@/interfaces';
import type { AuthenticatedRequest, ListQuery } from '@/requests';
@ -23,6 +30,10 @@ export declare namespace WorkflowRequest {
startNodes?: StartNodeData[];
destinationNode?: string;
dirtyNodeNames?: string[];
triggerToStartFrom?: {
name: string;
data?: ITaskData;
};
};
type Create = AuthenticatedRequest<{}, {}, CreateUpdatePayload>;

View file

@ -21,3 +21,4 @@ export { BinaryData } from './BinaryData/types';
export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils';
export * from './ExecutionMetadata';
export * from './node-execution-context';
export * from './PartialExecutionUtils';

View file

@ -46,6 +46,7 @@ import type {
StartNodeData,
IPersonalizationSurveyAnswersV4,
AnnotationVote,
ITaskData,
} from 'n8n-workflow';
import type {
@ -201,6 +202,10 @@ export interface IStartRunData {
destinationNode?: string;
runData?: IRunData;
dirtyNodeNames?: string[];
triggerToStartFrom?: {
name: string;
data?: ITaskData;
};
}
export interface ITableData {

View file

@ -8,6 +8,7 @@ import {
type IRunData,
type Workflow,
type IExecuteData,
type ITaskData,
} from 'n8n-workflow';
import { useRootStore } from '@/stores/root.store';
@ -20,6 +21,7 @@ import { useToast } from './useToast';
import { useI18n } from '@/composables/useI18n';
import { useLocalStorage } from '@vueuse/core';
import { ref } from 'vue';
import { mock } from 'vitest-mock-extended';
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
@ -325,6 +327,34 @@ describe('useRunWorkflow({ router })', () => {
);
});
it('should send triggerToStartFrom if triggerNode and nodeData are passed in', async () => {
// ARRANGE
const composable = useRunWorkflow({ router });
const triggerNode = 'Chat Trigger';
const nodeData = mock<ITaskData>();
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(
mock<Workflow>({ getChildNodes: vi.fn().mockReturnValue([]) }),
);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue(
mock<IWorkflowData>({ nodes: [] }),
);
const { runWorkflow } = composable;
// ACT
await runWorkflow({ triggerNode, nodeData });
// ASSERT
expect(workflowsStore.runWorkflow).toHaveBeenCalledWith(
expect.objectContaining({
triggerToStartFrom: {
name: triggerNode,
data: nodeData,
},
}),
);
});
it('does not use the original run data if `PartialExecution.version` is set to 0', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };

View file

@ -150,6 +150,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
let { runData: newRunData } = consolidatedData;
let executedNode: string | undefined;
let triggerToStartFrom: IStartRunData['triggerToStartFrom'];
if (
startNodeNames.length === 0 &&
'destinationNode' in options &&
@ -157,14 +158,16 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
) {
executedNode = options.destinationNode;
startNodeNames.push(options.destinationNode);
} else if ('triggerNode' in options && 'nodeData' in options) {
} else if (options.triggerNode && options.nodeData) {
startNodeNames.push(
...workflow.getChildNodes(options.triggerNode as string, NodeConnectionType.Main, 1),
...workflow.getChildNodes(options.triggerNode, NodeConnectionType.Main, 1),
);
newRunData = {
[options.triggerNode as string]: [options.nodeData],
} as IRunData;
newRunData = { [options.triggerNode]: [options.nodeData] };
executedNode = options.triggerNode;
triggerToStartFrom = {
name: options.triggerNode,
data: options.nodeData,
};
}
// If the destination node is specified, check if it is a chat node or has a chat parent
@ -258,6 +261,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
// data to use and what to ignore.
runData: partialExecutionVersion.value === 1 ? (runData ?? undefined) : newRunData,
startNodes,
triggerToStartFrom,
};
if ('destinationNode' in options) {
startRunData.destinationNode = options.destinationNode;

View file

@ -2284,6 +2284,10 @@ export interface IWorkflowExecutionDataProcess {
*/
partialExecutionVersion?: string;
dirtyNodeNames?: string[];
triggerToStartFrom?: {
name: string;
data?: ITaskData;
};
}
export interface ExecuteWorkflowOptions {