代码拉取完成,页面将自动刷新
'''
Author: thinktanker [email protected]
Date: 2024-02-06 17:21:27
LastEditors: duliang [email protected]
LastEditTime: 2024-04-18 21:09:43
FilePath: \duls-wechat\getsq.py
Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
'''
import sqlite3
import time
import locale
import matplotlib
matplotlib.use('Agg') # 设置matplotlib不显示图形,只保存图形
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import matplotlib.ticker as ticker
from threading import Thread
import queue
from PIL import Image
from io import BytesIO
locale.setlocale(locale.LC_CTYPE, 'chinese') # 设置为中文环境
db_path = r'd:\lljdata\swdb.db' # 数据库路径
plt.rcParams['font.sans-serif'] = ['SimHei'] # 无衬线字体,用于显示正文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号'-'显示为方块的问题
results_queue = queue.Queue() # 多线程结果队列
# db1_path = r'F:\gitee\control-net\data\hisdata.db'
# db2_path = r'F:\gitee\control-net\data\hisdata2.db'
# ename_list = ["35kV", "zhb", "sb", "6kV", "zb", "1", "2", "3", "4"]
# realname_list = [
# "35kV", "主变", "所变", "6 kV", "站变", "1# ", "2# ", "3# ", "4# "
# ]
# realname_list2 = {
# '1': '1#',
# '2': '2#',
# "3": "3#",
# '4': '4#',
# 'zhbg': '站变高压侧',
# 'zhbd': '站变低压侧',
# 'zb': '主变',
# '35kV': '35kV',
# }
def getsq(gcname, gcstart, gcend):
# dbpath = r'd:\lljdata\swdb.db'
# print(dbpath)
conn = sqlite3.connect(db_path)
c = conn.cursor()
sq = ""
if gcstart != "":
tmps = f"{gcstart[0:4]}-{gcstart[4:6]}-{gcstart[6:8]}"
tmpe = tmps
gcstart = f"{tmps} 00:00:00"
if gcend == "":
gcend = f"{tmpe} 23:59:59"
else:
tmpe = f"{gcend[0:4]}-{gcend[4:6]}-{gcend[6:8]}"
gcend = f"{tmpe} 23:59:59"
# sqltxt = f"select name,sysw,xysw,time from swdb where swbdb.name like %'{gcname}'% order by time desc limit 4"
sqltxt = f"""select name,sysw,xysw,time from swdb where name like "%{gcname}%" and time between "{gcstart}" and "{gcend}" order by time desc limit 4"""
else:
sqltxt = f"""select name,sysw,xysw,time from swdb where name like "%{gcname}%" order by time desc limit 4"""
print(sqltxt)
rs = c.execute(sqltxt)
for row in rs:
sq += (str(row[0]) + '\n' + str(row[3]) + "\n上游:" + str(row[1]) +
" 下游:" + str(row[2]) + "\n\n")
c.close()
conn.close()
return sq
def getnewsq(gcname):
'''
获取最新水位数据
:param gcname: 工程名称
:return: 最新水位数据
'''
# print(dbpath)
gcname = list(gcname)
gcname = "%".join(gcname)
conn = sqlite3.connect(db_path)
c = conn.cursor()
sq = ""
sqltxt = f"""select name,sysw,xysw,time from swdb where name like "%{gcname}%" order by time desc limit 4"""
# print(sqltxt)
rs = c.execute(sqltxt)
for row in rs:
sq += (str(row[0]) + '\n' + str(row[3]) + "\n上游:" + str(row[1]) +
" 下游:" + str(row[2]) + "\n\n")
c.close()
conn.close()
return sq
def get_gcname(_name):
'''
获取工程名称
:param _name: 工程模糊关键字
:return: 符合关键字的真实工程名称
'''
_name = '%'.join(list(_name))
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
query = "SELECT name FROM swdb where name like '%" + _name + "%' GROUP BY name limit 10"
cursor.execute(query)
gcname = cursor.fetchall()
conn.close()
return gcname
def get_trend(_name):
'''
获取近7天的水位数据,并绘制趋势图
:param _name: 工程名称
:return: 趋势图的bytes
'''
conn = sqlite3.connect(db_path)
query = "SELECT name,sysw,xysw,time FROM swdb where name like '%" + _name + f"%' order by time desc limit 168"
df = pd.read_sql_query(query, conn)
conn.close()
fig, ax = plt.subplots()
ax.plot(df['time'], df['sysw'])
ax.plot(df['time'], df['xysw'])
xlims = ax.get_xlim()
# print(xlims)
# 确保 df['time'] 是 datetime 类型,如果不是则需要转换
if not isinstance(df['time'].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype):
df['time'] = pd.to_datetime(df['time'])
# 设置x轴主要刻度器,只在首尾各有一个刻度
ax.xaxis.set_major_locator(ticker.FixedLocator([xlims[0], xlims[1]]))
# 设置首尾刻度标签
# ax.set_xticks([xlims[0], xlims[1]])
# end_time = df['time'].iloc[0].strftime('%Y-%m-%d %H:%M:%S')
start_time = df['time'].iloc[1].strftime('%Y-%m-%d %H:%M:%S')
# ax.set_xticklabels([start_time, end_time]) # 替换为实际的时间标签或字符串
# 获取特定列的最大值和最小值
max_sysw = df['sysw'].max()
max_s_index = df['sysw'].idxmax()
max_s_time = df['time'].iloc[max_s_index].strftime('%Y-%m-%d %H:%M:%S')
min_sysw = df['sysw'].min()
min_s_index = df['sysw'].idxmin()
min_s_time = df['time'].iloc[min_s_index].strftime('%Y-%m-%d %H:%M:%S')
max_xysw = df['xysw'].max()
max_x_index = df['xysw'].idxmax()
max_x_time = df['time'].iloc[max_x_index].strftime('%Y-%m-%d %H:%M:%S')
min_xysw = df['xysw'].min()
min_x_index = df['xysw'].idxmin()
min_x_time = df['time'].iloc[min_x_index].strftime('%Y-%m-%d %H:%M:%S')
# 设置x轴刻度
ax.set_xlim(df['time'].index[-1], df['time'].index[0]) # 设置x轴的范围包括最小和最大值
ax.xaxis.set_ticks([
df['time'].index[-1], df['time'].index[0]
# df['time'].index[max_s_index], df['time'].index[min_s_index]
])
# # 设置x轴刻度
# desired_ticks = [max_s_time, min_s_time] # 已包含最小值和最大值
# # if len(x_data) > 0:
# # first_tick = x_data[0]
# # last_tick = x_data[-1]
# desired_ticks.extend([start_time, end_time])
# # 设置x轴刻度标签
# ax.set_xticks(desired_ticks)
# 添加垂直线段到最大值和最小值处
# ax.axvline(x=max_s_time,
# ymin=0.6,
# ymax=1,
# color='r',
# linestyle='--',
# linewidth=1)
# ax.axhline(y=max_sysw, color='r', linestyle='--', linewidth=1)
# ax.axvline(x=min_s_time,
# ymin=0.6,
# ymax=1,
# color='g',
# linestyle='--',
# linewidth=1)
# ax.axhline(y=min_sysw, color='g', linestyle='--', linewidth=1)
# 标记最大值和最小值
# 上游
sysw_cur = df['sysw'].iloc[0]
ax.annotate(text=f" {sysw_cur}",
xy=(start_time, sysw_cur),
xycoords='data',
xytext=(start_time, sysw_cur),
textcoords='data',
fontsize=10,
color='blue',
arrowprops=dict(facecolor='blue'))
time_text = '\n'.join(max_s_time.split(' '))
ax.annotate(text=f"Max:{max_sysw:.2f}\n{time_text}\n",
xy=(max_s_time, max_sysw),
xycoords='data',
xytext=(max_s_time, max_sysw - 1),
textcoords='data',
fontsize=10,
color='red',
arrowprops=dict(facecolor='red'))
time_text = '\n'.join(min_s_time.split(' '))
ax.annotate(text=f"Min:{min_sysw:.2f}\n{time_text}\n",
xy=(min_s_time, min_sysw),
xycoords='data',
xytext=(min_s_time, min_sysw - 1),
textcoords='data',
fontsize=10,
color='green',
arrowprops=dict(facecolor='green'))
# 下游
xysw_cur = df['xysw'].iloc[0]
ax.annotate(text=f" {xysw_cur}",
xy=(start_time, xysw_cur),
xycoords='data',
xytext=(start_time, xysw_cur),
textcoords='data',
fontsize=10,
color='blue',
arrowprops=dict(facecolor='blue'))
time_text = '\n'.join(max_x_time.split(' '))
ax.annotate(text=f"Max:{max_xysw:.2f}\n{time_text}\n",
xy=(max_x_time, max_xysw),
xycoords='data',
xytext=(max_x_time, max_xysw + 0.5),
textcoords='data',
fontsize=10,
color='red',
arrowprops=dict(facecolor='red'))
time_text = '\n'.join(min_x_time.split(' '))
ax.annotate(text=f"Min:{min_xysw:.2f}\n{time_text}\n",
xy=(min_x_time, min_xysw),
xycoords='data',
xytext=(min_x_time, min_xysw + 0.5),
textcoords='data',
fontsize=10,
color='green',
arrowprops=dict(facecolor='green'))
# 水位差
ax.annotate(
text=f" Δ{sysw_cur - xysw_cur:.2f}",
xy=(start_time, xysw_cur),
xycoords='data',
xytext=(1, 0.5),
textcoords='axes fraction',
fontsize=10,
color='black',
)
ax.set_title(_name)
ax.set_xlabel('近7天趋势图')
fig.savefig(f'{_name}.png', dpi=100)
plt.close(fig) # 关闭图形以释放资源
# results_queue.put(_name)
# return _name
with open(f'{_name}.png', 'rb') as f:
results_queue.put(f.read())
def merge_images(images, output_path, direction='horizontal'):
'''
合并多张图片
:param images: 图片列表
:param output_path: 输出路径
:param direction: 合并方向,'horizontal'或'vertical'
:return: 无
'''
# images: List[Image.Image], list of PIL images to be merged
# output_path: str, path to save the merged image
# direction: str, either 'horizontal' or 'vertical', indicating the merging direction
widths, heights = zip(*(i.size for i in images))
if direction == 'horizontal':
total_width = sum(widths)
max_height = max(heights)
new_im = Image.new('RGB', (total_width, max_height), color='white')
elif direction == 'vertical':
total_height = sum(heights)
max_width = max(widths)
new_im = Image.new('RGB', (max_width, total_height), color='white')
x_offset = 0
y_offset = 0
for im in images:
if direction == 'horizontal':
new_im.paste(im, (x_offset, y_offset))
x_offset += im.size[0]
else:
new_im.paste(im, (0, y_offset))
y_offset += im.size[1]
new_im.save(output_path)
img_bytes = BytesIO()
new_im.save(img_bytes, format='PNG')
img_bytes = img_bytes.getvalue()
return img_bytes
def getqs(_name):
'''
获取曲线图
:param _name: 站点名
:return: 曲线图
'''
name_list = get_gcname(_name)
name_list = [i[0] for i in name_list]
th_list = []
for name in name_list:
print(name)
th = Thread(target=get_trend, args=(name, ))
th.daemon = True
th.start()
th_list.append(th)
for th in th_list:
th.join()
results = [Image.open(BytesIO(results_queue.get())) for _ in th_list]
r = merge_images(results, 'result.png', direction='vertical')
return r
def getwx(_name, _type):
'''
获取工况
:param _name: 站点名
:return: 曲线图
'''
# name_list = get_gcname(_name)
print(_name)
try:
if int(_type) == 1:
with open(f'./{_name}.jpg', 'rb') as img:
return img.read()
except Exception as e:
print(e)
with open(f'./error.jpg', 'rb') as img:
return img.read()
# def getgq_1zhan():
# result = []
# conn = sqlite3.connect(db1_path)
# c = conn.cursor()
# sqltxt = "SELECT sysw,xysw,sqsw,ssll,pjll,time FROM swll ORDER BY time DESC LIMIT 1"
# rs = c.execute(sqltxt)
# rf = rs.fetchall()
# if rf:
# result.append(
# time.strftime(r"%y年%m月%d日 %H:%M:%S", time.localtime(rf[0][5])) +
# "\n")
# result.append(f'上游:{rf[0][0]}')
# result.append(f'下游:{rf[0][1]}')
# result.append(f'栅前:{rf[0][2]}')
# result.append(f'瞬时流量:{rf[0][3]}\n')
# # result.append( f'平均流量:{rf[0][4]}')
# for index, ename in enumerate(ename_list):
# sqltxt = f"""SELECT Uab,Ubc,Uca,Ia,Ib,Ic,P,Q,cos,time FROM dianyadianliu WHERE ename="{ename}" ORDER BY time DESC LIMIT 1"""
# rs = c.execute(sqltxt)
# rf = rs.fetchall()
# if rf:
# result.append(realname_list[index])
# result.append(f'Uab:{rf[0][0]}')
# result.append(f'Ubc:{rf[0][1]}')
# result.append(f'Uca:{rf[0][2]}')
# result.append(f'Ia:{rf[0][3]}')
# result.append(f'Ib:{rf[0][4]}')
# result.append(f'Ic:{rf[0][5]}')
# result.append(f'P:{rf[0][6]}')
# result.append(f'Q:{rf[0][7]}')
# result.append(f'cos:{rf[0][8]}\n')
# # result.append(f'time:{rf[0][9]}')
# # result.append(time.strftime(r"%y年%m月%d日 %H:%M:%S",time.localtime(rf[0][9]))+"\n")
# c.close()
# conn.close()
# return ' '.join(result)
# def getgq_2zhan():
# result = []
# conn = sqlite3.connect(db2_path)
# c = conn.cursor()
# sqltxt = "SELECT sysw,sqsw,shsw,yc,ldsw,zbwd1,zbwd2,yxts,zll, time FROM qzcs ORDER BY time DESC LIMIT 1"
# rs = c.execute(sqltxt)
# rf = rs.fetchall()
# if rf:
# result.append(
# time.strftime(r"%y年%m月%d日 %H:%M:%S", time.localtime(rf[0][9])) +
# "\n")
# result.append(f'上游: {rf[0][0]}')
# result.append(f'栅前: {rf[0][1]}')
# result.append(f'栅后: {rf[0][2]}')
# result.append(f'扬程: {rf[0][3]}')
# result.append(f'廊道: {rf[0][4]}')
# result.append(f'主变温度1: {rf[0][5]} ℃')
# result.append(f'主变温度2: {rf[0][6]} ℃')
# result.append(f'运行台数: {rf[0][7]}')
# result.append(f'总流量: {rf[0][8]}\n')
# # result.append( f'平均流量:{rf[0][4]}')
# sqltxt = "SELECT * FROM dydlpqcos WHERE time=(SELECT time FROM dydlpqcos WHERE Ia<>0 ORDER BY id DESC LIMIT 1) and Ia<>0"
# rs = c.execute(sqltxt)
# rf = rs.fetchall()
# if rf:
# for r in rf:
# result.append(realname_list2[r[1]])
# result.append(f'Ia:{r[2]}')
# result.append(f'Ic:{r[3]}')
# result.append(f'Uab:{r[4]}')
# result.append(f'Ubc:{r[5]}')
# result.append(f'Uca:{r[6]}')
# result.append(f'P:{r[7]}')
# result.append(f'Q:{r[8]}')
# if r[10]:
# result.append(f'cosφ:{r[9]}')
# result.append(f'lc1:{r[10]}')
# result.append(f'lc2:{r[11]}\n')
# else:
# result.append(f'cosφ:{r[9]}\n')
# c.close()
# conn.close()
# return ' '.join(result)
# # result.append(realname_list2[r[1]], r[2])
# # for index, ename in enumerate(ename_list):
# # sqltxt = f"""SELECT Uab,Ubc,Uca,Ia,Ib,Ic,P,Q,cos,time FROM dianyadianliu WHERE ename="{ename}" ORDER BY time DESC LIMIT 1"""
# # rs = c.execute(sqltxt)
# # rf = rs.fetchall()
# # if rf:
# # result.append(realname_list[index])
# # result.append(f'Uab:{rf[0][0]}')
# # result.append(f'Ubc:{rf[0][1]}')
# # result.append(f'Uca:{rf[0][2]}')
# # result.append(f'Ia:{rf[0][3]}')
# # result.append(f'Ib:{rf[0][4]}')
# # result.append(f'Ic:{rf[0][5]}')
# # result.append(f'P:{rf[0][6]}')
# # result.append(f'Q:{rf[0][7]}')
# # result.append(f'cos:{rf[0][8]}\n')
# # result.append(f'time:{rf[0][9]}')
# # result.append(time.strftime(r"%y年%m月%d日 %H:%M:%S",time.localtime(rf[0][9]))+"\n")
# # return ' '.join(result)
if __name__ == "__main__":
now = time.time()
getqs("刘涧")
print(time.time() - now)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。