refactor(core): Simplify createDeferredPromise, and add tests (no-changelog) (#10811)
Some checks failed
Test Master / install-and-build (push) Has been cancelled
Benchmark Docker Image CI / build (push) Has been cancelled
Test Master / Unit tests (18.x) (push) Has been cancelled
Test Master / Unit tests (20.x) (push) Has been cancelled
Test Master / Unit tests (22.4) (push) Has been cancelled
Test Master / Lint (push) Has been cancelled
Test Master / Notify Slack on failure (push) Has been cancelled

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-13 15:53:03 +02:00 committed by GitHub
parent d647ef41ac
commit cef64329a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 66 additions and 43 deletions

View file

@ -85,12 +85,12 @@ describe('ActiveExecutions', () => {
test('Should attach and resolve response promise to existing execution', async () => { test('Should attach and resolve response promise to existing execution', async () => {
const newExecution = mockExecutionData(); const newExecution = mockExecutionData();
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
const deferredPromise = await mockDeferredPromise(); const deferredPromise = mockDeferredPromise();
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise);
const fakeResponse = { data: { resultData: { runData: {} } } }; const fakeResponse = { data: { resultData: { runData: {} } } };
activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse); activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse);
await expect(deferredPromise.promise()).resolves.toEqual(fakeResponse); await expect(deferredPromise.promise).resolves.toEqual(fakeResponse);
}); });
test('Should remove an existing execution', async () => { test('Should remove an existing execution', async () => {
@ -163,5 +163,5 @@ function mockFullRunData(): IRun {
// eslint-disable-next-line @typescript-eslint/promise-function-async // eslint-disable-next-line @typescript-eslint/promise-function-async
const mockCancelablePromise = () => new PCancelable<IRun>((resolve) => resolve()); const mockCancelablePromise = () => new PCancelable<IRun>((resolve) => resolve());
// eslint-disable-next-line @typescript-eslint/promise-function-async
const mockDeferredPromise = () => createDeferredPromise<IExecuteResponsePromiseData>(); const mockDeferredPromise = () => createDeferredPromise<IExecuteResponsePromiseData>();

View file

@ -184,9 +184,9 @@ export class ActiveExecutions {
*/ */
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> { async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
// Create the promise which will be resolved when the execution finished // Create the promise which will be resolved when the execution finished
const waitPromise = await createDeferredPromise<IRun | undefined>(); const waitPromise = createDeferredPromise<IRun | undefined>();
this.getExecution(executionId).postExecutePromises.push(waitPromise); this.getExecution(executionId).postExecutePromises.push(waitPromise);
return await waitPromise.promise(); return await waitPromise.promise;
} }
/** /**

View file

@ -437,9 +437,8 @@ export async function executeWebhook(
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined; let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') { if (responseMode === 'responseNode') {
responsePromise = await createDeferredPromise<IN8nHttpFullResponse>(); responsePromise = createDeferredPromise<IN8nHttpFullResponse>();
responsePromise responsePromise.promise
.promise()
.then(async (response: IN8nHttpFullResponse) => { .then(async (response: IN8nHttpFullResponse) => {
if (didSendResponse) { if (didSendResponse) {
return; return;
@ -550,7 +549,7 @@ export async function executeWebhook(
// in `responseNode` mode `responseCallback` is called by `responsePromise` // in `responseNode` mode `responseCallback` is called by `responsePromise`
if (responseMode === 'responseNode' && responsePromise) { if (responseMode === 'responseNode' && responsePromise) {
await Promise.allSettled([responsePromise.promise()]); await Promise.allSettled([responsePromise.promise]);
return undefined; return undefined;
} }

View file

@ -30,7 +30,7 @@ describe('WorkflowExecute', () => {
}, },
}); });
const waitPromise = await createDeferredPromise<IRun>(); const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = []; const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData( const additionalData = Helpers.WorkflowExecuteAdditionalData(
waitPromise, waitPromise,
@ -41,7 +41,7 @@ describe('WorkflowExecute', () => {
const executionData = await workflowExecute.run(workflowInstance); const executionData = await workflowExecute.run(workflowInstance);
const result = await waitPromise.promise(); const result = await waitPromise.promise;
// Check if the data from WorkflowExecute is identical to data received // Check if the data from WorkflowExecute is identical to data received
// by the webhooks // by the webhooks
@ -93,7 +93,7 @@ describe('WorkflowExecute', () => {
}, },
}); });
const waitPromise = await createDeferredPromise<IRun>(); const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = []; const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData( const additionalData = Helpers.WorkflowExecuteAdditionalData(
waitPromise, waitPromise,
@ -104,7 +104,7 @@ describe('WorkflowExecute', () => {
const executionData = await workflowExecute.run(workflowInstance); const executionData = await workflowExecute.run(workflowInstance);
const result = await waitPromise.promise(); const result = await waitPromise.promise;
// Check if the data from WorkflowExecute is identical to data received // Check if the data from WorkflowExecute is identical to data received
// by the webhooks // by the webhooks
@ -160,7 +160,7 @@ describe('WorkflowExecute', () => {
settings: testData.input.workflowData.settings, settings: testData.input.workflowData.settings,
}); });
const waitPromise = await createDeferredPromise<IRun>(); const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = []; const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData( const additionalData = Helpers.WorkflowExecuteAdditionalData(
waitPromise, waitPromise,
@ -171,7 +171,7 @@ describe('WorkflowExecute', () => {
const executionData = await workflowExecute.run(workflowInstance); const executionData = await workflowExecute.run(workflowInstance);
const result = await waitPromise.promise(); const result = await waitPromise.promise;
// Check if the data from WorkflowExecute is identical to data received // Check if the data from WorkflowExecute is identical to data received
// by the webhooks // by the webhooks

View file

@ -208,11 +208,11 @@ export class AmqpTrigger implements INodeType {
let responsePromise: IDeferredPromise<IRun> | undefined = undefined; let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (!parallelProcessing) { if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise(); responsePromise = this.helpers.createDeferredPromise();
} }
if (responsePromise) { if (responsePromise) {
this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise); this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise);
await responsePromise.promise(); await responsePromise.promise;
} else { } else {
this.emit([this.helpers.returnJsonArray([data as any])]); this.emit([this.helpers.returnJsonArray([data as any])]);
} }

View file

@ -507,7 +507,7 @@ export class EmailReadImapV1 implements INodeType {
return newEmails; return newEmails;
}; };
const returnedPromise = await this.helpers.createDeferredPromise(); const returnedPromise = this.helpers.createDeferredPromise();
const establishConnection = async (): Promise<ImapSimple> => { const establishConnection = async (): Promise<ImapSimple> => {
let searchCriteria = ['UNSEEN'] as Array<string | string[]>; let searchCriteria = ['UNSEEN'] as Array<string | string[]>;
@ -560,7 +560,7 @@ export class EmailReadImapV1 implements INodeType {
}); });
// Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy // Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy
// if it receives an error before the workflow got activated // if it receives an error before the workflow got activated
await returnedPromise.promise().then(() => { await returnedPromise.promise.then(() => {
this.emitError(error as Error); this.emitError(error as Error);
}); });
} }

View file

@ -535,7 +535,7 @@ export class EmailReadImapV2 implements INodeType {
return newEmails; return newEmails;
}; };
const returnedPromise = await this.helpers.createDeferredPromise(); const returnedPromise = this.helpers.createDeferredPromise();
const establishConnection = async (): Promise<ImapSimple> => { const establishConnection = async (): Promise<ImapSimple> => {
let searchCriteria = ['UNSEEN'] as Array<string | string[]>; let searchCriteria = ['UNSEEN'] as Array<string | string[]>;
@ -590,7 +590,7 @@ export class EmailReadImapV2 implements INodeType {
}); });
// Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy // Wait with resolving till the returnedPromise got resolved, else n8n will be unhappy
// if it receives an error before the workflow got activated // if it receives an error before the workflow got activated
await returnedPromise.promise().then(() => { await returnedPromise.promise.then(() => {
this.emitError(error as Error); this.emitError(error as Error);
}); });
} }

View file

@ -11,7 +11,7 @@ import type {
ITriggerResponse, ITriggerResponse,
IRun, IRun,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createDeferredPromise, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
export class KafkaTrigger implements INodeType { export class KafkaTrigger implements INodeType {
description: INodeTypeDescription = { description: INodeTypeDescription = {
@ -281,13 +281,13 @@ export class KafkaTrigger implements INodeType {
} }
let responsePromise = undefined; let responsePromise = undefined;
if (!parallelProcessing && (options.nodeVersion as number) > 1) { if (!parallelProcessing && (options.nodeVersion as number) > 1) {
responsePromise = await createDeferredPromise<IRun>(); responsePromise = this.helpers.createDeferredPromise<IRun>();
this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise); this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise);
} else { } else {
this.emit([this.helpers.returnJsonArray([data])]); this.emit([this.helpers.returnJsonArray([data])]);
} }
if (responsePromise) { if (responsePromise) {
await responsePromise.promise(); await responsePromise.promise;
} }
}, },
}); });

View file

@ -140,11 +140,11 @@ export class MqttTrigger implements INodeType {
if (this.getMode() === 'trigger') { if (this.getMode() === 'trigger') {
const donePromise = !options.parallelProcessing const donePromise = !options.parallelProcessing
? await this.helpers.createDeferredPromise<IRun>() ? this.helpers.createDeferredPromise<IRun>()
: undefined; : undefined;
client.on('message', async (topic, payload) => { client.on('message', async (topic, payload) => {
this.emit(parsePayload(topic, payload), undefined, donePromise); this.emit(parsePayload(topic, payload), undefined, donePromise);
await donePromise?.promise(); await donePromise?.promise;
}); });
await client.subscribeAsync(topicsQoS); await client.subscribeAsync(topicsQoS);
} }

View file

@ -283,10 +283,9 @@ export class RabbitMQTrigger implements INodeType {
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined = let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined; undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise(); responsePromise = this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') { } else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook = responsePromiseHook = this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
} }
if (responsePromiseHook) { if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined); this.emit([[item]], responsePromiseHook, undefined);
@ -295,7 +294,7 @@ export class RabbitMQTrigger implements INodeType {
} }
if (responsePromise && acknowledgeMode !== 'laterMessageNode') { if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished // Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => { await responsePromise.promise.then(async (data: IRun) => {
if (data.data.resultData.error) { if (data.data.resultData.error) {
// The execution did fail // The execution did fail
if (acknowledgeMode === 'executionFinishesSuccessfully') { if (acknowledgeMode === 'executionFinishesSuccessfully') {
@ -308,7 +307,7 @@ export class RabbitMQTrigger implements INodeType {
messageTracker.answered(message); messageTracker.answered(message);
}); });
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => { await responsePromiseHook.promise.then(() => {
channel.ack(message); channel.ack(message);
messageTracker.answered(message); messageTracker.answered(message);
}); });

View file

@ -22,7 +22,7 @@ export async function executeWorkflow(testData: WorkflowTestData, nodeTypes: INo
nodeTypes, nodeTypes,
settings: testData.input.workflowData.settings, settings: testData.input.workflowData.settings,
}); });
const waitPromise = await createDeferredPromise<IRun>(); const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = []; const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
@ -50,6 +50,6 @@ export async function executeWorkflow(testData: WorkflowTestData, nodeTypes: INo
const workflowExecute = new WorkflowExecute(additionalData, executionMode, runExecutionData); const workflowExecute = new WorkflowExecute(additionalData, executionMode, runExecutionData);
executionData = await workflowExecute.processRunExecutionData(workflowInstance); executionData = await workflowExecute.processRunExecutionData(workflowInstance);
const result = await waitPromise.promise(); const result = await waitPromise.promise;
return { executionData, result, nodeExecutionOrder }; return { executionData, result, nodeExecutionOrder };
} }

View file

@ -1,14 +1,17 @@
// From: https://gist.github.com/compulim/8b49b0a744a3eeb2205e2b9506201e50 type ResolveFn<T> = (result: T | PromiseLike<T>) => void;
type RejectFn = (error: Error) => void;
export interface IDeferredPromise<T> { export interface IDeferredPromise<T> {
promise: () => Promise<T>; promise: Promise<T>;
reject: (error: Error) => void; resolve: ResolveFn<T>;
resolve: (result: T) => void; reject: RejectFn;
} }
export async function createDeferredPromise<T = void>(): Promise<IDeferredPromise<T>> { export function createDeferredPromise<T = void>(): IDeferredPromise<T> {
return await new Promise<IDeferredPromise<T>>((resolveCreate) => { const deferred: Partial<IDeferredPromise<T>> = {};
const promise = new Promise<T>((resolve, reject) => { deferred.promise = new Promise<T>((resolve, reject) => {
resolveCreate({ promise: async () => await promise, resolve, reject }); deferred.resolve = resolve;
}); deferred.reject = reject;
}); });
return deferred as IDeferredPromise<T>;
} }

View file

@ -728,7 +728,7 @@ export interface ICredentialTestFunctions {
} }
interface BaseHelperFunctions { interface BaseHelperFunctions {
createDeferredPromise: <T = void>() => Promise<IDeferredPromise<T>>; createDeferredPromise: <T = void>() => IDeferredPromise<T>;
} }
interface JsonHelperFunctions { interface JsonHelperFunctions {

View file

@ -0,0 +1,22 @@
import { createDeferredPromise } from '@/DeferredPromise';
describe('DeferredPromise', () => {
it('should resolve the promise with the correct value', async () => {
let done = false;
const deferred = createDeferredPromise<string>();
void deferred.promise.finally(() => {
done = true;
});
expect(done).toBe(false);
deferred.resolve('test');
await expect(deferred.promise).resolves.toBe('test');
expect(done).toBe(true);
});
it('should reject the promise with the correct error', async () => {
const deferred = createDeferredPromise();
const error = new Error('test error');
deferred.reject(error);
await expect(deferred.promise).rejects.toThrow(error);
});
});