代码拉取完成,页面将自动刷新
'''
Author: duliang [email protected]
Date: 2024-06-05 19:59:59
LastEditors: duliang [email protected]
LastEditTime: 2024-07-20 00:00:47
FilePath: mqttser.py
Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
'''
from paho.mqtt import client as mqtt
import time
# import sqlite3
from threading import Thread
class MqttClient(object):
def __init__(self, clientname="lljserver"):
self.MQTTHOST = "127.0.0.1" # MQTT服务器地址
self.MQTTPORT = 1883 # MQTT端口
self.clientname = clientname
self.mqttClient = self.mqtt_connect()
# self.on_subscribe()
def on_connect(self, client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = [
"连接成功", "协议版本不正确", "客户端标识符无效", "服务器不可用", "用户名或密码不正确", "未经授权"
]
print("mqtt connect:", rc_status[rc])
def on_message(self, client, userdata, msg):
"""一旦订阅到消息, 回调此方法"""
new_msg = str(msg.payload.decode('utf8'))
# print("主题:" + msg.topic + " 消息:" + new_msg)
def mqtt_connect(self):
"""连接MQTT服务器"""
# mqttClient = mqtt.Client(str(uuid.uuid4()))
mqttClient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,
self.clientname)
mqttClient.on_connect = self.on_connect # 返回连接状态的回调函数
mqttClient.on_message = self.on_message # 返回订阅消息回调函数
mqttClient.username_pw_set("llj", "llj123") # mqtt服务器账号密码
mqttClient.connect(self.MQTTHOST, self.MQTTPORT, 60)
mqttClient.loop_start() # 启用线程连接
return mqttClient
def on_subscribe(self, subTopic_list):
"""订阅主题:gate subscribe([("my/topic", 0), ("another/topic", 2)])"""
self.mqttClient.subscribe(subTopic_list)
# mqttClient.loop_forever()
# while True:
# pass
def mqtt_publish(self, topic, content):
"""
判断发布的消息,并转发
"""
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.mqttClient.publish(topic, f"{now_time}:{content}", 2)
if topic == 'warning':
if content == 'None':
# 为None的时候为调试
self.mqttClient.publish("get", now_time)
elif content == 'test':
# test
self.mqttClient.publish("test", now_time)
elif content == '0':
# 为0的时候为1站大门
self.mqttClient.publish("s1gate", 1)
elif content == '1':
# 为1的时候为2站大门
self.mqttClient.publish("s2gate", 1)
# print("publish:", text)
def mqtt_run(clientname, subTopic_list):
# global publish_flag
while 1:
try:
myMqttClient = MqttClient(clientname)
th = Thread(target=myMqttClient.on_subscribe,
args=(subTopic_list, ),
daemon=True)
th.start()
th.join()
break
except Exception as e:
print(e)
time.sleep(1)
if __name__ == '__main__':
'''
'''
mqtt_run("lljserver")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。