代码拉取完成,页面将自动刷新
# 多线程
import threading
import keyboard
# loop1 解析mjpeg http流 https://blog.csdn.net/Barry_J/article/details/101280263
import requests
import numpy as np
import builtins
import urllib.request
# loop3 modelscope框架神经网络 https://modelscope.cn/models/damo/cv_convnext-base_image-classification_garbage/summary
from modelscope.models import Model
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
import time
# loop2 opencv
import cv2
from vis import vis_det_img, cv2AddChineseText
def loop1():
'''esp32cam image mjpeg'''
global img
r = requests.get(url+'cam.mjpeg', stream=True)
if(r.status_code == 200):
bytes = builtins.bytes()
for chunk in r.iter_content(chunk_size=1024):
bytes += chunk
a = bytes.find(b'\xff\xd8')
b = bytes.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = bytes[a:b+2]
bytes = bytes[b+2:]
img = cv2.imdecode(np.fromstring(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
if exit_flag:
break
else:
print("Received unexpected status code {}".format(r.status_code))
print('loop1 exit normally')
# def loop1():
# '''esp32cam image jpg'''
# global img
# while not exit_flag:
# try:
# imgResp=urllib.request.urlopen(url+'cam-hi.jpg')
# imgNp=np.array(bytearray(imgResp.read()),dtype=np.uint8)
# img=cv2.imdecode(imgNp,-1)
# except urllib.error.URLError as e:
# print(e.reason)
# pass
# print('loop1 exit normally')
def loop2():
'''opencv processing'''
while not exit_flag:
img_puttext = img.copy()
result_c_puttext = result_c.copy()
result_d_puttext = result_d.copy()
if model_sel == 'g' or model_sel == 'd' or model_sel == 'r':
'''image classification'''
if model_sel == 'g':
img_puttext = cv2AddChineseText(img_puttext, '垃圾分类', (4, 6), (0, 255, 0), 20)
elif model_sel == 'd':
img_puttext = cv2AddChineseText(img_puttext, '日常用品', (4, 6), (0, 255, 0), 20)
elif model_sel == 'r':
img_puttext = cv2AddChineseText(img_puttext, '万物识别', (4, 6), (0, 255, 0), 20)
for i in range(len(result_c_puttext['labels'])):
text = str(result_c_puttext['labels'][i]) + ' ' + str(result_c_puttext['scores'][i])
img_puttext = cv2AddChineseText(img_puttext, text, (4, 26 + 20 * i), (0, 255, 0), 20)
elif model_sel == 'o' or model_sel == 'h' or model_sel == 'f' or model_sel == 'p':
'''object detection'''
if model_sel == 'o':
img_puttext = cv2AddChineseText(img_puttext, '通用目标检测', (4, 6), (0, 255, 0), 20)
elif model_sel == 'h':
img_puttext = cv2AddChineseText(img_puttext, '人头检测', (4, 6), (0, 255, 0), 20)
elif model_sel == 'f':
img_puttext = cv2AddChineseText(img_puttext, '口罩检测', (4, 6), (0, 255, 0), 20)
elif model_sel == 'p':
img_puttext = cv2AddChineseText(img_puttext, '手机检测', (4, 6), (0, 255, 0), 20)
try:
img_puttext = vis_det_img(img_puttext, result_d_puttext)
except cv2.error:
pass
cv2.imshow('esp32cam-test',img_puttext)
if ord('q')==cv2.waitKey(10):
break
cv2.destroyAllWindows()
print('loop2 exit normally')
def loop3():
'''modelscope processing'''
global result_c
global result_d
while not exit_flag:
if model_sel == 'g' or model_sel == 'd' or model_sel == 'r':
'''image classification'''
if model_sel == 'g':
result_c = garbage_classification(img)
elif model_sel == 'd':
result_c = dailylife_classification(img)
elif model_sel == 'r':
result_c = general_recognition(img)
print(result_c)
elif model_sel == 'o' or model_sel == 'h' or model_sel == 'f' or model_sel == 'p':
'''object detection'''
if model_sel == 'o':
result_d = object_detection(img)
elif model_sel == 'h':
result_d = head_detection(img)
elif model_sel == 'f':
result_d = facemask_detection(img)
elif model_sel == 'p':
result_d = phone_detection(img)
print(result_d)
print('loop3 exit normally')
def keyboard_callback(x):
global model_sel
print(x)
if x.name == 'g':
print('garbage_classification')
model_sel = 'g'
elif x.name == 'd':
print('dailylife_classification')
model_sel = 'd'
elif x.name == 'r':
print('general_recognition')
model_sel = 'r'
elif x.name == 'o':
print('object_detection')
model_sel = 'o'
elif x.name == 'h':
print('head_detection')
model_sel = 'h'
elif x.name == 'f':
print('facemask_detection')
model_sel = 'f'
elif x.name == 'p':
print('phone_detection')
model_sel = 'p'
if __name__ == '__main__':
'''loop1 initialization'''
# 初始化esp32cam url
url='http://192.168.216.148/' # 改成串口收到的ESP32-CAM内网url,检查ESP32-CAM和上位机是不是在一个局域网网段上。
# 初始化global img变量,存储esp32cam捕获到的图片(opencv格式)
img = np.zeros((600, 800, 3), np.uint8)
'''loop3 initialization'''
# 导入模型和pipeline(loop3)
'''image classification'''
garbage_classification = pipeline(Tasks.image_classification, model='damo/cv_convnext-base_image-classification_garbage')
dailylife_classification = pipeline(Tasks.image_classification, model='damo/cv_vit-base_image-classification_Dailylife-labels')
general_recognition = pipeline(Tasks.general_recognition, model='damo/cv_resnest101_general_recognition')
'''object detection'''
object_detection = pipeline(Tasks.image_object_detection,model='damo/cv_tinynas_object-detection_damoyolo')
head_detection = pipeline(Tasks.domain_specific_object_detection, model='damo/cv_tinynas_head-detection_damoyolo')
facemask_detection = pipeline(Tasks.domain_specific_object_detection, model='damo/cv_tinynas_object-detection_damoyolo_facemask')
phone_detection = pipeline(Tasks.domain_specific_object_detection, model='damo/cv_tinynas_object-detection_damoyolo_phone')
# 初始化global result_c和global result_d变量,用于存储模型推理的结果(loop3)
result_c = {'scores': [0,0,0,0,0], 'labels': ['','','','','']}
result_d = {'scores': [], 'labels': [], 'boxes': []}
'''keyboard_callback initialization'''
# 初始化global model_sel变量,用于存储模型选择的标识符(keyboard_callback)
model_sel = 'g'
# 创建按键监听线程,按下任何按键时(包括长按),都会调用keyboard_callback,并传入键盘事件作为参量
keyboard.on_press(keyboard_callback)
# 初始化控制线程正常退出的标志位
exit_flag = False
# 创建线程
t1 = threading.Thread(target=loop1)
t2 = threading.Thread(target=loop2)
t3 = threading.Thread(target=loop3)
# 启动线程
t1.start()
t2.start()
t3.start()
# 等待用户输入 q (阻塞等待)
keyboard.wait('q')
# 如果用户输入 q 则置标志位为True,告知线程退出循环
exit_flag = True
# 等待线程结束
t1.join()
t2.join()
t3.join()
print('main thread exit normally')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。