merge in Jan's branch to master, handle conflicts

This commit is contained in:
Mutasem Aldmour 2024-11-06 17:39:59 +01:00
parent 973e48ffea
commit c49c4bb55a
9 changed files with 228 additions and 59 deletions

View file

@ -17,6 +17,7 @@ import { Document } from '@langchain/core/documents';
import type { SetField, SetNodeOptions } from 'n8n-nodes-base/dist/nodes/Set/v2/helpers/interfaces';
import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import get from 'lodash/get';
import type { CallbackManagerForRetrieverRun } from '@langchain/core/callbacks/manager';
import { logWrapper } from '../../../utils/logWrapper';
@ -293,6 +294,8 @@ export class RetrieverWorkflow implements INodeType {
};
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const workflowProxy = this.getWorkflowDataProxy(0);
class WorkflowRetriever extends BaseRetriever {
lc_namespace = ['n8n-nodes-langchain', 'retrievers', 'workflow'];
@ -384,21 +387,30 @@ export class RetrieverWorkflow implements INodeType {
const items = [newItem] as INodeExecutionData[];
let receivedItems: INodeExecutionData[][];
let receivedData;
try {
receivedItems = (await this.executeFunctions.executeWorkflow(
receivedData = await this.executeFunctions.executeWorkflow(
workflowInfo,
items,
config?.getChild(),
)) as INodeExecutionData[][];
{
startMetadata: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
},
);
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
throw new NodeOperationError(this.executeFunctions.getNode(), error as Error);
}
// eslint-disable-next-line lodash/path-style
const receivedItems = get(receivedData, ['data', 0]) ?? [];
const returnData: Document[] = [];
for (const [index, itemData] of receivedItems[0].entries()) {
for (const [index, itemData] of receivedItems.entries()) {
const pageContent = objectToString(itemData.json);
returnData.push(
new Document({

View file

@ -14,8 +14,10 @@ import type {
ISupplyDataFunctions,
SupplyData,
ExecutionError,
ExecuteWorkflowData,
IDataObject,
INodeParameterResourceLocator,
ITaskMetadata,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, jsonParse } from 'n8n-workflow';
@ -360,6 +362,7 @@ export class ToolWorkflow implements INodeType {
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const name = this.getNodeParameter('name', itemIndex) as string;
const description = this.getNodeParameter('description', itemIndex) as string;
let executionId: string | undefined = undefined;
const useSchema = this.getNodeParameter('specifyInputSchema', itemIndex) as boolean;
let tool: DynamicTool | DynamicStructuredTool | undefined = undefined;
@ -440,13 +443,17 @@ export class ToolWorkflow implements INodeType {
const items = [newItem] as INodeExecutionData[];
let receivedData: INodeExecutionData;
const workflowProxy = this.getWorkflowDataProxy(0);
let receivedData: ExecuteWorkflowData;
try {
receivedData = (await this.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
)) as INodeExecutionData;
receivedData = await this.executeWorkflow(workflowInfo, items, runManager?.getChild(), {
startMetadata: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
});
executionId = receivedData.executionId;
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
@ -454,6 +461,7 @@ export class ToolWorkflow implements INodeType {
}
const response: string | undefined = get(receivedData, [
'data',
0,
0,
'json',
@ -503,10 +511,22 @@ export class ToolWorkflow implements INodeType {
response = `There was an error: "${executionError.message}"`;
}
let metadata: ITaskMetadata | undefined;
if (executionId) {
metadata = {
executionId,
};
}
if (executionError) {
void this.addOutputData(NodeConnectionType.AiTool, index, executionError);
void this.addOutputData(NodeConnectionType.AiTool, index, executionError, metadata);
} else {
void this.addOutputData(NodeConnectionType.AiTool, index, [[{ json: { response } }]]);
void this.addOutputData(
NodeConnectionType.AiTool,
index,
[[{ json: { response } }]],
metadata,
);
}
return response;
};

View file

@ -1,6 +1,5 @@
import type { Scope } from '@n8n/permissions';
import type { Application } from 'express';
import type { WorkflowExecute } from 'n8n-core';
import type {
ExecutionError,
ICredentialDataDecryptedObject,
@ -14,7 +13,6 @@ import type {
ITelemetryTrackProperties,
IWorkflowBase,
CredentialLoadingDetails,
Workflow,
WorkflowExecuteMode,
ExecutionStatus,
ExecutionSummary,
@ -296,12 +294,6 @@ export interface IWorkflowErrorData {
};
}
export interface IWorkflowExecuteProcess {
startedAt: Date;
workflow: Workflow;
workflowExecute: WorkflowExecute;
}
export interface IWorkflowStatisticsDataLoaded {
dataLoaded: boolean;
}

View file

@ -36,6 +36,8 @@ import type {
ExecuteWorkflowOptions,
IWorkflowExecutionDataProcess,
EnvProviderState,
ITaskMetadata,
ExecuteWorkflowData,
} from 'n8n-workflow';
import { Container } from 'typedi';
@ -45,11 +47,7 @@ import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { AiEventMap, AiEventPayload } from '@/events/maps/ai.event-map';
import { ExternalHooks } from '@/external-hooks';
import type {
IWorkflowExecuteProcess,
IWorkflowErrorData,
UpdateExecutionPayload,
} from '@/interfaces';
import type { IWorkflowErrorData, UpdateExecutionPayload } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
@ -681,6 +679,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
export async function getRunData(
workflowData: IWorkflowBase,
inputData?: INodeExecutionData[],
metadata?: ITaskMetadata,
): Promise<IWorkflowExecutionDataProcess> {
const mode = 'integrated';
@ -700,6 +699,7 @@ export async function getRunData(
data: {
main: [inputData],
},
metadata,
source: null,
});
@ -771,14 +771,12 @@ export async function executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
): Promise<ExecuteWorkflowData> {
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const eventService = Container.get(EventService);
const executionRepository = Container.get(ExecutionRepository);
const workflowData =
options.loadedWorkflowData ??
@ -796,10 +794,44 @@ export async function executeWorkflow(
settings: workflowData.settings,
});
const runData = options.loadedRunData ?? (await getRunData(workflowData, options.inputData));
const runData =
options.loadedRunData ??
(await getRunData(workflowData, options.inputData, options.startMetadata));
const executionId = await activeExecutions.add(runData);
// We wrap it in another promise that we can depending on the setting return
// the execution ID before the execution is finished
const executionPromise = startExecution(
additionalData,
options,
workflow,
executionId,
runData,
workflowData,
externalHooks,
);
if (options.doNotWaitToFinish) {
return { executionId, data: [null] };
}
return await executionPromise;
}
async function startExecution(
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
workflow: Workflow,
executionId: string,
runData: IWorkflowExecutionDataProcess,
workflowData: IWorkflowBase,
externalHooks: ExternalHooks,
): Promise<ExecuteWorkflowData> {
const eventService = Container.get(EventService);
const activeExecutions = Container.get(ActiveExecutions);
const executionRepository = Container.get(ExecutionRepository);
/**
* A subworkflow execution in queue mode is not enqueued, but rather runs in the
* same worker process as the parent execution. Hence ensure the subworkflow
@ -921,7 +953,10 @@ export async function executeWorkflow(
activeExecutions.finalizeExecution(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
return {
executionId,
data: returnData!.data!.main,
};
}
activeExecutions.finalizeExecution(executionId, data);

View file

@ -33,12 +33,14 @@ import get from 'lodash/get';
import isEmpty from 'lodash/isEmpty';
import merge from 'lodash/merge';
import pick from 'lodash/pick';
import set from 'lodash/set';
import { DateTime } from 'luxon';
import { extension, lookup } from 'mime-types';
import type {
BinaryHelperFunctions,
CloseFunction,
ContextType,
ExecuteWorkflowData,
FieldType,
FileSystemHelperFunctions,
FunctionsBase,
@ -78,6 +80,7 @@ import type {
IRunExecutionData,
ITaskData,
ITaskDataConnections,
ITaskMetadata,
ITriggerFunctions,
IWebhookData,
IWebhookDescription,
@ -2710,6 +2713,7 @@ const addExecutionDataFunctions = async (
sourceNodeName: string,
sourceNodeRunIndex: number,
currentNodeRunIndex: number,
metadata?: ITaskMetadata,
): Promise<void> => {
if (connectionType === NodeConnectionType.Main) {
throw new ApplicationError('Setting type is not supported for main connection', {
@ -2735,6 +2739,7 @@ const addExecutionDataFunctions = async (
if (taskData === undefined) {
return;
}
taskData.metadata = metadata;
}
taskData = taskData!;
@ -3568,6 +3573,13 @@ export function getExecuteTriggerFunctions(
return new TriggerContext(workflow, node, additionalData, mode, activation);
}
function setMetadata(executeData: IExecuteData, key: string, value: string) {
if (!executeData.metadata) {
executeData.metadata = {};
}
set(executeData.metadata, key, value);
}
/**
* Returns the execute functions regular nodes have access to.
*/
@ -3603,6 +3615,9 @@ export function getExecuteFunctions(
itemIndex,
),
getExecuteData: () => executeData,
setMetadata: (key: string, value: string): void => {
return setMetadata(executeData, key, value);
},
continueOnFail: () => {
return continueOnFail(node);
},
@ -3624,23 +3639,28 @@ export function getExecuteFunctions(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
): Promise<any> {
options?: {
doNotWaitToFinish?: boolean;
startMetadata?: ITaskMetadata;
},
): Promise<ExecuteWorkflowData> {
return await additionalData
.executeWorkflow(workflowInfo, additionalData, {
...options,
parentWorkflowId: workflow.id?.toString(),
inputData,
parentWorkflowSettings: workflow.settings,
node,
parentCallbackManager,
})
.then(
async (result) =>
await Container.get(BinaryDataService).duplicateBinaryData(
.then(async (result) => {
const data = await Container.get(BinaryDataService).duplicateBinaryData(
workflow.id,
additionalData.executionId!,
result,
),
result.data,
);
return { ...result, data };
});
},
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
@ -3834,6 +3854,7 @@ export function getExecuteFunctions(
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][] | ExecutionBaseError,
metadata?: ITaskMetadata,
): void {
addExecutionDataFunctions(
'output',
@ -3845,6 +3866,7 @@ export function getExecuteFunctions(
node.name,
runIndex,
currentNodeRunIndex,
metadata,
).catch((error) => {
Logger.warn(
`There was a problem logging output data of node "${this.getNode().name}": ${
@ -3953,7 +3975,11 @@ export function getSupplyDataFunctions(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
) =>
options?: {
doNotWaitToFinish?: boolean;
startMetadata?: ITaskMetadata;
},
): Promise<ExecuteWorkflowData> =>
await additionalData
.executeWorkflow(workflowInfo, additionalData, {
parentWorkflowId: workflow.id?.toString(),
@ -3961,15 +3987,16 @@ export function getSupplyDataFunctions(
parentWorkflowSettings: workflow.settings,
node,
parentCallbackManager,
...options,
})
.then(
async (result) =>
await Container.get(BinaryDataService).duplicateBinaryData(
.then(async (result) => {
const data = await Container.get(BinaryDataService).duplicateBinaryData(
workflow.id,
additionalData.executionId!,
result,
),
),
result.data,
);
return { ...result, data };
}),
getNodeOutputs() {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => {

View file

@ -1448,6 +1448,7 @@ export class WorkflowExecute {
startTime,
executionTime: new Date().getTime() - startTime,
source: !executionData.source ? [] : executionData.source.main,
metadata: executionData.metadata,
executionStatus: 'success',
};

View file

@ -1,3 +1,4 @@
import set from 'lodash/set';
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
@ -36,6 +37,13 @@ import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { NodeExecutionContext } from './node-execution-context';
function setMetadata(executeData: IExecuteData, key: string, value: string) {
if (!executeData.metadata) {
executeData.metadata = {};
}
set(executeData.metadata, key, value);
}
export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions {
readonly helpers: IExecuteSingleFunctions['helpers'];
@ -79,6 +87,10 @@ export class ExecuteSingleContext extends NodeExecutionContext implements IExecu
this.abortSignal?.addEventListener('abort', fn);
}
setMetadata(key: string, value: string): void {
return setMetadata(this.executeData, key, value);
}
continueOnFail() {
return continueOnFail(this.node);
}

View file

@ -1,5 +1,6 @@
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
ExecuteWorkflowData,
IExecuteFunctions,
INodeExecutionData,
INodeType,
@ -209,8 +210,10 @@ export class ExecuteWorkflow implements INodeType {
const mode = this.getNodeParameter('mode', 0, false) as string;
const items = this.getInputData();
const workflowProxy = this.getWorkflowDataProxy(0);
if (mode === 'each') {
let returnData: INodeExecutionData[][] = [];
const returnData: INodeExecutionData[][] = [];
for (let i = 0; i < items.length; i++) {
try {
@ -222,14 +225,26 @@ export class ExecuteWorkflow implements INodeType {
const workflowInfo = await getWorkflowInfo.call(this, source, i);
if (waitForSubWorkflow) {
const workflowResult: INodeExecutionData[][] = await this.executeWorkflow(
const executionResult: ExecuteWorkflowData = await this.executeWorkflow(
workflowInfo,
[items[i]],
undefined,
{
startMetadata: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
},
);
const workflowResult = executionResult.data as INodeExecutionData[][];
for (const [outputIndex, outputData] of workflowResult.entries()) {
for (const item of outputData) {
item.pairedItem = { item: i };
item.metadata = {
executionId: executionResult.executionId,
workflowId: workflowInfo.id,
};
}
if (returnData[outputIndex] === undefined) {
@ -239,8 +254,29 @@ export class ExecuteWorkflow implements INodeType {
returnData[outputIndex].push(...outputData);
}
} else {
void this.executeWorkflow(workflowInfo, [items[i]]);
returnData = [items];
const executionResult: ExecuteWorkflowData = await this.executeWorkflow(
workflowInfo,
[items[i]],
undefined,
{
doNotWaitToFinish: true,
startMetadata: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
},
);
if (returnData.length === 0) {
returnData.push([]);
}
returnData[0].push({
...items[i],
metadata: {
executionId: executionResult.executionId,
},
});
}
} catch (error) {
if (this.continueOnFail()) {
@ -268,15 +304,29 @@ export class ExecuteWorkflow implements INodeType {
) as boolean;
const workflowInfo = await getWorkflowInfo.call(this, source);
const executionResult: ExecuteWorkflowData = await this.executeWorkflow(
workflowInfo,
items,
undefined,
{
doNotWaitToFinish: !waitForSubWorkflow,
startMetadata: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
},
);
this.setMetadata('executionId', executionResult.executionId);
if (workflowInfo.id !== undefined) {
this.setMetadata('workflowId', workflowInfo.id);
}
if (!waitForSubWorkflow) {
void this.executeWorkflow(workflowInfo, items);
return [items];
}
const workflowResult: INodeExecutionData[][] = await this.executeWorkflow(
workflowInfo,
items,
);
const workflowResult = executionResult.data as INodeExecutionData[][];
const fallbackPairedItemData = generatePairedItemData(items.length);

View file

@ -484,6 +484,7 @@ export interface ISourceDataConnections {
export interface IExecuteData {
data: ITaskDataConnections;
metadata?: ITaskMetadata;
node: INode;
source: ITaskDataConnectionsSource | null;
}
@ -935,6 +936,7 @@ export type ContextType = 'flow' | 'node';
type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
continueOnFail(): boolean;
setMetadata(key: string, value: string): void;
evaluateExpression(expression: string, itemIndex: number): NodeParameterValueType;
getContext(type: ContextType): IContextObject;
getExecuteData(): IExecuteData;
@ -952,7 +954,11 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
): Promise<any>;
options?: {
doNotWaitToFinish?: boolean;
startMetadata?: ITaskMetadata;
},
): Promise<ExecuteWorkflowData>;
getInputConnectionData(
inputName: NodeConnectionType,
itemIndex: number,
@ -975,6 +981,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][] | ExecutionError,
metadata?: ITaskMetadata,
): void;
nodeHelpers: NodeHelperFunctions;
@ -1207,6 +1214,10 @@ export interface INodeExecutionData {
binary?: IBinaryKeyData;
error?: NodeApiError | NodeOperationError;
pairedItem?: IPairedItemData | IPairedItemData[] | number;
metadata?: {
executionId?: string;
workflowId?: string;
};
index?: number;
}
@ -1543,6 +1554,11 @@ export interface ITriggerResponse {
manualTriggerResponse?: Promise<INodeExecutionData[][]>;
}
export interface ExecuteWorkflowData {
executionId: string;
data: Array<INodeExecutionData[] | null>;
}
export type WebhookSetupMethodNames = 'checkExists' | 'create' | 'delete';
export namespace MultiPartFormData {
@ -2127,6 +2143,8 @@ export interface ITaskSubRunMetadata {
}
export interface ITaskMetadata {
executionId?: string;
workflowId?: string;
subRun?: ITaskSubRunMetadata[];
}
@ -2254,6 +2272,8 @@ export interface ExecuteWorkflowOptions {
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
startMetadata?: ITaskMetadata;
doNotWaitToFinish?: boolean;
}
export type AiEvent =
@ -2287,7 +2307,7 @@ export interface IWorkflowExecuteAdditionalData {
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: ExecuteWorkflowOptions,
) => Promise<any>;
) => Promise<ExecuteWorkflowData>;
executionId?: string;
restartExecutionId?: string;
hooks?: WorkflowHooks;