n8n/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.node.ts
Shireen Missi 52dd2c7619
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
feat(core): Dedupe (#10101)
Co-authored-by: Jan Oberhauser <jan@n8n.io>
Co-authored-by: Giulio Andreini <g.andreini@gmail.com>
Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
Co-authored-by: Elias Meire <elias@meire.dev>
2024-10-10 16:12:05 +01:00

278 lines
8.5 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
NodeConnectionType,
NodeExecutionOutput,
NodeOperationError,
tryToParseDateTime,
} from 'n8n-workflow';
import type {
INodeTypeBaseDescription,
IExecuteFunctions,
INodeExecutionData,
INodeType,
INodeTypeDescription,
DeduplicationScope,
} from 'n8n-workflow';
import { removeDuplicatesNodeFields } from './RemoveDuplicatesV2.description';
import { removeDuplicateInputItems } from '../utils';
const versionDescription: INodeTypeDescription = {
displayName: 'Remove Duplicates',
name: 'removeDuplicates',
icon: 'file:removeDuplicates.svg',
group: ['transform'],
subtitle: '',
version: [2],
description: 'Delete items with matching field values',
defaults: {
name: 'Remove Duplicates',
},
inputs: [NodeConnectionType.Main],
outputs: [NodeConnectionType.Main],
outputNames: ['Kept', 'Discarded'],
hints: [
{
message: 'The dedupe key set in “Value to Dedupe On” has no value',
displayCondition:
'={{ $parameter["operation"] === "removeItemsSeenInPreviousExecutions" && ($parameter["logic"] === "removeItemsWithAlreadySeenKeyValues" && $parameter["dedupeValue"] === undefined) || ($parameter["logic"] === "removeItemsUpToStoredIncrementalKey" && $parameter["incrementalDedupeValue"] === undefined) || ($parameter["logic"] === "removeItemsUpToStoredDate" && $parameter["dateDedupeValue"] === undefined) }}',
whenToDisplay: 'beforeExecution',
location: 'outputPane',
},
],
properties: [...removeDuplicatesNodeFields],
};
export class RemoveDuplicatesV2 implements INodeType {
description: INodeTypeDescription;
constructor(baseDescription: INodeTypeBaseDescription) {
this.description = {
...baseDescription,
...versionDescription,
};
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0);
const returnData: INodeExecutionData[][] = [];
const DEFAULT_MAX_ENTRIES = 10000;
try {
switch (operation) {
case 'removeDuplicateInputItems': {
return removeDuplicateInputItems(this, items);
}
case 'removeItemsSeenInPreviousExecutions': {
const logic = this.getNodeParameter('logic', 0);
const scope = this.getNodeParameter('options.scope', 0, 'node') as DeduplicationScope;
if (logic === 'removeItemsWithAlreadySeenKeyValues') {
if (!['node', 'workflow'].includes(scope)) {
throw new NodeOperationError(
this.getNode(),
`The scope '${scope}' is not supported. Please select either "node" or "workflow".`,
);
}
let checkValue: string;
const itemMapping: {
[key: string]: INodeExecutionData[];
} = {};
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
checkValue = this.getNodeParameter('dedupeValue', itemIndex, '')?.toString() ?? '';
if (itemMapping[checkValue]) {
itemMapping[checkValue].push(items[itemIndex]);
} else {
itemMapping[checkValue] = [items[itemIndex]];
}
}
const maxEntries = this.getNodeParameter(
'options.historySize',
0,
DEFAULT_MAX_ENTRIES,
) as number;
const maxEntriesNum = Number(maxEntries);
const currentProcessedDataCount = await this.helpers.getProcessedDataCount(scope, {
mode: 'entries',
maxEntries,
});
if (currentProcessedDataCount + items.length > maxEntriesNum) {
throw new NodeOperationError(
this.getNode(),
'The number of items to be processed exceeds the maximum history size. Please increase the history size or reduce the number of items to be processed.',
);
}
const itemsProcessed = await this.helpers.checkProcessedAndRecord(
Object.keys(itemMapping),
scope,
{ mode: 'entries', maxEntries },
);
const processedDataCount = await this.helpers.getProcessedDataCount(scope, {
mode: 'entries',
maxEntries,
});
returnData.push(
itemsProcessed.new
.map((key) => {
return itemMapping[key];
})
.flat(),
itemsProcessed.processed
.map((key) => {
return itemMapping[key];
})
.flat(),
);
if (maxEntriesNum > 0 && processedDataCount / maxEntriesNum > 0.5) {
return new NodeExecutionOutput(returnData, [
{
message: `Some duplicates may be not be removed since you're approaching the maximum history size (${maxEntriesNum} items). You can raise this limit using the history size option.`,
location: 'outputPane',
},
]);
} else return returnData;
} else if (logic === 'removeItemsUpToStoredIncrementalKey') {
if (!['node', 'workflow'].includes(scope)) {
throw new NodeOperationError(
this.getNode(),
`The scope '${scope}' is not supported. Please select either "node" or "workflow".`,
);
}
let parsedIncrementalKey: number;
const itemMapping: {
[key: string]: INodeExecutionData[];
} = {};
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const incrementalKey = this.getNodeParameter('incrementalDedupeValue', itemIndex, '');
if (!incrementalKey?.toString()) {
throw new NodeOperationError(
this.getNode(),
'The `Value to Dedupe` On is empty. Please provide a value.',
);
}
parsedIncrementalKey = Number(incrementalKey);
if (isNaN(parsedIncrementalKey)) {
throw new NodeOperationError(
this.getNode(),
`The value '${incrementalKey}' is not a number. Please provide a number.`,
);
}
if (itemMapping[parsedIncrementalKey]) {
itemMapping[parsedIncrementalKey].push(items[itemIndex]);
} else {
itemMapping[parsedIncrementalKey] = [items[itemIndex]];
}
}
const itemsProcessed = await this.helpers.checkProcessedAndRecord(
Object.keys(itemMapping),
scope,
{ mode: 'latestIncrementalKey' },
);
returnData.push(
itemsProcessed.new
.map((key) => {
return itemMapping[key];
})
.flat(),
itemsProcessed.processed
.map((key) => {
return itemMapping[key];
})
.flat(),
);
return returnData;
} else if (logic === 'removeItemsUpToStoredDate') {
if (!['node', 'workflow'].includes(scope)) {
throw new NodeOperationError(
this.getNode(),
`The scope '${scope}' is not supported. Please select either "node" or "workflow".`,
);
}
let checkValue: string;
const itemMapping: {
[key: string]: INodeExecutionData[];
} = {};
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
checkValue =
this.getNodeParameter('dateDedupeValue', itemIndex, '')?.toString() ?? '';
if (!checkValue) {
throw new NodeOperationError(
this.getNode(),
'The `Value to Dedupe` On is empty. Please provide a value.',
);
}
try {
tryToParseDateTime(checkValue);
} catch (error) {
throw new NodeOperationError(
this.getNode(),
`The value '${checkValue}' is not a valid date. Please provide a valid date.`,
);
}
if (itemMapping[checkValue]) {
itemMapping[checkValue].push(items[itemIndex]);
} else {
itemMapping[checkValue] = [items[itemIndex]];
}
}
const itemsProcessed = await this.helpers.checkProcessedAndRecord(
Object.keys(itemMapping),
scope,
{ mode: 'latestDate' },
);
returnData.push(
itemsProcessed.new
.map((key) => {
return itemMapping[key];
})
.flat(),
itemsProcessed.processed
.map((key) => {
return itemMapping[key];
})
.flat(),
);
return returnData;
} else {
return [items];
}
}
case 'clearDeduplicationHistory': {
const mode = this.getNodeParameter('mode', 0) as string;
if (mode === 'updateKeyValuesInDatabase') {
} else if (mode === 'deleteKeyValuesFromDatabase') {
} else if (mode === 'cleanDatabase') {
const scope = this.getNodeParameter('options.scope', 0, 'node') as DeduplicationScope;
await this.helpers.clearAllProcessedItems(scope, {
mode: 'entries',
});
}
return [items];
}
default: {
return [items];
}
}
} catch (error) {
if (this.continueOnFail()) {
returnData.push([{ json: this.getInputData(0)[0].json, error }]);
} else {
throw error;
}
}
return returnData;
}
}