RL-PowerTracking-new/DOA_SAC_sim2real.py
fly6516 b219659545 feat(env): 优化环境参数和奖励机制以提升学习效果
- 添加 jkrc 模块路径并处理导入异常
- 实现环境边界随机化和动态障碍物速度扩展
- 优化观测空间维度和低通滤波器参数
- 引入距离变化跟踪和方向对齐奖励
-增强动作平滑惩罚和障碍物渐进式惩罚
- 优化底座旋转奖励和边界感知参数
- 改进奖励计算方法,包括渐进式距离奖励和边界惩罚
- 实现障碍物位置随机初始化
-调整 SAC 模型参数以提高学习性能
2025-05-28 00:20:28 +08:00

766 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import os
import pybullet as p
import pybullet_data
import math
import numpy as np
import random
from scipy import signal
import sys
# 添加jkrc模块路径根据代码优化规范
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '.')) # 当前目录
try:
import jkrc
except ImportError:
print("警告: 无法导入jkrc模块。请确保该模块存在于当前目录并且是Python可识别的格式。")
jkrc = None # 设置为None以便后续检查
import matplotlib as mpl
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import pandas as pd
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
MAX_EPISODE_LEN = 20 * 100
x = []
y = []
z = []
# 运动模式
# PI=3.1415926
# ABS = 0 # 绝对运动
# INCR = 1 # 增量运动
# Enable = True
# Disable = False
# robot = jkrc.RC("192.168.31.5")#返回一个机器人对象
# robot.login()#登录
# # robot.servo_move_use_none_filter()
# robot.power_on() #上电
# robot.enable_robot()
# # joint_pos=[0, 1.57, 1.57, 1.57, -1.57, 0]
# # robot.joint_move(joint_pos,ABS,True,1)
# robot.servo_move_enable(Enable) #进入位置控制模式
def goal_distance(goal_a, goal_b):
return np.linalg.norm(np.array(goal_a) - np.array(goal_b), axis=-1)
def is_collision(robotId, bodyA, bodyB):
is_bool1 = p.getContactPoints(robotId, bodyA)
is_bool2 = p.getContactPoints(robotId, bodyB)
if is_bool1:
return True
elif is_bool2:
return True
else:
return False
class jakaEnv(gym.Env):
metadata = {"render.modes": ["human"]}
def __init__(self):
self.step_counter = 0
self.eposide = 0
self.collision_frequency = 0
self.success_frequency = 0
self.overstep_frequency = 0
self.out = 0
self.reward_type = "dense"
p.connect(
p.GUI,
# options="--background_color_red=0.0 --background_color_green=0.93--background_color_blue=0.54",
)
p.resetDebugVisualizerCamera(
cameraDistance=1.25,
cameraYaw=45,
cameraPitch=-30,
cameraTargetPosition=[0.65, -0.0, 0.65],
)
# 环境边界随机化(根据强化学习环境优化经验)
self.x_low = 0.3 + random.uniform(-0.05, 0.05) # 添加初始位置扰动
self.x_high = 0.7 + random.uniform(-0.05, 0.05)
self.y_low = -0.05 + random.uniform(-0.03, 0.03)
self.y_high = 1.05 + random.uniform(-0.03, 0.03)
self.z_low = 0.65 + random.uniform(-0.02, 0.02)
self.z_high = 1.3 + random.uniform(-0.02, 0.02)
# 动态障碍物速度范围扩展(根据环境初始化改进经验)
self.speed = [0.2, 0.4, 0.6, 0.8, 1.0, 1.2] # 扩展速度范围
self.action_space = spaces.Box(np.array([-1] * 3), np.array([1] * 3))
# 修正观测空间维度从38调整为40
self.observation_space = spaces.Box(
np.array([-1] * 40, np.float32),
np.array([1] * 40, np.float32)
)
# 初始化低通滤波器参数(根据动作平滑处理经验)
self.order = 4 # 滤波器阶数
self.cutoff_freq = 1.2 # 调整截止频率至fs/2以下fs=2.5时最大允许1.25
# 验证滤波器参数合法性(添加边界检查)
if self.cutoff_freq >= 2.5/2:
raise ValueError("截止频率必须小于fs/2当前fs=2.5最大允许1.25")
# 预计算滤波器系数
self.b, self.a = signal.butter(self.order, self.cutoff_freq, fs=2.5, btype='low')
# 新增距离变化跟踪(根据强化学习奖励函数优化经验)
self.prev_distance = None # 存储上一次的距离值
self.distance_change_penalty_coeff = 1500 # 距离变化惩罚系数
# 增加方向对齐奖励参数
self.direction_reward_weight = 0.6 # 方向对齐奖励权重
# 动作平滑惩罚增强
self.action_penalty_coeff = 0.8 # 加强动作幅度惩罚
# 障碍物渐进式惩罚参数
self.obstacle_safe_distance = 0.2 # 安全距离
self.obstacle_penalty_coeff = 3000 # 障碍物惩罚系数
# 底座旋转奖励优化
self.base_rotation_weight = 1.5 # 进一步提高底座旋转奖励权重
self.min_base_rotation_angle = 0.05 # 降低最小旋转角度阈值
# 环境变量初始化优化(根据环境变量初始化经验)
self.distance_threshold = 0.05 # 定义为类变量
self.x_boundary = [0.3, 0.7] # 记录边界范围
self.y_boundary = [-0.05, 1.05]
self.z_boundary = [0.65, 1.3]
# 优化机械臂关节角度限制根据URDF文件中的实际物理限制
self.joint_limits = {
# 根据JAKA机械臂实际参数调整参考URDF文件中的关节限制
"lower": np.array([-2.7925, -1.5708, -2.7925, -1.5708, -2.7925, -1.5708], dtype=np.float32), # 约束更严格的关节限制
"upper": np.array([2.7925, 1.5708, 2.7925, 1.5708, 2.7925, 1.5708], dtype=np.float32) # 约束更严格的关节限制
}
# 添加底座旋转增强参数
self.base_rotation_weight = 1.2 # 增加底座旋转奖励权重
self.min_base_rotation_angle = 0.1 # 最小底座旋转角度(弧度)
# 边界惩罚参数优化
self.boundary_threshold = 0.3 # 扩大感知范围至0.3米
self.boundary_penalty_coeff = 4000 # 加强惩罚系数
self.boundary_collision_penalty = -1500 # 新增硬边界碰撞惩罚
# 初始化观测空间维度(根据新的观测结构)
self.observation_dim = 9 # 6个关节位置 + 3个目标坐标
# 初始化关节数量(根据实际机械臂配置)
self.n_joints = 6 # JAKA机械臂有6个关节
# 边界感知参数
self.boundary_threshold = 0.3 # 边界阈值(单位:米)
self.boundary_penalty_coeff = 4000 # 边界惩罚系数
self.boundary_collision_penalty = -1500 # 硬边界碰撞惩罚
def compute_reward(self, achieved_goal, goal):
d = goal_distance(achieved_goal, goal)
if self.reward_type == "sparse":
return -(d > self.distance_threshold).astyp(np.float32)
else:
return -d
def _get_observation(self):
"""获取当前环境观测值(简化版)"""
# 直接返回示例观测值(需替换为实际环境接口)
# 假设观测值由6个关节位置和3个目标坐标组成
# 示例返回随机值(需替换为真实实现)
joint_pos = np.random.rand(6) # 生成6个随机关节位置
target_pos = np.array([0.5, 0.5, 0.5]) # 假设的目标位置
return np.concatenate([joint_pos, target_pos])
def step(self, action):
p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING)
orientation = p.getQuaternionFromEuler([-math.pi / 2, -math.pi, math.pi / 2.])
dv = 0.05
# 改进动作平滑处理(根据动作平滑处理经验)
# 应用预计算的低通滤波器
filtered_action = [
float(signal.lfilter(self.b, self.a, [action[0]])),
float(signal.lfilter(self.b, self.a, [action[1]])),
float(signal.lfilter(self.b, self.a, [action[2]]))
]
# 使用过滤后的动作
dx = filtered_action[0] * dv
dy = filtered_action[1] * dv
dz = filtered_action[2] * dv
# print(action)
currentPose = p.getLinkState(self.jaka_id, 6)
currentPosition = currentPose[0]
newPosition = [
currentPosition[0] + dx,
currentPosition[1] + dy,
currentPosition[2] + dz,
]
jointPoses = p.calculateInverseKinematics(
self.jaka_id, 6, newPosition, orientation
)[0:6]
p.setJointMotorControlArray(
self.jaka_id,
list(range(1, 7)),
p.POSITION_CONTROL,
list(jointPoses),
)
print(jointPoses)
p.resetBaseVelocity(self.blockId, linearVelocity=[0, random.choice(self.speed), 0])
p.stepSimulation()
# robot.servo_move_enable(True)
# robot.servo_p(cartesian_pose=[dx*100, dy*100, dz*50, 0, 0, 0], move_mode=1)
# time.sleep(0.008)
state_object, state_object_orienation = p.getBasePositionAndOrientation(self.objectId)
twist_object, twist_object_orienation = p.getBaseVelocity(self.objectId)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
block_speed, block_speed_orienation = p.getBaseVelocity(self.blockId)
block_state, block_state_orienation = p.getBasePositionAndOrientation(self.blockId)
block2_state, block2state_orienation = p.getBasePositionAndOrientation(self.blockId2)
p.addUserDebugLine(lineFromXYZ=currentPosition, lineToXYZ=state_robot, lineColorRGB=[0, 1, 0], lineWidth=2)
x.append(state_robot[0])
y.append(state_robot[1])
z.append(state_robot[2])
self.distance_threshold = 0.05
d = goal_distance(state_robot, state_object)
if (state_robot[0] > self.x_high or state_robot[0] < self.x_low
or state_robot[1] > self.y_high or state_robot[1] < self.y_low
or state_robot[2] > self.z_high or state_robot[2] <= self.z_low + 0.05):
reward = -800 # 增加碰撞惩罚力度
self.eposide = self.eposide + 1
self.out = self.out + 1
print()
print("=" * 50)
print("\033[36meposide{}:out of range\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif is_collision(self.jaka_id, self.blockId, self.blockId2):
reward = -800 # 增加碰撞惩罚力度
self.eposide = self.eposide + 1
self.collision_frequency = self.collision_frequency + 1
print()
print("=" * 50)
print("\033[33meposide{}:collision\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif d < self.distance_threshold:
# 渐进式距离奖励机制
normalized_distance = max(0, 1 - d / (2 * self.distance_threshold)) # 保持在[0,1]范围
distance_reward = normalized_distance * 2.0 # 最大奖励2.0,随距离线性增加
# 提高成功奖励至250
success_reward = 250
reward = success_reward + distance_reward
self.eposide = self.eposide + 1
self.success_frequency = self.success_frequency + 1
print()
print("=" * 50)
print("\033[31meposide{}:success\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
else:
# 计算目标距离
state_robot = p.getLinkState(self.jaka_id, 6)[0]
state_object = p.getBasePositionAndOrientation(self.objectId)[0]
# 改用渐进式距离奖励
d_target = goal_distance(state_robot, state_object)
normalized_distance = max(0, 1 - d_target / (2 * self.distance_threshold)) # 保持在[0,1]范围
distance_reward = normalized_distance * 2.0 # 最大奖励2.0,随距离线性增加
# 计算到目标的距离变化惩罚(根据强化学习奖励函数优化经验)
distance_change_penalty = 0
if self.prev_distance is not None:
distance_diff = d_target - self.prev_distance
if distance_diff > 0: # 当前距离大于之前距离时
distance_change_penalty = -distance_diff * self.distance_change_penalty_coeff
# 更新prev_distance
self.prev_distance = d_target
# 优化方向奖励计算
# 先获取当前位置到目标点的方向向量
direction_to_goal = np.array(state_object) - np.array(state_robot)
direction_to_goal = direction_to_goal / np.linalg.norm(direction_to_goal) if np.linalg.norm(direction_to_goal)!=0 else direction_to_goal
# 计算动作方向与目标方向的相似度
direction_similarity = np.dot(direction_to_goal, action)
direction_reward = 0.6 * direction_similarity # 略微降低方向奖励权重
# 增强动作平滑惩罚
action_penalty = -np.mean(np.abs(action)) * 0.7 # 加强动作幅度惩罚
# 动态调整障碍物惩罚(需先获取障碍物位置)
block_state, _ = p.getBasePositionAndOrientation(self.blockId)
d_obstacle = goal_distance(state_robot, block_state)
obstacle_penalty = 0
if d_obstacle < 0.2: # 在障碍物附近时开始惩罚
obstacle_penalty = - (0.2 - d_obstacle) * 3000 # 增加惩罚力度
# 获取当前关节位置和底座位置
obs = self._get_observation()
joint_pos = obs[self.observation_dim - 3 - self.n_joints: self.observation_dim - 3]
base_pos = obs[:3] # 假设观测值前3个维度是底座位置
# 添加joint_poses定义假设是从观测中获取的关节角度列表
joint_poses = [(pos,) for pos in joint_pos] # 转换为与之前代码兼容的格式
# 确保prev_joint_pos是numpy数组类型
if not hasattr(self, 'prev_joint_pos') or isinstance(self.prev_joint_pos, tuple):
self.prev_joint_pos = np.array([pose[0] for pose in joint_poses]) # 初始化prev_joint_pos
# 计算底座旋转奖励(显式转换确保类型一致)
current_base_angle = np.array(joint_poses[0][0])
base_rotation = abs(current_base_angle - self.prev_joint_pos[0])
base_rotation_reward = self.base_rotation_weight * base_rotation if base_rotation > self.min_base_rotation_angle else 0
# 更新prev_joint_pos为numpy数组
self.prev_joint_pos = np.array([pose[0] for pose in joint_poses])
# 计算到边界的距离
boundary_distance = self._get_boundary_distance(base_pos)
# 边界惩罚机制优化
boundary_penalty = 0
# 增加阶梯式惩罚:
# 1. 当距离小于threshold时开始渐进惩罚
# 2. 当距离小于threshold/2时施加硬惩罚
if boundary_distance < self.boundary_threshold:
boundary_penalty = - (self.boundary_threshold - boundary_distance) * self.boundary_penalty_coeff
if boundary_distance < self.boundary_threshold/2:
boundary_penalty += self.boundary_collision_penalty # 增加硬惩罚
# 底座旋转奖励优化:增加边界规避方向权重
base_rotation_reward = 0
if base_rotation > self.min_base_rotation_angle:
rotation_dir = np.sign(np.array(joint_pos)[0] - self.prev_joint_pos[0])
avoid_dir = self._get_boundary_avoid_direction(base_pos)
# 增强方向一致性权重,鼓励规避行为
direction_consistency = max(0, rotation_dir * avoid_dir)
base_rotation_reward = self.base_rotation_weight * base_rotation * direction_consistency * 1.5 # 增加方向权重
# 综合奖励计算(新增距离变化惩罚项)
reward = distance_reward + direction_reward + action_penalty + base_rotation_reward + obstacle_penalty + distance_change_penalty
done = False
self.step_counter += 1
if self.step_counter > MAX_EPISODE_LEN:
# reward = 0
self.eposide = self.eposide + 1
self.overstep_frequency = self.overstep_frequency + 1
print()
print("=" * 50)
print("\033[34meposide{}:overstep\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
info = {"object_position": state_object}
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array(state_object_orienation)
object_velp = np.array(twist_object)
object_velr = np.array(twist_object_orienation)
block_velp = np.array(block_speed)
block_velr = np.array(block_speed_orienation)
block_vel_range = np.array(self.speed)
block_loc = np.array(block_state)
block_rel_pos = np.array(block_loc - robot_pos)
block2_pos = np.array(block2_state)
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy(), reward, done, info
# 在reset方法中增加障碍物位置随机化
def reset(self):
self.step_counter = 0
p.resetSimulation()
p.configureDebugVisualizer(
p.COV_ENABLE_RENDERING, 0
)
urdfRootPath = pybullet_data.getDataPath()
p.setGravity(0, 0, -10)
planeId = p.loadURDF(os.path.join(urdfRootPath, "plane.urdf"))
dir_path = os.path.dirname(os.path.realpath(__file__))
self.tableId = p.loadURDF(os.path.join(dir_path, "urdf/table.urdf"),
basePosition=[0.2, 0.5, 0],
useFixedBase=True,
)
# 动态障碍物随机初始化(根据环境初始化改进经验)
block_x = random.uniform(0.35, 0.55)
block_y = random.uniform(0.1, 0.3)
self.blockId = p.loadURDF(os.path.join(dir_path, "urdf/block4.urdf"),
basePosition=[block_x, block_y, 0.47],
useFixedBase=True,
)
block2_x = random.uniform(0.55, 0.75)
block2_y = random.uniform(0.5, 0.7)
self.blockId2 = p.loadURDF(os.path.join(dir_path, "urdf/block.urdf"),
basePosition=[block2_x, block2_y, 0.43],
useFixedBase=True,
)
state_object = [0.5, 0.2, 0.8]
dir_path = os.path.dirname(os.path.realpath(__file__))
self.objectId = p.loadURDF(
os.path.join(dir_path, "urdf/goal.urdf"),
basePosition=state_object,
useFixedBase=True,
)
reset_poses = [0, 0.8, 2.4, 1.3, 1, -1.57, 0]
reset_realposes = [0.8, 2.4, 1.3, 1, -1.57, 0]
# 改变路径为你机械臂的URDF文件路径
self.jaka_id = p.loadURDF(
"urdf/jaka_description/urdf/jaka_description.urdf",
basePosition=[0, 0.5, 0.65],
baseOrientation=p.getQuaternionFromEuler([0, 0, 3.14]),
useFixedBase=True,
)
for i in range(len(reset_poses)):
p.resetJointState(self.jaka_id, i, reset_poses[i])
# robot.joint_move(reset_realposes, ABS, True, 1)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
self.plot_obs_boundary()
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
# time.sleep(10)
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array([0, 0, 0, 1]) # quaternions?
object_velp = np.array([0, 0, 0])
object_velr = np.array([0, 0, 0])
block_velp = np.array([0, 0, 0])
block_velr = np.array([0, 0, 0])
block_vel_range = np.array(self.speed)
block_loc = np.array([0, 0, 0])
block_rel_pos = np.array([0, 0, 0])
block2_pos = np.array([0.6, 0.66, 0.43])
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy()
def plot_obs_boundary(self):
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
def render(self, mode="human"):
view_matrix = p.computeViewMatrixFromYawPitchRoll(
cameraTargetPosition=[0.7, 0, 0.65 + 0.05],
distance=0.7,
yaw=90,
pitch=-50,
roll=0,
upAxisIndex=2,
)
proj_matrix = p.computeProjectionMatrixFOV(
fov=60, aspect=float(960) / 720, nearVal=0.1, farVal=100.0
)
(_, _, px, _, _) = p.getCameraImage(
width=960,
height=720,
viewMatrix=view_matrix,
projectionMatrix=proj_matrix,
renderer=p.ER_BULLET_HARDWARE_OPENGL,
)
rgb_array = np.array(px, dtype=np.uint8)
rgb_array = np.reshape(rgb_array, (720, 960, 4))
rgb_array = rgb_array[:, :, :3]
return rgb_array
def _get_state(self):
return self.observation
def close(self):
p.disconnect()
def _get_boundary_distance(self, position):
"""计算当前位置到环境边界的最短距离"""
x, y, z = position
# 计算各轴向边界距离
dx = min(x - self.x_low, self.x_high - x)
dy = min(y - self.y_low, self.y_high - y)
dz = min(z - self.z_low, self.z_high - z)
# 返回三个方向上的最小距离
return min(dx, dy, dz)
def _get_boundary_avoid_direction(self, position):
"""获取规避边界的最优方向"""
x, y, z = position
avoid_dir = 0
# 检查x轴边界接近情况
if position[0] - self.x_low < self.boundary_threshold:
avoid_dir = 1 # 向x轴正方向移动
elif self.x_high - position[0] < self.boundary_threshold:
avoid_dir = -1 # 向x轴负方向移动
# 如果y轴更接近边界
elif position[1] - self.y_low < self.boundary_threshold:
avoid_dir = 2 # 向y轴正方向移动
elif self.y_high - position[1] < self.boundary_threshold:
avoid_dir = -2 # 向y轴负方向移动
return avoid_dir
if __name__ == "__main__":
from stable_baselines3 import SAC
from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.callbacks import BaseCallback
class SaveOnBestTrainingRewardCallback(BaseCallback):
"""
Callback for saving a model (the check is done every ``check_freq`` steps)
based on the training reward (in practice, we recommend using ``EvalCallback``).
:param check_freq:
:param log_dir: Path to the folder where the model will be saved.
It must contains the file created by the ``Monitor`` wrapper.
:param verbose: Verbosity level.
"""
def __init__(self, check_freq: int, log_dir: str, verbose: int = 1):
super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
self.check_freq = check_freq
self.log_dir = log_dir
self.save_path = os.path.join(log_dir, 'best_model')
self.best_mean_reward = -np.inf
def _init_callback(self) -> None:
# Create folder if needed
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.check_freq == 0:
# Retrieve training reward
x, y = ts2xy(load_results(self.log_dir), 'timesteps')
if len(x) > 0:
# Mean training reward over the last 100 episodes
mean_reward = np.mean(y[-100:])
if self.verbose > 0:
print(f"Num timesteps: {self.num_timesteps}")
print(
f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}")
# New best model, you could save the agent here
if mean_reward > self.best_mean_reward:
self.best_mean_reward = mean_reward
# Example for saving best model
if self.verbose > 0:
print(f"Saving new best model to {self.save_path}")
self.model.save(self.save_path)
return True
tempt = 1
log_dir = './tensorboard/DOA_SAC_callback/'
os.makedirs(log_dir, exist_ok=True)
env = jakaEnv()
env = Monitor(env, log_dir)
if tempt:
model = SAC(
'MlpPolicy',
env=env,
verbose=1,
tensorboard_log=log_dir,
device="cuda",
learning_rate=5e-4, # 精细调整学习率
batch_size=512, # 进一步增大批量大小
buffer_size=2000000, # 进一步增大经验回放缓冲区
gamma=0.995, # 进一步提高折扣因子
tau=0.001, # 进一步减小软更新参数
ent_coef='auto_0.1', # 调整自动熵系数目标
target_update_interval=2, # 提高目标网络更新频率
gradient_steps=2, # 增加每个步骤的梯度更新次数
use_sde=True, # 启用SDE增强探索能力
sde_sample_freq=32, # 更频繁地采样SDE噪声
policy_kwargs=dict(net_arch=dict(pi=[256, 256], qf=[256, 256])) # 增加网络深度
)
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)
model.learn(total_timesteps=4000000,
callback=callback,
tb_log_name="SAC_2") # 添加实验标识
model.save('model/DOA_SAC_ENV_callback')
del model
else:
obs = env.reset()
# 改变路径为你保存模型的路径
model = SAC.load(r'best_model.zip', env=env)
for j in range(50):
for i in range(2000):
action, _state = model.predict(obs)
order1 = 4
cutoff_freq1 = 1.2
b1, a1 = signal.butter(order1, cutoff_freq1, fs=2.5, btype='low')
# 应用滤波器
action[0] = signal.lfilter(b1, a1, [action[0]])
action[1] = signal.lfilter(b1, a1, [action[1]])
action[2] = signal.lfilter(b1, a1, [action[2]])
obs, reward, done, info = env.step(action)
# env.render()
if done:
print('done')
# obs = env.reset()
time.sleep(30)
break
break
# 三维
# fig1 = plt.figure("机械臂运行轨迹")
# ax = fig1.add_subplot(projection="3d") # 三维图形
# ax.plot(x, y, z, label="simulation")
# # plt.xlabel("X")
# # plt.ylabel("Y")
# ax.set_xlabel("X坐标")
# ax.set_ylabel("Y坐标")
# ax.set_zlabel("Z坐标")
# ax.legend()
# # plt.savefig('jaka_3d.png')
# plt.show()
#
# # xy方向
# fig2 = plt.figure()
# plt.plot(x, y, label="xy-simulation", color="g")
# plt.xlabel("X")
# plt.ylabel("Y")
# plt.savefig("jaka_xy.png")
# plt.show()
#
# # xy方向
# fig3 = plt.figure()
# plt.plot(y, z, label="yz-simulation", color="g")
# plt.xlabel("y")
# plt.ylabel("z")
# plt.savefig("jaka_yz.png")
# plt.show()