1 Star 0 Fork 6

Wuming/keentune_bench

forked from anolis/keentune_bench 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test.py 4.02 KB
一键复制 编辑 原始数据 按行查看 历史
jiaominchao 提交于 2024-01-12 14:40 . add unit test cases and code comments
# Copyright (c) 2021-2023 Alibaba Cloud Computing Ltd.
# SPDX-License-Identifier: MulanPSL-2.0
import pytest
async def test_status():
import tornado
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/status"
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
async def test_sendfile():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/sendfile"
file_content = "#sendfile Test file\nimport time\ntime.sleep(2)\nprint('Latency_90 = 3310.0, Latency_99 = 9530.0, Requests_sec = 358697.42, Transfer_sec = 291790000.0')"
file_encode = "utf-8"
file_name = "benchmark/test/sendfile_test.py"
request_body={
"file_name": file_name,
"body": file_content,
"encode_type": file_encode
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def test_benchmark():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
from bench.controller.benchmark import BenchmarkProcess
url="http://localhost:9874/benchmark"
benchmark_cmd = 'python3 /var/keentune/bench/scripts/benchmark/test/sendfile_test.py localhost'
response_ip = 'localhost'
response_port = '9874'
bench_id = 1
request_body={
"benchmark_cmd": benchmark_cmd,
"resp_ip": response_ip,
"resp_port": response_port,
"bench_id": bench_id
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
benchmark=BenchmarkProcess(benchmark_cmd, response_ip, response_port, bench_id)
res, data = benchmark.runbenchmark()
print("[+] benchmark result:{res} response benchmark running result: {data}".format(
res = res,
data = data
))
async def test_terminate():
import tornado
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/terminate"
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
async def test_avaliable():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/avaliable"
request_body={"agent_address": "localhost"}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def test_err():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/sendfile"
file_name = "benchmark/test/sendfile_test.py"
request_body={
"file_name_err": file_name
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def run_requests():
import tornado
import time
await test_avaliable()
await test_status()
await test_sendfile()
await test_benchmark()
time.sleep(5)
# kill benchmark process
await test_benchmark()
await test_terminate()
# test error
await test_err()
tornado.ioloop.IOLoop.current().stop()
def test_all_handler():
from bench.common.config import Config
import tornado
from tornado.httpclient import AsyncHTTPClient
from bench.controller.sendfile import SendfileHandler
from bench.controller.benchmark import BenchmarkHandler,BenchmarkTerminateHandler
from bench.controller.status import StatusHandler, AvaliableAddressHandler
app = tornado.web.Application(handlers=[
(r"/sendfile", SendfileHandler),
(r"/benchmark", BenchmarkHandler),
(r"/terminate", BenchmarkTerminateHandler),
(r"/status", StatusHandler),
(r"/avaliable", AvaliableAddressHandler)
])
app.listen(Config.BENCH_PORT)
tornado.ioloop.IOLoop.current().run_sync(run_requests)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
test_all_handler()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/dingpengs/keentune_bench.git
[email protected]:dingpengs/keentune_bench.git
dingpengs
keentune_bench
keentune_bench
master

搜索帮助