1 Star 0 Fork 0

Linwangles/yolov5

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main_exe_leave.py 5.23 KB
一键复制 编辑 原始数据 按行查看 历史
Linwangles 提交于 2023-10-27 11:36 . mmmiddleware version
import cv2
import time
import datetime
import base64
import json
import requests
import threading
from args import make_parser
from ByteTrack.yolox.tracking_utils.timer import Timer
from ByteTrack.yolox.tracker.byte_tracker import BYTETracker
from ByteTrack.yolox.utils.visualize import plot_tracking
from yolov5 import *
from flask import Flask, request, jsonify
app = Flask(__name__)
start_post = True
@app.route('/leaveAlarmControl', methods=['POST'])
def receive_json():
global start_post
try:
data = request.get_json()
start_post = data['start_post']
print("start post changed: ", start_post)
return jsonify({"msg_head":"leave_alarm_control_response",
"status":"succ",
"start_post":start_post
})
except Exception as e:
return jsonify({"msg_head":"leave_alarm_control_response",
"status":"failed",
"start_post":start_post,
"code": str(e)})
def main_detect(args, source, post_url):
tracker = BYTETracker(args, frame_rate=30)
timer = Timer()
frame_id = 0
results = []
###
cap = cv2.VideoCapture(source)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) # float
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) # float
###
# detector = yolov5("yolov5l.pt", )
detector = yolov5("yolov5l.pt", classes=0)
# rect = [(100,100),(200,100), (200,200),(100,200)]
x1, y1 = (300, 300)
x2, y2 = (900, 900)
rect = [(x1, y1), (x2, y1), (x2,y2), (x1, y2)]
timestamp = 0
global start_post
while True:
isEmpty = True
t0 = time.time()
diff = t0 - timestamp
# print("t0: ",t0, " , timestamp: ", timestamp, " diff: ", diff)
# print("start post changed--->: ", start_post)
ret_val, frame = cap.read()
if not ret_val:
print("leave detecting read frame error, retring...")
cv2.destroyAllWindows()
cap = cv2.VideoCapture(source)
time.sleep(5)
continue
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), thickness=1)
dets = detector.run(frame)
if len(dets) > 0:
online_targets = tracker.update(dets,(1080,1920),(1080,1920))
# print(online_targets)
online_tlwhs = []
online_ids = []
online_scores = []
online_cls = []
for t in online_targets:
tlwh = t.tlwh
tid = t.track_id
cls_ = t.cls
vertical = tlwh[2] / tlwh[3] > args.aspect_ratio_thresh
if tlwh[2] * tlwh[3] > args.min_box_area and not vertical:
online_tlwhs.append(tlwh)
online_ids.append(tid)
online_scores.append(t.score)
online_cls.append(t.cls)
results.append(f"{frame_id},{tid},{tlwh[0]:.2f},{tlwh[1]:.2f},{tlwh[2]:.2f},{tlwh[3]:.2f},{t.score:.2f},-1,-1,-1\n")
for box, id in zip(online_tlwhs, online_ids):
point = (box[0] + box[2]/2, box[1] + box[3]/3)
# distance = cv2.pointPolygonTest(rect, point, False)
if x1 < point[0] < x2 and y1 < point[1] < y2:
# print(id , "in rect")
isEmpty = False
timer.toc()
online_im = plot_tracking(frame, online_tlwhs, online_ids, online_cls, frame_id=frame_id + 1, fps=1. / timer.average_time)
else:
online_im = frame
if isEmpty and abs(diff) >= 20 and start_post:
timestamp = int(time.time())
current_time = datetime.datetime.now()
corp_img,_,_ = letterbox(frame[y1:y2, x1:x2],(320,320),)
# cv2.imshow('corp', corp_img)
# cv2.waitKey(0)
_, img_byte_array = cv2.imencode(".jpg", corp_img)
b64img = base64.b64encode(img_byte_array).decode()
formatted_time = current_time.strftime("%Y%m%d%H%M%S")
data = {
'msg_head':'leave_area_warning',
'area_id':1,
"area_name":"office",
"leave_time":formatted_time,
"b64_img":"data:image/jpeg;base64," + b64img
}
headers = {"Content-Type":"application/json",
"Content-Encoding":"gzip, deflate, br"}
try:
response = requests.post(post_url, headers=headers, data=json.dumps(data))
except Exception as e:
print("post failed, error code: ", e)
# print("empty in rect", data)
cv2.imshow("leave_track", online_im)
cv2.waitKey(1)
def run_flask():
app.run(debug=False, host="192.168.32.126", port=65004)
if __name__ == '__main__':
args = make_parser().parse_args()
# source = "../ometer.mp4"
source = "rtsp://117.119.65.140:1554/live/test7"
post_url = "https://4e6t356834.eicp.vip/Xkan/LeavingAlarm"
# main(args, source, post_url)
work_thread = threading.Thread(target=main_detect, args=(args, source, post_url))
work_thread.start()
run_flask()
# flask_thread = threading.Thread(target=run_flask)
# flask_thread.start()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/linwangles/yolov5.git
[email protected]:linwangles/yolov5.git
linwangles
yolov5
yolov5
master

搜索帮助