1 Star 7 Fork 3

mo-shan/rpc_for_process_monitor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rpc.py 17.98 KB
一键复制 编辑 原始数据 按行查看 历史
mo-shan 提交于 2023-05-17 09:55 . update:打印错误日志
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author : moshan
# Version : 1.0
# Created Time: 2021-02-21 10:40:02
# Function : 主程序
#########################################################################
import sys, time, threading, json, random
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
from optparse import OptionParser
from lib.Config import *
from lib.ConnectMySQL import *
from lib.Public import *
from lib.Monitor import *
from lib.globalVar import gloVar
def f_monitor(Alert_DB) :
alert_tmp = {}
alert_list = {}
alert_mark = 0
for d_type in alert_data_dir_rule.keys() :
Alert_DB, alert_tmp = f_monitor_data_dir(Alert_DB, alert_data_dir_rule[d_type], d_type)
alert_list.update(alert_tmp)
alert_mark = len(alert_tmp)
Alert_DB, alert_tmp = f_monitor_host(Alert_DB, alert_host_rule)
alert_list.update(alert_tmp)
if alert_mark == 0 : alert_mark = len(alert_tmp)
Alert_DB, alert_tmp = f_monitor_alert_ok(Alert_DB)
alert_list.update(alert_tmp)
if alert_mark == 0 : alert_mark = len(alert_tmp)
alert_info = ""
for alert_name in alert_list.keys() :
alert_info += ";" + alert_name + ":;"
for alert_info_tmp in alert_list[alert_name] :
alert_info += alert_info_tmp + ";"
alert_mark = 1
if alert_mark > 0 :
f_send_alert_to_bot(alert_info)
return Alert_DB
def f_manager_monitor():
"""监控-后台进程"""
Alert_DB = f_connect_mysql(60, 5)
while True :
sql = """insert into tb_monitor_alert_info(rshost,istate,a_time,n_time,remarks) values('f_manager_monitor',0,now(),now(),'管理监控的任务')
on duplicate key update a_time=now(),n_time = now();"""
status, data, Alert_DB = f_query_mysql(Alert_DB, "insert", {"sql" : sql})
start_time = int(str(time.time()).split(".")[0]) #记录结束时间
if alert == "true" : Alert_DB = f_monitor(Alert_DB)
end_time = int(str(time.time()).split(".")[0]) #记录结束时间
if int(t_interval) > (end_time - start_time) :
time.sleep(int(t_interval) * 2 - (end_time - start_time)) #根据采集周期进行sleep, 这里用了一个随机数是避免所有client同时上报
return 0
# 调用函数
def f_init_client(DB):
sql = """insert into tb_monitor_alert_info(rshost,istate,a_time,n_time,remarks) values('f_manager_client',0,now(),now(),'管理客户端的任务')
on duplicate key update a_time=now(),n_time = now();"""
status, data, DB = f_query_mysql(DB, "insert", {"sql" : sql})
DB = f_manager_node_status(DB,options)
DB = f_delete_monitor_data(DB)
return DB
def f_manager_client():
DB = f_connect_mysql()
sql = "select version from " + t_version_info + " order by id desc limit 1;"
status, data, DB = f_query_mysql(DB, "select", {"sql" : sql}) #如果表里存储的版本号跟配置文件的不一样就会更新client代码
update_mark = ""
if data is None or len(data) == 0:
update_mark = "true"
else :
for v in data :
if v[0] != version : update_mark = "true"
if update_mark == "true" : #如果需要升级就将所有监控client状态改为1, 且记录最新的版本号
f_write_log(log_opt = "INFO", log = "[ 即将进行版本升级 ]", log_file = log_file)
sql = {}
sql["update"] = "update " + t_host_config + " set istate = 1 where istate = 2;"
sql["insert"] = "insert into " + t_version_info + " (version,a_time) select '" + version + "', now();"
status, data, DB = f_query_mysql(DB, "insert", sql)
if (status != 0) : f_write_log(log_opt = "ERROR", log = "[ 版本号更新失败 ] [ " + "\n".join(sql.values()) + " ] ", log_file = log_file)
while True :
try :
DB = f_init_client(DB)
except Exception as e :
f_write_log(log_opt = "ERROR", log = "[ client节点状态巡检遇到错误 ] [ " + str(e) + " ] ", log_file = log_file)
time.sleep(int(t_interval) * 2)
def f_rpc_func(data):
"""
监控数据处理逻辑
"""
global DB
status = 0
insert_sql = {}
io_time = ""
if "HOST" in data.keys():
cpu_json = json.dumps(data["cpu"])
mem_json = json.dumps(data["mem"])
disk_json = json.dumps(data["disk"])
io_time = data["io"]["time"]
data["io"].pop("time")
io_json = json.dumps(data["io"])
net_time = data["net"]["time"]
data["net"].pop("time")
net_json = json.dumps(data["net"])
insert_sql["host"] = """insert into """ + t_host_info + """(id,rshost,cpu_info,mem_info,io_info,net,a_time)
value(0,'""" + data["HOST"] + """','""" + cpu_json + """','""" + mem_json + """','""" + io_json + """',
'""" + net_json + """','""" + net_time + """');"""
for tmp in data["disk"].keys():
disk_json = json.dumps(data["disk"][tmp])
insert_sql[tmp] = """insert into """ + t_disk_info + """(id,rshost,part,disk_info,a_time)
value(0,'""" + data["HOST"] + """','""" + tmp + """','""" + disk_json + """','""" + net_time + """');"""
if "util_qu" not in data.keys():
data["util_qu"] = {}
for qu_info in data["util_qu"].keys():
insert_sql["util_qu" + data["HOST"] + qu_info] = """insert into """ + t_util_qu_info + """(id,rshost,dev,avgqu,util,a_time)
value(0,'""" + data["HOST"] + """','""" + qu_info + """','""" + data["util_qu"][qu_info]["avgqu-sz"] + """',
'""" + data["util_qu"][qu_info]["util"] + """','""" + net_time + """');"""
for dir in data["dir"].keys() :
if dir == "time" : continue
for dir_tmp in data["dir"][dir].keys() :
insert_sql[dir_tmp] = """insert into """ + t_dir_size_info + """(id, rshost,dir, size, a_time)
value(0,'""" + data["HOST"] + """','""" + dir_tmp + """','""" + data["dir"][dir][dir_tmp] + """','""" + data["dir"]["time"] + """');"""
for iftop in data["iftop"].keys():
if iftop == "time" : continue
for remote_info in data["iftop"][iftop] :
remote = remote_info["remote"]
in_info = remote_info["in"]
out_info = remote_info["out"]
remote_info.pop("remote")
info_json = json.dumps(remote_info)
in_info = f_return_value(in_info)
out_info = f_return_value(out_info)
if in_info < r_net and out_info < r_net : continue #只记录大于10MB的流量
insert_sql[iftop+remote] = """insert into """ + t_port_net_info + """(id, rsinfo,remote, in_info, out_info, a_time)
value(0,'""" + iftop + """','""" + remote + """','""" + str(in_info) + """','""" + str(out_info) + """','""" + data["iftop"]["time"] + """');"""
tmp_index = 0
for port_info in data["port"].keys():
remarks = data["port"][port_info]["remarks"].strip()
cpu_info = data["port"][port_info]["cpu"]
mem_info = data["port"][port_info]["mem"]
io_r = data["port"][port_info]["io_r"]
io_w = data["port"][port_info]["io_w"]
io_r = f_return_value(io_r)
io_w = f_return_value(io_w)
mem_info = mem_info.split("GB")[0]
cpu_info = cpu_info.split("%")[0]
if (io_r < r_io and io_w < r_io) and float(mem_info) < r_mem and float(cpu_info) < r_cpu : continue #只记录大于10MB的, 或者内存大于10GB, CPU大于200%
data["port"][port_info].pop("remarks")
io_json = json.dumps(data["port"][port_info])
insert_sql[port_info] = """insert into """ + t_process_io_info + """(id,rshost,rsport,cpu,mem,io_r,io_w,a_time,md5_str,remarks)
value(0,'""" + data["HOST"] + """','""" + port_info + """','""" + cpu_info + """','""" + mem_info + """'
,'""" + str(io_r) + """','""" + str(io_w) + """','""" + io_time + """',MD5('""" + remarks + """'),'');"""
insert_sql[tmp_index] = """replace into """ + t_process_info + """(id,md5_str,remarks)
value(0,MD5('""" + remarks + """'),'""" + remarks + """');"""
tmp_index += 1
data_json = json.dumps(data, indent=4,ensure_ascii=False, sort_keys=False,separators=(',', ':'))
#print(data_json)
status, _, DB = f_query_mysql(DB, "insert", insert_sql)
if (status != 0) :
f_write_log(log_opt = "ERROR", log = "[ 监控数据写入失败 ] [ " + data["HOST"] + " ] [ " + "\n".join(insert_sql.values()) + " ] ", log_file = log_file)
data_json = json.dumps(insert_sql, indent=4,ensure_ascii=False, sort_keys=False,separators=(',', ':'))
#print(data_json)
insert_sql = {}
daemon_info = []
for item_rshost in daemon : #检查监控线程是否正常工作
sql = "select rshost,(select rshost from " + t_alert_info + " where rshost = '" + item_rshost + """' and istate = 0
and n_time < now()) from """ + t_alert_info + " where rshost = '" + item_rshost + "' and istate = 0 and a_time < date_add(now(), INTERVAL - 5 MINUTE);"
status, check_data, DB = f_query_mysql(DB, "select", {"sql" : sql})
if check_data is None or len(check_data) == 0 or check_data[0][1] is None :
pass
else :
daemon_info.append(item_rshost + "任务异常")
replace_sql = "replace into " + t_alert_info + "(rshost,istate,a_time,n_time) select '" + item_rshost + "',0,now(),date_add(now(), INTERVAL + 5 MINUTE);"
status, check_data, DB = f_query_mysql(DB, "insert", {"sql" : replace_sql})
if len(daemon_info) != 0 :
f_send_alert_to_bot("后台监控任务异常 : " + ",".join(daemon_info))
return status, version, data["dir"]
def f_rpc_server(port) :
server = SimpleXMLRPCServer(('0.0.0.0', int(port))) # 初始化
server.register_function(f_rpc_func, "f_rpc_func") # 注册get_string函数
f_write_log(log_opt = "INFO", log = "[ V" + version + " Listening for '0.0.0.0:" + str(port) + "' ]", log_file = log_file)
server.serve_forever() # 保持等待调用状态
def f_rpc_client(server_list, client, port) :
if not os.path.exists(script_dir + "/state/state.log"):
os.system(r"touch {}".format(script_dir + "/state/state.log"))
server_mark = 0 #server状态标志, 如果连续三次异常就退出程序
get_dir_size_mark = {} #采集目录大小标志变量, 每小时就采集一次
while True :
start_time = int(str(time.time()).split(".")[0]) #记录开始时间
gloVar.res_disk = {} #避免历史数据干扰, 每次清空
gloVar.res_mem = {}
gloVar.res_cpu = {}
gloVar.res_net = {}
gloVar.res_iftop = {}
gloVar.res_iotop = {}
gloVar.res_util_qu = {} #避免历史数据干扰, 每次清空
with open(script_dir + "/state/state.log", 'r') as f :
mark = f.readline()
if "stop" in mark :
sys.exit()
res = {}
now_time = f_get_time()
# 并发采集数据
thread_list = []
t1= threading.Thread(target=f_monitor_for_disk)
thread_list.append(t1)
t2= threading.Thread(target=f_monitor_for_mem)
thread_list.append(t2)
t3= threading.Thread(target=f_monitor_for_cpu)
thread_list.append(t3)
t4= threading.Thread(target=f_monitor_for_net, args=(client,))
thread_list.append(t4)
t5= threading.Thread(target=f_monitor_for_iftop)
thread_list.append(t5)
t6= threading.Thread(target=f_monitor_for_iotop)
thread_list.append(t6)
t7= threading.Thread(target=f_monitor_for_util_qu)
thread_list.append(t7)
if now_time["log"].split(":")[0] not in get_dir_size_mark.keys() :
get_dir_size_mark = {}
get_dir_size_mark[now_time["log"].split(":")[0]] = "true"
t8= threading.Thread(target=f_monitor_for_dir_size, args=(data_dir,))
t8.setDaemon(True) # 设置为守护线程,主线程结束一并回收
t8.start()
#thread_list.append(t8)
for t in thread_list:
t.setDaemon(True) # 设置为守护线程,主线程结束一并回收
t.start()
for t in thread_list:
t.join() # 子线程全部加入,主线程等所有子线程运行完毕
res = {
"HOST" : client,
"disk" : gloVar.res_disk,
"util_qu" : gloVar.res_util_qu,
"mem" : gloVar.res_mem,
"cpu" : gloVar.res_cpu,
"net" : gloVar.res_net,
"iftop" : {},
"io" : {},
"port" : gloVar.res_iotop,
"dir" : gloVar.res_dir_size
}
iftop = gloVar.res_iftop
res["iftop"]["time"] = iftop["time"]
for tmp in iftop["iftop"].split("\n") : #处理iftop结果
r = []
for i in tmp.split(" "):
if len(i) > 0 : r.append(i)
if len(r) == 0 : continue
if r[0] not in res["iftop"].keys(): res["iftop"][r[0]] = []
res["iftop"][r[0]].append({"remote":r[2],"out":r[1],"in":r[3]})
#d = json.dumps(res["iftop"], indent=4,ensure_ascii=False, sort_keys=False,separators=(',', ':'))
res["io"] = {
"Total" : res["port"]["Total"],
"Actual" : res["port"]["Actual"],
"time" : res["port"]["time"]
}
res["port"].pop("Total")
res["port"].pop("Actual")
res["port"].pop("time")
#data = json.dumps(res, indent=4,ensure_ascii=False, sort_keys=False,separators=(',', ':'))
#print(res["dir"])
mark = 1
for ser in server_list.split(",") : #多个sesrver的话会选择一个正常的进行上报
try :
server = ServerProxy("http://" + ser + ":" + str(port)) # 初始化服务器
status, ver, res_for_dir_size = server.f_rpc_func(res) #返回一个状态码, 版本号, 版本号是用来判断server端的版本跟当前版本是否一致, 不一致就自动升级
if (ver != version and status == 0) :
f_write_log(log_opt = "INFO", log = "[ 需要升级 : " + version + " -> " + ver + " ]", log_file = log_file)
sys.exit(-1)
if (status != 0) :
f_write_log(log_opt = "ERROR", log = "[ 监控入库失败 ]", log_file = log_file)
mark = 0
if len(res_for_dir_size) > 0 : gloVar.res_dir_size = {}
except Exception as e: #server都异常就报警, 并记录一下避免所有的client都同时告警
if server_mark < 3 : continue #server连续三次都异常就退出程序
sql = "select rshost from " + t_alert_info + " where istate = 1 and a_time > date_add(now(), INTERVAL - 5 MINUTE) and rshost = '" + ser + "'"
DB = f_connect_mysql()
status, data, DB = f_query_mysql(DB, "select", {"sql" : sql})
if data is None or len(data) == 0:
f_send_alert_to_bot(client + " 报告, 进程监控, server 端连接异常 : " + ser + ":" + str(port) + "[" + str(e) + "]")
replace_sql = "replace into " + t_alert_info + "(rshost,istate,a_time,remarks) select '" + ser + "', 1,now(),'server 端连接异常';"
status, data, DB = f_query_mysql(DB, "insert", {"sql" : replace_sql})
if (status != 0) :
f_write_log(log_opt = "ERROR", log = "[ 告警状态更新失败 ] [ " + "\n".join(replace_sql.values()) + " ] ", log_file = log_file)
f_close_connection(DB)
if mark == 0 :
server_mark = 0
else :
server_mark += 1
if server_mark > 3 : #server连续三次都异常就退出程序
sys.exit(-1)
end_time = int(str(time.time()).split(".")[0]) #记录结束时间
if int(t_interval) - 5 + random.randint(1,5) > (end_time - start_time) :
time.sleep(int(t_interval) - 5 + random.randint(1,5) - (end_time - start_time)) #根据采集周期进行sleep, 这里用了一个随机数是避免所有client同时上报
def _args_parse():
opt = OptionParser(usage = "it's usage tip.", version = "%prog " + version, description = "")
opt.add_option('-C', '--client', dest="client_host", type="string", help = u'rpc client的地址(本机地址)')
opt.add_option('-S', '--server', dest="server_host", type="string", help = u'rpc server的地址,多个server逗号隔开')
opt.add_option('-R', '--role', dest="rpc_role", type="string", help = u'rpc 角色(client|server)')
opt.add_option('-P', '--port', dest="rpc_port", type="int", help = u'rpc监听端口')
options,args = opt.parse_args()
mark = 0
if options.rpc_role is None :
mark = 1
elif options.rpc_port is None :
mark = 1
elif options.rpc_role == "client" and (options.client_host is None or options.server_host is None):
mark = 1
if (mark == 1) :
print (opt.print_help())
sys.exit(-1)
return options,args
if __name__ == '__main__':
options,args = _args_parse()
if options.rpc_role == "server" : #每次启动都会判断是否更新client的代码
DB = f_connect_mysql()
t_init = threading.Thread(target = f_manager_client)
t_init.setDaemon(True) # 设置为守护线程,主线程结束一并回收
t_init.start()
t_init = threading.Thread(target = f_manager_monitor)
t_init.setDaemon(True) # 设置为守护线程,主线程结束一并回收
t_init.start()
f_rpc_server(options.rpc_port)
elif options.rpc_role == "client" :
f_rpc_client(options.server_host, options.client_host, options.rpc_port)
else:
opt = OptionParser(usage = "it's usage tip.", version = "%prog " + version, description = "")
print (opt.print_help())
sys.exit(-1)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mo-shan/rpc_for_process_monitor.git
[email protected]:mo-shan/rpc_for_process_monitor.git
mo-shan
rpc_for_process_monitor
rpc_for_process_monitor
master

搜索帮助