代码拉取完成,页面将自动刷新
同步操作将从 二毛/抖音爬虫 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import os
import sys
import threading
import time
from typing import List
import schedule
from PIL import Image
from pystray import Icon, MenuItem
from browser import Browser
from spider import Douyin
class API(Douyin):
def download(self):
"""
采集完成后,统一下载已采集的结果
"""
if os.path.exists(self.aria2_conf):
command = f"aria2c -c --console-log-level warn -d {self.down_path} -i {self.aria2_conf}"
# os.system(command) # system有输出,阻塞
os.popen(command) # popen无输出,不阻塞
def get_path(relative_path):
try:
# PyInstaller
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def start() -> None:
print('任务开始')
edge = Browser()
with open('url.txt', 'r', encoding='utf-8') as f:
lines: List[str] = f.readlines()
for url in lines:
a = API(edge.context, url)
a.run()
if a.results:
if download:
icon.notify(f"发现{len(a.results)}条新内容,开始下载", "发现新内容")
a.download()
else:
icon.notify(f"发现{len(a.results)}条新内容", "发现新内容")
else:
# icon.notify("未发现新内容")
pass
edge.stop()
print('任务结束')
def click_download(icon: Icon, item: MenuItem):
global download
download = not item.checked
def click_start(icon: Icon, item: MenuItem):
global running
running = not item.checked
set_schedule(icon)
def click_exit(icon: Icon, item: MenuItem):
global state
state = False
schedule.clear()
icon.stop()
def set_schedule(icon: Icon):
if running:
schedule.every(10).minutes.do(start)
schedule.run_all()
if icon.HAS_NOTIFICATION:
icon.title = '监控中'
icon.notify(f'{int(schedule.idle_seconds())}秒后执行下一次任务', '抖音监控')
else:
schedule.clear()
if icon.HAS_NOTIFICATION:
icon.title = '未启动'
icon.notify("停止监控", '抖音监控')
def on_start():
set_schedule(icon)
while state:
schedule.run_pending()
time.sleep(1)
state = True
running = True
download = False
menu = (
MenuItem(text='开始', action=click_start, checked=lambda item: running, default=True),
MenuItem(text='下载', action=click_download, checked=lambda item: download, default=False),
MenuItem(text='退出', action=click_exit),
)
image: Image = Image.open(get_path("ico.ico"))
icon: Icon = Icon("监控", image, "监控中" if running else "未启动", menu)
# 使用icon.run(setup=on_start)时无法创建图标,只能单独跑一个线程
threading.Thread(target=on_start, daemon=True).start()
icon.run()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。