1 Star 0 Fork 0

陈柏居/destor-test

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
evaluationBatch.py 2.38 KB
一键复制 编辑 原始数据 按行查看 历史
陈柏居 提交于 2024-10-14 16:46 +08:00 . refactor testing utils
from utils import *
from concurrent.futures import ThreadPoolExecutor, wait
import subprocess
from eprogress import LineProgress
def executeCommand(cmd: str, taskid):
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = ""
while True:
output = p.stdout.read1(1024).decode("utf8") # 使用 read1 而不是 readline 可以避免阻塞
if output == "" and p.poll() is not None: # 检查是否结束
break
output = output.split("\n")[-1]
match = re.search(r"(\d+) GB,", output)
if match is not None:
progress.update(taskid, int(match.group(1)))
p.wait()
if p.returncode == 0:
progress.update(taskid, TOTAL_SIZE)
else:
raise Exception(f"Task {taskid} failed: {output}")
def runSingle(taskid: int, args: Arguments):
progress.start(taskid)
progress._progress_dict.get(taskid).title += f" in pool {getThreadID()}"
executeCommand(args.getShellCommand(), taskid)
renameAndReplace(f"/data/cbj/destor/log/pool_{getThreadID()}.log", f"./data/{args.getLogName()}")
def putProgress(taskid: int):
progress.put(taskid, LineProgress(title=f"Task {taskid}", width=100, total=TOTAL_SIZE, progressUnit="GB"))
def runBasic(executor) -> list:
futures = []
cacheSizes = [10 * 256]
ratios = [50, 60, 70]
levels = [3, 4, 5]
for level in levels:
for cacheSize in cacheSizes:
for ratio in ratios:
progress.put(len(futures), LineProgress(title=f"Task {len(futures)}", width=100, total=totalSize, progressUnit="GB"))
futures.append(executor.submit(runSingle, len(futures), level, cacheSize, ratio))
return futures
def runExternal(executor) -> list:
futures = []
cacheSize = 2000
externals = [1000, 2000, 4000]
levels = [3, 4, 5]
for level in levels:
for external in externals:
putProgress(len(futures))
args = Arguments(level, cacheSize, external, 50)
futures.append(executor.submit(runSingle, len(futures), args))
return futures
def main():
os.system("bash resetEvaluation.sh")
# 创建一个最大容纳4个线程的线程池
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务到线程池
futures = runBasic(executor)
wait(futures)
if __name__ == "__main__":
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/chen-boju/destor-test.git
[email protected]:chen-boju/destor-test.git
chen-boju
destor-test
destor-test
master

搜索帮助