From 663dfb48defd944f88f0ecc4f3347ea4f8a7c831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 27 Jan 2025 11:52:18 +0100 Subject: [PATCH] fix(Postgres PGVector Store Node): Release postgres connections back to the pool (#12723) Co-authored-by: Oleg Ivaniv --- .../VectorStoreInMemory.node.ts | 3 +- .../VectorStorePGVector.node.ts | 10 +- .../VectorStorePinecone.node.ts | 2 +- .../VectorStoreQdrant.node.ts | 2 +- .../VectorStoreSupabase.node.ts | 2 +- .../VectorStoreZep/VectorStoreZep.node.ts | 2 +- .../shared/createVectorStoreNode.ts | 153 ++++++++++-------- 7 files changed, 102 insertions(+), 72 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreInMemory/VectorStoreInMemory.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreInMemory/VectorStoreInMemory.node.ts index 0323478ee8..d08bc2bab2 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreInMemory/VectorStoreInMemory.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreInMemory/VectorStoreInMemory.node.ts @@ -1,3 +1,4 @@ +import type { MemoryVectorStore } from 'langchain/vectorstores/memory'; import type { INodeProperties } from 'n8n-workflow'; import { createVectorStoreNode } from '../shared/createVectorStoreNode'; @@ -20,7 +21,7 @@ const insertFields: INodeProperties[] = [ }, ]; -export class VectorStoreInMemory extends createVectorStoreNode({ +export class VectorStoreInMemory extends createVectorStoreNode({ meta: { displayName: 'In-Memory Vector Store', name: 'vectorStoreInMemory', diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePGVector/VectorStorePGVector.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePGVector/VectorStorePGVector.node.ts index 852453b622..7b2ab7664d 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePGVector/VectorStorePGVector.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePGVector/VectorStorePGVector.node.ts @@ -213,7 +213,7 @@ class ExtendedPGVectorStore extends PGVectorStore { } } -export class VectorStorePGVector extends createVectorStoreNode({ +export class VectorStorePGVector extends createVectorStoreNode({ meta: { description: 'Work with your data in Postgresql with the PGVector extension', icon: 'file:postgres.svg', @@ -274,6 +274,7 @@ export class VectorStorePGVector extends createVectorStoreNode({ return await ExtendedPGVectorStore.initialize(embeddings, config); }, + async populateVectorStore(context, embeddings, documents, itemIndex) { // NOTE: if you are to create the HNSW index before use, you need to consider moving the distanceStrategy field to // shared fields, because you need that strategy when creating the index. @@ -307,6 +308,11 @@ export class VectorStorePGVector extends createVectorStoreNode({ metadataColumnName: 'metadata', }) as ColumnOptions; - await PGVectorStore.fromDocuments(documents, embeddings, config); + const vectorStore = await PGVectorStore.fromDocuments(documents, embeddings, config); + vectorStore.client?.release(); + }, + + releaseVectorStoreClient(vectorStore) { + vectorStore.client?.release(); }, }) {} diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePinecone/VectorStorePinecone.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePinecone/VectorStorePinecone.node.ts index 5a11acea24..61761a54ec 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePinecone/VectorStorePinecone.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStorePinecone/VectorStorePinecone.node.ts @@ -51,7 +51,7 @@ const insertFields: INodeProperties[] = [ }, ]; -export class VectorStorePinecone extends createVectorStoreNode({ +export class VectorStorePinecone extends createVectorStoreNode({ meta: { displayName: 'Pinecone Vector Store', name: 'vectorStorePinecone', diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreQdrant/VectorStoreQdrant.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreQdrant/VectorStoreQdrant.node.ts index 988f607ad7..e18cc4988e 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreQdrant/VectorStoreQdrant.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreQdrant/VectorStoreQdrant.node.ts @@ -79,7 +79,7 @@ const retrieveFields: INodeProperties[] = [ }, ]; -export class VectorStoreQdrant extends createVectorStoreNode({ +export class VectorStoreQdrant extends createVectorStoreNode({ meta: { displayName: 'Qdrant Vector Store', name: 'vectorStoreQdrant', diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreSupabase/VectorStoreSupabase.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreSupabase/VectorStoreSupabase.node.ts index a462ff8cf6..6ec3975ebd 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreSupabase/VectorStoreSupabase.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreSupabase/VectorStoreSupabase.node.ts @@ -41,7 +41,7 @@ const retrieveFields: INodeProperties[] = [ const updateFields: INodeProperties[] = [...insertFields]; -export class VectorStoreSupabase extends createVectorStoreNode({ +export class VectorStoreSupabase extends createVectorStoreNode({ meta: { description: 'Work with your data in Supabase Vector Store', icon: 'file:supabase.svg', diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreZep/VectorStoreZep.node.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreZep/VectorStoreZep.node.ts index 1372d54f6e..5c973002b2 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreZep/VectorStoreZep.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/VectorStoreZep/VectorStoreZep.node.ts @@ -46,7 +46,7 @@ const retrieveFields: INodeProperties[] = [ }, ]; -export class VectorStoreZep extends createVectorStoreNode({ +export class VectorStoreZep extends createVectorStoreNode({ meta: { displayName: 'Zep Vector Store', name: 'vectorStoreZep', diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts index e393d32e55..ecf2e64a81 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts @@ -49,7 +49,7 @@ interface NodeMeta { operationModes?: NodeOperationMode[]; } -export interface VectorStoreNodeConstructorArgs { +export interface VectorStoreNodeConstructorArgs { meta: NodeMeta; methods?: { listSearch?: { @@ -77,7 +77,8 @@ export interface VectorStoreNodeConstructorArgs { filter: Record | undefined, embeddings: Embeddings, itemIndex: number, - ) => Promise; + ) => Promise; + releaseVectorStoreClient?: (vectorStore: T) => void; } function transformDescriptionForOperationMode( @@ -90,11 +91,15 @@ function transformDescriptionForOperationMode( })); } -function isUpdateSupported(args: VectorStoreNodeConstructorArgs): boolean { +function isUpdateSupported( + args: VectorStoreNodeConstructorArgs, +): boolean { return args.meta.operationModes?.includes('update') ?? false; } -function getOperationModeOptions(args: VectorStoreNodeConstructorArgs): INodePropertyOptions[] { +function getOperationModeOptions( + args: VectorStoreNodeConstructorArgs, +): INodePropertyOptions[] { const enabledOperationModes = args.meta.operationModes ?? DEFAULT_OPERATION_MODES; const allOptions = [ @@ -137,7 +142,9 @@ function getOperationModeOptions(args: VectorStoreNodeConstructorArgs): INodePro ); } -export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => +export const createVectorStoreNode = ( + args: VectorStoreNodeConstructorArgs, +) => class VectorStoreNodeType implements INodeType { description: INodeTypeDescription = { displayName: args.meta.displayName, @@ -334,38 +341,42 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => embeddings, itemIndex, ); - const prompt = this.getNodeParameter('prompt', itemIndex) as string; - const topK = this.getNodeParameter('topK', itemIndex, 4) as number; + try { + const prompt = this.getNodeParameter('prompt', itemIndex) as string; + const topK = this.getNodeParameter('topK', itemIndex, 4) as number; - const embeddedPrompt = await embeddings.embedQuery(prompt); - const docs = await vectorStore.similaritySearchVectorWithScore( - embeddedPrompt, - topK, - filter, - ); + const embeddedPrompt = await embeddings.embedQuery(prompt); + const docs = await vectorStore.similaritySearchVectorWithScore( + embeddedPrompt, + topK, + filter, + ); - const includeDocumentMetadata = this.getNodeParameter( - 'includeDocumentMetadata', - itemIndex, - true, - ) as boolean; + const includeDocumentMetadata = this.getNodeParameter( + 'includeDocumentMetadata', + itemIndex, + true, + ) as boolean; - const serializedDocs = docs.map(([doc, score]) => { - const document = { - pageContent: doc.pageContent, - ...(includeDocumentMetadata ? { metadata: doc.metadata } : {}), - }; + const serializedDocs = docs.map(([doc, score]) => { + const document = { + pageContent: doc.pageContent, + ...(includeDocumentMetadata ? { metadata: doc.metadata } : {}), + }; - return { - json: { document, score }, - pairedItem: { - item: itemIndex, - }, - }; - }); + return { + json: { document, score }, + pairedItem: { + item: itemIndex, + }, + }; + }); - resultData.push(...serializedDocs); - logAiEvent(this, 'ai-vector-store-searched', { query: prompt }); + resultData.push(...serializedDocs); + logAiEvent(this, 'ai-vector-store-searched', { query: prompt }); + } finally { + args.releaseVectorStoreClient?.(vectorStore); + } } return [resultData]; @@ -427,24 +438,28 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => itemIndex, ); - const { processedDocuments, serializedDocuments } = await processDocument( - loader, - itemData, - itemIndex, - ); + try { + const { processedDocuments, serializedDocuments } = await processDocument( + loader, + itemData, + itemIndex, + ); - if (processedDocuments?.length !== 1) { - throw new NodeOperationError(this.getNode(), 'Single document per item expected'); + if (processedDocuments?.length !== 1) { + throw new NodeOperationError(this.getNode(), 'Single document per item expected'); + } + + resultData.push(...serializedDocuments); + + // Use ids option to upsert instead of insert + await vectorStore.addDocuments(processedDocuments, { + ids: [documentId], + }); + + logAiEvent(this, 'ai-vector-store-updated'); + } finally { + args.releaseVectorStoreClient?.(vectorStore); } - - resultData.push(...serializedDocuments); - - // Use ids option to upsert instead of insert - await vectorStore.addDocuments(processedDocuments, { - ids: [documentId], - }); - - logAiEvent(this, 'ai-vector-store-updated'); } return [resultData]; @@ -468,6 +483,9 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => const vectorStore = await args.getVectorStoreClient(this, filter, embeddings, itemIndex); return { response: logWrapper(vectorStore, this), + closeFunction: async () => { + args.releaseVectorStoreClient?.(vectorStore); + }, }; } @@ -491,23 +509,28 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => embeddings, itemIndex, ); - const embeddedPrompt = await embeddings.embedQuery(input); - const documents = await vectorStore.similaritySearchVectorWithScore( - embeddedPrompt, - topK, - filter, - ); - return documents - .map((document) => { - if (includeDocumentMetadata) { - return { type: 'text', text: JSON.stringify(document[0]) }; - } - return { - type: 'text', - text: JSON.stringify({ pageContent: document[0].pageContent }), - }; - }) - .filter((document) => !!document); + + try { + const embeddedPrompt = await embeddings.embedQuery(input); + const documents = await vectorStore.similaritySearchVectorWithScore( + embeddedPrompt, + topK, + filter, + ); + return documents + .map((document) => { + if (includeDocumentMetadata) { + return { type: 'text', text: JSON.stringify(document[0]) }; + } + return { + type: 'text', + text: JSON.stringify({ pageContent: document[0].pageContent }), + }; + }) + .filter((document) => !!document); + } finally { + args.releaseVectorStoreClient?.(vectorStore); + } }, });