代码拉取完成,页面将自动刷新
import os, re, time, shutil
import threading
from dataclasses import dataclass
from eprogress import MultiProgressManager
from config import *
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
# 共享的进度信息
progress = MultiProgressManager()
lock = threading.Lock()
threadIDMap = {}
def getThreadID():
with lock:
tid = threading.get_ident()
if tid not in threadIDMap:
threadIDMap[tid] = len(threadIDMap)
return threadIDMap[tid]
def renameAndReplace(old, new):
if os.path.exists(new):
os.remove(new)
shutil.move(old, new)
def sampleTask(n, max_value):
# print(f"Task {getThreadID()} started")
progress.start(n)
progress._progress_dict.get(n).title += f" in pool {getThreadID()}"
for i in range(1, max_value + 1):
# 模拟耗时操作
time.sleep(.5)
progress.update(n, i) # 更新进度条
@dataclass
class Arguments:
level: int
cacheSize: int
externalCacheSize: int
ratio: int
def getLogName(self) -> str:
return f"{self.level}_{self.cacheSize}_{self.externalCacheSize}_{self.ratio}.log"
def getShellCommand(self) -> str:
return f"bash {SHELL_PATH} {SOURCE_PATH} {getThreadID()} {self.level} {self.cacheSize} {self.externalCacheSize} {self.ratio} A"
def handleFileNotFound(returnValue=0):
def outer_wrapper(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except FileNotFoundError as e:
print(f"FileNotFound: {e}; args: {args}; kwargs: {kwargs}")
return returnValue
return wrapper
return outer_wrapper
def matchFirstInt(reStr: str, string: str):
"""
返回匹配字符串的第一个组
"""
pattern = re.compile(reStr)
match = pattern.search(string)
if match is None:
raise KeyError("match failed")
return int(match.group(1))
def matchUniqueInt(reStr: str, string: str):
"""
返回匹配字符串的唯一一个组
"""
pattern = re.compile(reStr)
match = pattern.findall(string)
if len(match) == 0:
raise KeyError("match failed")
if len(match) > 1:
raise KeyError("more than one match")
return int(match[0])
@handleFileNotFound()
def parseFileContent(filename, key):
content = ""
with open(filename, "r") as f:
content = f.read()
if type(key) == str:
return matchUniqueInt(rf"{key}: (\d+)", content)
else:
# key is a iterable
return (matchUniqueInt(rf"{k}: (\d+)", content) for k in key)
@handleFileNotFound()
def parseFileContentFirst(filename, key):
content = ""
with open(filename, "r") as f:
content = f.read()
if type(key) == str:
return matchFirstInt(rf"{key}: (\d+)", content)
else:
# key is a iterable
print(key)
return (matchFirstInt(rf"{k}: (\d+)", content) for k in key)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。