4 Star 23 Fork 6

babakara/controlPy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
simulator_sac_head_tracking.py 5.11 KB
一键复制 编辑 原始数据 按行查看 历史
babakara 提交于 2024-08-26 07:28 . 123
# 仿真器入口 用于训练强化学习 跟踪算法用的简化版本仿真器
# 由于没有GUI界面
# 航控需要通过 位置偏差 和 航向偏差 等指标查看控制算法是否有效
from dynamics_simulation import motionSim
import time
import pandas
import numpy as np
import random
import matplotlib.pyplot as plt
import socket
import json
from navigation import gpsAndUtm
from collections import namedtuple, OrderedDict
import traceback
# 解析控制参数
def decode_data(data):
# load json
recv_obser = json.loads(data)
# print(recv_obser)
delta = recv_obser['rudl']
rsp = recv_obser['rspl']
return delta, rsp
# 打包船的状态返回
def encode_data(trans):
# 按照标准接口编写
data = json.dumps(trans, sort_keys=True, indent=4, separators=(',', ':'))
return data.encode('utf-8')
# 计算奖励
def get_reward(exp_angle, psi, dpsi, delta):
# DDPG高频大舵角 得分高 但是这是执行机构做不到的 因此增加累积舵角惩罚
score = -((0.01*(exp_angle - psi)**2) + 0.1 *(dpsi**2) + 0.01 *(delta**2))
if abs(exp_angle - psi) < 3:
score = score + 5
elif abs(exp_angle - psi) < 1:
score = score + 10
print(f'score is {score}')
return score
# test motion sim
if __name__ == '__main__':
boat_data = dict()
# 与航行控制软件连接TCP
LOCAL_HOST = "127.0.0.1"
LOCAL_PORT = 10905
socket_server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
socket_server.bind((LOCAL_HOST, LOCAL_PORT))
socket_server.listen()
conn, address = socket_server.accept()
print(f"收到了客户端的连接,客户端信息是 {address}")
all_sim_time = 100
sim_time = 0
delta_h = 0.1
# 非线性kt参数
K=3.1429
T=18.348
deltam=6.8207e-2
alpha=6.8595e-3
# boat运动仿真类输入参数
my_mmg = motionSim.BoatMotionSim(delta_h=delta_h, K=K, T=T, deltam=deltam, alpha=alpha)
try:
exp_angle = 90
x = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])# 初始无人艇位置
u = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])# 初始无人艇速度
init_lon = 120.30754612708418 # 初始经度
init_lat = 31.49450233916176 # 初始纬度
while True:
sim_time += delta_h
# 接收转速和舵角
info, address = conn.recvfrom(1024)
# print(f' recv tcp client data: {info}')
delta, rsp = decode_data(info)
print(f'delta is {delta}, rsp is {rsp}')
# 计算更新 位置 速度
x, u = my_mmg.runKt(x=x, u=u, delta=delta, f=rsp)
local = gpsAndUtm.get_utm(init_lon, init_lat)
[lon, lat] = gpsAndUtm.get_wgs84(east=(local[0] + x[1]),
north=(local[1] + x[0]),
zone_number=local[2],
zone_letter=local[3])
# 更新状态
boat_data['boatID'] = 0
boat_data['boatType'] = 0
boat_data['lat'] = lat
boat_data['lon'] = lon
boat_data['height'] = x[2]
boat_data['yaw'] = x[5]
boat_data['collision'] = 0
boat_data['CollisionType'] = 0
boat_data['winda'] = my_mmg.wind.someWind.Vr
boat_data['winds'] = my_mmg.wind.someWind.beta_w
boat_data['wavea'] = my_mmg.wave.chi
boat_data['waveh'] = my_mmg.wave.wave_Hs
boat_data['currenta'] = 0.0
boat_data['currents'] = 0.0
boat_data['headDirection'] = x[5]
boat_data['pitch'] = x[4]
boat_data['roll'] = x[3]
boat_data['bearspeed'] = 0.0
boat_data['speedtowater'] = 0.0
boat_data['breartowater'] = 0.0
boat_data['speedtogrand'] = np.sqrt(u[0]**2 + u[1]**2)
boat_data['eastgrand'] = u[0]
boat_data['northgrand'] = u[1]
# 为强化学习用
observation=dict()
boat_data['reward'] = get_reward(exp_angle, x[5], u[5], delta)
observation['psi'] = x[5] - exp_angle
observation['dpsi'] = u[5]
observation['speed'] = np.sqrt(u[0]**2 + u[1]**2)
observation['rud'] = delta
if sim_time > all_sim_time:
exp_angle = random.randint(-90, 90)
x = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])# 初始无人艇位置
u = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])# 初始无人艇速度
rsp = 0
delta = 0
sim_time = 0
done = 1
else:
done = 0
boat_data['observation'] = observation
boat_data['terminated'] = done
boat_data['truncated'] = done
data = encode_data(trans=boat_data)
time.sleep(0.01) # sleep 10ms
# 发送数据
conn.send(data)
except Exception:
# 关闭连接
conn.close()
socket_server.close()
traceback.print_exc()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/babakara/control-py.git
[email protected]:babakara/control-py.git
babakara
control-py
controlPy
frenet

搜索帮助