代码拉取完成,页面将自动刷新
同步操作将从 anolis/keentune_bench 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# Copyright (c) 2021-2023 Alibaba Cloud Computing Ltd.
# SPDX-License-Identifier: MulanPSL-2.0
import pytest
async def test_status():
import tornado
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/status"
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
async def test_sendfile():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/sendfile"
file_content = "#sendfile Test file\nimport time\ntime.sleep(2)\nprint('Latency_90 = 3310.0, Latency_99 = 9530.0, Requests_sec = 358697.42, Transfer_sec = 291790000.0')"
file_encode = "utf-8"
file_name = "benchmark/test/sendfile_test.py"
request_body={
"file_name": file_name,
"body": file_content,
"encode_type": file_encode
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def test_benchmark():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
from bench.controller.benchmark import BenchmarkProcess
url="http://localhost:9874/benchmark"
benchmark_cmd = 'python3 /var/keentune/bench/scripts/benchmark/test/sendfile_test.py localhost'
response_ip = 'localhost'
response_port = '9874'
bench_id = 1
request_body={
"benchmark_cmd": benchmark_cmd,
"resp_ip": response_ip,
"resp_port": response_port,
"bench_id": bench_id
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
benchmark=BenchmarkProcess(benchmark_cmd, response_ip, response_port, bench_id)
res, data = benchmark.runbenchmark()
print("[+] benchmark result:{res} response benchmark running result: {data}".format(
res = res,
data = data
))
async def test_terminate():
import tornado
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/terminate"
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
async def test_avaliable():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/avaliable"
request_body={"agent_address": "localhost"}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def test_err():
import tornado
import json
from tornado.httpclient import AsyncHTTPClient
url="http://localhost:9874/sendfile"
file_name = "benchmark/test/sendfile_test.py"
request_body={
"file_name_err": file_name
}
http_client = AsyncHTTPClient()
response = await http_client.fetch(url, method="POST", body=json.dumps(request_body))
async def run_requests():
import tornado
import time
await test_avaliable()
await test_status()
await test_sendfile()
await test_benchmark()
time.sleep(5)
# kill benchmark process
await test_benchmark()
await test_terminate()
# test error
await test_err()
tornado.ioloop.IOLoop.current().stop()
def test_all_handler():
from bench.common.config import Config
import tornado
from tornado.httpclient import AsyncHTTPClient
from bench.controller.sendfile import SendfileHandler
from bench.controller.benchmark import BenchmarkHandler,BenchmarkTerminateHandler
from bench.controller.status import StatusHandler, AvaliableAddressHandler
app = tornado.web.Application(handlers=[
(r"/sendfile", SendfileHandler),
(r"/benchmark", BenchmarkHandler),
(r"/terminate", BenchmarkTerminateHandler),
(r"/status", StatusHandler),
(r"/avaliable", AvaliableAddressHandler)
])
app.listen(Config.BENCH_PORT)
tornado.ioloop.IOLoop.current().run_sync(run_requests)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
test_all_handler()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。