// base_test.js v1.0 测试基础功能(低代码框架测试入口) import sandbox from 'apigo.cc/gojs/sandbox' import co from 'apigo.cc/gojs/console' import file from 'apigo.cc/gojs/file' import rt from 'apigo.cc/gojs/runtime' import u from 'apigo.cc/gojs/util' import tt from 'testTool' function runTest(codefile) { let baseConfig = { projectDir: "data", } let user_config = "{}" if (codefile.endsWith('.py')) { baseConfig.runtime = { language: "python", venv: "test", version: "3.12" } baseConfig.startArgs = ["-c", file.read(codefile)] file.write('data/requirements.txt', 'cowsay') user_config = file.read(codefile).match(/"""\s*TEST_CONFIG\s*([\s\S]*?)\s*"""/)[1] } else if (codefile.endsWith('.js')) { file.write('data/package.json', '{"dependencies":{"cowsay":"*"}}') baseConfig.runtime = { language: "nodejs", version: "24" } baseConfig.startArgs = ["-e", file.read(codefile)] user_config = file.read(codefile).match(/\/\*\s*TEST_CONFIG\s*([\s\S]*?)\s*\*\//)[1] } let config = { ...baseConfig, ...u.unJson(user_config) } if (rt.os() === 'darwin') { // Mac 系统清除不支持的配置 config.limits = {} } const name = config.name || codefile try { file.remove('data/stdout.log') file.remove('data/stderr.log') const sb = sandbox.start(config) if (err = getLastError()) throw new Error(`sandbox test ${name} start failed: ` + err) let st = sb.status() if (st.status !== 'running') throw new Error(`sandbox test ${name} status is not running: ` + st.status) sb.wait(10000) if (err = getLastError()) throw new Error(`sandbox test ${name} wait failed: ` + err) st = sb.status() if (st.status !== 'exited') throw new Error(`sandbox test ${name} status is not exited: ` + st.status) co.info(`sandbox test ${name} finished uptime ${st.uptime}`) er = file.read('data/stderr.log') if (er) throw new Error(er) r = file.load('data/stdout.log') if (!r || !r.testSuccess) throw new Error(`sandbox test ${name} failed`) co.info(r) return true } catch (e) { co.info(co.red(e.message), co.magenta(file.read('data/stderr.log'))) co.println(co.bCyan('stdout'), co.cyan(file.read('data/stdout.log'))) co.println(co.bMagenta('stderr'), co.magenta(file.read('data/stderr.log'))) return false } } const TEST_COUNT_PER_TEST = 100 const TEST_COUNT = 10 function main() { if (!runTest('testcase/base_allow.py')) return false if (!runTest('testcase/base_deny.py')) return false if (!runTest('testcase/secret.py')) return false if (!runTest('testcase/base_allow.js')) return false // 压力测试 let tasks = [] for (let i = 0; i < TEST_COUNT_PER_TEST; i++) { tasks.push([` import sandbox from 'apigo.cc/gojs/sandbox' import u from 'apigo.cc/gojs/util' import co from 'apigo.cc/gojs/console' const sb = sandbox.start({ noLog: true, startArgs: ["-c", "import time\\nprint(int(time.time() * 1000))"], runtime: { language: "python", venv: "test", version: "3.12" }, }) u.len({ a: 1, b: 2 }) if (err = getLastError()) throw new Error(err) return sb.wait(10000) `]) } let totalOkNum = 0 let totalFailNum = 0 let totalTotalTime = 0 for (let j = 0; j < TEST_COUNT; j++) { co.info(`Dispatching ${tasks.length} tasks...`) const startTime = Date.now() // 执行并发任务 let results = tt.runAll(tasks) const totalTime = Date.now() - startTime let countTotalTime = 0 let okNum = 0 let failNum = 0 for (let i = 0; i < results.length; i++) { const [data, err] = results[i] if (!err) { countTotalTime += Number(data.trim()) - startTime okNum++ } else { failNum++ co.info(co.red(`Task ${i} failed: ${err}`)) } } totalOkNum += okNum totalFailNum += failNum totalTotalTime += countTotalTime co.info(`All tasks finished in ${totalTime}ms, avg ${countTotalTime / okNum}ms, ${okNum} ok, ${failNum} fail`) file.remove('/tmp/sandbox_tests') } co.info(`Total: ${totalOkNum} ok, ${totalFailNum} fail, avg ${totalTotalTime / totalOkNum}ms`) return totalFailNum === 0 }