This commit is contained in:
Nikita 2025-03-05 17:53:20 +03:00 committed by GitHub
commit ab21ef665f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 467 additions and 40 deletions

View file

@ -3,15 +3,13 @@ import type { PyodideInterface } from 'pyodide';
let pyodideInstance: PyodideInterface | undefined;
export async function LoadPyodide(packageCacheDir: string): Promise<PyodideInterface> {
if (pyodideInstance === undefined) {
const { loadPyodide } = await import('pyodide');
pyodideInstance = await loadPyodide({ packageCacheDir });
const { loadPyodide } = await import('pyodide');
pyodideInstance = await loadPyodide({ packageCacheDir });
await pyodideInstance.runPythonAsync(`
await pyodideInstance.runPythonAsync(`
from _pyodide_core import jsproxy_typedict
from js import Object
`);
}
return pyodideInstance;
}

View file

@ -1,7 +1,8 @@
import * as fs from 'fs';
import { ApplicationError, type IExecuteFunctions, type INodeExecutionData } from 'n8n-workflow';
import type { PyDict } from 'pyodide/ffi';
import * as path from 'path';
import { Worker } from 'worker_threads';
import { LoadPyodide } from './Pyodide';
import type { SandboxContext } from './Sandbox';
import { Sandbox } from './Sandbox';
@ -53,46 +54,173 @@ export class PythonSandbox extends Sandbox {
}
private async runCodeInPython<T>() {
const packageCacheDir = this.helpers.getStoragePath();
const pyodide = await LoadPyodide(packageCacheDir);
const workerFilePath = await this.createWorkerFile();
let executionResult;
try {
await pyodide.runPythonAsync('jsproxy_typedict[0] = type(Object.new().as_object_map())');
await pyodide.loadPackagesFromImports(this.pythonCode);
const dict = pyodide.globals.get('dict');
const globalsDict: PyDict = dict();
for (const key of Object.keys(this.context)) {
if ((key === '_env' && envAccessBlocked) || key === '_node') continue;
const value = this.context[key];
globalsDict.set(key, value);
}
pyodide.setStdout({ batched: (str) => this.emit('output', str) });
const runCode = `
async def __main():
${this.pythonCode
.split('\n')
.map((line) => ' ' + line)
.join('\n')}
await __main()`;
executionResult = await pyodide.runPythonAsync(runCode, { globals: globalsDict });
globalsDict.destroy();
return await this.executePythonInWorker<T>(workerFilePath);
} catch (error) {
throw this.getPrettyError(error as PyodideError);
} finally {
// Clean up the temporary worker file
try {
fs.unlinkSync(workerFilePath);
} catch (e) {
console.error('Failed to delete temporary worker file:', e);
}
}
}
/**
* Creates a temporary worker file for Python code execution
*/
private async createWorkerFile(): Promise<string> {
const packageCacheDir = this.helpers.getStoragePath();
const tempDir = path.join(packageCacheDir, 'workers');
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true });
}
if (executionResult?.toJs) {
return executionResult.toJs({
dict_converter: Object.fromEntries,
create_proxies: false,
}) as T;
}
const workerFilePath = path.join(tempDir, `python-worker-${Date.now()}.js`);
return executionResult as T;
fs.writeFileSync(workerFilePath, this.generateWorkerCode());
return workerFilePath;
}
/**
* Generates the worker thread code for Python execution
*/
private generateWorkerCode(): string {
return `
const { parentPort, workerData } = require('worker_threads');
const { pythonCode, context, packageCacheDir } = workerData;
async function runPython() {
try {
const { LoadPyodide } = require('${path.resolve(__dirname, './Pyodide.js')}');
const pyodide = await LoadPyodide(packageCacheDir);
await pyodide.runPythonAsync('jsproxy_typedict[0] = type(Object.new().as_object_map())');
await pyodide.loadPackagesFromImports(pythonCode);
const globalsDict = pyodide.globals.get('dict')();
for (const [key, value] of Object.entries(context)) {
if ((key === '_env' && ${envAccessBlocked}) || key === '_node') continue;
globalsDict.set(key, value);
}
const indentedCode = pythonCode.split('\\n').map(line => ' ' + line).join('\\n');
const result = await pyodide.runPythonAsync(
\`async def __main():\n\${indentedCode}\nawait __main()\`,
{ globals: globalsDict }
);
const jsResult = result?.toJs ?
result.toJs({ dict_converter: Object.fromEntries, create_proxies: false }) :
result;
// Clean up
globalsDict.destroy();
// Send result back to main thread
parentPort.postMessage({ success: true, result: jsResult });
} catch (error) {
// Send error back to main thread
parentPort.postMessage({
success: false,
error: error.message,
type: error.type || 'Error'
});
}
}
runPython();
`;
}
private async executePythonInWorker<T>(workerFilePath: string): Promise<T> {
return await new Promise((resolve, reject) => {
try {
function sanitizeForWorker(input: any, seen = new WeakMap()) {
if (typeof input !== 'object' || input === null) {
if (typeof input === 'function' || typeof input === 'symbol') {
return undefined;
}
return input;
}
// Handle circular references: if we've seen this object, return the same reference
if (seen.has(input)) {
return seen.get(input);
}
let output: any;
if (Array.isArray(input)) {
output = [];
// Mark the object as seen before recursing
seen.set(input, output);
for (const item of input) {
const sanitizedItem = sanitizeForWorker(item, seen);
if (sanitizedItem !== undefined) {
output.push(sanitizedItem);
}
}
return output;
}
output = {};
seen.set(input, output);
for (const key in input) {
if (Object.prototype.hasOwnProperty.call(input, key)) {
const value = input[key];
// If the value is non-cloneable, skip it
if (typeof value === 'function' || typeof value === 'symbol') {
continue;
}
const sanitizedValue = sanitizeForWorker(value, seen);
if (sanitizedValue !== undefined) {
output[key] = sanitizedValue;
}
}
}
return output;
}
const worker = new Worker(workerFilePath, {
workerData: {
pythonCode: this.pythonCode,
context: sanitizeForWorker(this.context),
packageCacheDir: this.helpers.getStoragePath(),
},
});
console.log('before on message');
worker.on('message', (data) => {
console.log('on message', data);
if (data.success) {
resolve(data.result as T);
} else {
const error = new Error(data.error);
(error as PyodideError).type = data.type;
reject(error);
}
});
worker.on('error', (error) => {
reject(error);
});
worker.on('exit', (code) => {
if (code !== 0 && code !== null) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
} catch (error) {
reject(error);
}
});
}
private getPrettyError(error: PyodideError): Error {

View file

@ -0,0 +1,301 @@
import * as fs from 'fs';
import type { IExecuteFunctions } from 'n8n-workflow';
import * as path from 'path';
import { Worker } from 'worker_threads';
import { PythonSandbox } from '../PythonSandbox';
jest.mock('fs');
jest.mock('worker_threads');
describe('PythonSandbox', () => {
let sandbox: PythonSandbox;
let mockHelpers: IExecuteFunctions['helpers'];
let mockWorker: { on: jest.Mock; postMessage: jest.Mock };
const mockStoragePath = '/mock/storage/path';
const mockPythonCode = 'print("Hello World")';
beforeEach(() => {
jest.clearAllMocks();
// Mock helpers
mockHelpers = {
getStoragePath: jest.fn().mockReturnValue(mockStoragePath),
} as unknown as IExecuteFunctions['helpers'];
// Mock fs functions
(fs.existsSync as jest.Mock).mockReturnValue(true);
(fs.mkdirSync as jest.Mock).mockImplementation(() => undefined);
(fs.writeFileSync as jest.Mock).mockImplementation(() => undefined);
(fs.unlinkSync as jest.Mock).mockImplementation(() => undefined);
// Mock Worker
mockWorker = {
on: jest.fn(),
postMessage: jest.fn(),
};
(Worker as unknown as jest.Mock).mockImplementation(() => mockWorker);
// Setup default worker behavior
mockWorker.on.mockImplementation((event, callback) => {
if (event === 'message') {
callback({ success: true, result: {} });
}
return mockWorker;
});
// Create sandbox instance
sandbox = new PythonSandbox({} as any, mockPythonCode, mockHelpers);
});
describe('worker file management', () => {
it('should create worker file in the correct directory', async () => {
// Mock Date.now() to get consistent file names
const mockTimestamp = 1234567890;
jest.spyOn(Date, 'now').mockReturnValue(mockTimestamp);
// Access the private method
const createWorkerFile = (sandbox as any).createWorkerFile.bind(sandbox);
const workerFilePath = await createWorkerFile();
// Assertions
const expectedPath = path.join(mockStoragePath, 'workers', `python-worker-${mockTimestamp}.js`);
expect(workerFilePath).toBe(expectedPath);
expect(fs.writeFileSync).toHaveBeenCalledWith(expectedPath, expect.any(String));
});
it('should create the workers directory if it does not exist', async () => {
// Setup
(fs.existsSync as jest.Mock).mockReturnValue(false);
// Execute
const createWorkerFile = (sandbox as any).createWorkerFile.bind(sandbox);
await createWorkerFile();
// Assert
expect(fs.mkdirSync).toHaveBeenCalledWith(
path.join(mockStoragePath, 'workers'),
{ recursive: true }
);
});
it('should generate worker code with correct absolute paths', async () => {
// Execute
const generateWorkerCode = (sandbox as any).generateWorkerCode.bind(sandbox);
const workerCode = generateWorkerCode();
// Assert
const pyodidePath = path.resolve(__dirname, '../Pyodide.js');
expect(workerCode).toContain(`require('${pyodidePath}')`);
});
it('should clean up worker file after execution', async () => {
// Setup
jest.spyOn(sandbox as any, 'executePythonInWorker').mockResolvedValue({});
// Execute
const runCodeInPython = (sandbox as any).runCodeInPython.bind(sandbox);
await runCodeInPython();
// Assert
expect(fs.unlinkSync).toHaveBeenCalled();
});
});
describe('worker communication', () => {
it('should pass the correct packageCacheDir to the worker', async () => {
// Setup a spy to capture worker options
let capturedOptions: any;
(Worker as unknown as jest.Mock).mockImplementation((_, options) => {
capturedOptions = options;
return mockWorker;
});
// Execute
const executePythonInWorker = (sandbox as any).executePythonInWorker.bind(sandbox);
await executePythonInWorker('/mock/worker/file.js');
// Assert
expect(capturedOptions.workerData.packageCacheDir).toBe(mockStoragePath);
});
it('should handle worker exit with non-zero code as error', async () => {
// Setup worker to emit exit event with error code
mockWorker.on.mockImplementation((event, callback) => {
if (event === 'exit') {
callback(1); // Non-zero exit code
}
return mockWorker;
});
// Execute and assert
const executePythonInWorker = (sandbox as any).executePythonInWorker.bind(sandbox);
await expect(executePythonInWorker('/mock/worker/file.js')).rejects.toThrow('Worker stopped with exit code 1');
});
});
describe('Python code execution', () => {
it('should execute code and return the result', async () => {
// Setup worker to return a specific result
mockWorker.on.mockImplementation((event, callback) => {
if (event === 'message') {
callback({ success: true, result: { hello: 'world' } });
}
return mockWorker;
});
// Create sandbox with specific Python code
const pythonCode = 'result = {"hello": "world"}\nreturn result';
const testSandbox = new PythonSandbox({} as any, pythonCode, mockHelpers);
// Execute and assert
const result = await testSandbox.runCode();
expect(result).toEqual({ hello: 'world' });
// Verify worker was created with correct code
expect(Worker).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({
workerData: expect.objectContaining({ pythonCode })
})
);
});
it('should propagate Python execution errors', async () => {
// Setup worker to return an error
mockWorker.on.mockImplementation((event, callback) => {
if (event === 'message') {
callback({
success: false,
error: 'NameError: name "undefined_variable" is not defined',
type: 'NameError'
});
}
return mockWorker;
});
// Execute and assert
await expect(sandbox.runCode()).rejects.toThrow();
});
it('should format Python errors for better readability', async () => {
// Setup
const error = new Error('Python Error: NameError: name is not defined');
(error as any).type = 'NameError';
// Execute
const getPrettyError = (sandbox as any).getPrettyError.bind(sandbox);
const prettyError = getPrettyError(error);
// Assert
expect(prettyError.message).toContain('NameError');
});
});
describe('context handling', () => {
it('should convert $ prefixed variables to _ prefixed for Python compatibility', () => {
// Setup context with $ prefixed variables
const context = {
$input: { item: { json: {} } },
$json: { data: 123 }
};
// Create sandbox with this context
const testSandbox = new PythonSandbox(context as any, mockPythonCode, mockHelpers);
// Access the private context property
const pythonContext = (testSandbox as any).context;
// Assert
expect(pythonContext).toHaveProperty('_input');
expect(pythonContext).toHaveProperty('_json');
expect(pythonContext).not.toHaveProperty('$input');
expect(pythonContext).not.toHaveProperty('$json');
});
it('should pass sanitized context to the worker', async () => {
// Setup
let capturedWorkerData: any;
(Worker as unknown as jest.Mock).mockImplementation((_, options) => {
capturedWorkerData = options.workerData;
return mockWorker;
});
// Create context with test data
const testContext = {
$input: { item: { json: { testData: 123 } } },
$json: { testData: 123 }
};
// Create sandbox with this context
const testSandbox = new PythonSandbox(testContext as any, mockPythonCode, mockHelpers);
// Execute
await testSandbox.runCode();
// Assert
expect(capturedWorkerData.context).toHaveProperty('_json');
expect(capturedWorkerData.context._json).toHaveProperty('testData', 123);
});
it('should handle circular references in context', async () => {
// Setup
let capturedWorkerData: any;
(Worker as unknown as jest.Mock).mockImplementation((_, options) => {
capturedWorkerData = options.workerData;
return mockWorker;
});
// Create circular reference
const circularObj: any = { name: 'circular' };
circularObj.self = circularObj;
const testContext = {
$input: { item: { json: { circular: circularObj } } }
};
// Create sandbox and execute
const testSandbox = new PythonSandbox(testContext as any, mockPythonCode, mockHelpers);
await testSandbox.runCode();
// Assert
expect(capturedWorkerData.context._input.item.json.circular).toBeDefined();
expect(capturedWorkerData.context._input.item.json.circular.name).toBe('circular');
expect(capturedWorkerData.context._input.item.json.circular.self).toBeDefined();
});
it('should filter out non-serializable values from context', async () => {
// Setup
let capturedWorkerData: any;
(Worker as unknown as jest.Mock).mockImplementation((_, options) => {
capturedWorkerData = options.workerData;
return mockWorker;
});
// Create context with function and symbol
const testContext = {
$input: {
item: {
json: {
fn: () => console.log('test'),
sym: Symbol('test'),
valid: 'data'
}
}
}
};
// Create sandbox and execute
const testSandbox = new PythonSandbox(testContext as any, mockPythonCode, mockHelpers);
await testSandbox.runCode();
// Assert
expect(capturedWorkerData.context._input.item.json.valid).toBe('data');
expect(capturedWorkerData.context._input.item.json.fn).toBeUndefined();
expect(capturedWorkerData.context._input.item.json.sym).toBeUndefined();
});
});
});