mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
wip: workflow execution filtering
This commit is contained in:
parent
4dc458eca5
commit
80cf38e203
|
@ -169,6 +169,8 @@ export async function init(
|
||||||
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
|
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
|
||||||
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
|
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
|
||||||
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
|
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
|
||||||
|
collections.ExecutionMetadata = linkRepository(entities.ExecutionMetadata);
|
||||||
|
|
||||||
collections.EventDestinations = linkRepository(entities.EventDestinations);
|
collections.EventDestinations = linkRepository(entities.EventDestinations);
|
||||||
|
|
||||||
isInitialized = true;
|
isInitialized = true;
|
||||||
|
|
|
@ -45,6 +45,7 @@ import type { WebhookEntity } from '@db/entities/WebhookEntity';
|
||||||
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
||||||
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
|
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
|
||||||
import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity';
|
import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity';
|
||||||
|
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';
|
||||||
|
|
||||||
export interface IActivationError {
|
export interface IActivationError {
|
||||||
time: number;
|
time: number;
|
||||||
|
@ -85,6 +86,7 @@ export interface IDatabaseCollections {
|
||||||
InstalledNodes: Repository<InstalledNodes>;
|
InstalledNodes: Repository<InstalledNodes>;
|
||||||
WorkflowStatistics: Repository<WorkflowStatistics>;
|
WorkflowStatistics: Repository<WorkflowStatistics>;
|
||||||
EventDestinations: Repository<EventDestinations>;
|
EventDestinations: Repository<EventDestinations>;
|
||||||
|
ExecutionMetadata: Repository<ExecutionMetadata>;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------
|
// ----------------------------------
|
||||||
|
|
|
@ -619,6 +619,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
|
||||||
stoppedAt: fullExecutionData.stoppedAt,
|
stoppedAt: fullExecutionData.stoppedAt,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
console.log('got metadata2:', fullRunData.data.resultData.metadata);
|
||||||
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
||||||
|
|
||||||
// Save the Execution in DB
|
// Save the Execution in DB
|
||||||
|
@ -627,6 +628,22 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
|
||||||
executionData as IExecutionFlattedDb,
|
executionData as IExecutionFlattedDb,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (fullRunData.data.resultData.metadata) {
|
||||||
|
for (const [key, value] of Object.entries(fullRunData.data.resultData.metadata)) {
|
||||||
|
console.log(this.executionId, parseInt(this.executionId));
|
||||||
|
await Db.collections.ExecutionMetadata.save({
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
execution: this.executionId as any,
|
||||||
|
key,
|
||||||
|
value,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
|
||||||
if (fullRunData.finished === true && this.retryOf !== undefined) {
|
if (fullRunData.finished === true && this.retryOf !== undefined) {
|
||||||
// If the retry was successful save the reference it on the original execution
|
// If the retry was successful save the reference it on the original execution
|
||||||
// await Db.collections.Execution.save(executionData as IExecutionFlattedDb);
|
// await Db.collections.Execution.save(executionData as IExecutionFlattedDb);
|
||||||
|
@ -741,6 +758,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
|
||||||
fullExecutionData.workflowId = workflowId;
|
fullExecutionData.workflowId = workflowId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log('got metadata:', fullRunData.data.resultData.metadata);
|
||||||
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
||||||
|
|
||||||
// Save the Execution in DB
|
// Save the Execution in DB
|
||||||
|
@ -1012,6 +1030,7 @@ async function executeWorkflow(
|
||||||
fullExecutionData.workflowId = workflowData.id;
|
fullExecutionData.workflowId = workflowData.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log('got metadata1:', fullRunData.data.resultData.metadata);
|
||||||
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
||||||
|
|
||||||
await Db.collections.Execution.update(executionId, executionData as IExecutionFlattedDb);
|
await Db.collections.Execution.update(executionId, executionData as IExecutionFlattedDb);
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
import { WorkflowExecuteMode } from 'n8n-workflow';
|
import { WorkflowExecuteMode } from 'n8n-workflow';
|
||||||
import { Column, Entity, Generated, Index, PrimaryColumn } from 'typeorm';
|
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
|
||||||
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
|
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
|
||||||
import { IWorkflowDb } from '@/Interfaces';
|
import { IWorkflowDb } from '@/Interfaces';
|
||||||
import type { IExecutionFlattedDb } from '@/Interfaces';
|
import type { IExecutionFlattedDb } from '@/Interfaces';
|
||||||
import { idStringifier } from '../utils/transformers';
|
import { idStringifier } from '../utils/transformers';
|
||||||
|
import type { ExecutionMetadata } from './ExecutionMetadata';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@Index(['workflowId', 'id'])
|
@Index(['workflowId', 'id'])
|
||||||
|
@ -46,4 +47,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
|
||||||
|
|
||||||
@Column({ type: datetimeColumnType, nullable: true })
|
@Column({ type: datetimeColumnType, nullable: true })
|
||||||
waitTill: Date;
|
waitTill: Date;
|
||||||
|
|
||||||
|
@OneToMany('ExecutionMetadata', 'execution')
|
||||||
|
metadata: ExecutionMetadata[];
|
||||||
}
|
}
|
||||||
|
|
22
packages/cli/src/databases/entities/ExecutionMetadata.ts
Normal file
22
packages/cli/src/databases/entities/ExecutionMetadata.ts
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn, RelationId } from 'typeorm';
|
||||||
|
import type { ExecutionEntity } from './ExecutionEntity';
|
||||||
|
|
||||||
|
@Entity()
|
||||||
|
export class ExecutionMetadata {
|
||||||
|
@PrimaryGeneratedColumn()
|
||||||
|
id: number;
|
||||||
|
|
||||||
|
@ManyToOne('ExecutionEntity', 'metadata', {
|
||||||
|
onDelete: 'CASCADE',
|
||||||
|
})
|
||||||
|
execution: ExecutionEntity;
|
||||||
|
|
||||||
|
@RelationId((executionMetadata: ExecutionMetadata) => executionMetadata.execution)
|
||||||
|
executionId: number;
|
||||||
|
|
||||||
|
@Column('text')
|
||||||
|
key: string;
|
||||||
|
|
||||||
|
@Column('text')
|
||||||
|
value: string;
|
||||||
|
}
|
|
@ -15,6 +15,7 @@ import { User } from './User';
|
||||||
import { WebhookEntity } from './WebhookEntity';
|
import { WebhookEntity } from './WebhookEntity';
|
||||||
import { WorkflowEntity } from './WorkflowEntity';
|
import { WorkflowEntity } from './WorkflowEntity';
|
||||||
import { WorkflowStatistics } from './WorkflowStatistics';
|
import { WorkflowStatistics } from './WorkflowStatistics';
|
||||||
|
import { ExecutionMetadata } from './ExecutionMetadata';
|
||||||
|
|
||||||
export const entities = {
|
export const entities = {
|
||||||
AuthIdentity,
|
AuthIdentity,
|
||||||
|
@ -33,4 +34,5 @@ export const entities = {
|
||||||
WebhookEntity,
|
WebhookEntity,
|
||||||
WorkflowEntity,
|
WorkflowEntity,
|
||||||
WorkflowStatistics,
|
WorkflowStatistics,
|
||||||
|
ExecutionMetadata,
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
|
||||||
|
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
|
||||||
|
import config from '@/config';
|
||||||
|
|
||||||
|
export class CreateExecutionMetadataTable1674133106777 implements MigrationInterface {
|
||||||
|
name = 'CreateExecutionMetadataTable1674133106777';
|
||||||
|
|
||||||
|
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||||
|
logMigrationStart(this.name);
|
||||||
|
const tablePrefix = config.getEnv('database.tablePrefix');
|
||||||
|
|
||||||
|
await queryRunner.query(
|
||||||
|
`CREATE TABLE "${tablePrefix}execution_metadata" (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
executionId INTEGER NOT NULL,
|
||||||
|
"key" TEXT,
|
||||||
|
value TEXT,
|
||||||
|
CONSTRAINT ${tablePrefix}execution_metadata_entity_FK FOREIGN KEY (executionId) REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
|
||||||
|
)`,
|
||||||
|
);
|
||||||
|
|
||||||
|
logMigrationEnd(this.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||||
|
const tablePrefix = config.getEnv('database.tablePrefix');
|
||||||
|
|
||||||
|
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ import { MessageEventBusDestinations1671535397530 } from './1671535397530-Messag
|
||||||
import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows';
|
import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows';
|
||||||
import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities';
|
import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities';
|
||||||
import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections';
|
import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections';
|
||||||
|
import { CreateExecutionMetadataTable1674133106777 } from './1674133106777-CreateExecutionMetadataTable';
|
||||||
|
|
||||||
const sqliteMigrations = [
|
const sqliteMigrations = [
|
||||||
InitialMigration1588102412422,
|
InitialMigration1588102412422,
|
||||||
|
@ -60,6 +61,7 @@ const sqliteMigrations = [
|
||||||
DeleteExecutionsWithWorkflows1673268682475,
|
DeleteExecutionsWithWorkflows1673268682475,
|
||||||
CreateLdapEntities1674509946020,
|
CreateLdapEntities1674509946020,
|
||||||
PurgeInvalidWorkflowConnections1675940580449,
|
PurgeInvalidWorkflowConnections1675940580449,
|
||||||
|
CreateExecutionMetadataTable1674133106777,
|
||||||
];
|
];
|
||||||
|
|
||||||
export { sqliteMigrations };
|
export { sqliteMigrations };
|
||||||
|
|
|
@ -5,8 +5,8 @@ import { validate as jsonSchemaValidate } from 'jsonschema';
|
||||||
import { BinaryDataManager } from 'n8n-core';
|
import { BinaryDataManager } from 'n8n-core';
|
||||||
import type { IDataObject, IWorkflowBase, JsonObject } from 'n8n-workflow';
|
import type { IDataObject, IWorkflowBase, JsonObject } from 'n8n-workflow';
|
||||||
import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow';
|
import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow';
|
||||||
import type { FindOperator, FindOptionsWhere } from 'typeorm';
|
import type { FindOptionsWhere, FindOperator } from 'typeorm';
|
||||||
import { In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm';
|
import { In, IsNull, LessThanOrEqual, MoreThanOrEqual, Not, Raw } from 'typeorm';
|
||||||
import * as ActiveExecutions from '@/ActiveExecutions';
|
import * as ActiveExecutions from '@/ActiveExecutions';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import type { User } from '@db/entities/User';
|
import type { User } from '@db/entities/User';
|
||||||
|
@ -25,9 +25,11 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers';
|
||||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
import * as Db from '@/Db';
|
import * as Db from '@/Db';
|
||||||
import * as GenericHelpers from '@/GenericHelpers';
|
import * as GenericHelpers from '@/GenericHelpers';
|
||||||
|
import { ExecutionMetadata } from '@/databases/entities/ExecutionMetadata';
|
||||||
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
|
|
||||||
interface IGetExecutionsQueryFilter {
|
interface IGetExecutionsQueryFilter {
|
||||||
id?: FindOperator<string>;
|
id?: FindOperator<string> | string;
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
mode?: string;
|
mode?: string;
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
|
@ -35,18 +37,35 @@ interface IGetExecutionsQueryFilter {
|
||||||
workflowId?: string;
|
workflowId?: string;
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
waitTill?: FindOperator<any> | boolean;
|
waitTill?: FindOperator<any> | boolean;
|
||||||
|
metadata?: Array<{ key: string; value: string }>;
|
||||||
|
startedAfter?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const schemaGetExecutionsQueryFilter = {
|
const schemaGetExecutionsQueryFilter = {
|
||||||
$id: '/IGetExecutionsQueryFilter',
|
$id: '/IGetExecutionsQueryFilter',
|
||||||
type: 'object',
|
type: 'object',
|
||||||
properties: {
|
properties: {
|
||||||
|
id: { type: 'string' },
|
||||||
finished: { type: 'boolean' },
|
finished: { type: 'boolean' },
|
||||||
mode: { type: 'string' },
|
mode: { type: 'string' },
|
||||||
retryOf: { type: 'string' },
|
retryOf: { type: 'string' },
|
||||||
retrySuccessId: { type: 'string' },
|
retrySuccessId: { type: 'string' },
|
||||||
waitTill: { type: 'boolean' },
|
waitTill: { type: 'boolean' },
|
||||||
workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] },
|
workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] },
|
||||||
|
metadata: { type: 'array', items: { $ref: '#/$defs/metadata' } },
|
||||||
|
startedAfter: { type: 'date-time' },
|
||||||
|
},
|
||||||
|
$defs: {
|
||||||
|
metadata: {
|
||||||
|
type: 'object',
|
||||||
|
required: ['key', 'value'],
|
||||||
|
properties: {
|
||||||
|
key: {
|
||||||
|
type: 'string',
|
||||||
|
},
|
||||||
|
value: { type: 'string' },
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -68,17 +87,40 @@ export class ExecutionsService {
|
||||||
static async getExecutionsCount(
|
static async getExecutionsCount(
|
||||||
countFilter: IDataObject,
|
countFilter: IDataObject,
|
||||||
user: User,
|
user: User,
|
||||||
|
metadata?: Array<{ key: string; value: string }>,
|
||||||
): Promise<{ count: number; estimated: boolean }> {
|
): Promise<{ count: number; estimated: boolean }> {
|
||||||
const dbType = config.getEnv('database.type');
|
const dbType = config.getEnv('database.type');
|
||||||
const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id');
|
const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id');
|
||||||
|
|
||||||
// For databases other than Postgres, do a regular count
|
// For databases other than Postgres, do a regular count
|
||||||
// when filtering based on `workflowId` or `finished` fields.
|
// when filtering based on `workflowId` or `finished` fields.
|
||||||
if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') {
|
if (
|
||||||
|
dbType !== 'postgresdb' ||
|
||||||
|
metadata?.length ||
|
||||||
|
filteredFields.length > 0 ||
|
||||||
|
user.globalRole.name !== 'owner'
|
||||||
|
) {
|
||||||
const sharedWorkflowIds = await this.getWorkflowIdsForUser(user);
|
const sharedWorkflowIds = await this.getWorkflowIdsForUser(user);
|
||||||
|
|
||||||
const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } };
|
let query = Db.collections.Execution.createQueryBuilder('execution')
|
||||||
const count = await Db.collections.Execution.count(countParams);
|
.select()
|
||||||
|
.orderBy('execution.id', 'DESC')
|
||||||
|
.where({ workflowId: In(sharedWorkflowIds) });
|
||||||
|
|
||||||
|
if (metadata?.length) {
|
||||||
|
query = query.leftJoinAndSelect(ExecutionMetadata, 'md', 'md.executionId = execution.id');
|
||||||
|
for (const md of metadata) {
|
||||||
|
query = query.andWhere('md.key = :key AND md.value = :value', md);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filteredFields.length > 0) {
|
||||||
|
query = query.andWhere(countFilter);
|
||||||
|
}
|
||||||
|
|
||||||
|
// const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } };
|
||||||
|
// const count = await Db.collections.Execution.count(countParams);
|
||||||
|
const count = await query.getCount();
|
||||||
return { count, estimated: false };
|
return { count, estimated: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,6 +164,14 @@ export class ExecutionsService {
|
||||||
} else {
|
} else {
|
||||||
delete filter.waitTill;
|
delete filter.waitTill;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(filter.metadata)) {
|
||||||
|
delete filter.metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ('startedAfter' in filter) {
|
||||||
|
delete filter.startedAfter;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,11 +286,27 @@ export class ExecutionsService {
|
||||||
'execution.stoppedAt',
|
'execution.stoppedAt',
|
||||||
'execution.workflowData',
|
'execution.workflowData',
|
||||||
])
|
])
|
||||||
.orderBy('id', 'DESC')
|
.orderBy('execution.id', 'DESC')
|
||||||
.take(limit)
|
.take(limit)
|
||||||
.where(findWhere);
|
.where(findWhere);
|
||||||
|
|
||||||
const countFilter = deepCopy(filter ?? {});
|
const countFilter = deepCopy(filter ?? {});
|
||||||
|
const metadata = filter?.metadata;
|
||||||
|
|
||||||
|
if (metadata?.length) {
|
||||||
|
query = query.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
|
||||||
|
for (const md of metadata) {
|
||||||
|
query = query.andWhere('md.key = :key AND md.value = :value', md);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filter?.startedAfter) {
|
||||||
|
query = query.andWhere({
|
||||||
|
startedAt: MoreThanOrEqual(
|
||||||
|
DateUtils.mixedDateToUtcDatetimeString(new Date(filter.startedAfter)),
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (filter) {
|
if (filter) {
|
||||||
this.massageFilters(filter as IDataObject);
|
this.massageFilters(filter as IDataObject);
|
||||||
|
@ -255,6 +321,7 @@ export class ExecutionsService {
|
||||||
const { count, estimated } = await this.getExecutionsCount(
|
const { count, estimated } = await this.getExecutionsCount(
|
||||||
countFilter as IDataObject,
|
countFilter as IDataObject,
|
||||||
req.user,
|
req.user,
|
||||||
|
metadata,
|
||||||
);
|
);
|
||||||
|
|
||||||
const formattedExecutions = executions.map((execution) => {
|
const formattedExecutions = executions.map((execution) => {
|
||||||
|
|
|
@ -108,6 +108,12 @@ import type { IResponseError, IWorkflowSettings } from './Interfaces';
|
||||||
import { extractValue } from './ExtractValue';
|
import { extractValue } from './ExtractValue';
|
||||||
import { getClientCredentialsToken } from './OAuth2Helper';
|
import { getClientCredentialsToken } from './OAuth2Helper';
|
||||||
import { PLACEHOLDER_EMPTY_EXECUTION_ID } from './Constants';
|
import { PLACEHOLDER_EMPTY_EXECUTION_ID } from './Constants';
|
||||||
|
import {
|
||||||
|
getAllWorkflowExecutionMetadata,
|
||||||
|
getWorkflowExecutionMetadata,
|
||||||
|
setAllWorkflowExecutionMetadata,
|
||||||
|
setWorkflowExecutionMetadata,
|
||||||
|
} from './WorkflowExecutionMetadata';
|
||||||
|
|
||||||
axios.defaults.timeout = 300000;
|
axios.defaults.timeout = 300000;
|
||||||
// Prevent axios from adding x-form-www-urlencoded headers by default
|
// Prevent axios from adding x-form-www-urlencoded headers by default
|
||||||
|
@ -1589,6 +1595,7 @@ export async function requestWithAuthentication(
|
||||||
export function getAdditionalKeys(
|
export function getAdditionalKeys(
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
|
runExecutionData: IRunExecutionData | null,
|
||||||
): IWorkflowDataProxyAdditionalKeys {
|
): IWorkflowDataProxyAdditionalKeys {
|
||||||
const executionId = additionalData.executionId || PLACEHOLDER_EMPTY_EXECUTION_ID;
|
const executionId = additionalData.executionId || PLACEHOLDER_EMPTY_EXECUTION_ID;
|
||||||
const resumeUrl = `${additionalData.webhookWaitingBaseUrl}/${executionId}`;
|
const resumeUrl = `${additionalData.webhookWaitingBaseUrl}/${executionId}`;
|
||||||
|
@ -1597,6 +1604,22 @@ export function getAdditionalKeys(
|
||||||
id: executionId,
|
id: executionId,
|
||||||
mode: mode === 'manual' ? 'test' : 'production',
|
mode: mode === 'manual' ? 'test' : 'production',
|
||||||
resumeUrl,
|
resumeUrl,
|
||||||
|
customData: runExecutionData
|
||||||
|
? {
|
||||||
|
set(key: string, value: string): void {
|
||||||
|
setWorkflowExecutionMetadata(runExecutionData, key, value);
|
||||||
|
},
|
||||||
|
setAll(obj: Record<string, string>): void {
|
||||||
|
setAllWorkflowExecutionMetadata(runExecutionData, obj);
|
||||||
|
},
|
||||||
|
get(key: string): string {
|
||||||
|
return getWorkflowExecutionMetadata(runExecutionData, key);
|
||||||
|
},
|
||||||
|
getAll(): Record<string, string> {
|
||||||
|
return getAllWorkflowExecutionMetadata(runExecutionData);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
: undefined,
|
||||||
},
|
},
|
||||||
|
|
||||||
// deprecated
|
// deprecated
|
||||||
|
@ -2085,7 +2108,7 @@ export function getExecutePollFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
undefined,
|
undefined,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2143,7 +2166,7 @@ export function getExecuteTriggerFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
undefined,
|
undefined,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2201,7 +2224,7 @@ export function getExecuteFunctions(
|
||||||
connectionInputData,
|
connectionInputData,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
|
@ -2267,7 +2290,7 @@ export function getExecuteFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2284,7 +2307,7 @@ export function getExecuteFunctions(
|
||||||
{},
|
{},
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
);
|
);
|
||||||
return dataProxy.getDataProxy();
|
return dataProxy.getDataProxy();
|
||||||
|
@ -2365,7 +2388,7 @@ export function getExecuteSingleFunctions(
|
||||||
connectionInputData,
|
connectionInputData,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
|
@ -2436,7 +2459,7 @@ export function getExecuteSingleFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2453,7 +2476,7 @@ export function getExecuteSingleFunctions(
|
||||||
{},
|
{},
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
executeData,
|
executeData,
|
||||||
);
|
);
|
||||||
return dataProxy.getDataProxy();
|
return dataProxy.getDataProxy();
|
||||||
|
@ -2543,7 +2566,7 @@ export function getLoadOptionsFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
undefined,
|
undefined,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2592,7 +2615,7 @@ export function getExecuteHookFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, runExecutionData),
|
||||||
undefined,
|
undefined,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2606,7 +2629,7 @@ export function getExecuteHookFunctions(
|
||||||
additionalData,
|
additionalData,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, null),
|
||||||
isTest,
|
isTest,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
|
@ -2669,7 +2692,7 @@ export function getExecuteWebhookFunctions(
|
||||||
itemIndex,
|
itemIndex,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, null),
|
||||||
undefined,
|
undefined,
|
||||||
fallbackValue,
|
fallbackValue,
|
||||||
options,
|
options,
|
||||||
|
@ -2707,7 +2730,7 @@ export function getExecuteWebhookFunctions(
|
||||||
additionalData,
|
additionalData,
|
||||||
mode,
|
mode,
|
||||||
additionalData.timezone,
|
additionalData.timezone,
|
||||||
getAdditionalKeys(additionalData, mode),
|
getAdditionalKeys(additionalData, mode, null),
|
||||||
),
|
),
|
||||||
getWebhookName: () => webhookData.webhookDescription.name,
|
getWebhookName: () => webhookData.webhookDescription.name,
|
||||||
prepareOutputData: NodeHelpers.prepareOutputData,
|
prepareOutputData: NodeHelpers.prepareOutputData,
|
||||||
|
|
34
packages/core/src/WorkflowExecutionMetadata.ts
Normal file
34
packages/core/src/WorkflowExecutionMetadata.ts
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
import type { IRunExecutionData } from 'n8n-workflow';
|
||||||
|
|
||||||
|
export function setWorkflowExecutionMetadata(
|
||||||
|
executionData: IRunExecutionData,
|
||||||
|
key: string,
|
||||||
|
value: unknown,
|
||||||
|
) {
|
||||||
|
if (!executionData.resultData.metadata) {
|
||||||
|
executionData.resultData.metadata = {};
|
||||||
|
}
|
||||||
|
executionData.resultData.metadata[String(key)] = String(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function setAllWorkflowExecutionMetadata(
|
||||||
|
executionData: IRunExecutionData,
|
||||||
|
obj: Record<string, string>,
|
||||||
|
) {
|
||||||
|
Object.entries(obj).forEach(([key, value]) =>
|
||||||
|
setWorkflowExecutionMetadata(executionData, key, value),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getAllWorkflowExecutionMetadata(
|
||||||
|
executionData: IRunExecutionData,
|
||||||
|
): Record<string, string> {
|
||||||
|
return executionData.resultData.metadata ?? {};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getWorkflowExecutionMetadata(
|
||||||
|
executionData: IRunExecutionData,
|
||||||
|
key: string,
|
||||||
|
): string {
|
||||||
|
return getAllWorkflowExecutionMetadata(executionData)[String(key)];
|
||||||
|
}
|
|
@ -1548,6 +1548,7 @@ export interface IRunExecutionData {
|
||||||
runData: IRunData;
|
runData: IRunData;
|
||||||
pinData?: IPinData;
|
pinData?: IPinData;
|
||||||
lastNodeExecuted?: string;
|
lastNodeExecuted?: string;
|
||||||
|
metadata?: Record<string, string>;
|
||||||
};
|
};
|
||||||
executionData?: {
|
executionData?: {
|
||||||
contextData: IExecuteContextData;
|
contextData: IExecuteContextData;
|
||||||
|
|
Loading…
Reference in a new issue