fix(core): Remove circular dependency in WorkflowService and ActiveWorkflowRunner (#8128)

## Summary
A circular dependency between `WorkflowService` and
`ActiveWorkflowRunner` is sometimes causing `this.activeWorkflowRunner`
to be `undefined` in `WorkflowService`.
Breaking this circular dependency should hopefully fix this issue.

## Related tickets and issues
#8122


## Review / Merge checklist
- [x] PR title and summary are descriptive
- [ ] Tests included
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-12-21 17:37:08 +01:00 committed by GitHub
parent e9c7fd7397
commit 21788d9153
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 86 deletions

View file

@ -1,8 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Service } from 'typedi';
import { Container, Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type { import type {
@ -59,7 +58,6 @@ import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner'; import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks'; import { ExternalHooks } from '@/ExternalHooks';
import { whereClause } from './UserManagement/UserManagementHelper'; import { whereClause } from './UserManagement/UserManagementHelper';
import { WorkflowService } from './workflows/workflow.service';
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error'; import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
import { In } from 'typeorm'; import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service'; import { WebhookService } from './services/webhook.service';
@ -70,6 +68,7 @@ import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'
import { ActivationErrorsService } from '@/ActivationErrors.service'; import { ActivationErrorsService } from '@/ActivationErrors.service';
import type { Scope } from '@n8n/permissions'; import type { Scope } from '@n8n/permissions';
import { NotFoundError } from './errors/response-errors/not-found.error'; import { NotFoundError } from './errors/response-errors/not-found.error';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
@Service() @Service()
export class ActiveWorkflowRunner implements IWebhookManager { export class ActiveWorkflowRunner implements IWebhookManager {
@ -95,6 +94,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private readonly multiMainSetup: MultiMainSetup, private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService, private readonly activationErrorsService: ActivationErrorsService,
private readonly executionService: ExecutionsService, private readonly executionService: ExecutionsService,
private readonly workflowStaticDataService: WorkflowStaticDataService,
) {} ) {}
async init() { async init() {
@ -214,10 +214,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
undefined, undefined,
request, request,
response, response,
(error: Error | null, data: object) => { async (error: Error | null, data: object) => {
if (error !== null) { if (error !== null) {
return reject(error); return reject(error);
} }
// Save static data if it changed
await this.workflowStaticDataService.saveStaticData(workflow);
resolve(data); resolve(data);
}, },
); );
@ -413,7 +415,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
await this.webhookService.populateCache(); await this.webhookService.populateCache();
await Container.get(WorkflowService).saveStaticData(workflow); await this.workflowStaticDataService.saveStaticData(workflow);
} }
/** /**
@ -452,7 +454,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update'); await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update');
} }
await Container.get(WorkflowService).saveStaticData(workflow); await this.workflowStaticDataService.saveStaticData(workflow);
await this.webhookService.deleteWorkflowWebhooks(workflowId); await this.webhookService.deleteWorkflowWebhooks(workflowId);
} }
@ -525,7 +527,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
donePromise?: IDeferredPromise<IRun | undefined>, donePromise?: IDeferredPromise<IRun | undefined>,
): void => { ): void => {
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
void Container.get(WorkflowService).saveStaticData(workflow); void this.workflowStaticDataService.saveStaticData(workflow);
const executePromise = this.runWorkflow( const executePromise = this.runWorkflow(
workflowData, workflowData,
node, node,
@ -582,7 +584,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
donePromise?: IDeferredPromise<IRun | undefined>, donePromise?: IDeferredPromise<IRun | undefined>,
): void => { ): void => {
this.logger.debug(`Received trigger for workflow "${workflow.name}"`); this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
void Container.get(WorkflowService).saveStaticData(workflow); void this.workflowStaticDataService.saveStaticData(workflow);
const executePromise = this.runWorkflow( const executePromise = this.runWorkflow(
workflowData, workflowData,
@ -817,7 +819,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.activationErrorsService.unset(workflowId); await this.activationErrorsService.unset(workflowId);
const triggerCount = this.countTriggers(workflow, additionalData); const triggerCount = this.countTriggers(workflow, additionalData);
await Container.get(WorkflowService).updateWorkflowTriggerCount(workflow.id, triggerCount); await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (e) { } catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`); const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message); await this.activationErrorsService.set(workflowId, error.message);
@ -827,7 +829,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// If for example webhooks get created it sometimes has to save the // If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted. // id of them in the static data. So make sure that data gets persisted.
await Container.get(WorkflowService).saveStaticData(workflow); await this.workflowStaticDataService.saveStaticData(workflow);
} }
/** /**

View file

@ -60,7 +60,6 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { EventsService } from '@/services/events.service'; import { EventsService } from '@/services/events.service';
import { OwnershipService } from './services/ownership.service'; import { OwnershipService } from './services/ownership.service';
import { parseBody } from './middlewares'; import { parseBody } from './middlewares';
import { WorkflowService } from './workflows/workflow.service';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { NotFoundError } from './errors/response-errors/not-found.error'; import { NotFoundError } from './errors/response-errors/not-found.error';
import { InternalServerError } from './errors/response-errors/internal-server.error'; import { InternalServerError } from './errors/response-errors/internal-server.error';
@ -386,9 +385,6 @@ export async function executeWebhook(
}; };
} }
// Save static data if it changed
await Container.get(WorkflowService).saveStaticData(workflow);
const additionalKeys: IWorkflowDataProxyAdditionalKeys = { const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId, $executionId: executionId,
}; };

View file

@ -52,7 +52,6 @@ import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker'; import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowService } from './workflows/workflow.service';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
import { EventsService } from '@/services/events.service'; import { EventsService } from '@/services/events.service';
@ -67,6 +66,8 @@ import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryData
import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings'; import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { saveExecutionProgress } from './executionLifecycleHooks/saveExecutionProgress'; import { saveExecutionProgress } from './executionLifecycleHooks/saveExecutionProgress';
import { WorkflowStaticDataService } from './workflows/workflowStaticData.service';
import { WorkflowRepository } from './databases/repositories/workflow.repository';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@ -418,7 +419,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database // Workflow is saved so update in database
try { try {
await Container.get(WorkflowService).saveStaticDataById( await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id as string, this.workflowData.id as string,
newStaticData, newStaticData,
); );
@ -564,7 +565,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database // Workflow is saved so update in database
try { try {
await Container.get(WorkflowService).saveStaticDataById( await Container.get(WorkflowStaticDataService).saveStaticDataById(
this.workflowData.id as string, this.workflowData.id as string,
newStaticData, newStaticData,
); );
@ -714,7 +715,10 @@ export async function getWorkflowData(
if (workflowInfo.id !== undefined) { if (workflowInfo.id !== undefined) {
const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags']; const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags'];
workflowData = await Container.get(WorkflowService).get({ id: workflowInfo.id }, { relations }); workflowData = await Container.get(WorkflowRepository).get(
{ id: workflowInfo.id },
{ relations },
);
if (workflowData === undefined || workflowData === null) { if (workflowData === undefined || workflowData === null) {
throw new ApplicationError('Workflow does not exist.', { throw new ApplicationError('Workflow does not exist.', {

View file

@ -1,5 +1,6 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm'; import { DataSource, Repository, type UpdateResult, type FindOptionsWhere } from 'typeorm';
import config from '@/config';
import { WorkflowEntity } from '../entities/WorkflowEntity'; import { WorkflowEntity } from '../entities/WorkflowEntity';
@Service() @Service()
@ -8,6 +9,13 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
super(WorkflowEntity, dataSource.manager); super(WorkflowEntity, dataSource.manager);
} }
async get(where: FindOptionsWhere<WorkflowEntity>, options?: { relations: string[] }) {
return this.findOne({
where,
relations: options?.relations,
});
}
async getAllActive() { async getAllActive() {
return this.find({ return this.find({
where: { active: true }, where: { active: true },
@ -28,4 +36,21 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
}); });
return totalTriggerCount ?? 0; return totalTriggerCount ?? 0;
} }
async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise<UpdateResult> {
const qb = this.createQueryBuilder('workflow');
return qb
.update()
.set({
triggerCount,
updatedAt: () => {
if (['mysqldb', 'mariadb'].includes(config.getEnv('database.type'))) {
return 'updatedAt';
}
return '"updatedAt"';
},
})
.where('id = :id', { id })
.execute();
}
} }

View file

@ -6,7 +6,7 @@ import type { IExecutionResponse, IExecutionFlattedResponse } from '@/Interfaces
import { EnterpriseWorkflowService } from '../workflows/workflow.service.ee'; import { EnterpriseWorkflowService } from '../workflows/workflow.service.ee';
import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types'; import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types';
import Container from 'typedi'; import Container from 'typedi';
import { WorkflowService } from '@/workflows/workflow.service'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
export class EEExecutionsService extends ExecutionsService { export class EEExecutionsService extends ExecutionsService {
/** /**
@ -26,7 +26,7 @@ export class EEExecutionsService extends ExecutionsService {
const relations = ['shared', 'shared.user', 'shared.role']; const relations = ['shared', 'shared.user', 'shared.role'];
const workflow = (await Container.get(WorkflowService).get( const workflow = (await Container.get(WorkflowRepository).get(
{ id: execution.workflowId }, { id: execution.workflowId },
{ relations }, { relations },
)) as WorkflowWithSharingsAndCredentials; )) as WorkflowWithSharingsAndCredentials;

View file

@ -18,6 +18,7 @@ import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
@Service() @Service()
export class EnterpriseWorkflowService { export class EnterpriseWorkflowService {
@ -26,6 +27,7 @@ export class EnterpriseWorkflowService {
private readonly userService: UserService, private readonly userService: UserService,
private readonly roleService: RoleService, private readonly roleService: RoleService,
private readonly sharedWorkflowRepository: SharedWorkflowRepository, private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly workflowRepository: WorkflowRepository,
) {} ) {}
async isOwned( async isOwned(
@ -182,7 +184,7 @@ export class EnterpriseWorkflowService {
} }
async preventTampering(workflow: WorkflowEntity, workflowId: string, user: User) { async preventTampering(workflow: WorkflowEntity, workflowId: string, user: User) {
const previousVersion = await this.workflowService.get({ id: workflowId }); const previousVersion = await this.workflowRepository.get({ id: workflowId });
if (!previousVersion) { if (!previousVersion) {
throw new NotFoundError('Workflow not found'); throw new NotFoundError('Workflow not found');

View file

@ -1,7 +1,7 @@
import Container, { Service } from 'typedi'; import Container, { Service } from 'typedi';
import type { IDataObject, INode, IPinData } from 'n8n-workflow'; import type { INode, IPinData } from 'n8n-workflow';
import { NodeApiError, ErrorReporterProxy as ErrorReporter, Workflow } from 'n8n-workflow'; import { NodeApiError, Workflow } from 'n8n-workflow';
import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm'; import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere } from 'typeorm';
import { In, Like } from 'typeorm'; import { In, Like } from 'typeorm';
import pick from 'lodash/pick'; import pick from 'lodash/pick';
import omit from 'lodash/omit'; import omit from 'lodash/omit';
@ -25,7 +25,7 @@ import { whereClause } from '@/UserManagement/UserManagementHelper';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { OwnershipService } from '@/services/ownership.service'; import { OwnershipService } from '@/services/ownership.service';
import { isStringArray, isWorkflowIdValid } from '@/utils'; import { isStringArray } from '@/utils';
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService } from 'n8n-core';
import type { Scope } from '@n8n/permissions'; import type { Scope } from '@n8n/permissions';
@ -120,13 +120,6 @@ export class WorkflowService {
return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution
} }
async get(workflow: FindOptionsWhere<WorkflowEntity>, options?: { relations: string[] }) {
return this.workflowRepository.findOne({
where: workflow,
relations: options?.relations,
});
}
async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) { async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
if (sharedWorkflowIds.length === 0) return { workflows: [], count: 0 }; if (sharedWorkflowIds.length === 0) return { workflows: [], count: 0 };
@ -512,56 +505,4 @@ export class WorkflowService {
return sharedWorkflow.workflow; return sharedWorkflow.workflow;
} }
async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise<UpdateResult> {
const qb = this.workflowRepository.createQueryBuilder('workflow');
return qb
.update()
.set({
triggerCount,
updatedAt: () => {
if (['mysqldb', 'mariadb'].includes(config.getEnv('database.type'))) {
return 'updatedAt';
}
return '"updatedAt"';
},
})
.where('id = :id', { id })
.execute();
}
/**
* Saves the static data if it changed
*/
async saveStaticData(workflow: Workflow): Promise<void> {
if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed and so has to be saved
if (isWorkflowIdValid(workflow.id)) {
// Workflow is saved so update in database
try {
await this.saveStaticDataById(workflow.id, workflow.staticData);
workflow.staticData.__dataChanged = false;
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`,
{ workflowId: workflow.id },
);
}
}
}
}
/**
* Saves the given static data on workflow
*
* @param {(string)} workflowId The id of the workflow to save data on
* @param {IDataObject} newStaticData The static data to save
*/
async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise<void> {
await this.workflowRepository.update(workflowId, {
staticData: newStaticData,
});
}
} }

View file

@ -0,0 +1,41 @@
import { Service } from 'typedi';
import { type IDataObject, type Workflow, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import { Logger } from '@/Logger';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { isWorkflowIdValid } from '@/utils';
@Service()
export class WorkflowStaticDataService {
constructor(
private readonly logger: Logger,
private readonly workflowRepository: WorkflowRepository,
) {}
/** Saves the static data if it changed */
async saveStaticData(workflow: Workflow): Promise<void> {
if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed and so has to be saved
if (isWorkflowIdValid(workflow.id)) {
// Workflow is saved so update in database
try {
await this.saveStaticDataById(workflow.id, workflow.staticData);
workflow.staticData.__dataChanged = false;
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`,
{ workflowId: workflow.id },
);
}
}
}
}
/** Saves the given static data on workflow */
async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise<void> {
await this.workflowRepository.update(workflowId, {
staticData: newStaticData,
});
}
}

View file

@ -28,6 +28,7 @@ import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { WorkflowService } from './workflow.service'; import { WorkflowService } from './workflow.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
export const EEWorkflowController = express.Router(); export const EEWorkflowController = express.Router();
@ -129,7 +130,7 @@ EEWorkflowController.get(
relations.push('tags'); relations.push('tags');
} }
const workflow = await Container.get(WorkflowService).get({ id: workflowId }, { relations }); const workflow = await Container.get(WorkflowRepository).get({ id: workflowId }, { relations });
if (!workflow) { if (!workflow) {
throw new NotFoundError(`Workflow with ID "${workflowId}" does not exist`); throw new NotFoundError(`Workflow with ID "${workflowId}" does not exist`);