mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-13 16:14:07 -08:00
fix(core): Revert all the context helpers changes (#11616)
This commit is contained in:
parent
b92729df2b
commit
8a804b3b49
|
@ -108,6 +108,7 @@ import type {
|
|||
AiEvent,
|
||||
ISupplyDataFunctions,
|
||||
WebhookType,
|
||||
SchedulingFunctions,
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
NodeConnectionType,
|
||||
|
@ -172,6 +173,7 @@ import {
|
|||
TriggerContext,
|
||||
WebhookContext,
|
||||
} from './node-execution-context';
|
||||
import { ScheduledTaskManager } from './ScheduledTaskManager';
|
||||
import { getSecretsProxy } from './Secrets';
|
||||
import { SSHClientsManager } from './SSHClientsManager';
|
||||
|
||||
|
@ -3023,7 +3025,7 @@ const executionCancellationFunctions = (
|
|||
},
|
||||
});
|
||||
|
||||
const getRequestHelperFunctions = (
|
||||
export const getRequestHelperFunctions = (
|
||||
workflow: Workflow,
|
||||
node: INode,
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
|
@ -3343,11 +3345,19 @@ const getRequestHelperFunctions = (
|
|||
};
|
||||
};
|
||||
|
||||
const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
|
||||
export const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
|
||||
getSSHClient: async (credentials) =>
|
||||
await Container.get(SSHClientsManager).getClient(credentials),
|
||||
});
|
||||
|
||||
export const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
|
||||
const scheduledTaskManager = Container.get(ScheduledTaskManager);
|
||||
return {
|
||||
registerCron: (cronExpression, onTick) =>
|
||||
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
|
||||
};
|
||||
};
|
||||
|
||||
const getAllowedPaths = () => {
|
||||
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
|
||||
if (!restrictFileAccessTo) {
|
||||
|
@ -3414,7 +3424,7 @@ export function isFilePathBlocked(filePath: string): boolean {
|
|||
return false;
|
||||
}
|
||||
|
||||
const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({
|
||||
export const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({
|
||||
async createReadStream(filePath) {
|
||||
try {
|
||||
await fsAccess(filePath);
|
||||
|
@ -3450,7 +3460,7 @@ const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions =>
|
|||
},
|
||||
});
|
||||
|
||||
const getNodeHelperFunctions = (
|
||||
export const getNodeHelperFunctions = (
|
||||
{ executionId }: IWorkflowExecuteAdditionalData,
|
||||
workflowId: string,
|
||||
): NodeHelperFunctions => ({
|
||||
|
@ -3458,7 +3468,7 @@ const getNodeHelperFunctions = (
|
|||
await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType),
|
||||
});
|
||||
|
||||
const getBinaryHelperFunctions = (
|
||||
export const getBinaryHelperFunctions = (
|
||||
{ executionId }: IWorkflowExecuteAdditionalData,
|
||||
workflowId: string,
|
||||
): BinaryHelperFunctions => ({
|
||||
|
@ -3476,7 +3486,7 @@ const getBinaryHelperFunctions = (
|
|||
},
|
||||
});
|
||||
|
||||
const getCheckProcessedHelperFunctions = (
|
||||
export const getCheckProcessedHelperFunctions = (
|
||||
workflow: Workflow,
|
||||
node: INode,
|
||||
): DeduplicationHelperFunctions => ({
|
||||
|
|
|
@ -27,13 +27,13 @@ import {
|
|||
continueOnFail,
|
||||
getAdditionalKeys,
|
||||
getBinaryDataBuffer,
|
||||
getBinaryHelperFunctions,
|
||||
getCredentials,
|
||||
getNodeParameter,
|
||||
getRequestHelperFunctions,
|
||||
returnJsonArray,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions {
|
||||
|
@ -57,8 +57,14 @@ export class ExecuteSingleContext extends NodeExecutionContext implements IExecu
|
|||
this.helpers = {
|
||||
createDeferredPromise,
|
||||
returnJsonArray,
|
||||
...new BinaryHelpers(workflow, additionalData).exported,
|
||||
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||
...getRequestHelperFunctions(
|
||||
workflow,
|
||||
node,
|
||||
additionalData,
|
||||
runExecutionData,
|
||||
connectionInputData,
|
||||
),
|
||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
||||
|
||||
assertBinaryData: (propertyName, inputIndex = 0) =>
|
||||
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),
|
||||
|
|
|
@ -1,136 +0,0 @@
|
|||
import FileType from 'file-type';
|
||||
import { IncomingMessage, type ClientRequest } from 'http';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { Workflow, IWorkflowExecuteAdditionalData, IBinaryData } from 'n8n-workflow';
|
||||
import type { Socket } from 'net';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
|
||||
|
||||
import { BinaryHelpers } from '../binary-helpers';
|
||||
|
||||
jest.mock('file-type');
|
||||
|
||||
describe('BinaryHelpers', () => {
|
||||
let binaryDataService = mock<BinaryDataService>();
|
||||
Container.set(BinaryDataService, binaryDataService);
|
||||
const workflow = mock<Workflow>({ id: '123' });
|
||||
const additionalData = mock<IWorkflowExecuteAdditionalData>({ executionId: '456' });
|
||||
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
|
||||
binaryDataService.store.mockImplementation(
|
||||
async (_workflowId, _executionId, _buffer, value) => value,
|
||||
);
|
||||
});
|
||||
|
||||
describe('getBinaryPath', () => {
|
||||
it('should call getPath method of BinaryDataService', () => {
|
||||
binaryHelpers.getBinaryPath('mock-binary-data-id');
|
||||
expect(binaryDataService.getPath).toHaveBeenCalledWith('mock-binary-data-id');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getBinaryMetadata', () => {
|
||||
it('should call getMetadata method of BinaryDataService', async () => {
|
||||
await binaryHelpers.getBinaryMetadata('mock-binary-data-id');
|
||||
expect(binaryDataService.getMetadata).toHaveBeenCalledWith('mock-binary-data-id');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getBinaryStream', () => {
|
||||
it('should call getStream method of BinaryDataService', async () => {
|
||||
await binaryHelpers.getBinaryStream('mock-binary-data-id');
|
||||
expect(binaryDataService.getAsStream).toHaveBeenCalledWith('mock-binary-data-id', undefined);
|
||||
});
|
||||
});
|
||||
|
||||
describe('prepareBinaryData', () => {
|
||||
it('should guess the mime type and file extension if not provided', async () => {
|
||||
const buffer = Buffer.from('test');
|
||||
const fileTypeData = { mime: 'application/pdf', ext: 'pdf' };
|
||||
(FileType.fromBuffer as jest.Mock).mockResolvedValue(fileTypeData);
|
||||
|
||||
const binaryData = await binaryHelpers.prepareBinaryData(buffer);
|
||||
|
||||
expect(binaryData.mimeType).toEqual('application/pdf');
|
||||
expect(binaryData.fileExtension).toEqual('pdf');
|
||||
expect(binaryData.fileType).toEqual('pdf');
|
||||
expect(binaryData.fileName).toBeUndefined();
|
||||
expect(binaryData.directory).toBeUndefined();
|
||||
expect(binaryDataService.store).toHaveBeenCalledWith(
|
||||
workflow.id,
|
||||
additionalData.executionId!,
|
||||
buffer,
|
||||
binaryData,
|
||||
);
|
||||
});
|
||||
|
||||
it('should use the provided mime type and file extension if provided', async () => {
|
||||
const buffer = Buffer.from('test');
|
||||
const mimeType = 'application/octet-stream';
|
||||
|
||||
const binaryData = await binaryHelpers.prepareBinaryData(buffer, undefined, mimeType);
|
||||
|
||||
expect(binaryData.mimeType).toEqual(mimeType);
|
||||
expect(binaryData.fileExtension).toEqual('bin');
|
||||
expect(binaryData.fileType).toBeUndefined();
|
||||
expect(binaryData.fileName).toBeUndefined();
|
||||
expect(binaryData.directory).toBeUndefined();
|
||||
expect(binaryDataService.store).toHaveBeenCalledWith(
|
||||
workflow.id,
|
||||
additionalData.executionId!,
|
||||
buffer,
|
||||
binaryData,
|
||||
);
|
||||
});
|
||||
|
||||
const mockSocket = mock<Socket>({ readableHighWaterMark: 0 });
|
||||
|
||||
it('should use the contentDisposition.filename, responseUrl, and contentType properties to set the fileName, directory, and mimeType properties of the binaryData object', async () => {
|
||||
const incomingMessage = new IncomingMessage(mockSocket);
|
||||
incomingMessage.contentDisposition = { filename: 'test.txt', type: 'attachment' };
|
||||
incomingMessage.contentType = 'text/plain';
|
||||
incomingMessage.responseUrl = 'https://example.com/test.txt';
|
||||
|
||||
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
|
||||
|
||||
expect(binaryData.fileName).toEqual('test.txt');
|
||||
expect(binaryData.fileType).toEqual('text');
|
||||
expect(binaryData.directory).toBeUndefined();
|
||||
expect(binaryData.mimeType).toEqual('text/plain');
|
||||
expect(binaryData.fileExtension).toEqual('txt');
|
||||
});
|
||||
|
||||
it('should use the req.path property to set the fileName property of the binaryData object if contentDisposition.filename and responseUrl are not provided', async () => {
|
||||
const incomingMessage = new IncomingMessage(mockSocket);
|
||||
incomingMessage.contentType = 'text/plain';
|
||||
incomingMessage.req = mock<ClientRequest>({ path: '/test.txt' });
|
||||
|
||||
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
|
||||
|
||||
expect(binaryData.fileName).toEqual('test.txt');
|
||||
expect(binaryData.directory).toBeUndefined();
|
||||
expect(binaryData.mimeType).toEqual('text/plain');
|
||||
expect(binaryData.fileExtension).toEqual('txt');
|
||||
});
|
||||
});
|
||||
|
||||
describe('setBinaryDataBuffer', () => {
|
||||
it('should call store method of BinaryDataService', async () => {
|
||||
const binaryData = mock<IBinaryData>();
|
||||
const bufferOrStream = mock<Buffer>();
|
||||
|
||||
await binaryHelpers.setBinaryDataBuffer(binaryData, bufferOrStream);
|
||||
|
||||
expect(binaryDataService.store).toHaveBeenCalledWith(
|
||||
workflow.id,
|
||||
additionalData.executionId,
|
||||
bufferOrStream,
|
||||
binaryData,
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,33 +0,0 @@
|
|||
import { mock } from 'jest-mock-extended';
|
||||
import type { Workflow } from 'n8n-workflow';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
|
||||
|
||||
import { SchedulingHelpers } from '../scheduling-helpers';
|
||||
|
||||
describe('SchedulingHelpers', () => {
|
||||
const scheduledTaskManager = mock<ScheduledTaskManager>();
|
||||
Container.set(ScheduledTaskManager, scheduledTaskManager);
|
||||
const workflow = mock<Workflow>();
|
||||
const schedulingHelpers = new SchedulingHelpers(workflow);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('registerCron', () => {
|
||||
it('should call registerCron method of ScheduledTaskManager', () => {
|
||||
const cronExpression = '* * * * * *';
|
||||
const onTick = jest.fn();
|
||||
|
||||
schedulingHelpers.registerCron(cronExpression, onTick);
|
||||
|
||||
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(
|
||||
workflow,
|
||||
cronExpression,
|
||||
onTick,
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,32 +0,0 @@
|
|||
import { mock } from 'jest-mock-extended';
|
||||
import type { SSHCredentials } from 'n8n-workflow';
|
||||
import type { Client } from 'ssh2';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
import { SSHClientsManager } from '@/SSHClientsManager';
|
||||
|
||||
import { SSHTunnelHelpers } from '../ssh-tunnel-helpers';
|
||||
|
||||
describe('SSHTunnelHelpers', () => {
|
||||
const sshClientsManager = mock<SSHClientsManager>();
|
||||
Container.set(SSHClientsManager, sshClientsManager);
|
||||
const sshTunnelHelpers = new SSHTunnelHelpers();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('getSSHClient', () => {
|
||||
const credentials = mock<SSHCredentials>();
|
||||
|
||||
it('should call SSHClientsManager.getClient with the given credentials', async () => {
|
||||
const mockClient = mock<Client>();
|
||||
sshClientsManager.getClient.mockResolvedValue(mockClient);
|
||||
|
||||
const client = await sshTunnelHelpers.getSSHClient(credentials);
|
||||
|
||||
expect(sshClientsManager.getClient).toHaveBeenCalledWith(credentials);
|
||||
expect(client).toBe(mockClient);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,148 +0,0 @@
|
|||
import FileType from 'file-type';
|
||||
import { IncomingMessage } from 'http';
|
||||
import MimeTypes from 'mime-types';
|
||||
import { ApplicationError, fileTypeFromMimeType } from 'n8n-workflow';
|
||||
import type {
|
||||
BinaryHelperFunctions,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
Workflow,
|
||||
IBinaryData,
|
||||
} from 'n8n-workflow';
|
||||
import path from 'path';
|
||||
import type { Readable } from 'stream';
|
||||
import Container from 'typedi';
|
||||
|
||||
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
|
||||
import { binaryToBuffer } from '@/BinaryData/utils';
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
import { binaryToString } from '@/NodeExecuteFunctions';
|
||||
|
||||
export class BinaryHelpers {
|
||||
private readonly binaryDataService = Container.get(BinaryDataService);
|
||||
|
||||
constructor(
|
||||
private readonly workflow: Workflow,
|
||||
private readonly additionalData: IWorkflowExecuteAdditionalData,
|
||||
) {}
|
||||
|
||||
get exported(): BinaryHelperFunctions {
|
||||
return {
|
||||
getBinaryPath: this.getBinaryPath.bind(this),
|
||||
getBinaryMetadata: this.getBinaryMetadata.bind(this),
|
||||
getBinaryStream: this.getBinaryStream.bind(this),
|
||||
binaryToBuffer,
|
||||
binaryToString,
|
||||
prepareBinaryData: this.prepareBinaryData.bind(this),
|
||||
setBinaryDataBuffer: this.setBinaryDataBuffer.bind(this),
|
||||
copyBinaryFile: this.copyBinaryFile.bind(this),
|
||||
};
|
||||
}
|
||||
|
||||
getBinaryPath(binaryDataId: string) {
|
||||
return this.binaryDataService.getPath(binaryDataId);
|
||||
}
|
||||
|
||||
async getBinaryMetadata(binaryDataId: string) {
|
||||
return await this.binaryDataService.getMetadata(binaryDataId);
|
||||
}
|
||||
|
||||
async getBinaryStream(binaryDataId: string, chunkSize?: number) {
|
||||
return await this.binaryDataService.getAsStream(binaryDataId, chunkSize);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line complexity
|
||||
async prepareBinaryData(binaryData: Buffer | Readable, filePath?: string, mimeType?: string) {
|
||||
let fileExtension: string | undefined;
|
||||
if (binaryData instanceof IncomingMessage) {
|
||||
if (!filePath) {
|
||||
try {
|
||||
const { responseUrl } = binaryData;
|
||||
filePath =
|
||||
binaryData.contentDisposition?.filename ??
|
||||
((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1);
|
||||
} catch {}
|
||||
}
|
||||
if (!mimeType) {
|
||||
mimeType = binaryData.contentType;
|
||||
}
|
||||
}
|
||||
|
||||
if (!mimeType) {
|
||||
// If no mime type is given figure it out
|
||||
|
||||
if (filePath) {
|
||||
// Use file path to guess mime type
|
||||
const mimeTypeLookup = MimeTypes.lookup(filePath);
|
||||
if (mimeTypeLookup) {
|
||||
mimeType = mimeTypeLookup;
|
||||
}
|
||||
}
|
||||
|
||||
if (!mimeType) {
|
||||
if (Buffer.isBuffer(binaryData)) {
|
||||
// Use buffer to guess mime type
|
||||
const fileTypeData = await FileType.fromBuffer(binaryData);
|
||||
if (fileTypeData) {
|
||||
mimeType = fileTypeData.mime;
|
||||
fileExtension = fileTypeData.ext;
|
||||
}
|
||||
} else if (binaryData instanceof IncomingMessage) {
|
||||
mimeType = binaryData.headers['content-type'];
|
||||
} else {
|
||||
// TODO: detect filetype from other kind of streams
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!fileExtension && mimeType) {
|
||||
fileExtension = MimeTypes.extension(mimeType) || undefined;
|
||||
}
|
||||
|
||||
if (!mimeType) {
|
||||
// Fall back to text
|
||||
mimeType = 'text/plain';
|
||||
}
|
||||
|
||||
const returnData: IBinaryData = {
|
||||
mimeType,
|
||||
fileType: fileTypeFromMimeType(mimeType),
|
||||
fileExtension,
|
||||
data: '',
|
||||
};
|
||||
|
||||
if (filePath) {
|
||||
if (filePath.includes('?')) {
|
||||
// Remove maybe present query parameters
|
||||
filePath = filePath.split('?').shift();
|
||||
}
|
||||
|
||||
const filePathParts = path.parse(filePath as string);
|
||||
|
||||
if (filePathParts.dir !== '') {
|
||||
returnData.directory = filePathParts.dir;
|
||||
}
|
||||
returnData.fileName = filePathParts.base;
|
||||
|
||||
// Remove the dot
|
||||
const extractedFileExtension = filePathParts.ext.slice(1);
|
||||
if (extractedFileExtension) {
|
||||
returnData.fileExtension = extractedFileExtension;
|
||||
}
|
||||
}
|
||||
|
||||
return await this.setBinaryDataBuffer(returnData, binaryData);
|
||||
}
|
||||
|
||||
async setBinaryDataBuffer(binaryData: IBinaryData, bufferOrStream: Buffer | Readable) {
|
||||
return await this.binaryDataService.store(
|
||||
this.workflow.id,
|
||||
this.additionalData.executionId!,
|
||||
bufferOrStream,
|
||||
binaryData,
|
||||
);
|
||||
}
|
||||
|
||||
async copyBinaryFile(): Promise<never> {
|
||||
throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.');
|
||||
}
|
||||
}
|
|
@ -1,381 +0,0 @@
|
|||
import { createHash } from 'crypto';
|
||||
import { pick } from 'lodash';
|
||||
import { jsonParse, NodeOperationError, sleep } from 'n8n-workflow';
|
||||
import type {
|
||||
RequestHelperFunctions,
|
||||
IAdditionalCredentialOptions,
|
||||
IAllExecuteFunctions,
|
||||
IExecuteData,
|
||||
IHttpRequestOptions,
|
||||
IN8nHttpFullResponse,
|
||||
IN8nHttpResponse,
|
||||
INode,
|
||||
INodeExecutionData,
|
||||
IOAuth2Options,
|
||||
IRequestOptions,
|
||||
IRunExecutionData,
|
||||
IWorkflowDataProxyAdditionalKeys,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
NodeParameterValueType,
|
||||
PaginationOptions,
|
||||
Workflow,
|
||||
WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
import {
|
||||
applyPaginationRequestData,
|
||||
binaryToString,
|
||||
httpRequest,
|
||||
httpRequestWithAuthentication,
|
||||
proxyRequestToAxios,
|
||||
requestOAuth1,
|
||||
requestOAuth2,
|
||||
requestWithAuthentication,
|
||||
validateUrl,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
export class RequestHelpers {
|
||||
constructor(
|
||||
private readonly context: IAllExecuteFunctions,
|
||||
private readonly workflow: Workflow,
|
||||
private readonly node: INode,
|
||||
private readonly additionalData: IWorkflowExecuteAdditionalData,
|
||||
private readonly runExecutionData: IRunExecutionData | null = null,
|
||||
private readonly connectionInputData: INodeExecutionData[] = [],
|
||||
) {}
|
||||
|
||||
get exported(): RequestHelperFunctions {
|
||||
return {
|
||||
httpRequest,
|
||||
httpRequestWithAuthentication: this.httpRequestWithAuthentication.bind(this),
|
||||
requestWithAuthenticationPaginated: this.requestWithAuthenticationPaginated.bind(this),
|
||||
request: this.request.bind(this),
|
||||
requestWithAuthentication: this.requestWithAuthentication.bind(this),
|
||||
requestOAuth1: this.requestOAuth1.bind(this),
|
||||
requestOAuth2: this.requestOAuth2.bind(this),
|
||||
};
|
||||
}
|
||||
|
||||
get httpRequest() {
|
||||
return httpRequest;
|
||||
}
|
||||
|
||||
async httpRequestWithAuthentication(
|
||||
credentialsType: string,
|
||||
requestOptions: IHttpRequestOptions,
|
||||
additionalCredentialOptions?: IAdditionalCredentialOptions,
|
||||
) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return await httpRequestWithAuthentication.call(
|
||||
this.context,
|
||||
credentialsType,
|
||||
requestOptions,
|
||||
this.workflow,
|
||||
this.node,
|
||||
this.additionalData,
|
||||
additionalCredentialOptions,
|
||||
);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line complexity
|
||||
async requestWithAuthenticationPaginated(
|
||||
requestOptions: IRequestOptions,
|
||||
itemIndex: number,
|
||||
paginationOptions: PaginationOptions,
|
||||
credentialsType?: string,
|
||||
additionalCredentialOptions?: IAdditionalCredentialOptions,
|
||||
): Promise<unknown[]> {
|
||||
const responseData = [];
|
||||
if (!requestOptions.qs) {
|
||||
requestOptions.qs = {};
|
||||
}
|
||||
requestOptions.resolveWithFullResponse = true;
|
||||
requestOptions.simple = false;
|
||||
|
||||
let tempResponseData: IN8nHttpFullResponse;
|
||||
let makeAdditionalRequest: boolean;
|
||||
let paginateRequestData: PaginationOptions['request'];
|
||||
|
||||
const runIndex = 0;
|
||||
|
||||
const additionalKeys = {
|
||||
$request: requestOptions,
|
||||
$response: {} as IN8nHttpFullResponse,
|
||||
$version: this.node.typeVersion,
|
||||
$pageCount: 0,
|
||||
};
|
||||
|
||||
const executeData: IExecuteData = {
|
||||
data: {},
|
||||
node: this.node,
|
||||
source: null,
|
||||
};
|
||||
|
||||
const hashData = {
|
||||
identicalCount: 0,
|
||||
previousLength: 0,
|
||||
previousHash: '',
|
||||
};
|
||||
|
||||
do {
|
||||
paginateRequestData = this.getResolvedValue(
|
||||
paginationOptions.request as unknown as NodeParameterValueType,
|
||||
itemIndex,
|
||||
runIndex,
|
||||
executeData,
|
||||
additionalKeys,
|
||||
false,
|
||||
) as object as PaginationOptions['request'];
|
||||
|
||||
const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData);
|
||||
|
||||
if (!validateUrl(tempRequestOptions.uri as string)) {
|
||||
throw new NodeOperationError(
|
||||
this.node,
|
||||
`'${paginateRequestData.url}' is not a valid URL.`,
|
||||
{
|
||||
itemIndex,
|
||||
runIndex,
|
||||
type: 'invalid_url',
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if (credentialsType) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
tempResponseData = await this.requestWithAuthentication(
|
||||
credentialsType,
|
||||
tempRequestOptions,
|
||||
additionalCredentialOptions,
|
||||
);
|
||||
} else {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
tempResponseData = await this.request(tempRequestOptions);
|
||||
}
|
||||
|
||||
const newResponse: IN8nHttpFullResponse = Object.assign(
|
||||
{
|
||||
body: {},
|
||||
headers: {},
|
||||
statusCode: 0,
|
||||
},
|
||||
pick(tempResponseData, ['body', 'headers', 'statusCode']),
|
||||
);
|
||||
|
||||
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
|
||||
|
||||
if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) {
|
||||
// Keep the original string version that we can use it to hash if needed
|
||||
contentBody = await binaryToString(newResponse.body as Buffer | Readable);
|
||||
|
||||
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
|
||||
if (responseContentType.includes('application/json')) {
|
||||
newResponse.body = jsonParse(contentBody, { fallbackValue: {} });
|
||||
} else {
|
||||
newResponse.body = contentBody;
|
||||
}
|
||||
tempResponseData.__bodyResolved = true;
|
||||
tempResponseData.body = newResponse.body;
|
||||
} else {
|
||||
contentBody = newResponse.body;
|
||||
}
|
||||
|
||||
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
|
||||
// If the data is not binary (and so not a stream), or an etag is present,
|
||||
// we check via etag or hash if identical data is received
|
||||
|
||||
let contentLength = 0;
|
||||
if ('content-length' in tempResponseData.headers) {
|
||||
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
|
||||
}
|
||||
|
||||
if (hashData.previousLength === contentLength) {
|
||||
let hash: string;
|
||||
if (tempResponseData.headers.etag) {
|
||||
// If an etag is provided, we use it as "hash"
|
||||
hash = tempResponseData.headers.etag as string;
|
||||
} else {
|
||||
// If there is no etag, we calculate a hash from the data in the body
|
||||
if (typeof contentBody !== 'string') {
|
||||
contentBody = JSON.stringify(contentBody);
|
||||
}
|
||||
hash = createHash('md5').update(contentBody).digest('base64');
|
||||
}
|
||||
|
||||
if (hashData.previousHash === hash) {
|
||||
hashData.identicalCount += 1;
|
||||
if (hashData.identicalCount > 2) {
|
||||
// Length was identical 5x and hash 3x
|
||||
throw new NodeOperationError(
|
||||
this.node,
|
||||
'The returned response was identical 5x, so requests got stopped',
|
||||
{
|
||||
itemIndex,
|
||||
description:
|
||||
'Check if "Pagination Completed When" has been configured correctly.',
|
||||
},
|
||||
);
|
||||
}
|
||||
} else {
|
||||
hashData.identicalCount = 0;
|
||||
}
|
||||
hashData.previousHash = hash;
|
||||
} else {
|
||||
hashData.identicalCount = 0;
|
||||
}
|
||||
hashData.previousLength = contentLength;
|
||||
}
|
||||
|
||||
responseData.push(tempResponseData);
|
||||
|
||||
additionalKeys.$response = newResponse;
|
||||
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
|
||||
|
||||
const maxRequests = this.getResolvedValue(
|
||||
paginationOptions.maxRequests,
|
||||
itemIndex,
|
||||
runIndex,
|
||||
executeData,
|
||||
additionalKeys,
|
||||
false,
|
||||
) as number;
|
||||
|
||||
if (maxRequests && additionalKeys.$pageCount >= maxRequests) {
|
||||
break;
|
||||
}
|
||||
|
||||
makeAdditionalRequest = this.getResolvedValue(
|
||||
paginationOptions.continue,
|
||||
itemIndex,
|
||||
runIndex,
|
||||
executeData,
|
||||
additionalKeys,
|
||||
false,
|
||||
) as boolean;
|
||||
|
||||
if (makeAdditionalRequest) {
|
||||
if (paginationOptions.requestInterval) {
|
||||
const requestInterval = this.getResolvedValue(
|
||||
paginationOptions.requestInterval,
|
||||
itemIndex,
|
||||
runIndex,
|
||||
executeData,
|
||||
additionalKeys,
|
||||
false,
|
||||
) as number;
|
||||
|
||||
await sleep(requestInterval);
|
||||
}
|
||||
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
|
||||
// We have it configured to let all requests pass no matter the response code
|
||||
// via "requestOptions.simple = false" to not by default fail if it is for example
|
||||
// configured to stop on 404 response codes. For that reason we have to throw here
|
||||
// now an error manually if the response code is not a success one.
|
||||
let data = tempResponseData.body;
|
||||
if (data instanceof Readable && paginationOptions.binaryResult !== true) {
|
||||
data = await binaryToString(data as Buffer | Readable);
|
||||
} else if (typeof data === 'object') {
|
||||
data = JSON.stringify(data);
|
||||
}
|
||||
|
||||
throw Object.assign(new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`), {
|
||||
statusCode: tempResponseData.statusCode,
|
||||
error: data,
|
||||
isAxiosError: true,
|
||||
response: {
|
||||
headers: tempResponseData.headers,
|
||||
status: tempResponseData.statusCode,
|
||||
statusText: tempResponseData.statusMessage,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
} while (makeAdditionalRequest);
|
||||
|
||||
return responseData;
|
||||
}
|
||||
|
||||
async request(uriOrObject: string | IRequestOptions, options?: IRequestOptions) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return await proxyRequestToAxios(
|
||||
this.workflow,
|
||||
this.additionalData,
|
||||
this.node,
|
||||
uriOrObject,
|
||||
options,
|
||||
);
|
||||
}
|
||||
|
||||
async requestWithAuthentication(
|
||||
credentialsType: string,
|
||||
requestOptions: IRequestOptions,
|
||||
additionalCredentialOptions?: IAdditionalCredentialOptions,
|
||||
itemIndex?: number,
|
||||
) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return await requestWithAuthentication.call(
|
||||
this.context,
|
||||
credentialsType,
|
||||
requestOptions,
|
||||
this.workflow,
|
||||
this.node,
|
||||
this.additionalData,
|
||||
additionalCredentialOptions,
|
||||
itemIndex,
|
||||
);
|
||||
}
|
||||
|
||||
async requestOAuth1(credentialsType: string, requestOptions: IRequestOptions) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return await requestOAuth1.call(this.context, credentialsType, requestOptions);
|
||||
}
|
||||
|
||||
async requestOAuth2(
|
||||
credentialsType: string,
|
||||
requestOptions: IRequestOptions,
|
||||
oAuth2Options?: IOAuth2Options,
|
||||
) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||
return await requestOAuth2.call(
|
||||
this.context,
|
||||
credentialsType,
|
||||
requestOptions,
|
||||
this.node,
|
||||
this.additionalData,
|
||||
oAuth2Options,
|
||||
);
|
||||
}
|
||||
|
||||
private getResolvedValue(
|
||||
parameterValue: NodeParameterValueType,
|
||||
itemIndex: number,
|
||||
runIndex: number,
|
||||
executeData: IExecuteData,
|
||||
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
|
||||
returnObjectAsString = false,
|
||||
): NodeParameterValueType {
|
||||
const mode: WorkflowExecuteMode = 'internal';
|
||||
|
||||
if (
|
||||
typeof parameterValue === 'object' ||
|
||||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
|
||||
) {
|
||||
return this.workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
this.runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
this.node.name,
|
||||
this.connectionInputData,
|
||||
mode,
|
||||
additionalKeys ?? {},
|
||||
executeData,
|
||||
returnObjectAsString,
|
||||
);
|
||||
}
|
||||
|
||||
return parameterValue;
|
||||
}
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
import type { CronExpression, Workflow, SchedulingFunctions } from 'n8n-workflow';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
|
||||
|
||||
export class SchedulingHelpers {
|
||||
private readonly scheduledTaskManager = Container.get(ScheduledTaskManager);
|
||||
|
||||
constructor(private readonly workflow: Workflow) {}
|
||||
|
||||
get exported(): SchedulingFunctions {
|
||||
return {
|
||||
registerCron: this.registerCron.bind(this),
|
||||
};
|
||||
}
|
||||
|
||||
registerCron(cronExpression: CronExpression, onTick: () => void) {
|
||||
this.scheduledTaskManager.registerCron(this.workflow, cronExpression, onTick);
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
import type { SSHCredentials, SSHTunnelFunctions } from 'n8n-workflow';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
import { SSHClientsManager } from '@/SSHClientsManager';
|
||||
|
||||
export class SSHTunnelHelpers {
|
||||
private readonly sshClientsManager = Container.get(SSHClientsManager);
|
||||
|
||||
get exported(): SSHTunnelFunctions {
|
||||
return {
|
||||
getSSHClient: this.getSSHClient.bind(this),
|
||||
};
|
||||
}
|
||||
|
||||
async getSSHClient(credentials: SSHCredentials) {
|
||||
return await this.sshClientsManager.getClient(credentials);
|
||||
}
|
||||
}
|
|
@ -21,10 +21,10 @@ import {
|
|||
getCredentials,
|
||||
getNodeParameter,
|
||||
getNodeWebhookUrl,
|
||||
getRequestHelperFunctions,
|
||||
getWebhookDescription,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
export class HookContext extends NodeExecutionContext implements IHookFunctions {
|
||||
|
@ -40,7 +40,7 @@ export class HookContext extends NodeExecutionContext implements IHookFunctions
|
|||
) {
|
||||
super(workflow, node, additionalData, mode);
|
||||
|
||||
this.helpers = new RequestHelpers(this, workflow, node, additionalData);
|
||||
this.helpers = getRequestHelperFunctions(workflow, node, additionalData);
|
||||
}
|
||||
|
||||
getActivationMode() {
|
||||
|
|
|
@ -13,10 +13,14 @@ import type {
|
|||
|
||||
import { extractValue } from '@/ExtractValue';
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
import { getAdditionalKeys, getCredentials, getNodeParameter } from '@/NodeExecuteFunctions';
|
||||
import {
|
||||
getAdditionalKeys,
|
||||
getCredentials,
|
||||
getNodeParameter,
|
||||
getRequestHelperFunctions,
|
||||
getSSHTunnelFunctions,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
export class LoadOptionsContext extends NodeExecutionContext implements ILoadOptionsFunctions {
|
||||
|
@ -31,8 +35,8 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt
|
|||
super(workflow, node, additionalData, 'internal');
|
||||
|
||||
this.helpers = {
|
||||
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||
...new SSHTunnelHelpers().exported,
|
||||
...getSSHTunnelFunctions(),
|
||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -16,14 +16,14 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
|||
// eslint-disable-next-line import/no-cycle
|
||||
import {
|
||||
getAdditionalKeys,
|
||||
getBinaryHelperFunctions,
|
||||
getCredentials,
|
||||
getNodeParameter,
|
||||
getRequestHelperFunctions,
|
||||
getSchedulingFunctions,
|
||||
returnJsonArray,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { SchedulingHelpers } from './helpers/scheduling-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
const throwOnEmit = () => {
|
||||
|
@ -51,9 +51,9 @@ export class PollContext extends NodeExecutionContext implements IPollFunctions
|
|||
this.helpers = {
|
||||
createDeferredPromise,
|
||||
returnJsonArray,
|
||||
...new BinaryHelpers(workflow, additionalData).exported,
|
||||
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||
...new SchedulingHelpers(workflow).exported,
|
||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
||||
...getSchedulingFunctions(workflow),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -16,15 +16,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
|||
// eslint-disable-next-line import/no-cycle
|
||||
import {
|
||||
getAdditionalKeys,
|
||||
getBinaryHelperFunctions,
|
||||
getCredentials,
|
||||
getNodeParameter,
|
||||
getRequestHelperFunctions,
|
||||
getSchedulingFunctions,
|
||||
getSSHTunnelFunctions,
|
||||
returnJsonArray,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { SchedulingHelpers } from './helpers/scheduling-helpers';
|
||||
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
const throwOnEmit = () => {
|
||||
|
@ -52,10 +52,10 @@ export class TriggerContext extends NodeExecutionContext implements ITriggerFunc
|
|||
this.helpers = {
|
||||
createDeferredPromise,
|
||||
returnJsonArray,
|
||||
...new BinaryHelpers(workflow, additionalData).exported,
|
||||
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||
...new SchedulingHelpers(workflow).exported,
|
||||
...new SSHTunnelHelpers().exported,
|
||||
...getSSHTunnelFunctions(),
|
||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
||||
...getSchedulingFunctions(workflow),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -24,15 +24,15 @@ import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
|||
import {
|
||||
copyBinaryFile,
|
||||
getAdditionalKeys,
|
||||
getBinaryHelperFunctions,
|
||||
getCredentials,
|
||||
getInputConnectionData,
|
||||
getNodeParameter,
|
||||
getNodeWebhookUrl,
|
||||
getRequestHelperFunctions,
|
||||
returnJsonArray,
|
||||
} from '@/NodeExecuteFunctions';
|
||||
|
||||
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||
import { RequestHelpers } from './helpers/request-helpers';
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
|
||||
export class WebhookContext extends NodeExecutionContext implements IWebhookFunctions {
|
||||
|
@ -54,8 +54,8 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
|
|||
this.helpers = {
|
||||
createDeferredPromise,
|
||||
returnJsonArray,
|
||||
...new BinaryHelpers(workflow, additionalData).exported,
|
||||
...new RequestHelpers(this, workflow, node, additionalData).exported,
|
||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
||||
};
|
||||
|
||||
this.nodeHelpers = {
|
||||
|
|
Loading…
Reference in a new issue