mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-15 00:54:06 -08:00
22a5f5258d
In a multi-main setup, we have the following issue. The user's client connects to main A and runs a test webhook, so main A starts listening for a webhook call. A third-party service sends a request to the test webhook URL. The request is forwarded by the load balancer to main B, who is not listening for this webhook call. Therefore, the webhook call is unhandled. To start addressing this, cache test webhook registrations, using Redis for queue mode and in-memory for regular mode. When the third-party service sends a request to the test webhook URL, the request is forwarded by the load balancer to main B, who fetches test webhooks from the cache and, if it finds a match, executes the test webhook. This should be transparent - test webhook behavior should remain the same as so far. Notes: - Test webhook timeouts are not cached. A timeout is only relevant to the process it was created in, so another process retrieving from Redis a "foreign" timeout will be unable to act on it. A timeout also has circular references, so `cache-manager-ioredis-yet` is unable to serialize it. - In a single-main scenario, the timeout remains in the single process and is cleared on test webhook expiration, successful execution, and manual cancellation - all as usual. - In a multi-main scenario, we will need to have the process who received the webhook call send a message to the process who created the webhook directing this originating process to clear the timeout. This will likely be implemented via execution lifecycle hooks and Redis channel messages checking session ID. This implementation is out of scope for this PR and will come next. - Additional data in test webhooks is not cached. From what I can tell, additional data is not needed for test webhooks to be executed. Additional data also has circular references, so `cache-manager-ioredis-yet` is unable to serialize it. Follow-up to: #8155
120 lines
3.6 KiB
TypeScript
120 lines
3.6 KiB
TypeScript
import { mock } from 'jest-mock-extended';
|
|
import { TestWebhooks } from '@/TestWebhooks';
|
|
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
|
|
import { v4 as uuid } from 'uuid';
|
|
import { generateNanoId } from '@/databases/utils/generators';
|
|
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
|
import * as WebhookHelpers from '@/WebhookHelpers';
|
|
|
|
import type { IWorkflowDb, WebhookRequest } from '@/Interfaces';
|
|
import type {
|
|
IWebhookData,
|
|
IWorkflowExecuteAdditionalData,
|
|
Workflow,
|
|
WorkflowActivateMode,
|
|
WorkflowExecuteMode,
|
|
} from 'n8n-workflow';
|
|
import type {
|
|
TestWebhookRegistrationsService,
|
|
TestWebhookRegistration,
|
|
} from '@/services/test-webhook-registrations.service';
|
|
|
|
describe('TestWebhooks', () => {
|
|
const registrations = mock<TestWebhookRegistrationsService>();
|
|
const testWebhooks = new TestWebhooks(mock(), mock(), registrations);
|
|
|
|
beforeAll(() => {
|
|
jest.useFakeTimers();
|
|
});
|
|
|
|
const httpMethod = 'GET';
|
|
const path = uuid();
|
|
const workflowId = generateNanoId();
|
|
|
|
const webhook = mock<IWebhookData>({
|
|
httpMethod,
|
|
path,
|
|
workflowId,
|
|
});
|
|
|
|
describe('needsWebhook()', () => {
|
|
type NeedsWebhookArgs = [
|
|
IWorkflowDb,
|
|
IWorkflowExecuteAdditionalData,
|
|
WorkflowExecuteMode,
|
|
WorkflowActivateMode,
|
|
];
|
|
|
|
const workflow = mock<Workflow>({ id: workflowId });
|
|
|
|
const args: NeedsWebhookArgs = [
|
|
mock<IWorkflowDb>({ id: workflowId, nodes: [] }),
|
|
mock<IWorkflowExecuteAdditionalData>(),
|
|
'manual',
|
|
'manual',
|
|
];
|
|
|
|
test('if webhook is needed, should return true and activate webhook', async () => {
|
|
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
|
|
|
|
const needsWebhook = await testWebhooks.needsWebhook(...args);
|
|
|
|
expect(needsWebhook).toBe(true);
|
|
});
|
|
|
|
test('if webhook activation fails, should deactivate workflow webhooks', async () => {
|
|
const msg = 'Failed to add webhook to active webhooks';
|
|
|
|
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
|
|
jest.spyOn(registrations, 'register').mockRejectedValueOnce(new Error(msg));
|
|
registrations.getAllRegistrations.mockResolvedValue([]);
|
|
|
|
const needsWebhook = testWebhooks.needsWebhook(...args);
|
|
|
|
await expect(needsWebhook).rejects.toThrowError(msg);
|
|
});
|
|
|
|
test('if no webhook is found to start workflow, should return false', async () => {
|
|
webhook.webhookDescription.restartWebhook = true;
|
|
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
|
|
|
|
const result = await testWebhooks.needsWebhook(...args);
|
|
|
|
expect(result).toBe(false);
|
|
});
|
|
});
|
|
|
|
describe('executeWebhook()', () => {
|
|
test('if webhook is not registered, should throw', async () => {
|
|
jest.spyOn(testWebhooks, 'getActiveWebhook').mockResolvedValue(webhook);
|
|
jest.spyOn(testWebhooks, 'getWebhookMethods').mockResolvedValue([]);
|
|
|
|
const promise = testWebhooks.executeWebhook(
|
|
mock<WebhookRequest>({ params: { path } }),
|
|
mock(),
|
|
);
|
|
|
|
await expect(promise).rejects.toThrowError(WebhookNotFoundError);
|
|
});
|
|
|
|
test('if webhook is registered but missing from workflow, should throw', async () => {
|
|
jest.spyOn(testWebhooks, 'getActiveWebhook').mockResolvedValue(webhook);
|
|
jest.spyOn(testWebhooks, 'getWebhookMethods').mockResolvedValue([]);
|
|
|
|
const registration = mock<TestWebhookRegistration>({
|
|
sessionId: 'some-session-id',
|
|
workflowEntity: mock<IWorkflowDb>({}),
|
|
});
|
|
|
|
await registrations.register(registration);
|
|
|
|
const promise = testWebhooks.executeWebhook(
|
|
mock<WebhookRequest>({ params: { path } }),
|
|
mock(),
|
|
);
|
|
|
|
await expect(promise).rejects.toThrowError(NotFoundError);
|
|
});
|
|
});
|
|
});
|