From 52dd2c76196c6895b47145c2b85a6895ce2874d4 Mon Sep 17 00:00:00 2001 From: Shireen Missi <94372015+ShireenMissi@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:12:05 +0100 Subject: [PATCH] feat(core): Dedupe (#10101) Co-authored-by: Jan Oberhauser Co-authored-by: Giulio Andreini Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Co-authored-by: Elias Meire --- packages/cli/src/commands/base-command.ts | 13 +- packages/cli/src/commands/execute-batch.ts | 1 + packages/cli/src/commands/execute.ts | 1 + packages/cli/src/commands/start.ts | 2 + packages/cli/src/commands/webhook.ts | 2 + packages/cli/src/commands/worker.ts | 2 + packages/cli/src/config/types.ts | 2 + packages/cli/src/databases/entities/index.ts | 2 + .../src/databases/entities/processed-data.ts | 22 + .../1726606152711-CreateProcessedDataTable.ts | 23 + .../src/databases/migrations/mysqldb/index.ts | 2 + .../databases/migrations/postgresdb/index.ts | 2 + .../src/databases/migrations/sqlite/index.ts | 2 + .../repositories/processed-data.repository.ts | 11 + .../src/deduplication/deduplication-helper.ts | 356 ++++++++++++ packages/cli/src/deduplication/index.ts | 7 + .../cli/src/errors/deduplication.error.ts | 7 + packages/cli/src/interfaces.ts | 16 + .../deduplication-helper.test.ts | 532 ++++++++++++++++++ .../cli/test/integration/shared/test-db.ts | 1 + packages/core/src/NodeExecuteFunctions.ts | 121 ++++ .../core/src/data-deduplication-service.ts | 124 ++++ packages/core/src/index.ts | 1 + packages/editor-ui/src/constants.ts | 6 +- .../RemoveDuplicates/RemoveDuplicates.node.ts | 249 +------- .../test/RemoveDuplicates.test.ts | 4 +- .../nodes/Transform/RemoveDuplicates/utils.ts | 129 ++++- .../v1/RemoveDuplicatesV1.node.ts | 122 ++++ .../v2/RemoveDuplicatesV2.description.ts | 278 +++++++++ .../v2/RemoveDuplicatesV2.node.ts | 277 +++++++++ .../v2/test/RemoveDuplicates.test.ts | 131 +++++ packages/workflow/src/Interfaces.ts | 83 +++ 32 files changed, 2298 insertions(+), 233 deletions(-) create mode 100644 packages/cli/src/databases/entities/processed-data.ts create mode 100644 packages/cli/src/databases/migrations/common/1726606152711-CreateProcessedDataTable.ts create mode 100644 packages/cli/src/databases/repositories/processed-data.repository.ts create mode 100644 packages/cli/src/deduplication/deduplication-helper.ts create mode 100644 packages/cli/src/deduplication/index.ts create mode 100644 packages/cli/src/errors/deduplication.error.ts create mode 100644 packages/cli/test/integration/deduplication/deduplication-helper.test.ts create mode 100644 packages/core/src/data-deduplication-service.ts create mode 100644 packages/nodes-base/nodes/Transform/RemoveDuplicates/v1/RemoveDuplicatesV1.node.ts create mode 100644 packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.description.ts create mode 100644 packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.node.ts create mode 100644 packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/test/RemoveDuplicates.test.ts diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index c33fe064c7..f4d97a6a05 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -1,7 +1,12 @@ import 'reflect-metadata'; import { GlobalConfig } from '@n8n/config'; import { Command, Errors } from '@oclif/core'; -import { BinaryDataService, InstanceSettings, ObjectStoreService } from 'n8n-core'; +import { + BinaryDataService, + InstanceSettings, + ObjectStoreService, + DataDeduplicationService, +} from 'n8n-core'; import { ApplicationError, ensureError, @@ -16,6 +21,7 @@ import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants'; import * as CrashJournal from '@/crash-journal'; import { generateHostInstanceId } from '@/databases/utils/generators'; import * as Db from '@/db'; +import { getDataDeduplicationService } from '@/deduplication'; import { initErrorHandling } from '@/error-reporting'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; @@ -266,6 +272,11 @@ export abstract class BaseCommand extends Command { await Container.get(BinaryDataService).init(binaryDataConfig); } + protected async initDataDeduplicationService() { + const dataDeduplicationService = getDataDeduplicationService(); + await DataDeduplicationService.init(dataDeduplicationService); + } + async initExternalHooks() { this.externalHooks = Container.get(ExternalHooks); await this.externalHooks.init(); diff --git a/packages/cli/src/commands/execute-batch.ts b/packages/cli/src/commands/execute-batch.ts index 71540952b5..fbbecd2cbb 100644 --- a/packages/cli/src/commands/execute-batch.ts +++ b/packages/cli/src/commands/execute-batch.ts @@ -167,6 +167,7 @@ export class ExecuteBatch extends BaseCommand { async init() { await super.init(); await this.initBinaryDataService(); + await this.initDataDeduplicationService(); await this.initExternalHooks(); } diff --git a/packages/cli/src/commands/execute.ts b/packages/cli/src/commands/execute.ts index 9a901fdfc5..fd49a2b619 100644 --- a/packages/cli/src/commands/execute.ts +++ b/packages/cli/src/commands/execute.ts @@ -31,6 +31,7 @@ export class Execute extends BaseCommand { async init() { await super.init(); await this.initBinaryDataService(); + await this.initDataDeduplicationService(); await this.initExternalHooks(); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 36e690c37e..070ba4f0cb 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -212,6 +212,8 @@ export class Start extends BaseCommand { this.logger.debug('Wait tracker init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); + await this.initDataDeduplicationService(); + this.logger.debug('Data deduplication service init complete'); await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 5a5d656c8c..d88fd3f5d4 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -82,6 +82,8 @@ export class Webhook extends BaseCommand { this.logger.debug('Orchestration init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); + await this.initDataDeduplicationService(); + this.logger.debug('Data deduplication service init complete'); await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 6345db6763..546a6dcc77 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -93,6 +93,8 @@ export class Worker extends BaseCommand { this.logger.debug('License init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); + await this.initDataDeduplicationService(); + this.logger.debug('Data deduplication service init complete'); await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); diff --git a/packages/cli/src/config/types.ts b/packages/cli/src/config/types.ts index 0d3c5db2cb..78f2358f5d 100644 --- a/packages/cli/src/config/types.ts +++ b/packages/cli/src/config/types.ts @@ -1,5 +1,6 @@ import type { RedisOptions } from 'ioredis'; import type { BinaryData } from 'n8n-core'; +import type { IProcessedDataConfig } from 'n8n-workflow'; import type { schema } from './schema'; @@ -76,6 +77,7 @@ type ToReturnType = T extends NumericPath type ExceptionPaths = { 'queue.bull.redis': RedisOptions; binaryDataManager: BinaryData.Config; + processedDataManager: IProcessedDataConfig; 'userManagement.isInstanceOwnerSetUp': boolean; 'ui.banners.dismissed': string[] | undefined; }; diff --git a/packages/cli/src/databases/entities/index.ts b/packages/cli/src/databases/entities/index.ts index 1993d20a75..39f67b3252 100644 --- a/packages/cli/src/databases/entities/index.ts +++ b/packages/cli/src/databases/entities/index.ts @@ -13,6 +13,7 @@ import { ExecutionMetadata } from './execution-metadata'; import { InstalledNodes } from './installed-nodes'; import { InstalledPackages } from './installed-packages'; import { InvalidAuthToken } from './invalid-auth-token'; +import { ProcessedData } from './processed-data'; import { Project } from './project'; import { ProjectRelation } from './project-relation'; import { Settings } from './settings'; @@ -56,4 +57,5 @@ export const entities = { Project, ProjectRelation, ApiKey, + ProcessedData, }; diff --git a/packages/cli/src/databases/entities/processed-data.ts b/packages/cli/src/databases/entities/processed-data.ts new file mode 100644 index 0000000000..bd638fca95 --- /dev/null +++ b/packages/cli/src/databases/entities/processed-data.ts @@ -0,0 +1,22 @@ +import { Column, Entity, PrimaryColumn } from '@n8n/typeorm'; + +import type { IProcessedDataEntries, IProcessedDataLatest } from '@/interfaces'; + +import { jsonColumnType, WithTimestamps } from './abstract-entity'; +import { objectRetriever } from '../utils/transformers'; + +@Entity() +export class ProcessedData extends WithTimestamps { + @PrimaryColumn('varchar') + context: string; + + @PrimaryColumn() + workflowId: string; + + @Column({ + type: jsonColumnType, + nullable: true, + transformer: objectRetriever, + }) + value: IProcessedDataEntries | IProcessedDataLatest; +} diff --git a/packages/cli/src/databases/migrations/common/1726606152711-CreateProcessedDataTable.ts b/packages/cli/src/databases/migrations/common/1726606152711-CreateProcessedDataTable.ts new file mode 100644 index 0000000000..86992a0580 --- /dev/null +++ b/packages/cli/src/databases/migrations/common/1726606152711-CreateProcessedDataTable.ts @@ -0,0 +1,23 @@ +import type { MigrationContext, ReversibleMigration } from '@/databases/types'; + +const processedDataTableName = 'processed_data'; + +export class CreateProcessedDataTable1726606152711 implements ReversibleMigration { + async up({ schemaBuilder: { createTable, column } }: MigrationContext) { + await createTable(processedDataTableName) + .withColumns( + column('workflowId').varchar(36).notNull.primary, + column('value').varchar(255).notNull, + column('context').varchar(255).notNull.primary, + ) + .withForeignKey('workflowId', { + tableName: 'workflow_entity', + columnName: 'id', + onDelete: 'CASCADE', + }).withTimestamps; + } + + async down({ schemaBuilder: { dropTable } }: MigrationContext) { + await dropTable(processedDataTableName); + } +} diff --git a/packages/cli/src/databases/migrations/mysqldb/index.ts b/packages/cli/src/databases/migrations/mysqldb/index.ts index 07b910b949..1dcca1e592 100644 --- a/packages/cli/src/databases/migrations/mysqldb/index.ts +++ b/packages/cli/src/databases/migrations/mysqldb/index.ts @@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022 import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices'; import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables'; import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable'; +import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable'; import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart'; export const mysqlMigrations: Migration[] = [ @@ -132,4 +133,5 @@ export const mysqlMigrations: Migration[] = [ CreateAnnotationTables1724753530828, AddApiKeysTable1724951148974, SeparateExecutionCreationFromStart1727427440136, + CreateProcessedDataTable1726606152711, ]; diff --git a/packages/cli/src/databases/migrations/postgresdb/index.ts b/packages/cli/src/databases/migrations/postgresdb/index.ts index 21b90e201d..eb0e2bd946 100644 --- a/packages/cli/src/databases/migrations/postgresdb/index.ts +++ b/packages/cli/src/databases/migrations/postgresdb/index.ts @@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022 import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices'; import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables'; import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable'; +import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable'; import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart'; export const postgresMigrations: Migration[] = [ @@ -132,4 +133,5 @@ export const postgresMigrations: Migration[] = [ CreateAnnotationTables1724753530828, AddApiKeysTable1724951148974, SeparateExecutionCreationFromStart1727427440136, + CreateProcessedDataTable1726606152711, ]; diff --git a/packages/cli/src/databases/migrations/sqlite/index.ts b/packages/cli/src/databases/migrations/sqlite/index.ts index 2828bb3f59..797b26752c 100644 --- a/packages/cli/src/databases/migrations/sqlite/index.ts +++ b/packages/cli/src/databases/migrations/sqlite/index.ts @@ -61,6 +61,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101 import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable'; import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices'; import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables'; +import { CreateProcessedDataTable1726606152711 } from '../common/1726606152711-CreateProcessedDataTable'; import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart'; const sqliteMigrations: Migration[] = [ @@ -126,6 +127,7 @@ const sqliteMigrations: Migration[] = [ CreateAnnotationTables1724753530828, AddApiKeysTable1724951148974, SeparateExecutionCreationFromStart1727427440136, + CreateProcessedDataTable1726606152711, ]; export { sqliteMigrations }; diff --git a/packages/cli/src/databases/repositories/processed-data.repository.ts b/packages/cli/src/databases/repositories/processed-data.repository.ts new file mode 100644 index 0000000000..f02fbf270a --- /dev/null +++ b/packages/cli/src/databases/repositories/processed-data.repository.ts @@ -0,0 +1,11 @@ +import { DataSource, Repository } from '@n8n/typeorm'; +import { Service } from 'typedi'; + +import { ProcessedData } from '../entities/processed-data'; + +@Service() +export class ProcessedDataRepository extends Repository { + constructor(dataSource: DataSource) { + super(ProcessedData, dataSource.manager); + } +} diff --git a/packages/cli/src/deduplication/deduplication-helper.ts b/packages/cli/src/deduplication/deduplication-helper.ts new file mode 100644 index 0000000000..a913a21a8c --- /dev/null +++ b/packages/cli/src/deduplication/deduplication-helper.ts @@ -0,0 +1,356 @@ +import { createHash } from 'crypto'; +import { + type ICheckProcessedContextData, + type IDataDeduplicator, + type ICheckProcessedOptions, + type IDeduplicationOutput, + type DeduplicationScope, + type DeduplicationItemTypes, + type DeduplicationMode, + tryToParseDateTime, +} from 'n8n-workflow'; +import * as assert from 'node:assert/strict'; +import { Container } from 'typedi'; + +import type { ProcessedData } from '@/databases/entities/processed-data'; +import { ProcessedDataRepository } from '@/databases/repositories/processed-data.repository'; +import { DeduplicationError } from '@/errors/deduplication.error'; +import type { IProcessedDataEntries, IProcessedDataLatest } from '@/interfaces'; + +export class DeduplicationHelper implements IDataDeduplicator { + private static sortEntries( + items: DeduplicationItemTypes[], + mode: DeduplicationMode, + ): DeduplicationItemTypes[] { + return items.slice().sort((a, b) => DeduplicationHelper.compareValues(mode, a, b)); + } + /** + * Compares two values based on the provided mode ('latestIncrementalKey' or 'latestDate'). + * + * @param {DeduplicationMode} mode - The mode to determine the comparison logic. Can be either: + * - 'latestIncrementalKey': Compares numeric values and returns true if `value1` is greater than `value2`. + * - 'latestDate': Compares date strings and returns true if `value1` is a later date than `value2`. + * + * @param {DeduplicationItemTypes} value1 - The first value to compare. + * - If the mode is 'latestIncrementalKey', this should be a numeric value or a string that can be converted to a number. + * - If the mode is 'latestDate', this should be a valid date string. + * + * @param {DeduplicationItemTypes} value2 - The second value to compare. + * - If the mode is 'latestIncrementalKey', this should be a numeric value or a string that can be converted to a number. + * - If the mode is 'latestDate', this should be a valid date string. + * + * @returns {boolean} - Returns `true` if `value1` is greater than `value2` based on the comparison mode. + * - In 'latestIncrementalKey' mode, it returns `true` if `value1` is numerically greater than `value2`. + * - In 'latestDate' mode, it returns `true` if `value1` is a later date than `value2`. + * + * @throws {DeduplicationError} - Throws an error if: + * - The mode is 'latestIncrementalKey' and the values are not valid numbers. + * - The mode is 'latestDate' and the values are not valid date strings. + * - An unsupported mode is provided. + */ + + private static compareValues( + mode: DeduplicationMode, + value1: DeduplicationItemTypes, + value2: DeduplicationItemTypes, + ): 1 | 0 | -1 { + if (mode === 'latestIncrementalKey') { + const num1 = Number(value1); + const num2 = Number(value2); + if (!isNaN(num1) && !isNaN(num2)) { + return num1 === num2 ? 0 : num1 > num2 ? 1 : -1; + } + throw new DeduplicationError( + 'Invalid value. Only numbers are supported in mode "latestIncrementalKey"', + ); + } else if (mode === 'latestDate') { + try { + const date1 = tryToParseDateTime(value1); + const date2 = tryToParseDateTime(value2); + + return date1 === date2 ? 0 : date1 > date2 ? 1 : -1; + } catch (error) { + throw new DeduplicationError( + 'Invalid value. Only valid dates are supported in mode "latestDate"', + ); + } + } else { + throw new DeduplicationError( + "Invalid mode. Only 'latestIncrementalKey' and 'latestDate' are supported.", + ); + } + } + + private static createContext( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + ): string { + if (scope === 'node') { + if (!contextData.node) { + throw new DeduplicationError( + "No node information has been provided and so cannot use scope 'node'", + ); + } + // Use the node ID to make sure that the data can still be accessed and does not get deleted + // whenever the node gets renamed + return `n:${contextData.node.id}`; + } + return ''; + } + + private static createValueHash(value: DeduplicationItemTypes): string { + return createHash('md5').update(value.toString()).digest('base64'); + } + + private async findProcessedData( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + ): Promise { + return await Container.get(ProcessedDataRepository).findOne({ + where: { + workflowId: contextData.workflow.id, + context: DeduplicationHelper.createContext(scope, contextData), + }, + }); + } + + private validateMode(processedData: ProcessedData | null, options: ICheckProcessedOptions) { + if (processedData && processedData.value.mode !== options.mode) { + throw new DeduplicationError( + 'Deduplication data was originally saved with an incompatible setting of the ‘Keep Items Where’ parameter. Try ’Clean Database’ operation to reset.', + ); + } + } + + private processedDataHasEntries( + data: IProcessedDataEntries | IProcessedDataLatest, + ): data is IProcessedDataEntries { + return Array.isArray(data.data); + } + + private processedDataIsLatest( + data: IProcessedDataEntries | IProcessedDataLatest, + ): data is IProcessedDataLatest { + return data && !Array.isArray(data.data); + } + + private async handleLatestModes( + items: DeduplicationItemTypes[], + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + processedData: ProcessedData | null, + dbContext: string, + ): Promise { + const incomingItems = DeduplicationHelper.sortEntries(items, options.mode); + + if (!processedData) { + // All items are new so add new entries + await Container.get(ProcessedDataRepository).insert({ + workflowId: contextData.workflow.id, + context: dbContext, + value: { + mode: options.mode, + data: incomingItems.pop(), + }, + }); + + return { + new: items, + processed: [], + }; + } + + const returnData: IDeduplicationOutput = { + new: [], + processed: [], + }; + + if (!this.processedDataIsLatest(processedData.value)) { + return returnData; + } + + let largestValue = processedData.value.data; + const processedDataValue = processedData.value; + + incomingItems.forEach((item) => { + if (DeduplicationHelper.compareValues(options.mode, item, processedDataValue.data) === 1) { + returnData.new.push(item); + if (DeduplicationHelper.compareValues(options.mode, item, largestValue) === 1) { + largestValue = item; + } + } else { + returnData.processed.push(item); + } + }); + + processedData.value.data = largestValue; + + await Container.get(ProcessedDataRepository).update( + { workflowId: processedData.workflowId, context: processedData.context }, + processedData, + ); + + return returnData; + } + + private async handleHashedItems( + items: DeduplicationItemTypes[], + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + processedData: ProcessedData | null, + dbContext: string, + ): Promise { + const hashedItems = items.map((item) => DeduplicationHelper.createValueHash(item)); + + if (!processedData) { + // All items are new so add new entries + if (options.maxEntries) { + hashedItems.splice(0, hashedItems.length - options.maxEntries); + } + await Container.get(ProcessedDataRepository).insert({ + workflowId: contextData.workflow.id, + context: dbContext, + value: { + mode: options.mode, + data: hashedItems, + }, + }); + + return { + new: items, + processed: [], + }; + } + + const returnData: IDeduplicationOutput = { + new: [], + processed: [], + }; + + if (!this.processedDataHasEntries(processedData.value)) { + return returnData; + } + + const processedDataValue = processedData.value; + const processedItemsSet = new Set(processedDataValue.data); + + hashedItems.forEach((item, index) => { + if (processedItemsSet.has(item)) { + returnData.processed.push(items[index]); + } else { + returnData.new.push(items[index]); + processedDataValue.data.push(item); + } + }); + + if (options.maxEntries) { + processedDataValue.data.splice(0, processedDataValue.data.length - options.maxEntries); + } + + await Container.get(ProcessedDataRepository).update( + { workflowId: processedData.workflowId, context: processedData.context }, + processedData, + ); + + return returnData; + } + + async checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + const dbContext = DeduplicationHelper.createContext(scope, contextData); + + assert.ok(contextData.workflow.id); + + const processedData = await this.findProcessedData(scope, contextData); + + this.validateMode(processedData, options); + + if (['latestIncrementalKey', 'latestDate'].includes(options.mode)) { + return await this.handleLatestModes(items, contextData, options, processedData, dbContext); + } + //mode entries + return await this.handleHashedItems(items, contextData, options, processedData, dbContext); + } + + async removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + if (['latestIncrementalKey', 'latestDate'].includes(options.mode)) { + throw new DeduplicationError('Removing processed data is not possible in mode "latest"'); + } + assert.ok(contextData.workflow.id); + + const processedData = await Container.get(ProcessedDataRepository).findOne({ + where: { + workflowId: contextData.workflow.id, + context: DeduplicationHelper.createContext(scope, contextData), + }, + }); + + if (!processedData) { + return; + } + + const hashedItems = items.map((item) => DeduplicationHelper.createValueHash(item)); + + if (!this.processedDataHasEntries(processedData.value)) { + return; + } + + const processedDataValue = processedData.value; + + hashedItems.forEach((item) => { + const index = processedDataValue.data.findIndex((value) => value === item); + if (index !== -1) { + processedDataValue.data.splice(index, 1); + } + }); + + await Container.get(ProcessedDataRepository).update( + { workflowId: processedData.workflowId, context: processedData.context }, + processedData, + ); + } + + async clearAllProcessedItems( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + ): Promise { + await Container.get(ProcessedDataRepository).delete({ + workflowId: contextData.workflow.id, + context: DeduplicationHelper.createContext(scope, contextData), + }); + } + + async getProcessedDataCount( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + const processedDataRepository = Container.get(ProcessedDataRepository); + + const processedData = await processedDataRepository.findOne({ + where: { + workflowId: contextData.workflow.id, + context: DeduplicationHelper.createContext(scope, contextData), + }, + }); + + if ( + options.mode === 'entries' && + processedData && + this.processedDataHasEntries(processedData.value) + ) { + return processedData.value.data.length; + } else { + return 0; + } + } +} diff --git a/packages/cli/src/deduplication/index.ts b/packages/cli/src/deduplication/index.ts new file mode 100644 index 0000000000..2cf2973d71 --- /dev/null +++ b/packages/cli/src/deduplication/index.ts @@ -0,0 +1,7 @@ +import { type IDataDeduplicator } from 'n8n-workflow'; + +import { DeduplicationHelper } from './deduplication-helper'; + +export function getDataDeduplicationService(): IDataDeduplicator { + return new DeduplicationHelper(); +} diff --git a/packages/cli/src/errors/deduplication.error.ts b/packages/cli/src/errors/deduplication.error.ts new file mode 100644 index 0000000000..8e9173abb9 --- /dev/null +++ b/packages/cli/src/errors/deduplication.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class DeduplicationError extends ApplicationError { + constructor(message: string) { + super(`Deduplication Failed: ${message}`); + } +} diff --git a/packages/cli/src/interfaces.ts b/packages/cli/src/interfaces.ts index 5c29eea093..4d2cd9b2d9 100644 --- a/packages/cli/src/interfaces.ts +++ b/packages/cli/src/interfaces.ts @@ -22,6 +22,8 @@ import type { INodeProperties, IUserSettings, IWorkflowExecutionDataProcess, + DeduplicationMode, + DeduplicationItemTypes, } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; @@ -48,6 +50,20 @@ export interface ICredentialsOverwrite { [key: string]: ICredentialDataDecryptedObject; } +// ---------------------------------- +// ProcessedData +// ---------------------------------- + +export interface IProcessedDataLatest { + mode: DeduplicationMode; + data: DeduplicationItemTypes; +} + +export interface IProcessedDataEntries { + mode: DeduplicationMode; + data: DeduplicationItemTypes[]; +} + // ---------------------------------- // tags // ---------------------------------- diff --git a/packages/cli/test/integration/deduplication/deduplication-helper.test.ts b/packages/cli/test/integration/deduplication/deduplication-helper.test.ts new file mode 100644 index 0000000000..2859bb363c --- /dev/null +++ b/packages/cli/test/integration/deduplication/deduplication-helper.test.ts @@ -0,0 +1,532 @@ +import { DataDeduplicationService } from 'n8n-core'; +import type { ICheckProcessedContextData, INodeTypeData } from 'n8n-workflow'; +import type { IDeduplicationOutput, INode, DeduplicationItemTypes } from 'n8n-workflow'; +import { Workflow } from 'n8n-workflow'; + +import { getDataDeduplicationService } from '@/deduplication'; +import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { NodeTypes } from '@/node-types'; +import { mockInstance } from '@test/mocking'; +import { createWorkflow } from '@test-integration/db/workflows'; + +import * as testDb from '../shared/test-db'; + +let workflow: Workflow; + +jest.mock('../../../src/telemetry'); + +const MOCK_NODE_TYPES_DATA = mockNodeTypesData(['set']); +mockInstance(LoadNodesAndCredentials, { + loaded: { + nodes: MOCK_NODE_TYPES_DATA, + credentials: {}, + }, +}); +function mockNodeTypesData( + nodeNames: string[], + options?: { + addTrigger?: boolean; + }, +) { + return nodeNames.reduce((acc, nodeName) => { + return ( + (acc[`n8n-nodes-base.${nodeName}`] = { + sourcePath: '', + type: { + description: { + displayName: nodeName, + name: nodeName, + group: [], + description: '', + version: 1, + defaults: {}, + inputs: [], + outputs: [], + properties: [], + }, + trigger: options?.addTrigger ? async () => undefined : undefined, + }, + }), + acc + ); + }, {}); +} +const node: INode = { + id: 'uuid-1234', + parameters: {}, + name: 'test', + type: 'test.set', + typeVersion: 1, + position: [0, 0], +}; + +beforeAll(async () => { + await testDb.init(); + + const nodeTypes = mockInstance(NodeTypes); + const workflowEntityOriginal = await createWorkflow(); + + workflow = new Workflow({ + id: workflowEntityOriginal.id, + nodes: [node], + connections: {}, + active: false, + nodeTypes, + }); + + const dataDeduplicationService = getDataDeduplicationService(); + await DataDeduplicationService.init(dataDeduplicationService); +}); + +beforeEach(async () => { + await testDb.truncate(['ProcessedData']); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +describe('Deduplication.DeduplicationHelper', () => { + test('Deduplication (mode: entries): DeduplicationHelper should record and check data correctly', async () => { + const context = 'node'; + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + let processedData: IDeduplicationOutput; + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b'], + context, + contextData, + { mode: 'entries' }, + ); + + // 'a' & 'b' got only checked before, so still has to be new + expect(processedData).toEqual({ new: ['a', 'b'], processed: [] }); + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c', 'd'], + context, + contextData, + { mode: 'entries' }, + ); + + // 'a' & 'b' got recorded before, 'c' only checked bfeore and 'd' has never been seen + expect(processedData).toEqual({ new: ['c', 'd'], processed: ['a', 'b'] }); + + await DataDeduplicationService.getInstance().removeProcessed(['b', 'd'], context, contextData, { + mode: 'entries', + }); + }); + + test('Deduplication (mode: entries): DeduplicationHelper different contexts should not interfere with each other', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + let processedData: IDeduplicationOutput; + + // Add data with context "node" + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b'], + 'node', + contextData, + { mode: 'entries' }, + ); + + // No data exists yet for context "node" so has to be new + expect(processedData).toEqual({ new: ['a', 'b'], processed: [] }); + + // Add data with context "workflow" + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextData, + { mode: 'entries' }, + ); + + // No data exists yet for context 'worklow' so has to be new + expect(processedData).toEqual({ new: ['a', 'b', 'c'], processed: [] }); + + await DataDeduplicationService.getInstance().removeProcessed(['a'], 'node', contextData, { + mode: 'entries', + }); + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'node', + contextData, + { mode: 'entries' }, + ); + + // 'a' got removed for the context 'node' and 'c' never got saved, so only 'b' should be known + expect(processedData).toEqual({ new: ['a', 'c'], processed: ['b'] }); + + await DataDeduplicationService.getInstance().removeProcessed(['b'], 'workflow', contextData, { + mode: 'entries', + }); + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c', 'd'], + 'workflow', + contextData, + { mode: 'entries' }, + ); + + // 'b' got removed for the context 'workflow' and 'd' never got saved for that reason new + // 'a' and 'c' should should be known + expect(processedData).toEqual({ new: ['b', 'd'], processed: ['a', 'c'] }); + }); + + test('Deduplication (mode: entries): DeduplicationHelper check maxEntries', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + let processedData: IDeduplicationOutput; + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['0', '1', '2', '3'], + 'node', + contextData, + { mode: 'entries', maxEntries: 5 }, + ); + + // All data should be new + expect(processedData).toEqual({ new: ['0', '1', '2', '3'], processed: [] }); + + // Add data with context "workflow" + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['4', '5', '6'], + 'node', + contextData, + { mode: 'entries', maxEntries: 5 }, + ); + + // All given data should be new + expect(processedData).toEqual({ new: ['4', '5', '6'], processed: [] }); + + // This should not make a difference, removing an item which does not exist + await DataDeduplicationService.getInstance().removeProcessed(['a'], 'node', contextData, { + mode: 'entries', + }); + + processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['0', '1', '2', '3', '4', '5', '6', '7'], + 'node', + contextData, + { mode: 'entries', maxEntries: 5 }, + ); + + // '7' should be new and '0' and '1' also because they got been pruned as max 5 get saved + expect(processedData).toEqual({ new: ['0', '1', '7'], processed: ['2', '3', '4', '5', '6'] }); + }); + + describe('Deduplication (mode: latestIncrementalKey): DeduplicationHelper should record and check data correctly', () => { + const tests: Array<{ + description: string; + data: Array<{ + operation: 'checkProcessedAndRecord'; + input: DeduplicationItemTypes[]; + output: IDeduplicationOutput; + }>; + }> = [ + { + description: 'dates', + data: [ + { + operation: 'checkProcessedAndRecord', + input: [new Date('2022-01-02').toISOString(), new Date('2022-01-03').toISOString()], + output: { + new: [new Date('2022-01-02').toISOString(), new Date('2022-01-03').toISOString()], + processed: [], + }, + }, + { + operation: 'checkProcessedAndRecord', + input: [ + new Date('2022-01-02').toISOString(), + new Date('2022-01-03').toISOString(), + new Date('2022-01-04').toISOString(), + new Date('2022-01-05').toISOString(), + ], + output: { + new: [new Date('2022-01-04').toISOString(), new Date('2022-01-05').toISOString()], + processed: [ + new Date('2022-01-02').toISOString(), + new Date('2022-01-03').toISOString(), + ], + }, + }, + ], + }, + { + description: 'numbers', + data: [ + { + operation: 'checkProcessedAndRecord', + input: [2, 3], + output: { new: [2, 3], processed: [] }, + }, + { + operation: 'checkProcessedAndRecord', + input: [2, 3, 4, 5], + output: { new: [4, 5], processed: [2, 3] }, + }, + ], + }, + ]; + + for (const testData of tests) { + test(testData.description, async () => { + const context = 'node'; + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + const mode = testData.description === 'dates' ? 'latestDate' : 'latestIncrementalKey'; + + let processedData: IDeduplicationOutput; + + for (const data of testData.data) { + processedData = await DataDeduplicationService.getInstance()[data.operation]( + data.input, + context, + contextData, + { mode }, + ); + + expect(processedData).toEqual(data.output); + } + }); + } + }); + + test('removeProcessed should throw error for latest modes', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + await expect( + DataDeduplicationService.getInstance().removeProcessed(['2022-01-01'], 'node', contextData, { + mode: 'latestDate', + }), + ).rejects.toThrow('Removing processed data is not possible in mode "latest"'); + + await expect( + DataDeduplicationService.getInstance().removeProcessed([1], 'node', contextData, { + mode: 'latestIncrementalKey', + }), + ).rejects.toThrow('Removing processed data is not possible in mode "latest"'); + }); + + test('clearAllProcessedItems should delete all processed items for workflow scope', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + }; + + // First, add some data + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextData, + { mode: 'entries' }, + ); + + // Clear all processed items + await DataDeduplicationService.getInstance().clearAllProcessedItems('workflow', contextData, { + mode: 'entries', + }); + + // Check that all items are now considered new + const processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextData, + { mode: 'entries' }, + ); + + expect(processedData).toEqual({ new: ['a', 'b', 'c'], processed: [] }); + }); + + test('clearAllProcessedItems should delete all processed items for node scope', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + // First, add some data + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'node', + contextData, + { mode: 'entries' }, + ); + + // Clear all processed items + await DataDeduplicationService.getInstance().clearAllProcessedItems('node', contextData, { + mode: 'entries', + }); + + // Check that all items are now considered new + const processedData = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'node', + contextData, + { mode: 'entries' }, + ); + + expect(processedData).toEqual({ new: ['a', 'b', 'c'], processed: [] }); + }); + + test('clearAllProcessedItems should not clear workflow processed items when clearing node scope', async () => { + const contextDataWorkflow: ICheckProcessedContextData = { + workflow, + }; + + const contextDataNode: ICheckProcessedContextData = { + workflow, + node, + }; + + // Add data for workflow scope + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextDataWorkflow, + { mode: 'entries' }, + ); + + // Add data for node scope + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['d', 'e', 'f'], + 'node', + contextDataNode, + { mode: 'entries' }, + ); + + // Clear all processed items for node scope + await DataDeduplicationService.getInstance().clearAllProcessedItems('node', contextDataNode, { + mode: 'entries', + }); + + // Ensure workflow processed items are still intact + const processedDataWorkflow = + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextDataWorkflow, + { mode: 'entries' }, + ); + + // Workflow items should still be considered processed + expect(processedDataWorkflow).toEqual({ new: [], processed: ['a', 'b', 'c'] }); + + // Ensure node processed items have been cleared + const processedDataNode = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['d', 'e', 'f'], + 'node', + contextDataNode, + { mode: 'entries' }, + ); + + // Node items should be considered new + expect(processedDataNode).toEqual({ new: ['d', 'e', 'f'], processed: [] }); + }); + + test('clearAllProcessedItems should not clear node processed items when clearing workflow scope', async () => { + const contextDataWorkflow: ICheckProcessedContextData = { + workflow, + }; + + const contextDataNode: ICheckProcessedContextData = { + workflow, + node, + }; + + // Add data for workflow scope + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextDataWorkflow, + { mode: 'entries' }, + ); + + // Add data for node scope + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['d', 'e', 'f'], + 'node', + contextDataNode, + { mode: 'entries' }, + ); + + // Clear all processed items for workflow scope + await DataDeduplicationService.getInstance().clearAllProcessedItems( + 'workflow', + contextDataWorkflow, + { + mode: 'entries', + }, + ); + + // Ensure node processed items are still intact + const processedDataNode = await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['d', 'e', 'f'], + 'node', + contextDataNode, + { mode: 'entries' }, + ); + + // Node items should still be considered processed + expect(processedDataNode).toEqual({ new: [], processed: ['d', 'e', 'f'] }); + + // Ensure workflow processed items have been cleared + const processedDataWorkflow = + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'workflow', + contextDataWorkflow, + { mode: 'entries' }, + ); + + // Workflow items should be considered new + expect(processedDataWorkflow).toEqual({ new: ['a', 'b', 'c'], processed: [] }); + }); + + test('getProcessedDataCount should return correct count for different modes', async () => { + const contextData: ICheckProcessedContextData = { + workflow, + node, + }; + + // Test for 'entries' mode + await DataDeduplicationService.getInstance().checkProcessedAndRecord( + ['a', 'b', 'c'], + 'node', + contextData, + { mode: 'entries' }, + ); + + const entriesCount = await DataDeduplicationService.getInstance().getProcessedDataCount( + 'node', + contextData, + { mode: 'entries' }, + ); + + expect(entriesCount).toBe(3); + + // Test for other modes (should return 0) + const latestCount = await DataDeduplicationService.getInstance().getProcessedDataCount( + 'node', + contextData, + { mode: 'latestDate' }, + ); + + expect(latestCount).toBe(0); + }); +}); diff --git a/packages/cli/test/integration/shared/test-db.ts b/packages/cli/test/integration/shared/test-db.ts index 0d9b1672e1..7faaa3f6eb 100644 --- a/packages/cli/test/integration/shared/test-db.ts +++ b/packages/cli/test/integration/shared/test-db.ts @@ -67,6 +67,7 @@ const repositories = [ 'Project', 'ProjectRelation', 'Role', + 'ProcessedData', 'Project', 'ProjectRelation', 'Settings', diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index b672c9c8e8..6cbef1e1b8 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -102,6 +102,13 @@ import type { EnsureTypeOptions, SSHTunnelFunctions, SchedulingFunctions, + DeduplicationHelperFunctions, + IDeduplicationOutput, + IDeduplicationOutputItems, + ICheckProcessedOptions, + DeduplicationScope, + DeduplicationItemTypes, + ICheckProcessedContextData, AiEvent, } from 'n8n-workflow'; import { @@ -149,6 +156,7 @@ import { UM_EMAIL_TEMPLATES_PWRESET, } from './Constants'; import { createNodeAsTool } from './CreateNodeAsTool'; +import { DataDeduplicationService } from './data-deduplication-service'; import { getAllWorkflowExecutionMetadata, getWorkflowExecutionMetadata, @@ -1284,6 +1292,72 @@ async function prepareBinaryData( return await setBinaryDataBuffer(returnData, binaryData, workflowId, executionId); } +export async function checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, +): Promise { + return await DataDeduplicationService.getInstance().checkProcessedAndRecord( + items, + scope, + contextData, + options, + ); +} + +export async function checkProcessedItemsAndRecord( + key: string, + items: IDataObject[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, +): Promise { + return await DataDeduplicationService.getInstance().checkProcessedItemsAndRecord( + key, + items, + scope, + contextData, + options, + ); +} + +export async function removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, +): Promise { + return await DataDeduplicationService.getInstance().removeProcessed( + items, + scope, + contextData, + options, + ); +} + +export async function clearAllProcessedItems( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, +): Promise { + return await DataDeduplicationService.getInstance().clearAllProcessedItems( + scope, + contextData, + options, + ); +} +export async function getProcessedDataCount( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, +): Promise { + return await DataDeduplicationService.getInstance().getProcessedDataCount( + scope, + contextData, + options, + ); +} function applyPaginationRequestData( requestData: IRequestOptions, paginationRequestData: PaginationOptions['request'], @@ -3453,6 +3527,52 @@ const getBinaryHelperFunctions = ( }, }); +const getCheckProcessedHelperFunctions = ( + workflow: Workflow, + node: INode, +): DeduplicationHelperFunctions => ({ + async checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await checkProcessedAndRecord(items, scope, { node, workflow }, options); + }, + async checkProcessedItemsAndRecord( + propertyName: string, + items: IDataObject[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await checkProcessedItemsAndRecord( + propertyName, + items, + scope, + { node, workflow }, + options, + ); + }, + async removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await removeProcessed(items, scope, { node, workflow }, options); + }, + async clearAllProcessedItems( + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await clearAllProcessedItems(scope, { node, workflow }, options); + }, + async getProcessedDataCount( + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await getProcessedDataCount(scope, { node, workflow }, options); + }, +}); + /** * Returns a copy of the items which only contains the json data and * of that only the defined properties @@ -3896,6 +4016,7 @@ export function getExecuteFunctions( ...getSSHTunnelFunctions(), ...getFileSystemHelperFunctions(node), ...getBinaryHelperFunctions(additionalData, workflow.id), + ...getCheckProcessedHelperFunctions(workflow, node), assertBinaryData: (itemIndex, propertyName) => assertBinaryData(inputData, node, itemIndex, propertyName, 0), getBinaryDataBuffer: async (itemIndex, propertyName) => diff --git a/packages/core/src/data-deduplication-service.ts b/packages/core/src/data-deduplication-service.ts new file mode 100644 index 0000000000..4b7a51fcc2 --- /dev/null +++ b/packages/core/src/data-deduplication-service.ts @@ -0,0 +1,124 @@ +import get from 'lodash/get'; +import type { + IDataDeduplicator, + ICheckProcessedOptions, + IDeduplicationOutput, + IDeduplicationOutputItems, + IDataObject, + DeduplicationScope, + DeduplicationItemTypes, + ICheckProcessedContextData, +} from 'n8n-workflow'; +import * as assert from 'node:assert/strict'; + +/** + * A singleton service responsible for data deduplication. + * This service wraps around the IDataDeduplicator interface and provides methods to handle + * deduplication-related operations such as checking, recording, and clearing processed data. + */ +export class DataDeduplicationService { + private static instance: DataDeduplicationService; + + private deduplicator: IDataDeduplicator; + + private constructor(deduplicator: IDataDeduplicator) { + this.deduplicator = deduplicator; + } + + private assertDeduplicator() { + assert.ok( + this.deduplicator, + 'Manager needs to initialized before use. Make sure to call init()', + ); + } + + private static assertInstance() { + assert.ok( + DataDeduplicationService.instance, + 'Instance needs to initialized before use. Make sure to call init()', + ); + } + + private static assertSingleInstance() { + assert.ok( + !DataDeduplicationService.instance, + 'Instance already initialized. Multiple initializations are not allowed.', + ); + } + + static async init(deduplicator: IDataDeduplicator): Promise { + this.assertSingleInstance(); + DataDeduplicationService.instance = new DataDeduplicationService(deduplicator); + } + + static getInstance(): DataDeduplicationService { + this.assertInstance(); + return DataDeduplicationService.instance; + } + + async checkProcessedItemsAndRecord( + propertyName: string, + items: IDataObject[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + this.assertDeduplicator(); + let value; + const itemLookup = items.reduce((acc, cur, index) => { + value = JSON.stringify(get(cur, propertyName)); + acc[value ? value.toString() : ''] = index; + return acc; + }, {}); + + const checkedItems = await this.deduplicator.checkProcessedAndRecord( + Object.keys(itemLookup), + scope, + contextData, + options, + ); + + return { + new: checkedItems.new.map((key) => items[itemLookup[key] as number]), + processed: checkedItems.processed.map((key) => items[itemLookup[key] as number]), + }; + } + + async checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + this.assertDeduplicator(); + return await this.deduplicator.checkProcessedAndRecord(items, scope, contextData, options); + } + + async removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + this.assertDeduplicator(); + return await this.deduplicator.removeProcessed(items, scope, contextData, options); + } + + async clearAllProcessedItems( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + this.assertDeduplicator(); + return await this.deduplicator.clearAllProcessedItems(scope, contextData, options); + } + + async getProcessedDataCount( + scope: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise { + this.assertDeduplicator(); + return await this.deduplicator.getProcessedDataCount(scope, contextData, options); + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c6b8450a4f..ebe240b51e 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,6 +14,7 @@ export { InstanceSettings, InstanceType } from './InstanceSettings'; export * from './NodeExecuteFunctions'; export * from './WorkflowExecute'; export { NodeExecuteFunctions }; +export * from './data-deduplication-service'; export * from './errors'; export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee'; export { BinaryData } from './BinaryData/types'; diff --git a/packages/editor-ui/src/constants.ts b/packages/editor-ui/src/constants.ts index b05ba79199..521c481e9e 100644 --- a/packages/editor-ui/src/constants.ts +++ b/packages/editor-ui/src/constants.ts @@ -714,7 +714,11 @@ export const MFA_AUTHENTICATION_TOKEN_INPUT_MAX_LENGTH = 6; export const MFA_AUTHENTICATION_RECOVERY_CODE_INPUT_MAX_LENGTH = 36; -export const NODE_TYPES_EXCLUDED_FROM_OUTPUT_NAME_APPEND = [FILTER_NODE_TYPE, SWITCH_NODE_TYPE]; +export const NODE_TYPES_EXCLUDED_FROM_OUTPUT_NAME_APPEND = [ + FILTER_NODE_TYPE, + SWITCH_NODE_TYPE, + REMOVE_DUPLICATES_NODE_TYPE, +]; type ClearOutgoingConnectonsEvents = { [nodeName: string]: { diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/RemoveDuplicates.node.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/RemoveDuplicates.node.ts index 9a814f04fc..2af7fd7f2e 100644 --- a/packages/nodes-base/nodes/Transform/RemoveDuplicates/RemoveDuplicates.node.ts +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/RemoveDuplicates.node.ts @@ -1,234 +1,25 @@ -import get from 'lodash/get'; -import isEqual from 'lodash/isEqual'; -import lt from 'lodash/lt'; -import pick from 'lodash/pick'; -import { - NodeOperationError, - NodeConnectionType, - type IExecuteFunctions, - type INodeExecutionData, - type INodeType, - type INodeTypeDescription, -} from 'n8n-workflow'; -import { prepareFieldsArray } from '../utils/utils'; -import { validateInputData } from './utils'; -import { compareItems, flattenKeys } from '@utils/utilities'; +import type { INodeTypeBaseDescription, IVersionedNodeType } from 'n8n-workflow'; +import { VersionedNodeType } from 'n8n-workflow'; -export class RemoveDuplicates implements INodeType { - description: INodeTypeDescription = { - displayName: 'Remove Duplicates', - name: 'removeDuplicates', - icon: 'file:removeDuplicates.svg', - group: ['transform'], - subtitle: '', - version: [1, 1.1], - description: 'Delete items with matching field values', - defaults: { - name: 'Remove Duplicates', - }, - inputs: [NodeConnectionType.Main], - outputs: [NodeConnectionType.Main], - properties: [ - { - displayName: 'Compare', - name: 'compare', - type: 'options', - options: [ - { - name: 'All Fields', - value: 'allFields', - }, - { - name: 'All Fields Except', - value: 'allFieldsExcept', - }, - { - name: 'Selected Fields', - value: 'selectedFields', - }, - ], - default: 'allFields', - description: 'The fields of the input items to compare to see if they are the same', - }, - { - displayName: 'Fields To Exclude', - name: 'fieldsToExclude', - type: 'string', - placeholder: 'e.g. email, name', - requiresDataPath: 'multiple', - description: 'Fields in the input to exclude from the comparison', - default: '', - displayOptions: { - show: { - compare: ['allFieldsExcept'], - }, - }, - }, - { - displayName: 'Fields To Compare', - name: 'fieldsToCompare', - type: 'string', - placeholder: 'e.g. email, name', - requiresDataPath: 'multiple', - description: 'Fields in the input to add to the comparison', - default: '', - displayOptions: { - show: { - compare: ['selectedFields'], - }, - }, - }, - { - displayName: 'Options', - name: 'options', - type: 'collection', - placeholder: 'Add Field', - default: {}, - displayOptions: { - show: { - compare: ['allFieldsExcept', 'selectedFields'], - }, - }, - options: [ - { - displayName: 'Disable Dot Notation', - name: 'disableDotNotation', - type: 'boolean', - default: false, - description: - 'Whether to disallow referencing child fields using `parent.child` in the field name', - }, - { - displayName: 'Remove Other Fields', - name: 'removeOtherFields', - type: 'boolean', - default: false, - description: - 'Whether to remove any fields that are not being compared. If disabled, will keep the values from the first of the duplicates.', - }, - ], - }, - ], - }; +import { RemoveDuplicatesV1 } from './v1/RemoveDuplicatesV1.node'; +import { RemoveDuplicatesV2 } from './v2/RemoveDuplicatesV2.node'; +export class RemoveDuplicates extends VersionedNodeType { + constructor() { + const baseDescription: INodeTypeBaseDescription = { + displayName: 'Remove Duplicates', + name: 'removeDuplicates', + icon: 'file:removeDuplicates.svg', + group: ['transform'], + defaultVersion: 2, + description: 'Delete items with matching field values', + }; - async execute(this: IExecuteFunctions): Promise { - const items = this.getInputData(); - const compare = this.getNodeParameter('compare', 0) as string; - const disableDotNotation = this.getNodeParameter( - 'options.disableDotNotation', - 0, - false, - ) as boolean; - const removeOtherFields = this.getNodeParameter( - 'options.removeOtherFields', - 0, - false, - ) as boolean; + const nodeVersions: IVersionedNodeType['nodeVersions'] = { + 1: new RemoveDuplicatesV1(baseDescription), + 1.1: new RemoveDuplicatesV1(baseDescription), + 2: new RemoveDuplicatesV2(baseDescription), + }; - let keys = disableDotNotation - ? Object.keys(items[0].json) - : Object.keys(flattenKeys(items[0].json)); - - for (const item of items) { - for (const key of disableDotNotation - ? Object.keys(item.json) - : Object.keys(flattenKeys(item.json))) { - if (!keys.includes(key)) { - keys.push(key); - } - } - } - - if (compare === 'allFieldsExcept') { - const fieldsToExclude = prepareFieldsArray( - this.getNodeParameter('fieldsToExclude', 0, '') as string, - 'Fields To Exclude', - ); - - if (!fieldsToExclude.length) { - throw new NodeOperationError( - this.getNode(), - 'No fields specified. Please add a field to exclude from comparison', - ); - } - if (!disableDotNotation) { - keys = Object.keys(flattenKeys(items[0].json)); - } - keys = keys.filter((key) => !fieldsToExclude.includes(key)); - } - if (compare === 'selectedFields') { - const fieldsToCompare = prepareFieldsArray( - this.getNodeParameter('fieldsToCompare', 0, '') as string, - 'Fields To Compare', - ); - if (!fieldsToCompare.length) { - throw new NodeOperationError( - this.getNode(), - 'No fields specified. Please add a field to compare on', - ); - } - if (!disableDotNotation) { - keys = Object.keys(flattenKeys(items[0].json)); - } - keys = fieldsToCompare.map((key) => key.trim()); - } - - // This solution is O(nlogn) - // add original index to the items - const newItems = items.map( - (item, index) => - ({ - json: { ...item.json, __INDEX: index }, - pairedItem: { item: index }, - }) as INodeExecutionData, - ); - //sort items using the compare keys - newItems.sort((a, b) => { - let result = 0; - - for (const key of keys) { - let equal; - if (!disableDotNotation) { - equal = isEqual(get(a.json, key), get(b.json, key)); - } else { - equal = isEqual(a.json[key], b.json[key]); - } - if (!equal) { - let lessThan; - if (!disableDotNotation) { - lessThan = lt(get(a.json, key), get(b.json, key)); - } else { - lessThan = lt(a.json[key], b.json[key]); - } - result = lessThan ? -1 : 1; - break; - } - } - return result; - }); - - validateInputData(this.getNode(), newItems, keys, disableDotNotation); - - // collect the original indexes of items to be removed - const removedIndexes: number[] = []; - let temp = newItems[0]; - for (let index = 1; index < newItems.length; index++) { - if (compareItems(newItems[index], temp, keys, disableDotNotation)) { - removedIndexes.push(newItems[index].json.__INDEX as unknown as number); - } else { - temp = newItems[index]; - } - } - - let returnData = items.filter((_, index) => !removedIndexes.includes(index)); - - if (removeOtherFields) { - returnData = returnData.map((item, index) => ({ - json: pick(item.json, ...keys), - pairedItem: { item: index }, - })); - } - - return [returnData]; + super(nodeVersions, baseDescription); } } diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/test/RemoveDuplicates.test.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/test/RemoveDuplicates.test.ts index 4bf5f6b5b5..e063c46926 100644 --- a/packages/nodes-base/nodes/Transform/RemoveDuplicates/test/RemoveDuplicates.test.ts +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/test/RemoveDuplicates.test.ts @@ -1,7 +1,9 @@ import type { INode } from 'n8n-workflow'; -import { validateInputData } from '../utils'; + import { testWorkflows, getWorkflowFilenames } from '@test/nodes/Helpers'; +import { validateInputData } from '../utils'; + const workflows = getWorkflowFilenames(__dirname); describe('Test Remove Duplicates Node', () => testWorkflows(workflows)); diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/utils.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/utils.ts index 532ff4ec3a..4fda255aa1 100644 --- a/packages/nodes-base/nodes/Transform/RemoveDuplicates/utils.ts +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/utils.ts @@ -1,5 +1,11 @@ +import { isEqual, lt, pick } from 'lodash'; import get from 'lodash/get'; -import { NodeOperationError, type INode, type INodeExecutionData } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; +import type { IExecuteFunctions, INode, INodeExecutionData } from 'n8n-workflow'; + +import { compareItems, flattenKeys } from '@utils/utilities'; + +import { prepareFieldsArray } from '../utils/utils'; export const validateInputData = ( node: INode, @@ -39,3 +45,124 @@ export const validateInputData = ( } } }; + +export function removeDuplicateInputItems(context: IExecuteFunctions, items: INodeExecutionData[]) { + const compare = context.getNodeParameter('compare', 0) as string; + const disableDotNotation = context.getNodeParameter( + 'options.disableDotNotation', + 0, + false, + ) as boolean; + const removeOtherFields = context.getNodeParameter( + 'options.removeOtherFields', + 0, + false, + ) as boolean; + + let keys = disableDotNotation + ? Object.keys(items[0].json) + : Object.keys(flattenKeys(items[0].json)); + + for (const item of items) { + const itemKeys = disableDotNotation + ? Object.keys(item.json) + : Object.keys(flattenKeys(item.json)); + for (const key of itemKeys) { + if (!keys.includes(key)) { + keys.push(key); + } + } + } + + if (compare === 'allFieldsExcept') { + const fieldsToExclude = prepareFieldsArray( + context.getNodeParameter('fieldsToExclude', 0, '') as string, + 'Fields To Exclude', + ); + + if (!fieldsToExclude.length) { + throw new NodeOperationError( + context.getNode(), + 'No fields specified. Please add a field to exclude from comparison', + ); + } + if (!disableDotNotation) { + keys = Object.keys(flattenKeys(items[0].json)); + } + keys = keys.filter((key) => !fieldsToExclude.includes(key)); + } + if (compare === 'selectedFields') { + const fieldsToCompare = prepareFieldsArray( + context.getNodeParameter('fieldsToCompare', 0, '') as string, + 'Fields To Compare', + ); + if (!fieldsToCompare.length) { + throw new NodeOperationError( + context.getNode(), + 'No fields specified. Please add a field to compare on', + ); + } + if (!disableDotNotation) { + keys = Object.keys(flattenKeys(items[0].json)); + } + keys = fieldsToCompare.map((key) => key.trim()); + } + + // This solution is O(nlogn) + // add original index to the items + const newItems = items.map( + (item, index) => + ({ + json: { ...item.json, __INDEX: index }, + pairedItem: { item: index }, + }) as INodeExecutionData, + ); + //sort items using the compare keys + newItems.sort((a, b) => { + let result = 0; + + for (const key of keys) { + let equal; + if (!disableDotNotation) { + equal = isEqual(get(a.json, key), get(b.json, key)); + } else { + equal = isEqual(a.json[key], b.json[key]); + } + if (!equal) { + let lessThan; + if (!disableDotNotation) { + lessThan = lt(get(a.json, key), get(b.json, key)); + } else { + lessThan = lt(a.json[key], b.json[key]); + } + result = lessThan ? -1 : 1; + break; + } + } + return result; + }); + + validateInputData(context.getNode(), newItems, keys, disableDotNotation); + + // collect the original indexes of items to be removed + const removedIndexes: number[] = []; + let temp = newItems[0]; + for (let index = 1; index < newItems.length; index++) { + if (compareItems(newItems[index], temp, keys, disableDotNotation)) { + removedIndexes.push(newItems[index].json.__INDEX as unknown as number); + } else { + temp = newItems[index]; + } + } + let updatedItems: INodeExecutionData[] = items.filter( + (_, index) => !removedIndexes.includes(index), + ); + + if (removeOtherFields) { + updatedItems = updatedItems.map((item, index) => ({ + json: pick(item.json, ...keys), + pairedItem: { item: index }, + })); + } + return [updatedItems]; +} diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/v1/RemoveDuplicatesV1.node.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v1/RemoveDuplicatesV1.node.ts new file mode 100644 index 0000000000..f4e0289107 --- /dev/null +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v1/RemoveDuplicatesV1.node.ts @@ -0,0 +1,122 @@ +/* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import { NodeConnectionType } from 'n8n-workflow'; +import type { + INodeTypeBaseDescription, + IExecuteFunctions, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +import { removeDuplicateInputItems } from '../utils'; + +const versionDescription: INodeTypeDescription = { + displayName: 'Remove Duplicates', + name: 'removeDuplicates', + icon: 'file:removeDuplicates.svg', + group: ['transform'], + subtitle: '', + version: [1, 1.1], + description: 'Delete items with matching field values', + defaults: { + name: 'Remove Duplicates', + }, + inputs: [NodeConnectionType.Main], + outputs: [NodeConnectionType.Main], + properties: [ + { + displayName: 'Compare', + name: 'compare', + type: 'options', + options: [ + { + name: 'All Fields', + value: 'allFields', + }, + { + name: 'All Fields Except', + value: 'allFieldsExcept', + }, + { + name: 'Selected Fields', + value: 'selectedFields', + }, + ], + default: 'allFields', + description: 'The fields of the input items to compare to see if they are the same', + }, + { + displayName: 'Fields To Exclude', + name: 'fieldsToExclude', + type: 'string', + placeholder: 'e.g. email, name', + requiresDataPath: 'multiple', + description: 'Fields in the input to exclude from the comparison', + default: '', + displayOptions: { + show: { + compare: ['allFieldsExcept'], + }, + }, + }, + { + displayName: 'Fields To Compare', + name: 'fieldsToCompare', + type: 'string', + placeholder: 'e.g. email, name', + requiresDataPath: 'multiple', + description: 'Fields in the input to add to the comparison', + default: '', + displayOptions: { + show: { + compare: ['selectedFields'], + }, + }, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Field', + default: {}, + displayOptions: { + show: { + compare: ['allFieldsExcept', 'selectedFields'], + }, + }, + options: [ + { + displayName: 'Disable Dot Notation', + name: 'disableDotNotation', + type: 'boolean', + default: false, + description: + 'Whether to disallow referencing child fields using `parent.child` in the field name', + }, + { + displayName: 'Remove Other Fields', + name: 'removeOtherFields', + type: 'boolean', + default: false, + description: + 'Whether to remove any fields that are not being compared. If disabled, will keep the values from the first of the duplicates.', + }, + ], + }, + ], +}; +export class RemoveDuplicatesV1 implements INodeType { + description: INodeTypeDescription; + + constructor(baseDescription: INodeTypeBaseDescription) { + this.description = { + ...baseDescription, + ...versionDescription, + }; + } + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + return removeDuplicateInputItems(this, items); + } +} diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.description.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.description.ts new file mode 100644 index 0000000000..88ab5a9bc4 --- /dev/null +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.description.ts @@ -0,0 +1,278 @@ +import type { INodeProperties } from 'n8n-workflow'; +const operationOptions = [ + { + name: 'Remove Items Repeated Within Current Input', + value: 'removeDuplicateInputItems', + description: 'Remove duplicates from incoming items', + action: 'Remove items repeated within current input', + }, + { + name: 'Remove Items Processed in Previous Executions', + value: 'removeItemsSeenInPreviousExecutions', + description: 'Deduplicate items already seen in previous executions', + action: 'Remove items processed in previous executions', + }, + { + name: 'Clear Deduplication History', + value: 'clearDeduplicationHistory', + description: 'Wipe the store of previous items', + action: 'Clear deduplication history', + }, +]; +const compareOptions = [ + { + name: 'All Fields', + value: 'allFields', + }, + { + name: 'All Fields Except', + value: 'allFieldsExcept', + }, + { + name: 'Selected Fields', + value: 'selectedFields', + }, +]; +const logicOptions = [ + { + name: 'Value Is New', + value: 'removeItemsWithAlreadySeenKeyValues', + description: 'Remove all input items with values matching those already processed', + }, + { + name: 'Value Is Higher than Any Previous Value', + value: 'removeItemsUpToStoredIncrementalKey', + description: + 'Works with incremental values, removes all input items with values up to the stored value', + }, + { + name: 'Value Is a Date Later than Any Previous Date', + value: 'removeItemsUpToStoredDate', + description: + 'Works with date values, removes all input items with values up to the stored date', + }, +]; +const manageDatabaseModeOptions = [ + { + name: 'Clean Database', + value: 'cleanDatabase', + description: 'Clear all values stored for a key in the database', + }, +]; + +export const removeDuplicatesNodeFields: INodeProperties[] = [ + { + displayName: 'Operation', + name: 'operation', + type: 'options', + noDataExpression: true, + options: operationOptions, + default: 'removeDuplicateInputItems', + }, + { + displayName: 'Compare', + name: 'compare', + type: 'options', + options: compareOptions, + default: 'allFields', + description: 'The fields of the input items to compare to see if they are the same', + displayOptions: { + show: { + operation: ['removeDuplicateInputItems'], + }, + }, + }, + { + displayName: 'Fields To Exclude', + name: 'fieldsToExclude', + type: 'string', + placeholder: 'e.g. email, name', + requiresDataPath: 'multiple', + description: 'Fields in the input to exclude from the comparison', + default: '', + displayOptions: { + show: { + compare: ['allFieldsExcept'], + }, + }, + }, + { + displayName: 'Fields To Compare', + name: 'fieldsToCompare', + type: 'string', + placeholder: 'e.g. email, name', + requiresDataPath: 'multiple', + description: 'Fields in the input to add to the comparison', + default: '', + displayOptions: { + show: { + compare: ['selectedFields'], + }, + }, + }, + + // ---------------------------------- + { + displayName: 'Keep Items Where', + name: 'logic', + type: 'options', + noDataExpression: true, + options: logicOptions, + default: 'removeItemsWithAlreadySeenKeyValues', + description: + 'How to select input items to remove by comparing them with key values previously processed', + displayOptions: { + show: { + operation: ['removeItemsSeenInPreviousExecutions'], + }, + }, + }, + { + displayName: 'Value to Dedupe On', + name: 'dedupeValue', + type: 'string', + default: '', + description: 'Use an input field (or a combination of fields) that has a unique ID value', + hint: 'The input field value to compare between items', + placeholder: 'e.g. ID', + required: true, + displayOptions: { + show: { + logic: ['removeItemsWithAlreadySeenKeyValues'], + '/operation': ['removeItemsSeenInPreviousExecutions'], + }, + }, + }, + { + displayName: 'Value to Dedupe On', + name: 'incrementalDedupeValue', + type: 'number', + default: '', + description: 'Use an input field (or a combination of fields) that has an incremental value', + hint: 'The input field value to compare between items, an incremental value is expected', + placeholder: 'e.g. ID', + displayOptions: { + show: { + logic: ['removeItemsUpToStoredIncrementalKey'], + '/operation': ['removeItemsSeenInPreviousExecutions'], + }, + }, + }, + { + displayName: 'Value to Dedupe On', + name: 'dateDedupeValue', + type: 'dateTime', + default: '', + description: 'Use an input field that has a date value in ISO format', + hint: 'The input field value to compare between items, a date is expected', + placeholder: ' e.g. 2024-08-09T13:44:16Z', + displayOptions: { + show: { + logic: ['removeItemsUpToStoredDate'], + '/operation': ['removeItemsSeenInPreviousExecutions'], + }, + }, + }, + { + displayName: 'Mode', + name: 'mode', + type: 'options', + default: 'cleanDatabase', + description: + 'How you want to modify the key values stored on the database. None of these modes removes input items.', + displayOptions: { + show: { + operation: ['clearDeduplicationHistory'], + }, + }, + options: manageDatabaseModeOptions, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Field', + default: {}, + displayOptions: { + show: { + operation: [ + 'removeDuplicateInputItems', + 'removeItemsSeenInPreviousExecutions', + 'clearDeduplicationHistory', + ], + }, + }, + options: [ + { + displayName: 'Disable Dot Notation', + name: 'disableDotNotation', + type: 'boolean', + default: false, + displayOptions: { + show: { + '/operation': ['removeDuplicateInputItems'], + }, + hide: { + '/compare': ['allFields'], + }, + }, + description: + 'Whether to disallow referencing child fields using `parent.child` in the field name', + }, + { + displayName: 'Remove Other Fields', + name: 'removeOtherFields', + type: 'boolean', + default: false, + displayOptions: { + show: { + '/operation': ['removeDuplicateInputItems'], + }, + hide: { + '/compare': ['allFields'], + }, + }, + description: + 'Whether to remove any fields that are not being compared. If disabled, will keep the values from the first of the duplicates.', + }, + { + displayName: 'Scope', + name: 'scope', + type: 'options', + default: 'node', + displayOptions: { + show: { + '/operation': ['clearDeduplicationHistory', 'removeItemsSeenInPreviousExecutions'], + }, + }, + description: + 'If set to ‘workflow,’ key values will be shared across all nodes in the workflow. If set to ‘node,’ key values will be specific to this node.', + options: [ + { + name: 'Workflow', + value: 'workflow', + description: 'Deduplication info will be shared by all the nodes in the workflow', + }, + { + name: 'Node', + value: 'node', + description: 'Deduplication info will be stored only for this node', + }, + ], + }, + { + displayName: 'History Size', + name: 'historySize', + type: 'number', + default: 10000, + hint: 'The max number of past items to store for deduplication', + displayOptions: { + show: { + '/logic': ['removeItemsWithAlreadySeenKeyValues'], + '/operation': ['removeItemsSeenInPreviousExecutions'], + }, + }, + }, + ], + }, +]; diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.node.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.node.ts new file mode 100644 index 0000000000..a4fa34d997 --- /dev/null +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/RemoveDuplicatesV2.node.ts @@ -0,0 +1,277 @@ +import { + NodeConnectionType, + NodeExecutionOutput, + NodeOperationError, + tryToParseDateTime, +} from 'n8n-workflow'; +import type { + INodeTypeBaseDescription, + IExecuteFunctions, + INodeExecutionData, + INodeType, + INodeTypeDescription, + DeduplicationScope, +} from 'n8n-workflow'; + +import { removeDuplicatesNodeFields } from './RemoveDuplicatesV2.description'; +import { removeDuplicateInputItems } from '../utils'; + +const versionDescription: INodeTypeDescription = { + displayName: 'Remove Duplicates', + name: 'removeDuplicates', + icon: 'file:removeDuplicates.svg', + group: ['transform'], + subtitle: '', + version: [2], + description: 'Delete items with matching field values', + defaults: { + name: 'Remove Duplicates', + }, + inputs: [NodeConnectionType.Main], + outputs: [NodeConnectionType.Main], + outputNames: ['Kept', 'Discarded'], + hints: [ + { + message: 'The dedupe key set in “Value to Dedupe On” has no value', + displayCondition: + '={{ $parameter["operation"] === "removeItemsSeenInPreviousExecutions" && ($parameter["logic"] === "removeItemsWithAlreadySeenKeyValues" && $parameter["dedupeValue"] === undefined) || ($parameter["logic"] === "removeItemsUpToStoredIncrementalKey" && $parameter["incrementalDedupeValue"] === undefined) || ($parameter["logic"] === "removeItemsUpToStoredDate" && $parameter["dateDedupeValue"] === undefined) }}', + whenToDisplay: 'beforeExecution', + location: 'outputPane', + }, + ], + properties: [...removeDuplicatesNodeFields], +}; +export class RemoveDuplicatesV2 implements INodeType { + description: INodeTypeDescription; + + constructor(baseDescription: INodeTypeBaseDescription) { + this.description = { + ...baseDescription, + ...versionDescription, + }; + } + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + const operation = this.getNodeParameter('operation', 0); + const returnData: INodeExecutionData[][] = []; + const DEFAULT_MAX_ENTRIES = 10000; + try { + switch (operation) { + case 'removeDuplicateInputItems': { + return removeDuplicateInputItems(this, items); + } + case 'removeItemsSeenInPreviousExecutions': { + const logic = this.getNodeParameter('logic', 0); + const scope = this.getNodeParameter('options.scope', 0, 'node') as DeduplicationScope; + + if (logic === 'removeItemsWithAlreadySeenKeyValues') { + if (!['node', 'workflow'].includes(scope)) { + throw new NodeOperationError( + this.getNode(), + `The scope '${scope}' is not supported. Please select either "node" or "workflow".`, + ); + } + + let checkValue: string; + const itemMapping: { + [key: string]: INodeExecutionData[]; + } = {}; + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + checkValue = this.getNodeParameter('dedupeValue', itemIndex, '')?.toString() ?? ''; + if (itemMapping[checkValue]) { + itemMapping[checkValue].push(items[itemIndex]); + } else { + itemMapping[checkValue] = [items[itemIndex]]; + } + } + + const maxEntries = this.getNodeParameter( + 'options.historySize', + 0, + DEFAULT_MAX_ENTRIES, + ) as number; + const maxEntriesNum = Number(maxEntries); + + const currentProcessedDataCount = await this.helpers.getProcessedDataCount(scope, { + mode: 'entries', + maxEntries, + }); + if (currentProcessedDataCount + items.length > maxEntriesNum) { + throw new NodeOperationError( + this.getNode(), + 'The number of items to be processed exceeds the maximum history size. Please increase the history size or reduce the number of items to be processed.', + ); + } + const itemsProcessed = await this.helpers.checkProcessedAndRecord( + Object.keys(itemMapping), + scope, + { mode: 'entries', maxEntries }, + ); + const processedDataCount = await this.helpers.getProcessedDataCount(scope, { + mode: 'entries', + maxEntries, + }); + returnData.push( + itemsProcessed.new + .map((key) => { + return itemMapping[key]; + }) + .flat(), + itemsProcessed.processed + .map((key) => { + return itemMapping[key]; + }) + .flat(), + ); + + if (maxEntriesNum > 0 && processedDataCount / maxEntriesNum > 0.5) { + return new NodeExecutionOutput(returnData, [ + { + message: `Some duplicates may be not be removed since you're approaching the maximum history size (${maxEntriesNum} items). You can raise this limit using the ‘history size’ option.`, + location: 'outputPane', + }, + ]); + } else return returnData; + } else if (logic === 'removeItemsUpToStoredIncrementalKey') { + if (!['node', 'workflow'].includes(scope)) { + throw new NodeOperationError( + this.getNode(), + `The scope '${scope}' is not supported. Please select either "node" or "workflow".`, + ); + } + + let parsedIncrementalKey: number; + const itemMapping: { + [key: string]: INodeExecutionData[]; + } = {}; + + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + const incrementalKey = this.getNodeParameter('incrementalDedupeValue', itemIndex, ''); + if (!incrementalKey?.toString()) { + throw new NodeOperationError( + this.getNode(), + 'The `Value to Dedupe` On is empty. Please provide a value.', + ); + } + parsedIncrementalKey = Number(incrementalKey); + if (isNaN(parsedIncrementalKey)) { + throw new NodeOperationError( + this.getNode(), + `The value '${incrementalKey}' is not a number. Please provide a number.`, + ); + } + if (itemMapping[parsedIncrementalKey]) { + itemMapping[parsedIncrementalKey].push(items[itemIndex]); + } else { + itemMapping[parsedIncrementalKey] = [items[itemIndex]]; + } + } + + const itemsProcessed = await this.helpers.checkProcessedAndRecord( + Object.keys(itemMapping), + scope, + { mode: 'latestIncrementalKey' }, + ); + + returnData.push( + itemsProcessed.new + .map((key) => { + return itemMapping[key]; + }) + .flat(), + itemsProcessed.processed + .map((key) => { + return itemMapping[key]; + }) + .flat(), + ); + + return returnData; + } else if (logic === 'removeItemsUpToStoredDate') { + if (!['node', 'workflow'].includes(scope)) { + throw new NodeOperationError( + this.getNode(), + `The scope '${scope}' is not supported. Please select either "node" or "workflow".`, + ); + } + + let checkValue: string; + const itemMapping: { + [key: string]: INodeExecutionData[]; + } = {}; + + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + checkValue = + this.getNodeParameter('dateDedupeValue', itemIndex, '')?.toString() ?? ''; + if (!checkValue) { + throw new NodeOperationError( + this.getNode(), + 'The `Value to Dedupe` On is empty. Please provide a value.', + ); + } + try { + tryToParseDateTime(checkValue); + } catch (error) { + throw new NodeOperationError( + this.getNode(), + `The value '${checkValue}' is not a valid date. Please provide a valid date.`, + ); + } + if (itemMapping[checkValue]) { + itemMapping[checkValue].push(items[itemIndex]); + } else { + itemMapping[checkValue] = [items[itemIndex]]; + } + } + const itemsProcessed = await this.helpers.checkProcessedAndRecord( + Object.keys(itemMapping), + scope, + { mode: 'latestDate' }, + ); + + returnData.push( + itemsProcessed.new + .map((key) => { + return itemMapping[key]; + }) + .flat(), + itemsProcessed.processed + .map((key) => { + return itemMapping[key]; + }) + .flat(), + ); + + return returnData; + } else { + return [items]; + } + } + case 'clearDeduplicationHistory': { + const mode = this.getNodeParameter('mode', 0) as string; + if (mode === 'updateKeyValuesInDatabase') { + } else if (mode === 'deleteKeyValuesFromDatabase') { + } else if (mode === 'cleanDatabase') { + const scope = this.getNodeParameter('options.scope', 0, 'node') as DeduplicationScope; + await this.helpers.clearAllProcessedItems(scope, { + mode: 'entries', + }); + } + + return [items]; + } + default: { + return [items]; + } + } + } catch (error) { + if (this.continueOnFail()) { + returnData.push([{ json: this.getInputData(0)[0].json, error }]); + } else { + throw error; + } + } + return returnData; + } +} diff --git a/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/test/RemoveDuplicates.test.ts b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/test/RemoveDuplicates.test.ts new file mode 100644 index 0000000000..84994b34e6 --- /dev/null +++ b/packages/nodes-base/nodes/Transform/RemoveDuplicates/v2/test/RemoveDuplicates.test.ts @@ -0,0 +1,131 @@ +/* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import { mock } from 'jest-mock-extended'; +import type { IExecuteFunctions, INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow'; + +import { RemoveDuplicatesV2 } from '../RemoveDuplicatesV2.node'; + +describe('RemoveDuplicatesV2', () => { + let node: RemoveDuplicatesV2; + let executeFunctions: IExecuteFunctions; + + beforeEach(() => { + const baseDescription: INodeTypeBaseDescription = { + displayName: 'Remove Duplicates', + name: 'removeDuplicates', + icon: 'file:removeDuplicates.svg', + group: ['transform'], + description: 'Delete items with matching field values', + }; + node = new RemoveDuplicatesV2(baseDescription); + executeFunctions = mock(); + executeFunctions.helpers = { + checkProcessedAndRecord: jest.fn(), + clearAllProcessedItems: jest.fn(), + } as any; + executeFunctions.getInputData = jest.fn(); + executeFunctions.getNodeParameter = jest.fn(); + }); + + it('should Remove items repeated within current input based on all fields', async () => { + const items: INodeExecutionData[] = [ + { json: { id: 1, name: 'John' } }, + { json: { id: 2, name: 'Jane' } }, + { json: { id: 1, name: 'John' } }, + ]; + + (executeFunctions.getInputData as jest.Mock).mockReturnValue(items); + (executeFunctions.getNodeParameter as jest.Mock).mockImplementation( + (paramName: string) => { + if (paramName === 'operation') return 'removeDuplicateInputItems'; + if (paramName === 'compare') return 'allFields'; + return undefined; + }, + ); + + const result = await node.execute.call(executeFunctions); + expect(result).toHaveLength(1); + expect(result[0]).toHaveLength(2); + expect(result[0][0].json).toEqual({ id: 1, name: 'John' }); + expect(result[0][1].json).toEqual({ id: 2, name: 'Jane' }); + }); + + it('should Remove items repeated within current input based on selected fields', async () => { + const items: INodeExecutionData[] = [ + { json: { id: 1, name: 'John' } }, + { json: { id: 2, name: 'Jane' } }, + { json: { id: 1, name: 'Doe' } }, + ]; + + (executeFunctions.getInputData as jest.Mock).mockReturnValue(items); + (executeFunctions.getNodeParameter as jest.Mock).mockImplementation( + (paramName: string) => { + if (paramName === 'operation') return 'removeDuplicateInputItems'; + if (paramName === 'compare') return 'selectedFields'; + if (paramName === 'fieldsToCompare') return 'id'; + return undefined; + }, + ); + + const result = await node.execute.call(executeFunctions); + expect(result).toHaveLength(1); + expect(result[0]).toHaveLength(2); + expect(result[0][0].json).toEqual({ id: 1, name: 'John' }); + expect(result[0][1].json).toEqual({ id: 2, name: 'Jane' }); + }); + + it('should remove items seen in previous executions', async () => { + const items: INodeExecutionData[] = [ + { json: { id: 1, name: 'John' } }, + { json: { id: 2, name: 'Jane' } }, + { json: { id: 3, name: 'Doe' } }, + ]; + + (executeFunctions.getInputData as jest.Mock).mockReturnValue(items); + (executeFunctions.getNodeParameter as jest.Mock).mockImplementation( + (paramName: string, itemIndex: number) => { + if (paramName === 'operation') return 'removeItemsSeenInPreviousExecutions'; + if (paramName === 'logic') return 'removeItemsWithAlreadySeenKeyValues'; + if (paramName === 'dedupeValue' && itemIndex === 0) return 1; + if (paramName === 'dedupeValue' && itemIndex === 1) return 2; + if (paramName === 'dedupeValue' && itemIndex === 2) return 3; + if (paramName === 'options.scope') return 'node'; + if (paramName === 'options.historySize') return 10; + }, + ); + executeFunctions.helpers.getProcessedDataCount = jest.fn().mockReturnValue(3); + (executeFunctions.helpers.checkProcessedAndRecord as jest.Mock).mockReturnValue({ + new: [1, 3], + processed: [2], + }); + + const result = await node.execute.call(executeFunctions); + expect(result).toHaveLength(2); + expect(result[0]).toHaveLength(2); + expect(result[1]).toHaveLength(1); + expect(result[0][0].json).toEqual({ id: 1, name: 'John' }); + expect(result[0][1].json).toEqual({ id: 3, name: 'Doe' }); + }); + + it('should clean database when managing key values', async () => { + const items: INodeExecutionData[] = [ + { json: { id: 1, name: 'John' } }, + { json: { id: 2, name: 'Jane' } }, + ]; + + (executeFunctions.getInputData as jest.Mock).mockReturnValue(items); + (executeFunctions.getNodeParameter as jest.Mock).mockImplementation( + (paramName: string) => { + if (paramName === 'operation') return 'clearDeduplicationHistory'; + if (paramName === 'mode') return 'cleanDatabase'; + if (paramName === 'options.scope') return 'node'; + return undefined; + }, + ); + + const result = await node.execute.call(executeFunctions); + expect(result).toHaveLength(1); + expect(result[0]).toHaveLength(2); + expect(result[0][0].json).toEqual({ id: 1, name: 'John' }); + expect(result[0][1].json).toEqual({ id: 2, name: 'Jane' }); + }); +}); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index d1f95ab51f..20710c79b0 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -766,6 +766,48 @@ export interface BinaryHelperFunctions { }>; } +export type DeduplicationScope = 'node' | 'workflow'; +export type DeduplicationItemTypes = string | number; +export type DeduplicationMode = 'entries' | 'latestIncrementalKey' | 'latestDate'; + +export interface IDeduplicationOutput { + new: DeduplicationItemTypes[]; + processed: DeduplicationItemTypes[]; +} + +export interface IDeduplicationOutputItems { + new: IDataObject[]; + processed: IDataObject[]; +} + +export interface ICheckProcessedOptions { + mode: DeduplicationMode; + maxEntries?: number; +} + +export interface DeduplicationHelperFunctions { + checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise; + checkProcessedItemsAndRecord( + propertyName: string, + items: IDataObject[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise; + removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise; + clearAllProcessedItems(scope: DeduplicationScope, options: ICheckProcessedOptions): Promise; + getProcessedDataCount( + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise; +} export interface NodeHelperFunctions { copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise; } @@ -939,6 +981,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & helpers: RequestHelperFunctions & BaseHelperFunctions & BinaryHelperFunctions & + DeduplicationHelperFunctions & FileSystemHelperFunctions & SSHTunnelFunctions & JsonHelperFunctions & { @@ -2645,6 +2688,46 @@ export interface IUserSettings { npsSurvey?: NpsSurveyState; } +export interface IProcessedDataConfig { + availableModes: string; + mode: string; +} + +export interface IDataDeduplicator { + checkProcessedAndRecord( + items: DeduplicationItemTypes[], + context: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise; + + removeProcessed( + items: DeduplicationItemTypes[], + context: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise; + + clearAllProcessedItems( + context: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise; + getProcessedDataCount( + context: DeduplicationScope, + contextData: ICheckProcessedContextData, + options: ICheckProcessedOptions, + ): Promise; +} + +export interface ICheckProcessedContextData { + node?: INode; + workflow: { + id: string; + active: boolean; + }; +} + export type ExpressionEvaluatorType = 'tmpl' | 'tournament'; export type N8nAIProviderType = 'openai' | 'unknown';