1 Star 0 Fork 0

杜亮/dul文字识别

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
mqttser.py 3.41 KB
一键复制 编辑 原始数据 按行查看 历史
杜亮 提交于 2024-07-20 00:02 . 11
'''
Author: duliang [email protected]
Date: 2024-06-05 19:59:59
LastEditors: duliang [email protected]
LastEditTime: 2024-07-20 00:00:47
FilePath: mqttser.py
Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
'''
from paho.mqtt import client as mqtt
import time
# import sqlite3
from threading import Thread
class MqttClient(object):
def __init__(self, clientname="lljserver"):
self.MQTTHOST = "127.0.0.1" # MQTT服务器地址
self.MQTTPORT = 1883 # MQTT端口
self.clientname = clientname
self.mqttClient = self.mqtt_connect()
# self.on_subscribe()
def on_connect(self, client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = [
"连接成功", "协议版本不正确", "客户端标识符无效", "服务器不可用", "用户名或密码不正确", "未经授权"
]
print("mqtt connect:", rc_status[rc])
def on_message(self, client, userdata, msg):
"""一旦订阅到消息, 回调此方法"""
new_msg = str(msg.payload.decode('utf8'))
# print("主题:" + msg.topic + " 消息:" + new_msg)
def mqtt_connect(self):
"""连接MQTT服务器"""
# mqttClient = mqtt.Client(str(uuid.uuid4()))
mqttClient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,
self.clientname)
mqttClient.on_connect = self.on_connect # 返回连接状态的回调函数
mqttClient.on_message = self.on_message # 返回订阅消息回调函数
mqttClient.username_pw_set("llj", "llj123") # mqtt服务器账号密码
mqttClient.connect(self.MQTTHOST, self.MQTTPORT, 60)
mqttClient.loop_start() # 启用线程连接
return mqttClient
def on_subscribe(self, subTopic_list):
"""订阅主题:gate subscribe([("my/topic", 0), ("another/topic", 2)])"""
self.mqttClient.subscribe(subTopic_list)
# mqttClient.loop_forever()
# while True:
# pass
def mqtt_publish(self, topic, content):
"""
判断发布的消息,并转发
"""
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.mqttClient.publish(topic, f"{now_time}:{content}", 2)
if topic == 'warning':
if content == 'None':
# 为None的时候为调试
self.mqttClient.publish("get", now_time)
elif content == 'test':
# test
self.mqttClient.publish("test", now_time)
elif content == '0':
# 为0的时候为1站大门
self.mqttClient.publish("s1gate", 1)
elif content == '1':
# 为1的时候为2站大门
self.mqttClient.publish("s2gate", 1)
# print("publish:", text)
def mqtt_run(clientname, subTopic_list):
# global publish_flag
while 1:
try:
myMqttClient = MqttClient(clientname)
th = Thread(target=myMqttClient.on_subscribe,
args=(subTopic_list, ),
daemon=True)
th.start()
th.join()
break
except Exception as e:
print(e)
time.sleep(1)
if __name__ == '__main__':
'''
'''
mqtt_run("lljserver")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/dulht/dul-text-recognition.git
[email protected]:dulht/dul-text-recognition.git
dulht
dul-text-recognition
dul文字识别
master

搜索帮助