代码拉取完成,页面将自动刷新
import datetime
import logging
import os
import re
import sys
import time
from queue import Full, Queue
import config
from minio_handler import MinIOHandler
def _reset_logger(log):
for handler in log.handlers:
handler.close()
log.removeHandler(handler)
del handler
log.handlers.clear()
log.propagate = False
console_handle = logging.StreamHandler(sys.stdout)
console_handle.setFormatter(
logging.Formatter(
"[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
filepath = os.path.join(os.path.dirname(__file__), 'logs')
logFilename = os.path.join(filepath, time.strftime("%Y_%m_%d", time.localtime()) + '.log')
file_handle = logging.FileHandler(logFilename, encoding="utf-8")
file_handle.setFormatter(
logging.Formatter(
"[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
log.addHandler(file_handle)
log.addHandler(console_handle)
def _get_logger():
log = logging.getLogger("log")
_reset_logger(log)
log.setLevel(logging.INFO)
return log
# 日志句柄
logger = _get_logger()
def get_appdata_dir():
data_path = config.APP_DATA_DIR
if not os.path.exists(data_path):
logging.info("[INIT] data path not exists, create it: {}".format(data_path))
os.makedirs(data_path)
return data_path
class Dequeue(Queue):
def putleft(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._putleft(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def putleft_nowait(self, item):
return self.putleft(item, block=False)
def _putleft(self, item):
self.queue.appendleft(item)
def remove_markdown_symbol(text: str):
# 移除markdown格式,目前先移除**
if not text:
return text
return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
def check_prefix(content, prefix_list):
if not prefix_list:
return None
for prefix in prefix_list:
if content.startswith(prefix):
return prefix
return None
def check_contain(content, keyword_list):
if not keyword_list:
return None
for ky in keyword_list:
if content.find(ky) != -1:
return True
return None
def compose_attachment_path_name(is_group, actual_user_nickname, from_user_nickname, filename):
# timerStr = datetime.datetime.now().strftime(config.FILENAME_TIME_FORMAT)
if is_group:
return os.path.join(get_tmp_dir(), f"{actual_user_nickname}_{filename}")
return os.path.join(get_tmp_dir(), f"{from_user_nickname}_{filename}")
def compose_minio_file_name(is_group, actual_user_nickname, from_user_nickname, filename):
# timerStr = datetime.datetime.now().strftime(config.FILENAME_TIME_FORMAT)
dateStr = datetime.datetime.now().strftime(config.DEFAULT_DATE_FORMAT)
MinIOHandler.create_directory(config.MINIO_BUCKET_NAME, dateStr)
if is_group:
return f"{dateStr}/{actual_user_nickname}_{filename}"
return f"{dateStr}/{from_user_nickname}_{filename}"
def get_tmp_dir():
dateStr = datetime.datetime.now().strftime(config.DEFAULT_DATE_FORMAT)
tmp_path = os.path.join(config.TMP_FILE_ROOT_DIR, dateStr)
if not os.path.exists(tmp_path):
os.makedirs(tmp_path)
return tmp_path
def parse_file_reply(text) -> tuple:
return text.split(config.envConfig.FILE_MSG_SPLITER)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。