diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts index 67c8585226..0c002fb08e 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts @@ -1,21 +1,12 @@ import { IExecuteFunctions } from 'n8n-core'; -import { - ICredentialsDecrypted, - ICredentialTestFunctions, - IDataObject, - INodeCredentialTestResult, - INodeExecutionData, - INodeType, - INodeTypeDescription, - JsonObject, -} from 'n8n-workflow'; +import { IDataObject, INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow'; -import { elasticsearchApiRequest } from './GenericFunctions'; +import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions'; import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions'; -import { DocumentGetAllOptions, ElasticsearchApiCredentials, FieldsUiValues } from './types'; +import { DocumentGetAllOptions, FieldsUiValues } from './types'; import { omit } from 'lodash'; @@ -79,8 +70,6 @@ export class Elasticsearch implements INodeType { // document // ********************************************************************** - // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html - if (operation === 'delete') { // ---------------------------------------- // document: delete @@ -134,6 +123,7 @@ export class Elasticsearch implements INodeType { const body = {} as IDataObject; const qs = {} as IDataObject; const options = this.getNodeParameter('options', i) as DocumentGetAllOptions; + // const paginate = this.getNodeParameter('paginate', i) as boolean; if (Object.keys(options).length) { const { query, ...rest } = options; @@ -144,18 +134,38 @@ export class Elasticsearch implements INodeType { const returnAll = this.getNodeParameter('returnAll', 0); - if (!returnAll) { + if (returnAll) { + //Defines the number of hits to return. Defaults to 10. By default, you cannot page through more than 10,000 hits + qs.size = 10000; + if (qs.sort) { + responseData = await elasticsearchApiRequestAllItems.call( + this, + indexId as string, + body, + qs, + ); + } else { + responseData = await elasticsearchApiRequest.call( + this, + 'GET', + `/${indexId}/_search`, + body, + qs, + ); + responseData = responseData.hits.hits; + } + } else { qs.size = this.getNodeParameter('limit', 0); - } - responseData = await elasticsearchApiRequest.call( - this, - 'GET', - `/${indexId}/_search`, - body, - qs, - ); - responseData = responseData.hits.hits; + responseData = await elasticsearchApiRequest.call( + this, + 'GET', + `/${indexId}/_search`, + body, + qs, + ); + responseData = responseData.hits.hits; + } const simple = this.getNodeParameter('simple', 0) as IDataObject; @@ -196,23 +206,20 @@ export class Elasticsearch implements INodeType { const qs = {} as IDataObject; const additionalFields = this.getNodeParameter('additionalFields', i) as IDataObject; - const options = this.getNodeParameter('options', i, {}) as IDataObject; if (Object.keys(additionalFields).length) { Object.assign(qs, omit(additionalFields, ['documentId'])); } - Object.assign(qs, options); - const indexId = this.getNodeParameter('indexId', i); const { documentId } = additionalFields; if (documentId) { const endpoint = `/${indexId}/_doc/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body, qs); + responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body); } else { const endpoint = `/${indexId}/_doc`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body, qs); + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); } } else if (operation === 'update') { // ---------------------------------------- @@ -243,14 +250,9 @@ export class Elasticsearch implements INodeType { const indexId = this.getNodeParameter('indexId', i); const documentId = this.getNodeParameter('documentId', i); - const options = this.getNodeParameter('options', i, {}) as IDataObject; - - const qs = { - ...options, - }; const endpoint = `/${indexId}/_update/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body, qs); + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); } } else if (resource === 'index') { // ********************************************************************** @@ -328,7 +330,6 @@ export class Elasticsearch implements INodeType { } } } - const executionData = this.helpers.constructExecutionMetaData( this.helpers.returnJsonArray(responseData), { itemData: { item: i } }, diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts index eef6ba2362..ff3e5a9eef 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts @@ -36,6 +36,69 @@ export async function elasticsearchApiRequest( try { return await this.helpers.requestWithAuthentication.call(this, 'elasticsearchApi', options); + } catch (error) { + throw new NodeApiError(this.getNode(), error); + } +} + +export async function elasticsearchApiRequestAllItems( + this: IExecuteFunctions, + indexId: string, + body: IDataObject = {}, + qs: IDataObject = {}, +): Promise { //tslint:disable-line:no-any + //https://www.elastic.co/guide/en/elasticsearch/reference/7.16/paginate-search-results.html#search-after + try { + //create a point in time (PIT) to preserve the current index state over your searches + let pit = ( + await elasticsearchApiRequest.call(this, 'POST', `/${indexId}/_pit`, {}, { keep_alive: '1m' }) + )?.id as string; + + let returnData: IDataObject[] = []; + let responseData; + let searchAfter: string[] = []; + + const requestBody: IDataObject = { + ...body, + size: 10000, + pit: { + id: pit, + keep_alive: '1m', + }, + track_total_hits: false, //Disable the tracking of total hits to speed up pagination + }; + + responseData = await elasticsearchApiRequest.call(this, 'GET', `/_search`, requestBody, qs); + if (responseData?.hits?.hits) { + returnData = returnData.concat(responseData.hits.hits); + const lastHitIndex = responseData.hits.hits.length - 1; + //Sort values for the last returned hit with the tiebreaker value + searchAfter = responseData.hits.hits[lastHitIndex].sort; + //Update id for the point in time + pit = responseData.pit_id; + } else { + return []; + } + + while (true) { + requestBody.search_after = searchAfter; + requestBody.pit = { id: pit, keep_alive: '1m' }; + + responseData = await elasticsearchApiRequest.call(this, 'GET', `/_search`, requestBody, qs); + + if (responseData?.hits?.hits?.length) { + returnData = returnData.concat(responseData.hits.hits); + const lastHitIndex = responseData.hits.hits.length - 1; + searchAfter = responseData.hits.hits[lastHitIndex].sort; + pit = responseData.pit_id; + } else { + break; + } + } + + await elasticsearchApiRequest.call(this, 'DELETE', `/_pit`, { id: pit }); + + return returnData; } catch (error) { throw new NodeApiError(this.getNode(), error as JsonObject); } diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts index cb72aeb7d7..6a622e2386 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts @@ -194,6 +194,25 @@ export const documentFields: INodeProperties[] = [ }, }, }, + { + displayName: 'By default, you cannot page through more than 10,000 hits. To page through more hits, add "Sort" from options.', + name: 'paginateNotice', + type: 'notice', + default: '', + displayOptions: { + show: { + resource: [ + 'document', + ], + operation: [ + 'getAll', + ], + returnAll: [ + true, + ], + }, + }, + }, { displayName: 'Limit', name: 'limit',