1 Star 0 Fork 9

lichongbing/wuhan_virus_notify

forked from Jemmy/COVID-19_notify
暂停
 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
persistence.py 5.21 KB
一键复制 编辑 原始数据 按行查看 历史
Jemmy 提交于 2020-02-06 02:45 . use crontab to start script
#!/usr/local/bin/python3
# _*_ coding: utf-8 _*_
# 持久化
import pymysql
import Config
import time
import json
from kafka import KafkaProducer
import redis
class PersistenceMysql():
def __init__(self):
self.host = Config.PERSISTENCE["mysql"]["host"]
self.username = Config.PERSISTENCE["mysql"]["user"]
self.password = Config.PERSISTENCE["mysql"]["password"]
self.database = Config.PERSISTENCE["mysql"]["database"]
self.table_name = Config.PERSISTENCE["mysql"]["table_name"]
self.conn = pymysql.connect(host=self.host, user=self.username,
password=self.password, database=self.database, charset="utf8")
self.insert_sql = "insert into " + self.table_name + \
"(create_time,value) values(%d,%s)"
self.block_sql = ""
@staticmethod
def pingpingping(self):
# todo: 检查表是否存在,不存在,则新建
pass
def item_serialize(self, item):
return json.dumps(item)
def get_current_time(self):
return int(time.time())
# 保存感染者到访小区信息
def save_infected_blocks(self, item):
sql = "insert into inflected_block(id, p_name, c_name, county_name, block_name, source_url, city_lng, city_lat, block_lng, block_lat) values({},'{}','{}','{}','{}','{}',{},{},{},{})".format(item.get("id"),item.get("province"),item.get("city"),item.get("district"),item.get("community"),item.get("sourceurl"),item.get("citylong"),item.get("citylat"),item.get("communitylong"),item.get("communitylat"))
cursor = self.conn.cursor()
try:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
print(e)
print("插入小区数据 {}:{}:{}:{}:{} 失败".format(item.get("id"),item.get("p_name"),item.get("c_name"),item.get("county_name"),item.get("block_name")))
# 获取表item总数
def get_infected_block_count(self):
sql = "select count(*) from inflected_block"
cursor = self.conn.cursor()
cursor.execute()
return cursor.fetchall()[0][0]
class KafkaWork():
def __init__(self):
self.topic = Config.PERSISTENCE["kafka"]["topic"]
self.server = Config.PERSISTENCE["kafka"]["server"]
self.producer = KafkaProducer(bootstrap_servers=self.server)
def send(self, key=None, value=None):
future = self.producer.send(
self.topic, value=bytes(value, encoding="utf8"))
try:
meta = future.get(timeout=10)
self.on_success(meta)
except Exception as e:
print(e)
print("kafka 通知失败")
return False
def on_success(self, meta):
# print("topic:{} offset: {} 发送成功!".format(meta.topic,meta.offset))
return True
class PersistenceRedis():
def __init__(self):
self.tip_key = "wxopensvr:tip"
self.country_key = "wxopensvr:country"
self.city_key = "wxopensvr:city"
self.tc_total_key = "wxopensvr:tc:total"
self.tc_add_key = "wxopensvr:tc:add"
self.tc_day_key = "wxopensvr:tc:day_list"
self.tc_day_add_key = "wxopensvr:tc:day_add_list"
self.host = Config.PERSISTENCE["redis"]["host"]
self.port = Config.PERSISTENCE["redis"]["port"]
self.index = Config.PERSISTENCE["redis"]["index"]
self.password = Config.PERSISTENCE["redis"]["password"]
self.conn = redis.Redis(
host=self.host, port=self.port, password=self.password, db=self.index)
# 保存提示
def save_tips(self, tip):
self.conn.set(self.tip_key, tip)
# 保存各个国家的数据
def save_countries(self, ll):
pipe = self.conn.pipeline()
self.conn.delete(self.country_key)
for i in ll:
pipe.zadd(self.country_key, {json.dumps(i): float(i.get("confirmedCount")})
pipe.execute()
# 保存我国各个城市的数据
def save_cities(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.city_key)
for i in ll:
pipe.zadd(self.city_key, {json.dumps(i): float(i.get("confirmedCount")})
pipe.execute()
# 保存腾讯新闻总数
def save_tc_total(self, item):
self.conn.set(self.tc_total_key, item)
# 保存腾讯新闻“较昨日新增”
def save_tc_add(self, item):
self.conn.set(self.tc_add_key, item)
# 保存腾讯新闻“疫情趋势”
def save_tc_day_list_item(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.tc_day_key)
for i in ll:
pipe.zadd(self.tc_day_key, {json.dumps(i): float(
int("".join(i.get("date").split("."))))})
pipe.execute()
# 保存腾讯新闻“疫情趋势新增”
def save_tc_day_list_add_item(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.tc_day_add_key)
for i in ll:
pipe.zadd(self.tc_day_add_key, {json.dumps(
i): float(int("".join(i.get("date").split("."))))})
pipe.execute()
if __name__ == "__main__":
k=KafkaWork()
k.send("央视新闻", json.dumps({"name": "标题", "content": "内容"}))
k.send("央视新闻", json.dumps({"name": "title", "content": "content"}))
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/lichongbing/wuhan_virus_notify.git
[email protected]:lichongbing/wuhan_virus_notify.git
lichongbing
wuhan_virus_notify
wuhan_virus_notify
master

搜索帮助