1 Star 0 Fork 0

陈柏居/destor-test

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
utils.py 2.88 KB
一键复制 编辑 原始数据 按行查看 历史
陈柏居 提交于 2024-10-16 20:51 . feat: add parseFileContent
import os, re, time, shutil
import threading
from dataclasses import dataclass
from eprogress import MultiProgressManager
from config import *
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
# 共享的进度信息
progress = MultiProgressManager()
lock = threading.Lock()
threadIDMap = {}
def getThreadID():
with lock:
tid = threading.get_ident()
if tid not in threadIDMap:
threadIDMap[tid] = len(threadIDMap)
return threadIDMap[tid]
def renameAndReplace(old, new):
if os.path.exists(new):
os.remove(new)
shutil.move(old, new)
def sampleTask(n, max_value):
# print(f"Task {getThreadID()} started")
progress.start(n)
progress._progress_dict.get(n).title += f" in pool {getThreadID()}"
for i in range(1, max_value + 1):
# 模拟耗时操作
time.sleep(.5)
progress.update(n, i) # 更新进度条
@dataclass
class Arguments:
level: int
cacheSize: int
externalCacheSize: int
ratio: int
def getLogName(self) -> str:
return f"{self.level}_{self.cacheSize}_{self.externalCacheSize}_{self.ratio}.log"
def getShellCommand(self) -> str:
return f"bash {SHELL_PATH} {SOURCE_PATH} {getThreadID()} {self.level} {self.cacheSize} {self.externalCacheSize} {self.ratio} A"
def handleFileNotFound(returnValue=0):
def outer_wrapper(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except FileNotFoundError as e:
print(f"FileNotFound: {e}; args: {args}; kwargs: {kwargs}")
return returnValue
return wrapper
return outer_wrapper
def matchFirstInt(reStr: str, string: str):
"""
返回匹配字符串的第一个组
"""
pattern = re.compile(reStr)
match = pattern.search(string)
if match is None:
raise KeyError("match failed")
return int(match.group(1))
def matchUniqueInt(reStr: str, string: str):
"""
返回匹配字符串的唯一一个组
"""
pattern = re.compile(reStr)
match = pattern.findall(string)
if len(match) == 0:
raise KeyError("match failed")
if len(match) > 1:
raise KeyError("more than one match")
return int(match[0])
@handleFileNotFound()
def parseFileContent(filename, key):
content = ""
with open(filename, "r") as f:
content = f.read()
if type(key) == str:
return matchUniqueInt(rf"{key}: (\d+)", content)
else:
# key is a iterable
return (matchUniqueInt(rf"{k}: (\d+)", content) for k in key)
@handleFileNotFound()
def parseFileContentFirst(filename, key):
content = ""
with open(filename, "r") as f:
content = f.read()
if type(key) == str:
return matchFirstInt(rf"{key}: (\d+)", content)
else:
# key is a iterable
print(key)
return (matchFirstInt(rf"{k}: (\d+)", content) for k in key)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/chen-boju/destor-test.git
[email protected]:chen-boju/destor-test.git
chen-boju
destor-test
destor-test
master

搜索帮助