n8n/packages/@n8n/nodes-langchain/utils/N8nBinaryLoader.ts

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

241 lines
7.4 KiB
TypeScript
Raw Normal View History

import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
import type {
IBinaryData,
IExecuteFunctions,
INodeExecutionData,
ISupplyDataFunctions,
} from 'n8n-workflow';
feat: AI nodes usability fixes + Summarization Chain V2 (#7949) Fixes: - Refactor connection snapping when dragging and enable it also for non-main connection types - Fix propagation of errors from sub-nodes - Fix chat scrolling when sending/receiving messages - Prevent empty chat messages - Fix sub-node selected styles - Fix output names text overflow Usability improvements: - Auto-add manual chat trigger for agents & chain nodes - Various labels and description updates - Make the output parser input optional for Basic LLM Chain - Summarization Chain V2 with a simplified document loader & text chunking mode #### How to test the change: Example workflow showcasing different operation mode of the new summarization chain: [Summarization_V2.json](https://github.com/n8n-io/n8n/files/13599901/Summarization_V2.json) ## Issues fixed Include links to Github issue or Community forum post or **Linear ticket**: > Important in order to close automatically and provide context to reviewers - https://www.notion.so/n8n/David-Langchain-Posthog-notes-7a9294938420403095f4508f1a21d31d - https://linear.app/n8n/issue/N8N-7070/ux-fixes-batch - https://linear.app/n8n/issue/N8N-7071/ai-sub-node-bugs ## Review / Merge checklist - [x] PR title and summary are descriptive. **Remember, the title automatically goes into the changelog. Use `(no-changelog)` otherwise.** ([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md)) - [x] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up ticket created. - [ ] Tests included. > A bug is not considered fixed, unless a test is added to prevent it from happening again. A feature is not complete without tests. > > *(internal)* You can use Slack commands to trigger [e2e tests](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#a39f9e5ba64a48b58a71d81c837e8227) or [deploy test instance](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#f6a177d32bde4b57ae2da0b8e454bfce) or [deploy early access version on Cloud](https://www.notion.so/n8n/Cloudbot-3dbe779836004972b7057bc989526998?pvs=4#fef2d36ab02247e1a0f65a74f6fb534e). --------- Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Elias Meire <elias@meire.dev>
2023-12-08 04:42:32 -08:00
import { NodeOperationError, BINARY_ENCODING } from 'n8n-workflow';
import type { TextSplitter } from '@langchain/textsplitters';
import type { Document } from '@langchain/core/documents';
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv';
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx';
import { JSONLoader } from 'langchain/document_loaders/fs/json';
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf';
import { TextLoader } from 'langchain/document_loaders/fs/text';
import { EPubLoader } from '@langchain/community/document_loaders/fs/epub';
import { file as tmpFile, type DirectoryResult } from 'tmp-promise';
import { getMetadataFiltersValues } from './helpers';
const SUPPORTED_MIME_TYPES = {
auto: ['*/*'],
pdfLoader: ['application/pdf'],
csvLoader: ['text/csv'],
epubLoader: ['application/epub+zip'],
docxLoader: ['application/vnd.openxmlformats-officedocument.wordprocessingml.document'],
textLoader: ['text/plain', 'text/mdx', 'text/md'],
jsonLoader: ['application/json'],
};
export class N8nBinaryLoader {
constructor(
private context: IExecuteFunctions | ISupplyDataFunctions,
private optionsPrefix = '',
private binaryDataKey = '',
private textSplitter?: TextSplitter,
) {}
async processAll(items?: INodeExecutionData[]): Promise<Document[]> {
const docs: Document[] = [];
if (!items) return [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const processedDocuments = await this.processItem(items[itemIndex], itemIndex);
docs.push(...processedDocuments);
}
return docs;
}
private async validateMimeType(
mimeType: string,
selectedLoader: keyof typeof SUPPORTED_MIME_TYPES,
): Promise<void> {
// Check if loader matches the mime-type of the data
if (selectedLoader !== 'auto' && !SUPPORTED_MIME_TYPES[selectedLoader].includes(mimeType)) {
const neededLoader = Object.keys(SUPPORTED_MIME_TYPES).find((loader) =>
SUPPORTED_MIME_TYPES[loader as keyof typeof SUPPORTED_MIME_TYPES].includes(mimeType),
);
throw new NodeOperationError(
this.context.getNode(),
`Mime type doesn't match selected loader. Please select under "Loader Type": ${neededLoader}`,
);
}
if (!Object.values(SUPPORTED_MIME_TYPES).flat().includes(mimeType)) {
throw new NodeOperationError(this.context.getNode(), `Unsupported mime type: ${mimeType}`);
}
if (
!SUPPORTED_MIME_TYPES[selectedLoader].includes(mimeType) &&
selectedLoader !== 'textLoader' &&
selectedLoader !== 'auto'
) {
throw new NodeOperationError(
this.context.getNode(),
`Unsupported mime type: ${mimeType} for selected loader: ${selectedLoader}`,
);
}
}
private async getFilePathOrBlob(
binaryData: IBinaryData,
mimeType: string,
): Promise<string | Blob> {
if (binaryData.id) {
const binaryBuffer = await this.context.helpers.binaryToBuffer(
await this.context.helpers.getBinaryStream(binaryData.id),
);
return new Blob([binaryBuffer], {
type: mimeType,
});
} else {
return new Blob([Buffer.from(binaryData.data, BINARY_ENCODING)], {
type: mimeType,
});
}
}
private async getLoader(
mimeType: string,
filePathOrBlob: string | Blob,
itemIndex: number,
): Promise<PDFLoader | CSVLoader | EPubLoader | DocxLoader | TextLoader | JSONLoader> {
switch (mimeType) {
case 'application/pdf':
const splitPages = this.context.getNodeParameter(
`${this.optionsPrefix}splitPages`,
itemIndex,
false,
) as boolean;
return new PDFLoader(filePathOrBlob, { splitPages });
case 'text/csv':
const column = this.context.getNodeParameter(
`${this.optionsPrefix}column`,
itemIndex,
null,
) as string;
const separator = this.context.getNodeParameter(
`${this.optionsPrefix}separator`,
itemIndex,
',',
) as string;
return new CSVLoader(filePathOrBlob, { column: column ?? undefined, separator });
case 'application/epub+zip':
// EPubLoader currently does not accept Blobs https://github.com/langchain-ai/langchainjs/issues/1623
let filePath: string;
if (filePathOrBlob instanceof Blob) {
const tmpFileData = await tmpFile({ prefix: 'epub-loader-' });
const bufferData = await filePathOrBlob.arrayBuffer();
await pipeline([new Uint8Array(bufferData)], createWriteStream(tmpFileData.path));
return new EPubLoader(tmpFileData.path);
} else {
filePath = filePathOrBlob;
}
return new EPubLoader(filePath);
case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document':
return new DocxLoader(filePathOrBlob);
case 'text/plain':
return new TextLoader(filePathOrBlob);
case 'application/json':
const pointers = this.context.getNodeParameter(
`${this.optionsPrefix}pointers`,
itemIndex,
'',
) as string;
const pointersArray = pointers.split(',').map((pointer) => pointer.trim());
return new JSONLoader(filePathOrBlob, pointersArray);
default:
return new TextLoader(filePathOrBlob);
}
}
private async loadDocuments(
loader: PDFLoader | CSVLoader | EPubLoader | DocxLoader | TextLoader | JSONLoader,
): Promise<Document[]> {
return this.textSplitter
? await this.textSplitter.splitDocuments(await loader.load())
: await loader.load();
}
private async cleanupTmpFileIfNeeded(
cleanupTmpFile: DirectoryResult['cleanup'] | undefined,
): Promise<void> {
if (cleanupTmpFile) {
await cleanupTmpFile();
}
}
async processItem(item: INodeExecutionData, itemIndex: number): Promise<Document[]> {
const docs: Document[] = [];
const binaryMode = this.context.getNodeParameter('binaryMode', itemIndex, 'allInputData');
if (binaryMode === 'allInputData') {
const binaryData = this.context.getInputData();
for (const data of binaryData) {
if (data.binary) {
const binaryDataKeys = Object.keys(data.binary);
for (const fileKey of binaryDataKeys) {
const processedDocuments = await this.processItemByKey(item, itemIndex, fileKey);
docs.push(...processedDocuments);
}
}
}
} else {
const processedDocuments = await this.processItemByKey(item, itemIndex, this.binaryDataKey);
docs.push(...processedDocuments);
}
return docs;
}
async processItemByKey(
item: INodeExecutionData,
itemIndex: number,
binaryKey: string,
): Promise<Document[]> {
const selectedLoader: keyof typeof SUPPORTED_MIME_TYPES = this.context.getNodeParameter(
'loader',
itemIndex,
'auto',
) as keyof typeof SUPPORTED_MIME_TYPES;
const docs: Document[] = [];
const metadata = getMetadataFiltersValues(this.context, itemIndex);
if (!item) return [];
const binaryData = this.context.helpers.assertBinaryData(itemIndex, binaryKey);
const { mimeType } = binaryData;
await this.validateMimeType(mimeType, selectedLoader);
const filePathOrBlob = await this.getFilePathOrBlob(binaryData, mimeType);
const cleanupTmpFile: DirectoryResult['cleanup'] | undefined = undefined;
const loader = await this.getLoader(mimeType, filePathOrBlob, itemIndex);
const loadedDoc = await this.loadDocuments(loader);
docs.push(...loadedDoc);
if (metadata) {
docs.forEach((document) => {
document.metadata = {
...document.metadata,
...metadata,
};
});
}
await this.cleanupTmpFileIfNeeded(cleanupTmpFile);
return docs;
}
}