feat(Elasticsearch Node): Add bulk operations for Elasticsearch (#9940)

This commit is contained in:
pemontto 2024-08-07 10:42:07 +01:00 committed by GitHub
parent 34334651e0
commit bf8f848645
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 202 additions and 21 deletions

View file

@ -4,11 +4,16 @@ import type {
INodeExecutionData,
INodeType,
INodeTypeDescription,
JsonObject,
} from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import { jsonParse, NodeApiError } from 'n8n-workflow';
import omit from 'lodash/omit';
import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions';
import {
elasticsearchApiRequest,
elasticsearchApiRequestAllItems,
elasticsearchBulkApiRequest,
} from './GenericFunctions';
import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions';
@ -68,12 +73,14 @@ export class Elasticsearch implements INodeType {
let responseData;
let bulkBody: IDataObject = {};
for (let i = 0; i < items.length; i++) {
const bulkOperation = this.getNodeParameter('options.bulkOperation', i, false);
if (resource === 'document') {
// **********************************************************************
// document
// **********************************************************************
if (operation === 'delete') {
// ----------------------------------------
// document: delete
@ -84,8 +91,17 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const documentId = this.getNodeParameter('documentId', i);
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
delete: {
_index: indexId,
_id: documentId,
},
});
} else {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
}
} else if (operation === 'get') {
// ----------------------------------------
// document: get
@ -223,12 +239,22 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const { documentId } = additionalFields;
if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
index: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += `\n${JSON.stringify(body)}`;
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (operation === 'update') {
// ----------------------------------------
@ -261,7 +287,17 @@ export class Elasticsearch implements INodeType {
const documentId = this.getNodeParameter('documentId', i);
const endpoint = `/${indexId}/_update/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
update: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += `\n${JSON.stringify(body)}`;
} else {
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (resource === 'index') {
// **********************************************************************
@ -341,13 +377,80 @@ export class Elasticsearch implements INodeType {
}
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}
if (!bulkOperation) {
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}
if (Object.keys(bulkBody).length >= 50) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
for (let j = 0; j < responseData.length; j++) {
const itemData = responseData[j];
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);
if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
bulkBody = {};
}
}
if (Object.keys(bulkBody).length) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
for (let j = 0; j < responseData.length; j++) {
const itemData = responseData[j];
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);
if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
}
return [returnData];
}
}

View file

@ -2,13 +2,55 @@ import type {
IExecuteFunctions,
IDataObject,
JsonObject,
IRequestOptions,
IHttpRequestOptions,
IHttpRequestMethods,
} from 'n8n-workflow';
import { NodeApiError } from 'n8n-workflow';
import type { ElasticsearchApiCredentials } from './types';
export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: IDataObject) {
const { baseUrl, ignoreSSLIssues } = (await this.getCredentials(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;
const bulkBody = Object.values(body).flat().join('\n') + '\n';
const options: IHttpRequestOptions = {
method: 'POST',
headers: { 'Content-Type': 'application/x-ndjson' },
body: bulkBody,
url: `${baseUrl}/_bulk`,
skipSslCertificateValidation: ignoreSSLIssues,
returnFullResponse: true,
ignoreHttpStatusErrors: true,
};
const response = await this.helpers.httpRequestWithAuthentication.call(
this,
'elasticsearchApi',
options,
);
if (response.statusCode > 299) {
if (this.continueOnFail()) {
return Object.values(body).map((_) => ({ error: response.body.error }));
} else {
throw new NodeApiError(this.getNode(), { error: response.body.error } as JsonObject);
}
}
return response.body.items.map((item: IDataObject) => {
return {
...(item.index as IDataObject),
...(item.update as IDataObject),
...(item.create as IDataObject),
...(item.delete as IDataObject),
...(item.error as IDataObject),
};
});
}
export async function elasticsearchApiRequest(
this: IExecuteFunctions,
method: IHttpRequestMethods,
@ -20,13 +62,13 @@ export async function elasticsearchApiRequest(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;
const options: IRequestOptions = {
const options: IHttpRequestOptions = {
method,
body,
qs,
uri: `${baseUrl}${endpoint}`,
url: `${baseUrl}${endpoint}`,
json: true,
rejectUnauthorized: !ignoreSSLIssues,
skipSslCertificateValidation: ignoreSSLIssues,
};
if (!Object.keys(body).length) {

View file

@ -81,6 +81,28 @@ export const documentFields: INodeProperties[] = [
},
},
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
displayOptions: {
show: {
resource: ['document'],
operation: ['delete'],
},
},
options: [
{
displayName: 'Bulk Delete',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to delete the document/s',
},
],
},
// ----------------------------------------
// document: get
@ -644,6 +666,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Create',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to create the document/s',
},
{
displayName: 'Pipeline ID',
name: 'pipeline',
@ -802,6 +831,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Update',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to update the document/s',
},
{
displayName: 'Refresh',
name: 'refresh',