代码拉取完成,页面将自动刷新
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on 2017/7/21
edited by Boju Chen
"""
__author__ = 'HomgWu'
import sys, re, abc, time, threading
CLEAR_TO_END = "\033[K"
UP_ONE_LINE = "\033[F"
def format_interval(t):
"""
Formats a number of seconds as a clock time, [H:]MM:SS
Parameters
----------
t : int
Number of seconds.
Returns
-------
out : str
[H:]MM:SS
"""
mins, s = divmod(int(t), 60)
h, m = divmod(mins, 60)
if h:
return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s)
else:
return '{0:02d}:{1:02d}'.format(m, s)
class ProgressBar(object, metaclass=abc.ABCMeta):
def __init__(self, width=25, title=''):
self.width = width
self.title = ProgressBar.filter_str(title)
self._lock = threading.Lock()
self.startTime = 0
@property
def lock(self):
return self._lock
@abc.abstractmethod
def update(self, progress=0):
pass
@staticmethod
def filter_str(pending_str):
"""去掉字符串中的\r、\t、\n"""
return re.sub(pattern=r'\r|\t|\n', repl='', string=pending_str)
def start(self):
self.startTime = time.time()
class CircleProgress(ProgressBar):
def __init__(self, width=10, title=''):
"""
@param width : 进度条展示的长度
@param title : 进度条前面展示的文字
"""
super(CircleProgress, self).__init__(width=width, title=title)
self._current_char = ''
def update(self, progress=0):
"""
@param progress : 当前进度值,非0则更新符号
"""
with self.lock:
if progress > 0:
self._current_char = self._get_next_circle_char(self._current_char)
sys.stdout.write('\r' + CLEAR_TO_END)
sys.stdout.write("\r%s:[%s]" % (self.title, self._current_char))
# sys.stdout.flush()
def _get_next_circle_char(self, current_char):
if current_char == '':
current_char = '-'
elif current_char == '-':
current_char = '\\'
elif current_char == '\\':
current_char = '|'
elif current_char == '|':
current_char = '/'
elif current_char == '/':
current_char = '-'
return current_char
class LineProgress(ProgressBar):
def __init__(self, total=100, symbol='#', width=25, title='', progressUnit="it"):
"""
@param total : 进度总数
@param symbol : 进度条符号
@param width : 进度条展示的长度
@param title : 进度条前面展示的文字
@param progressUnit : 进度单位
"""
super(LineProgress, self).__init__(width=width, title=title)
self.total = total
self.symbol = symbol
self._current_progress = 0
self.finishedBar = None
self.progressUnit = progressUnit
def update(self, progress=0):
"""
@param progress : 当前进度值
"""
with self.lock:
if progress > 0:
self._current_progress = progress
sys.stdout.write('\r' + CLEAR_TO_END)
if self.finishedBar is not None:
sys.stdout.write(self.finishedBar)
return
hashes = '#' * int(self._current_progress / self.total * self.width)
spaces = ' ' * (self.width - len(hashes))
if self._current_progress > 0:
spentTime = time.time() - self.startTime
totalTime = spentTime / self._current_progress * self.total
resTime = totalTime - spentTime
speed = self._current_progress / spentTime
if speed > 1:
speedStr = f"{speed:.2f}{self.progressUnit}/s"
else:
speedStr = f"{1/speed:.2f}s/{self.progressUnit}"
else:
spentTime = 0
totalTime = 0
resTime = 0
speedStr = f"0{self.progressUnit}/s"
timeBar = f"[{format_interval(spentTime)}<{format_interval(resTime)}, {speedStr}>]"
bar = f"\r{self.title}:[{hashes + spaces}] {self._current_progress}/{self.total} {timeBar}"
sys.stdout.write(bar)
if self._current_progress >= self.total:
self.finishedBar = bar
class MultiProgressManager(object):
def __new__(cls, *args, **kwargs):
"""单例"""
if not hasattr(cls, '_instance'):
cls._instance = super(MultiProgressManager, cls).__new__(cls)
return cls._instance
def __init__(self):
self._progress_dict = {}
self._lock = threading.Lock()
def put(self, key, progress_bar):
with self._lock:
if key is not None and progress_bar:
self._progress_dict[key] = progress_bar
progress_bar.index = len(self._progress_dict) - 1
print()
def clear(self):
with self._lock:
self._progress_dict.clear()
def update(self, key, progress):
"""
@param key : 待更新的进度条标识
@param progress : 当前进度值
"""
with self._lock:
if key is None:
return
delta_line = len(self._progress_dict)
sys.stdout.write(UP_ONE_LINE * delta_line if delta_line > 0 else '')
for tmp_key in self._progress_dict.keys():
progress_bar = self._progress_dict.get(tmp_key)
tmp_progress = 0
if key == tmp_key:
tmp_progress = progress
progress_bar.update(tmp_progress)
sys.stdout.write('\n')
def start(self, key):
with self._lock:
progress_bar = self._progress_dict.get(key)
if progress_bar:
progress_bar.start()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。