perf(core): Optimize workflow activation errors (#8242)

At https://github.com/n8n-io/n8n/pull/8213 we introduced Redis hashes
for workflow ownership and manual webhooks...

- to remove clutter from multiple related keys at the top level,
- to improve performance by preventing serializing-deserializing, and
- to guarantee atomicity during concurrent updates in multi-main setup.

Workflow activation errors can also benefit from this. Added test
coverage as well.

To test manually, create a workflow with a trigger with an invalid
credential, edit the workflow's `active` column to `true`, and restart.
The activation error should show as a red triangle on canvas and in the
workflow list.
This commit is contained in:
Iván Ovejero 2024-01-05 13:06:42 +01:00 committed by GitHub
parent 23a4ac96c0
commit f2939568cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 30 deletions

View file

@ -1,10 +1,5 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { CacheService } from '@/services/cache/cache.service'; import { CacheService } from '@/services/cache/cache.service';
import { jsonParse } from 'n8n-workflow';
type ActivationErrors = {
[workflowId: string]: string; // error message
};
@Service() @Service()
export class ActivationErrorsService { export class ActivationErrorsService {
@ -12,38 +7,28 @@ export class ActivationErrorsService {
constructor(private readonly cacheService: CacheService) {} constructor(private readonly cacheService: CacheService) {}
async set(workflowId: string, errorMessage: string) { async register(workflowId: string, errorMessage: string) {
const errors = await this.getAll(); await this.cacheService.setHash(this.cacheKey, { [workflowId]: errorMessage });
errors[workflowId] = errorMessage;
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
} }
async unset(workflowId: string) { async deregister(workflowId: string) {
const errors = await this.getAll(); await this.cacheService.deleteFromHash(this.cacheKey, workflowId);
if (Object.keys(errors).length === 0) return;
delete errors[workflowId];
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
} }
async get(workflowId: string) { async get(workflowId: string) {
const errors = await this.getAll(); const activationError = await this.cacheService.getHashValue<string>(this.cacheKey, workflowId);
if (Object.keys(errors).length === 0) return null; if (!activationError) return null;
return errors[workflowId]; return activationError;
} }
async getAll() { async getAll() {
const errors = await this.cacheService.get<string>(this.cacheKey); const activationErrors = await this.cacheService.getHash<string>(this.cacheKey);
if (!errors) return {}; if (!activationErrors) return {};
return jsonParse<ActivationErrors>(errors); return activationErrors;
} }
async clearAll() { async clearAll() {

View file

@ -425,7 +425,7 @@ export class ActiveWorkflowRunner {
void this.activeWorkflows.remove(workflowData.id); void this.activeWorkflows.remove(workflowData.id);
void this.activationErrorsService.set(workflowData.id, error.message); void this.activationErrorsService.register(workflowData.id, error.message);
// Run Error Workflow if defined // Run Error Workflow if defined
const activationError = new WorkflowActivationError( const activationError = new WorkflowActivationError(
@ -630,13 +630,13 @@ export class ActiveWorkflowRunner {
// Workflow got now successfully activated so make sure nothing is left in the queue // Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
await this.activationErrorsService.unset(workflowId); await this.activationErrorsService.deregister(workflowId);
const triggerCount = this.countTriggers(workflow, additionalData); const triggerCount = this.countTriggers(workflow, additionalData);
await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount); await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (e) { } catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`); const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message); await this.activationErrorsService.register(workflowId, error.message);
throw e; throw e;
} }
@ -757,7 +757,7 @@ export class ActiveWorkflowRunner {
); );
} }
await this.activationErrorsService.unset(workflowId); await this.activationErrorsService.deregister(workflowId);
if (this.queuedActivations[workflowId] !== undefined) { if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
@ -824,6 +824,6 @@ export class ActiveWorkflowRunner {
} }
async removeActivationError(workflowId: string) { async removeActivationError(workflowId: string) {
await this.activationErrorsService.unset(workflowId); await this.activationErrorsService.deregister(workflowId);
} }
} }

View file

@ -0,0 +1,87 @@
import { ActivationErrorsService } from '@/ActivationErrors.service';
import { CacheService } from '@/services/cache/cache.service';
describe('ActivationErrorsService', () => {
const cacheService = new CacheService();
const activationErrorsService = new ActivationErrorsService(cacheService);
const firstWorkflowId = 'GSG0etbfTA2CNPDX';
const secondWorkflowId = 'k2ORscMPO66K0Jk3';
const firstErrorMsg = 'Failed to activate';
const secondErrorMsg = 'Also failed to activate';
afterEach(async () => {
await activationErrorsService.clearAll();
});
describe('register', () => {
test('should register an activation error for a workflow', async () => {
await activationErrorsService.register(firstWorkflowId, firstErrorMsg);
const activationError = await activationErrorsService.get(firstWorkflowId);
expect(activationError).toBe(firstErrorMsg);
});
});
describe('deregister', () => {
test('should deregister an activation error for a workflow', async () => {
await activationErrorsService.register(firstWorkflowId, firstErrorMsg);
await activationErrorsService.deregister(firstWorkflowId);
const activationError = await activationErrorsService.get(firstWorkflowId);
expect(activationError).toBeNull();
});
});
describe('get', () => {
test('should retrieve an activation error for a workflow', async () => {
await activationErrorsService.register(firstWorkflowId, firstErrorMsg);
const activationError = await activationErrorsService.get(firstWorkflowId);
expect(activationError).toBe(firstErrorMsg);
});
test('should return `null` if no activation error found for a workflow', async () => {
const activationError = await activationErrorsService.get(firstWorkflowId);
expect(activationError).toBeNull();
});
});
describe('getAll', () => {
test('should retrieve all activation errors', async () => {
await activationErrorsService.register(firstWorkflowId, firstErrorMsg);
await activationErrorsService.register(secondWorkflowId, secondErrorMsg);
const allActivationErrors = await activationErrorsService.getAll();
expect(allActivationErrors).toEqual({
[firstWorkflowId]: firstErrorMsg,
[secondWorkflowId]: secondErrorMsg,
});
});
test('should return an empty object if no activation errors', async () => {
const allActivationErrors = await activationErrorsService.getAll();
expect(allActivationErrors).toEqual({});
});
});
describe('clearAll()', () => {
test('should clear activation errors', async () => {
await activationErrorsService.register(firstWorkflowId, firstErrorMsg);
await activationErrorsService.register(secondWorkflowId, secondErrorMsg);
await activationErrorsService.clearAll();
const allActivationErrors = await activationErrorsService.getAll();
expect(allActivationErrors).toEqual({});
});
});
});