perf(core): Cache webhooks (#6825)

* refactor: Initial setup

* Refactor for clarity

* Comments to clarify

* More replacements

* Simplify with `fullPath`

* Fix tests

* Implement remaining methods

* chore: Fix misresolved conflicts

* Simplify syntax

* Reduce diff

* Minor cleanup

* Fix lint

* Inject dependency

* Improve typings

* Remove unused method

* Restore method

* Add comment

* Rename in test

* Restore comments

* Clean up dynamic webhook handling

* Clean up tests

* Remove redundant `cache` prefix

* fix: Correct `uniquePath` for dynamic webhooks
This commit is contained in:
Iván Ovejero 2023-08-04 11:52:45 +02:00 committed by GitHub
parent 90e825f743
commit 0511458d41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 394 additions and 77 deletions

View file

@ -18,7 +18,6 @@ import type {
IRunExecutionData,
IWorkflowBase,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
IHttpRequestMethods,
WorkflowActivateMode,
WorkflowExecuteMode,
INodeType,
@ -52,7 +51,6 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import config from '@/config';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WebhookEntity } from '@db/entities/WebhookEntity';
import { ActiveExecutions } from '@/ActiveExecutions';
import { createErrorExecution } from '@/GenericHelpers';
import {
@ -67,7 +65,7 @@ import { whereClause } from './UserManagement/UserManagementHelper';
import { WorkflowsService } from './workflows/workflows.services';
import { webhookNotFoundErrorMessage } from './utils';
import { In } from 'typeorm';
import { WebhookRepository } from '@db/repositories';
import { WebhookService } from './services/webhook.service';
const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@ -88,7 +86,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private activeExecutions: ActiveExecutions,
private externalHooks: ExternalHooks,
private nodeTypes: NodeTypes,
private webhookRepository: WebhookRepository,
private webhookService: WebhookService,
) {}
async init() {
@ -111,7 +109,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// This is not officially supported but there is no reason
// it should not work.
// Clear up active workflow table
await this.webhookRepository.clear();
await this.webhookService.deleteInstanceWebhooks();
}
if (workflowsData.length !== 0) {
@ -159,6 +157,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
await this.externalHooks.run('activeWorkflows.initialized', []);
await this.webhookService.populateCache();
}
/**
@ -200,59 +199,18 @@ export class ActiveWorkflowRunner implements IWebhookManager {
path = path.slice(0, -1);
}
let webhook = await this.webhookRepository.findOneBy({
webhookPath: path,
method: httpMethod,
});
let webhookId: string | undefined;
const webhook = await this.webhookService.findWebhook(httpMethod, path);
// check if path is dynamic
if (webhook === null) {
// check if a dynamic webhook path exists
const pathElements = path.split('/');
webhookId = pathElements.shift();
const dynamicWebhooks = await this.webhookRepository.findBy({
webhookId,
method: httpMethod,
pathLength: pathElements.length,
});
if (dynamicWebhooks === undefined || dynamicWebhooks.length === 0) {
// The requested webhook is not registered
throw new ResponseHelper.NotFoundError(
webhookNotFoundErrorMessage(path, httpMethod),
WEBHOOK_PROD_UNREGISTERED_HINT,
);
}
throw new ResponseHelper.NotFoundError(
webhookNotFoundErrorMessage(path, httpMethod),
WEBHOOK_PROD_UNREGISTERED_HINT,
);
}
let maxMatches = 0;
const pathElementsSet = new Set(pathElements);
// check if static elements match in path
// if more results have been returned choose the one with the most static-route matches
dynamicWebhooks.forEach((dynamicWebhook) => {
const staticElements = dynamicWebhook.webhookPath
.split('/')
.filter((ele) => !ele.startsWith(':'));
const allStaticExist = staticElements.every((staticEle) => pathElementsSet.has(staticEle));
if (webhook.isDynamic) {
const pathElements = path.split('/').slice(1);
if (allStaticExist && staticElements.length > maxMatches) {
maxMatches = staticElements.length;
webhook = dynamicWebhook;
}
// handle routes with no static elements
else if (staticElements.length === 0 && !webhook) {
webhook = dynamicWebhook;
}
});
if (webhook === null) {
throw new ResponseHelper.NotFoundError(
webhookNotFoundErrorMessage(path, httpMethod),
WEBHOOK_PROD_UNREGISTERED_HINT,
);
}
// @ts-ignore
path = webhook.webhookPath;
// extracting params from path
// @ts-ignore
webhook.webhookPath.split('/').forEach((ele, index) => {
@ -268,6 +226,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
where: { id: webhook.workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
});
if (workflowData === null) {
throw new ResponseHelper.NotFoundError(
`Could not find workflow with id "${webhook.workflowId}"`,
@ -293,7 +252,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
workflow,
workflow.getNode(webhook.node) as INode,
additionalData,
).find((w) => w.httpMethod === httpMethod && w.path === path) as IWebhookData;
).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
// Get the node which has the webhook defined to know where to start from and to
// get additional data
@ -329,14 +288,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Gets all request methods associated with a single webhook
*/
async getWebhookMethods(path: string): Promise<IHttpRequestMethods[]> {
const webhooks = await this.webhookRepository.find({
select: ['method'],
where: { webhookPath: path },
});
// Gather all request methods in string array
return webhooks.map((webhook) => webhook.method);
async getWebhookMethods(path: string) {
return this.webhookService.getWebhookMethods(path);
}
/**
@ -417,12 +370,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
path = webhookData.path;
const webhook: WebhookEntity = {
const webhook = this.webhookService.createWebhook({
workflowId: webhookData.workflowId,
webhookPath: path,
node: node.name,
method: webhookData.httpMethod,
};
});
if (webhook.webhookPath.startsWith('/')) {
webhook.webhookPath = webhook.webhookPath.slice(1);
@ -438,7 +391,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
try {
// TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch`
await this.webhookRepository.insert(webhook);
await this.webhookService.storeWebhook(webhook);
const webhookExists = await workflow.runWebhookMethod(
'checkExists',
webhookData,
@ -498,6 +451,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
throw error;
}
}
await this.webhookService.populateCache();
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
}
@ -547,9 +501,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await WorkflowHelpers.saveStaticData(workflow);
await this.webhookRepository.delete({
workflowId: workflowData.id,
});
await this.webhookService.deleteWorkflowWebhooks(workflowId);
}
/**

View file

@ -21,4 +21,31 @@ export class WebhookEntity {
@Column({ nullable: true })
pathLength?: number;
/**
* Unique section of production webhook path, appended to `${instanceUrl}/webhook/`.
* - Example for static UUID webhook: `87dd035f-9606-47b7-b443-8b675fe25719`
* - Example for static user-defined webhook: `user/:id/posts`
* - Example for dynamic webhook: `7e0e2b2a-19ba-4a6c-b452-4b46c0e11749/user/:id/posts`
*/
private get uniquePath() {
return this.webhookPath.includes(':')
? [this.webhookId, this.webhookPath].join('/')
: this.webhookPath;
}
get cacheKey() {
return `webhook:${this.method}-${this.uniquePath}`;
}
get staticSegments() {
return this.webhookPath.split('/').filter((s) => !s.startsWith(':'));
}
/**
* Whether the webhook has at least one dynamic path segment, e.g. `:id` in `<uuid>/user/:id/posts`.
*/
get isDynamic() {
return this.webhookPath.split('/').some((s) => s.startsWith(':'));
}
}

View file

@ -245,9 +245,6 @@ export class CacheService {
return this.cache;
}
/**
* Delete all values from the cache, but leave the cache initialized.
*/
async reset(): Promise<void> {
await this.cache?.store.reset();
}

View file

@ -0,0 +1,127 @@
import { WebhookRepository } from '@/databases/repositories';
import { Service } from 'typedi';
import { CacheService } from './cache.service';
import type { WebhookEntity } from '@/databases/entities/WebhookEntity';
import type { IHttpRequestMethods } from 'n8n-workflow';
import type { DeepPartial } from 'typeorm';
type Method = NonNullable<IHttpRequestMethods>;
@Service()
export class WebhookService {
constructor(
private webhookRepository: WebhookRepository,
private cacheService: CacheService,
) {}
async populateCache() {
const allWebhooks = await this.webhookRepository.find();
if (!allWebhooks) return;
void this.cacheService.setMany(allWebhooks.map((w) => [w.cacheKey, w]));
}
private async findCached(method: Method, path: string) {
const cacheKey = `webhook:${method}-${path}`;
const cachedWebhook = await this.cacheService.get(cacheKey);
if (cachedWebhook) return this.webhookRepository.create(cachedWebhook);
let dbWebhook = await this.findStaticWebhook(method, path);
if (dbWebhook === null) {
dbWebhook = await this.findDynamicWebhook(method, path);
}
void this.cacheService.set(cacheKey, dbWebhook);
return dbWebhook;
}
/**
* Find a matching webhook with zero dynamic path segments, e.g. `<uuid>` or `user/profile`.
*/
private async findStaticWebhook(method: Method, path: string) {
return this.webhookRepository.findOneBy({ webhookPath: path, method });
}
/**
* Find a matching webhook with one or more dynamic path segments, e.g. `<uuid>/user/:id/posts`.
* It is mandatory for dynamic webhooks to have `<uuid>/` at the base.
*/
private async findDynamicWebhook(method: Method, path: string) {
const [uuidSegment, ...otherSegments] = path.split('/');
const dynamicWebhooks = await this.webhookRepository.findBy({
webhookId: uuidSegment,
method,
pathLength: otherSegments.length,
});
if (dynamicWebhooks.length === 0) return null;
const requestSegments = new Set(otherSegments);
const { webhook } = dynamicWebhooks.reduce<{
webhook: WebhookEntity | null;
maxMatches: number;
}>(
(acc, dw) => {
const allStaticSegmentsMatch = dw.staticSegments.every((s) => requestSegments.has(s));
if (allStaticSegmentsMatch && dw.staticSegments.length > acc.maxMatches) {
acc.maxMatches = dw.staticSegments.length;
acc.webhook = dw;
return acc;
} else if (dw.staticSegments.length === 0 && !acc.webhook) {
acc.webhook = dw; // edge case: if path is `:var`, match on anything
}
return acc;
},
{ webhook: null, maxMatches: 0 },
);
return webhook;
}
async findWebhook(method: Method, path: string) {
return this.findCached(method, path);
}
async storeWebhook(webhook: WebhookEntity) {
void this.cacheService.set(webhook.cacheKey, webhook);
return this.webhookRepository.insert(webhook);
}
createWebhook(data: DeepPartial<WebhookEntity>) {
return this.webhookRepository.create(data);
}
async deleteWorkflowWebhooks(workflowId: string) {
const webhooks = await this.webhookRepository.findBy({ workflowId });
return this.deleteWebhooks(webhooks);
}
async deleteInstanceWebhooks() {
const webhooks = await this.webhookRepository.find();
return this.deleteWebhooks(webhooks);
}
private async deleteWebhooks(webhooks: WebhookEntity[]) {
void this.cacheService.deleteMany(webhooks.map((w) => w.cacheKey));
return this.webhookRepository.remove(webhooks);
}
async getWebhookMethods(path: string) {
return this.webhookRepository
.find({ select: ['method'], where: { webhookPath: path } })
.then((rows) => rows.map((r) => r.method));
}
}

View file

@ -24,7 +24,7 @@ import { mockInstance } from '../integration/shared/utils/';
import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions';
import { NodeTypes } from '@/NodeTypes';
import type { WebhookRepository } from '@/databases/repositories';
import { WebhookService } from '@/services/webhook.service';
import { VariablesService } from '../../src/environments/variables/variables.service';
/**
@ -141,7 +141,7 @@ const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
describe('ActiveWorkflowRunner', () => {
let externalHooks: ExternalHooks;
let activeWorkflowRunner: ActiveWorkflowRunner;
let webhookRepository = mock<WebhookRepository>();
const webhookService = mockInstance(WebhookService);
beforeAll(async () => {
LoggerProxy.init(getLogger());
@ -167,7 +167,7 @@ describe('ActiveWorkflowRunner', () => {
new ActiveExecutions(),
externalHooks,
Container.get(NodeTypes),
webhookRepository,
webhookService,
);
});
@ -182,7 +182,7 @@ describe('ActiveWorkflowRunner', () => {
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(webhookRepository.clear)).toHaveBeenCalled();
expect(webhookService.deleteInstanceWebhooks).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalledTimes(1);
});
@ -193,7 +193,7 @@ describe('ActiveWorkflowRunner', () => {
databaseActiveWorkflowsCount,
);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(webhookRepository.clear)).toHaveBeenCalled();
expect(webhookService.deleteInstanceWebhooks).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled();
});

View file

@ -0,0 +1,214 @@
import { v4 as uuid } from 'uuid';
import config from '@/config';
import { mockInstance } from '../../integration/shared/utils/';
import { WebhookRepository } from '@/databases/repositories';
import { CacheService } from '@/services/cache.service';
import { WebhookService } from '@/services/webhook.service';
import { WebhookEntity } from '@/databases/entities/WebhookEntity';
const createWebhook = (method: string, path: string, webhookId?: string, pathSegments?: number) =>
Object.assign(new WebhookEntity(), {
method,
webhookPath: path,
webhookId,
pathSegments,
}) as WebhookEntity;
describe('WebhookService', () => {
const webhookRepository = mockInstance(WebhookRepository);
const cacheService = mockInstance(CacheService);
const webhookService = new WebhookService(webhookRepository, cacheService);
beforeEach(() => {
config.load(config.default);
jest.clearAllMocks();
});
[true, false].forEach((isCacheEnabled) => {
const tag = '[' + ['cache', isCacheEnabled ? 'enabled' : 'disabled'].join(' ') + ']';
describe(`findWebhook() - static case ${tag}`, () => {
test('should return the webhook if found', async () => {
const method = 'GET';
const path = 'user/profile';
const mockWebhook = createWebhook(method, path);
webhookRepository.findOneBy.mockResolvedValue(mockWebhook);
const returnedWebhook = await webhookService.findWebhook(method, path);
expect(returnedWebhook).toBe(mockWebhook);
});
test('should return null if not found', async () => {
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([]);
const returnValue = await webhookService.findWebhook('GET', 'user/profile');
expect(returnValue).toBeNull();
});
});
describe(`findWebhook() - dynamic case ${tag}`, () => {
test('should return the webhook if found', async () => {
const method = 'GET';
const webhookId = uuid();
const path = 'user/:id/posts';
const mockWebhook = createWebhook(method, path, webhookId, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook]); // dynamic
const returnedWebhook = await webhookService.findWebhook(
method,
[webhookId, 'user/123/posts'].join('/'),
);
expect(returnedWebhook).toBe(mockWebhook);
});
test('should handle subset dynamic path case', async () => {
const method1 = 'GET';
const webhookId1 = uuid();
const path1 = 'user/:id/posts';
const mockWebhook1 = createWebhook(method1, path1, webhookId1, 3);
const method2 = 'GET';
const webhookId2 = uuid();
const path2 = 'user/:id/posts/:postId/comments';
const mockWebhook2 = createWebhook(method2, path2, webhookId2, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook1, mockWebhook2]); // dynamic
const fullPath1 = [webhookId1, 'user/123/posts'].join('/');
const returnedWebhook1 = await webhookService.findWebhook(method1, fullPath1);
const fullPath2 = [webhookId1, 'user/123/posts/456/comments'].join('/');
const returnedWebhook2 = await webhookService.findWebhook(method2, fullPath2);
expect(returnedWebhook1).toBe(mockWebhook1);
expect(returnedWebhook2).toBe(mockWebhook2);
});
test('should handle single-segment dynamic path case', async () => {
const method1 = 'GET';
const webhookId1 = uuid();
const path1 = ':var';
const mockWebhook1 = createWebhook(method1, path1, webhookId1, 3);
const method2 = 'GET';
const webhookId2 = uuid();
const path2 = 'user/:id/posts/:postId/comments';
const mockWebhook2 = createWebhook(method2, path2, webhookId2, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook1, mockWebhook2]); // dynamic
const fullPath = [webhookId1, 'user/123/posts/456'].join('/');
const returnedWebhook = await webhookService.findWebhook(method1, fullPath);
expect(returnedWebhook).toBe(mockWebhook1);
});
test('should return null if not found', async () => {
const fullPath = [uuid(), 'user/:id/posts'].join('/');
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([]); // dynamic
const returnValue = await webhookService.findWebhook('GET', fullPath);
expect(returnValue).toBeNull();
});
});
});
describe('getWebhookMethods()', () => {
test('should return all methods for webhook', async () => {
const path = 'user/profile';
webhookRepository.find.mockResolvedValue([
createWebhook('GET', path),
createWebhook('POST', path),
createWebhook('PUT', path),
createWebhook('PATCH', path),
]);
const returnedMethods = await webhookService.getWebhookMethods(path);
expect(returnedMethods).toEqual(['GET', 'POST', 'PUT', 'PATCH']);
});
test('should return empty array if no webhooks found', async () => {
webhookRepository.find.mockResolvedValue([]);
const returnedMethods = await webhookService.getWebhookMethods('user/profile');
expect(returnedMethods).toEqual([]);
});
});
describe('deleteInstanceWebhooks()', () => {
test('should delete all webhooks of the instance', async () => {
const mockInstanceWebhooks = [
createWebhook('PUT', 'users'),
createWebhook('GET', 'user/:id'),
createWebhook('POST', ':var'),
];
webhookRepository.find.mockResolvedValue(mockInstanceWebhooks);
await webhookService.deleteInstanceWebhooks();
expect(webhookRepository.remove).toHaveBeenCalledWith(mockInstanceWebhooks);
});
test('should not delete any webhooks if none found', async () => {
webhookRepository.find.mockResolvedValue([]);
await webhookService.deleteInstanceWebhooks();
expect(webhookRepository.remove).toHaveBeenCalledWith([]);
});
});
describe('deleteWorkflowWebhooks()', () => {
test('should delete all webhooks of the workflow', async () => {
const mockWorkflowWebhooks = [
createWebhook('PUT', 'users'),
createWebhook('GET', 'user/:id'),
createWebhook('POST', ':var'),
];
webhookRepository.findBy.mockResolvedValue(mockWorkflowWebhooks);
const workflowId = uuid();
await webhookService.deleteWorkflowWebhooks(workflowId);
expect(webhookRepository.remove).toHaveBeenCalledWith(mockWorkflowWebhooks);
});
test('should not delete any webhooks if none found', async () => {
webhookRepository.findBy.mockResolvedValue([]);
const workflowId = uuid();
await webhookService.deleteWorkflowWebhooks(workflowId);
expect(webhookRepository.remove).toHaveBeenCalledWith([]);
});
});
describe('createWebhook()', () => {
test('should create the webhook', async () => {
const mockWebhook = createWebhook('GET', 'user/:id');
await webhookService.storeWebhook(mockWebhook);
expect(webhookRepository.insert).toHaveBeenCalledWith(mockWebhook);
});
});
});