代码拉取完成,页面将自动刷新
同步操作将从 Jemmy/COVID-19_notify 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/local/bin/python3
# _*_ coding: utf-8 _*_
import Config
from util import ColorUtil,RedisConn
from notify import DingDing,ServerChan
import schedule
import requests
from bs4 import BeautifulSoup
import js2py
import time
class DingXiang():
def __init__(self):
self.source_url = "https://3g.dxy.cn/newh5/view/pneumonia?from=groupmessage&isappinstalled=0"
self.run_time = 0
self.schedule_minutes = 3 # 定时器每5分钟执行一次
self.sleep_time = 5
self.dingding = DingDing()
self.server_chan = ServerChan()
self.cache = RedisConn()
def get_source(self):
res = requests.get(self.source_url)
if res.status_code != 200:
print("{}获取源数据错误!{}".format(ColorUtil.get_red(),ColorUtil.get_reset()))
return None
res.encoding = "utf-8"
soup = BeautifulSoup(res.text,"lxml")
data = soup.find_all(id="getTimelineService")
if len(data) == 0:
print("{}没有实时播报信息{}".format(ColorUtil.get_red(),ColorUtil.get_reset()))
return None
data = data[0].text
resource = js2py.eval_js("var a = " + data.replace("try { window.getTimelineService =","").replace("}catch(e){}","").strip())
# resouce 是一个列表,每一个元素是包含实时播报信息字典
# response[0] = {
# 'createTime': 1580048835000,
# 'id': 356,
# 'infoSource': '人民日报',
# 'modifyTime': 1580048835000,
# 'provinceId': '42',
# 'provinceName': '湖北省',
# 'pubDate': 1580047711000,
# 'pubDateStr': '18分钟前',
# 'sourceUrl': 'http://m.weibo.cn/2803301701/4465106090076043',
# 'summary': '武汉市市长周先旺:截至今天凌晨,武汉累计报告618例,已经治愈出院40例,死亡45例,目前在院治疗533例,重症87例,危重53例,都在定点医院接受隔离治疗。',
# 'title': '武汉新型肺炎死亡病例已有45人,已经治愈出院40例'
#}
print("{}第{}次刷新,本次获取数据{}条{}\n".format(ColorUtil.get_green_with_back(),self.run_time,len(resource),ColorUtil.get_reset()))
return resource
# 定时器
def scheduler(self):
schedule.every(self.schedule_minutes).minutes.do(self.task)
while 1:
schedule.run_pending()
# 加入已读缓存
def set_is_read(self,id):
return self.cache.add(id) == 1
# 检查是否在已读缓存中
def check_is_read(self,id):
return self.cache.check_is_in(id)
# 发送通知
def notify(self,item):
if Config.NOTIFY_OPTION["dingding"]:
res1 = self.dingding.send(item)
if res1:
print("{}ID:{} Content:【{}】{} 钉钉通知成功".format(ColorUtil.get_green(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
else:
print("{}ID:{} Content:【{}】{} 钉钉通知失败".format(ColorUtil.get_red(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
if Config.NOTIFY_OPTION["server_chan"]:
res2 = self.server_chan.send(item)
if res2:
print("{}ID:{} Content:【{}】{} Server酱通知成功".format(ColorUtil.get_green(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
else:
print("{}ID:{} Content:【{}】{} Server酱通知失败".format(ColorUtil.get_red(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
return res1 and res2
# 定时任务
def task(self):
resource = self.get_source()
sorted(resource,key=lambda s:s["id"],reverse=True) # 根据ID倒序
for item in resource:
# 1. 检查是否在已读缓存中
if self.check_is_read(item.get("id")):
print("{}ID:{} Content:【{}】{} 已在缓存中\n".format(ColorUtil.get_blue_with_back(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
break # 根据ID倒序,最新的已经被已读,那么旧的肯定也被已读
# 2. 发送通知,并且加入已读缓存
if self.notify(item):
if self.set_is_read(item.get("id")):
print("{}加入已读缓存成功".format(item.get("id")))
else:
print("{}{}加入缓存失败!{}".format(ColorUtil.get_red_with_back(),item.get("id"),ColorUtil.get_reset()))
time.sleep(self.sleep_time) # 同批每次通知之间间隔5秒钟
self.run_time += 1
# 入口
def run(self):
print("{}爬虫启动...{}".format(ColorUtil.get_green(),ColorUtil.get_reset()))
self.scheduler()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。