import time import gym from gym import error, spaces, utils from gym.utils import seeding import os import pybullet as p import pybullet_data import math import numpy as np import random from scipy import signal import sys # 添加jkrc模块路径(根据代码优化规范) sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '.')) # 当前目录 try: import jkrc except ImportError: print("警告: 无法导入jkrc模块。请确保该模块存在于当前目录,并且是Python可识别的格式。") jkrc = None # 设置为None以便后续检查 import matplotlib as mpl from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt import pandas as pd os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" MAX_EPISODE_LEN = 20 * 100 x = [] y = [] z = [] # 运动模式 # PI=3.1415926 # ABS = 0 # 绝对运动 # INCR = 1 # 增量运动 # Enable = True # Disable = False # robot = jkrc.RC("192.168.31.5")#返回一个机器人对象 # robot.login()#登录 # # robot.servo_move_use_none_filter() # robot.power_on() #上电 # robot.enable_robot() # # joint_pos=[0, 1.57, 1.57, 1.57, -1.57, 0] # # robot.joint_move(joint_pos,ABS,True,1) # robot.servo_move_enable(Enable) #进入位置控制模式 def goal_distance(goal_a, goal_b): return np.linalg.norm(np.array(goal_a) - np.array(goal_b), axis=-1) def is_collision(robotId, bodyA, bodyB): is_bool1 = p.getContactPoints(robotId, bodyA) is_bool2 = p.getContactPoints(robotId, bodyB) if is_bool1: return True elif is_bool2: return True else: return False class jakaEnv(gym.Env): metadata = {"render.modes": ["human"]} def __init__(self): self.step_counter = 0 self.eposide = 0 self.collision_frequency = 0 self.success_frequency = 0 self.overstep_frequency = 0 self.out = 0 self.reward_type = "dense" p.connect( p.GUI, # options="--background_color_red=0.0 --background_color_green=0.93--background_color_blue=0.54", ) p.resetDebugVisualizerCamera( cameraDistance=1.25, cameraYaw=45, cameraPitch=-30, cameraTargetPosition=[0.65, -0.0, 0.65], ) # 环境边界随机化(根据强化学习环境优化经验) self.x_low = 0.3 + random.uniform(-0.05, 0.05) # 添加初始位置扰动 self.x_high = 0.7 + random.uniform(-0.05, 0.05) self.y_low = -0.05 + random.uniform(-0.03, 0.03) self.y_high = 1.05 + random.uniform(-0.03, 0.03) self.z_low = 0.65 + random.uniform(-0.02, 0.02) self.z_high = 1.3 + random.uniform(-0.02, 0.02) # 动态障碍物速度范围扩展(根据环境初始化改进经验) self.speed = [0.2, 0.4, 0.6, 0.8, 1.0, 1.2] # 扩展速度范围 self.action_space = spaces.Box(np.array([-1] * 3), np.array([1] * 3)) # 修正观测空间维度(从38调整为40) self.observation_space = spaces.Box( np.array([-1] * 40, np.float32), np.array([1] * 40, np.float32) ) # 初始化低通滤波器参数(根据动作平滑处理经验) self.order = 4 # 滤波器阶数 self.cutoff_freq = 1.2 # 调整截止频率至fs/2以下(fs=2.5时,最大允许1.25) # 验证滤波器参数合法性(添加边界检查) if self.cutoff_freq >= 2.5/2: raise ValueError("截止频率必须小于fs/2(当前fs=2.5,最大允许1.25)") # 预计算滤波器系数 self.b, self.a = signal.butter(self.order, self.cutoff_freq, fs=2.5, btype='low') # 新增距离变化跟踪(根据强化学习奖励函数优化经验) self.prev_distance = None # 存储上一次的距离值 self.distance_change_penalty_coeff = 1500 # 距离变化惩罚系数 # 增加方向对齐奖励参数 self.direction_reward_weight = 0.6 # 方向对齐奖励权重 # 动作平滑惩罚增强 self.action_penalty_coeff = 0.8 # 加强动作幅度惩罚 # 障碍物渐进式惩罚参数 self.obstacle_safe_distance = 0.2 # 安全距离 self.obstacle_penalty_coeff = 3000 # 障碍物惩罚系数 # 底座旋转奖励优化 self.base_rotation_weight = 1.5 # 进一步提高底座旋转奖励权重 self.min_base_rotation_angle = 0.05 # 降低最小旋转角度阈值 # 环境变量初始化优化(根据环境变量初始化经验) self.distance_threshold = 0.05 # 定义为类变量 self.x_boundary = [0.3, 0.7] # 记录边界范围 self.y_boundary = [-0.05, 1.05] self.z_boundary = [0.65, 1.3] # 优化机械臂关节角度限制(根据URDF文件中的实际物理限制) self.joint_limits = { # 根据JAKA机械臂实际参数调整(参考URDF文件中的关节限制) "lower": np.array([-2.7925, -1.5708, -2.7925, -1.5708, -2.7925, -1.5708], dtype=np.float32), # 约束更严格的关节限制 "upper": np.array([2.7925, 1.5708, 2.7925, 1.5708, 2.7925, 1.5708], dtype=np.float32) # 约束更严格的关节限制 } # 添加底座旋转增强参数 self.base_rotation_weight = 1.2 # 增加底座旋转奖励权重 self.min_base_rotation_angle = 0.1 # 最小底座旋转角度(弧度) # 边界惩罚参数优化 self.boundary_threshold = 0.3 # 扩大感知范围至0.3米 self.boundary_penalty_coeff = 4000 # 加强惩罚系数 self.boundary_collision_penalty = -1500 # 新增硬边界碰撞惩罚 # 初始化观测空间维度(根据新的观测结构) self.observation_dim = 9 # 6个关节位置 + 3个目标坐标 # 初始化关节数量(根据实际机械臂配置) self.n_joints = 6 # JAKA机械臂有6个关节 # 边界感知参数 self.boundary_threshold = 0.3 # 边界阈值(单位:米) self.boundary_penalty_coeff = 4000 # 边界惩罚系数 self.boundary_collision_penalty = -1500 # 硬边界碰撞惩罚 def compute_reward(self, achieved_goal, goal): d = goal_distance(achieved_goal, goal) if self.reward_type == "sparse": return -(d > self.distance_threshold).astyp(np.float32) else: return -d def _get_observation(self): """获取当前环境观测值(简化版)""" # 直接返回示例观测值(需替换为实际环境接口) # 假设观测值由6个关节位置和3个目标坐标组成 # 示例返回随机值(需替换为真实实现) joint_pos = np.random.rand(6) # 生成6个随机关节位置 target_pos = np.array([0.5, 0.5, 0.5]) # 假设的目标位置 return np.concatenate([joint_pos, target_pos]) def step(self, action): p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING) orientation = p.getQuaternionFromEuler([-math.pi / 2, -math.pi, math.pi / 2.]) dv = 0.05 # 改进动作平滑处理(根据动作平滑处理经验) # 应用预计算的低通滤波器 filtered_action = [ float(signal.lfilter(self.b, self.a, [action[0]])), float(signal.lfilter(self.b, self.a, [action[1]])), float(signal.lfilter(self.b, self.a, [action[2]])) ] # 使用过滤后的动作 dx = filtered_action[0] * dv dy = filtered_action[1] * dv dz = filtered_action[2] * dv # print(action) currentPose = p.getLinkState(self.jaka_id, 6) currentPosition = currentPose[0] newPosition = [ currentPosition[0] + dx, currentPosition[1] + dy, currentPosition[2] + dz, ] jointPoses = p.calculateInverseKinematics( self.jaka_id, 6, newPosition, orientation )[0:6] p.setJointMotorControlArray( self.jaka_id, list(range(1, 7)), p.POSITION_CONTROL, list(jointPoses), ) print(jointPoses) p.resetBaseVelocity(self.blockId, linearVelocity=[0, random.choice(self.speed), 0]) p.stepSimulation() # robot.servo_move_enable(True) # robot.servo_p(cartesian_pose=[dx*100, dy*100, dz*50, 0, 0, 0], move_mode=1) # time.sleep(0.008) state_object, state_object_orienation = p.getBasePositionAndOrientation(self.objectId) twist_object, twist_object_orienation = p.getBaseVelocity(self.objectId) state_robot = p.getLinkState(self.jaka_id, 6)[0] block_speed, block_speed_orienation = p.getBaseVelocity(self.blockId) block_state, block_state_orienation = p.getBasePositionAndOrientation(self.blockId) block2_state, block2state_orienation = p.getBasePositionAndOrientation(self.blockId2) p.addUserDebugLine(lineFromXYZ=currentPosition, lineToXYZ=state_robot, lineColorRGB=[0, 1, 0], lineWidth=2) x.append(state_robot[0]) y.append(state_robot[1]) z.append(state_robot[2]) self.distance_threshold = 0.05 d = goal_distance(state_robot, state_object) if (state_robot[0] > self.x_high or state_robot[0] < self.x_low or state_robot[1] > self.y_high or state_robot[1] < self.y_low or state_robot[2] > self.z_high or state_robot[2] <= self.z_low + 0.05): reward = -800 # 增加碰撞惩罚力度 self.eposide = self.eposide + 1 self.out = self.out + 1 print() print("=" * 50) print("\033[36meposide{}:out of range\033[0m".format(self.eposide)) print() print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format( self.collision_frequency, self.success_frequency, self.overstep_frequency, self.out)) print("=" * 50) print() done = True elif is_collision(self.jaka_id, self.blockId, self.blockId2): reward = -800 # 增加碰撞惩罚力度 self.eposide = self.eposide + 1 self.collision_frequency = self.collision_frequency + 1 print() print("=" * 50) print("\033[33meposide{}:collision\033[0m".format(self.eposide)) print() print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format( self.collision_frequency, self.success_frequency, self.overstep_frequency, self.out)) print("=" * 50) print() done = True elif d < self.distance_threshold: # 渐进式距离奖励机制 normalized_distance = max(0, 1 - d / (2 * self.distance_threshold)) # 保持在[0,1]范围 distance_reward = normalized_distance * 2.0 # 最大奖励2.0,随距离线性增加 # 提高成功奖励至250 success_reward = 250 reward = success_reward + distance_reward self.eposide = self.eposide + 1 self.success_frequency = self.success_frequency + 1 print() print("=" * 50) print("\033[31meposide{}:success\033[0m".format(self.eposide)) print() print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format( self.collision_frequency, self.success_frequency, self.overstep_frequency, self.out)) print("=" * 50) print() done = True else: # 计算目标距离 state_robot = p.getLinkState(self.jaka_id, 6)[0] state_object = p.getBasePositionAndOrientation(self.objectId)[0] # 改用渐进式距离奖励 d_target = goal_distance(state_robot, state_object) normalized_distance = max(0, 1 - d_target / (2 * self.distance_threshold)) # 保持在[0,1]范围 distance_reward = normalized_distance * 2.0 # 最大奖励2.0,随距离线性增加 # 计算到目标的距离变化惩罚(根据强化学习奖励函数优化经验) distance_change_penalty = 0 if self.prev_distance is not None: distance_diff = d_target - self.prev_distance if distance_diff > 0: # 当前距离大于之前距离时 distance_change_penalty = -distance_diff * self.distance_change_penalty_coeff # 更新prev_distance self.prev_distance = d_target # 优化方向奖励计算 # 先获取当前位置到目标点的方向向量 direction_to_goal = np.array(state_object) - np.array(state_robot) direction_to_goal = direction_to_goal / np.linalg.norm(direction_to_goal) if np.linalg.norm(direction_to_goal)!=0 else direction_to_goal # 计算动作方向与目标方向的相似度 direction_similarity = np.dot(direction_to_goal, action) direction_reward = 0.6 * direction_similarity # 略微降低方向奖励权重 # 增强动作平滑惩罚 action_penalty = -np.mean(np.abs(action)) * 0.7 # 加强动作幅度惩罚 # 动态调整障碍物惩罚(需先获取障碍物位置) block_state, _ = p.getBasePositionAndOrientation(self.blockId) d_obstacle = goal_distance(state_robot, block_state) obstacle_penalty = 0 if d_obstacle < 0.2: # 在障碍物附近时开始惩罚 obstacle_penalty = - (0.2 - d_obstacle) * 3000 # 增加惩罚力度 # 获取当前关节位置和底座位置 obs = self._get_observation() joint_pos = obs[self.observation_dim - 3 - self.n_joints: self.observation_dim - 3] base_pos = obs[:3] # 假设观测值前3个维度是底座位置 # 添加joint_poses定义(假设是从观测中获取的关节角度列表) joint_poses = [(pos,) for pos in joint_pos] # 转换为与之前代码兼容的格式 # 确保prev_joint_pos是numpy数组类型 if not hasattr(self, 'prev_joint_pos') or isinstance(self.prev_joint_pos, tuple): self.prev_joint_pos = np.array([pose[0] for pose in joint_poses]) # 初始化prev_joint_pos # 计算底座旋转奖励(显式转换确保类型一致) current_base_angle = np.array(joint_poses[0][0]) base_rotation = abs(current_base_angle - self.prev_joint_pos[0]) base_rotation_reward = self.base_rotation_weight * base_rotation if base_rotation > self.min_base_rotation_angle else 0 # 更新prev_joint_pos为numpy数组 self.prev_joint_pos = np.array([pose[0] for pose in joint_poses]) # 计算到边界的距离 boundary_distance = self._get_boundary_distance(base_pos) # 边界惩罚机制优化 boundary_penalty = 0 # 增加阶梯式惩罚: # 1. 当距离小于threshold时开始渐进惩罚 # 2. 当距离小于threshold/2时施加硬惩罚 if boundary_distance < self.boundary_threshold: boundary_penalty = - (self.boundary_threshold - boundary_distance) * self.boundary_penalty_coeff if boundary_distance < self.boundary_threshold/2: boundary_penalty += self.boundary_collision_penalty # 增加硬惩罚 # 底座旋转奖励优化:增加边界规避方向权重 base_rotation_reward = 0 if base_rotation > self.min_base_rotation_angle: rotation_dir = np.sign(np.array(joint_pos)[0] - self.prev_joint_pos[0]) avoid_dir = self._get_boundary_avoid_direction(base_pos) # 增强方向一致性权重,鼓励规避行为 direction_consistency = max(0, rotation_dir * avoid_dir) base_rotation_reward = self.base_rotation_weight * base_rotation * direction_consistency * 1.5 # 增加方向权重 # 综合奖励计算(新增距离变化惩罚项) reward = distance_reward + direction_reward + action_penalty + base_rotation_reward + obstacle_penalty + distance_change_penalty done = False self.step_counter += 1 if self.step_counter > MAX_EPISODE_LEN: # reward = 0 self.eposide = self.eposide + 1 self.overstep_frequency = self.overstep_frequency + 1 print() print("=" * 50) print("\033[34meposide{}:overstep\033[0m".format(self.eposide)) print() print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format( self.collision_frequency, self.success_frequency, self.overstep_frequency, self.out)) print("=" * 50) print() done = True info = {"object_position": state_object} robot_pos = np.array(state_robot) object_pos = np.array(state_object) object_rel_pos = object_pos - robot_pos object_rot = np.array(state_object_orienation) object_velp = np.array(twist_object) object_velr = np.array(twist_object_orienation) block_velp = np.array(block_speed) block_velr = np.array(block_speed_orienation) block_vel_range = np.array(self.speed) block_loc = np.array(block_state) block_rel_pos = np.array(block_loc - robot_pos) block2_pos = np.array(block2_state) obs = np.concatenate( [ robot_pos.ravel(), object_pos.ravel(), object_rel_pos.ravel(), object_rot.ravel(), object_velp.ravel(), object_velr.ravel(), block_velp.ravel(), block_velr.ravel(), block_vel_range.ravel(), block_loc.ravel(), block_rel_pos.ravel(), block2_pos.ravel(), ] ) return obs.copy(), reward, done, info # 在reset方法中增加障碍物位置随机化 def reset(self): self.step_counter = 0 p.resetSimulation() p.configureDebugVisualizer( p.COV_ENABLE_RENDERING, 0 ) urdfRootPath = pybullet_data.getDataPath() p.setGravity(0, 0, -10) planeId = p.loadURDF(os.path.join(urdfRootPath, "plane.urdf")) dir_path = os.path.dirname(os.path.realpath(__file__)) self.tableId = p.loadURDF(os.path.join(dir_path, "urdf/table.urdf"), basePosition=[0.2, 0.5, 0], useFixedBase=True, ) # 动态障碍物随机初始化(根据环境初始化改进经验) block_x = random.uniform(0.35, 0.55) block_y = random.uniform(0.1, 0.3) self.blockId = p.loadURDF(os.path.join(dir_path, "urdf/block4.urdf"), basePosition=[block_x, block_y, 0.47], useFixedBase=True, ) block2_x = random.uniform(0.55, 0.75) block2_y = random.uniform(0.5, 0.7) self.blockId2 = p.loadURDF(os.path.join(dir_path, "urdf/block.urdf"), basePosition=[block2_x, block2_y, 0.43], useFixedBase=True, ) state_object = [0.5, 0.2, 0.8] dir_path = os.path.dirname(os.path.realpath(__file__)) self.objectId = p.loadURDF( os.path.join(dir_path, "urdf/goal.urdf"), basePosition=state_object, useFixedBase=True, ) reset_poses = [0, 0.8, 2.4, 1.3, 1, -1.57, 0] reset_realposes = [0.8, 2.4, 1.3, 1, -1.57, 0] # 改变路径为你机械臂的URDF文件路径 self.jaka_id = p.loadURDF( "urdf/jaka_description/urdf/jaka_description.urdf", basePosition=[0, 0.5, 0.65], baseOrientation=p.getQuaternionFromEuler([0, 0, 3.14]), useFixedBase=True, ) for i in range(len(reset_poses)): p.resetJointState(self.jaka_id, i, reset_poses[i]) # robot.joint_move(reset_realposes, ABS, True, 1) state_robot = p.getLinkState(self.jaka_id, 6)[0] self.plot_obs_boundary() p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1) # time.sleep(10) robot_pos = np.array(state_robot) object_pos = np.array(state_object) object_rel_pos = object_pos - robot_pos object_rot = np.array([0, 0, 0, 1]) # quaternions? object_velp = np.array([0, 0, 0]) object_velr = np.array([0, 0, 0]) block_velp = np.array([0, 0, 0]) block_velr = np.array([0, 0, 0]) block_vel_range = np.array(self.speed) block_loc = np.array([0, 0, 0]) block_rel_pos = np.array([0, 0, 0]) block2_pos = np.array([0.6, 0.66, 0.43]) obs = np.concatenate( [ robot_pos.ravel(), object_pos.ravel(), object_rel_pos.ravel(), object_rot.ravel(), object_velp.ravel(), object_velr.ravel(), block_velp.ravel(), block_velr.ravel(), block_vel_range.ravel(), block_loc.ravel(), block_rel_pos.ravel(), block2_pos.ravel(), ] ) return obs.copy() def plot_obs_boundary(self): p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_low, self.z_low], lineToXYZ=[self.x_low, self.y_high, self.z_low]) p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_low, self.z_low], lineToXYZ=[self.x_high, self.y_low, self.z_low]) p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_low, self.z_low], lineToXYZ=[self.x_low, self.y_low, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_high, self.z_low], lineToXYZ=[self.x_high, self.y_low, self.z_low]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_high, self.z_low], lineToXYZ=[self.x_low, self.y_high, self.z_low]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_high, self.z_low], lineToXYZ=[self.x_high, self.y_high, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_low, self.z_low], lineToXYZ=[self.x_high, self.y_low, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_high, self.z_low], lineToXYZ=[self.x_low, self.y_high, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_low, self.z_high], lineToXYZ=[self.x_low, self.y_high, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_low, self.y_low, self.z_high], lineToXYZ=[self.x_high, self.y_low, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_high, self.z_high], lineToXYZ=[self.x_high, self.y_low, self.z_high]) p.addUserDebugLine( lineFromXYZ=[self.x_high, self.y_high, self.z_high], lineToXYZ=[self.x_low, self.y_high, self.z_high]) def render(self, mode="human"): view_matrix = p.computeViewMatrixFromYawPitchRoll( cameraTargetPosition=[0.7, 0, 0.65 + 0.05], distance=0.7, yaw=90, pitch=-50, roll=0, upAxisIndex=2, ) proj_matrix = p.computeProjectionMatrixFOV( fov=60, aspect=float(960) / 720, nearVal=0.1, farVal=100.0 ) (_, _, px, _, _) = p.getCameraImage( width=960, height=720, viewMatrix=view_matrix, projectionMatrix=proj_matrix, renderer=p.ER_BULLET_HARDWARE_OPENGL, ) rgb_array = np.array(px, dtype=np.uint8) rgb_array = np.reshape(rgb_array, (720, 960, 4)) rgb_array = rgb_array[:, :, :3] return rgb_array def _get_state(self): return self.observation def close(self): p.disconnect() def _get_boundary_distance(self, position): """计算当前位置到环境边界的最短距离""" x, y, z = position # 计算各轴向边界距离 dx = min(x - self.x_low, self.x_high - x) dy = min(y - self.y_low, self.y_high - y) dz = min(z - self.z_low, self.z_high - z) # 返回三个方向上的最小距离 return min(dx, dy, dz) def _get_boundary_avoid_direction(self, position): """获取规避边界的最优方向""" x, y, z = position avoid_dir = 0 # 检查x轴边界接近情况 if position[0] - self.x_low < self.boundary_threshold: avoid_dir = 1 # 向x轴正方向移动 elif self.x_high - position[0] < self.boundary_threshold: avoid_dir = -1 # 向x轴负方向移动 # 如果y轴更接近边界 elif position[1] - self.y_low < self.boundary_threshold: avoid_dir = 2 # 向y轴正方向移动 elif self.y_high - position[1] < self.boundary_threshold: avoid_dir = -2 # 向y轴负方向移动 return avoid_dir if __name__ == "__main__": from stable_baselines3 import SAC from stable_baselines3.common import results_plotter from stable_baselines3.common.monitor import Monitor from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results from stable_baselines3.common.noise import NormalActionNoise from stable_baselines3.common.callbacks import BaseCallback class SaveOnBestTrainingRewardCallback(BaseCallback): """ Callback for saving a model (the check is done every ``check_freq`` steps) based on the training reward (in practice, we recommend using ``EvalCallback``). :param check_freq: :param log_dir: Path to the folder where the model will be saved. It must contains the file created by the ``Monitor`` wrapper. :param verbose: Verbosity level. """ def __init__(self, check_freq: int, log_dir: str, verbose: int = 1): super(SaveOnBestTrainingRewardCallback, self).__init__(verbose) self.check_freq = check_freq self.log_dir = log_dir self.save_path = os.path.join(log_dir, 'best_model') self.best_mean_reward = -np.inf def _init_callback(self) -> None: # Create folder if needed if self.save_path is not None: os.makedirs(self.save_path, exist_ok=True) def _on_step(self) -> bool: if self.n_calls % self.check_freq == 0: # Retrieve training reward x, y = ts2xy(load_results(self.log_dir), 'timesteps') if len(x) > 0: # Mean training reward over the last 100 episodes mean_reward = np.mean(y[-100:]) if self.verbose > 0: print(f"Num timesteps: {self.num_timesteps}") print( f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}") # New best model, you could save the agent here if mean_reward > self.best_mean_reward: self.best_mean_reward = mean_reward # Example for saving best model if self.verbose > 0: print(f"Saving new best model to {self.save_path}") self.model.save(self.save_path) return True tempt = 1 log_dir = './tensorboard/DOA_SAC_callback/' os.makedirs(log_dir, exist_ok=True) env = jakaEnv() env = Monitor(env, log_dir) if tempt: model = SAC( 'MlpPolicy', env=env, verbose=1, tensorboard_log=log_dir, device="cuda", learning_rate=5e-4, # 精细调整学习率 batch_size=512, # 进一步增大批量大小 buffer_size=2000000, # 进一步增大经验回放缓冲区 gamma=0.995, # 进一步提高折扣因子 tau=0.001, # 进一步减小软更新参数 ent_coef='auto_0.1', # 调整自动熵系数目标 target_update_interval=2, # 提高目标网络更新频率 gradient_steps=2, # 增加每个步骤的梯度更新次数 use_sde=True, # 启用SDE增强探索能力 sde_sample_freq=32, # 更频繁地采样SDE噪声 policy_kwargs=dict(net_arch=dict(pi=[256, 256], qf=[256, 256])) # 增加网络深度 ) callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir) model.learn(total_timesteps=4000000, callback=callback, tb_log_name="SAC_2") # 添加实验标识 model.save('model/DOA_SAC_ENV_callback') del model else: obs = env.reset() # 改变路径为你保存模型的路径 model = SAC.load(r'best_model.zip', env=env) for j in range(50): for i in range(2000): action, _state = model.predict(obs) order1 = 4 cutoff_freq1 = 1.2 b1, a1 = signal.butter(order1, cutoff_freq1, fs=2.5, btype='low') # 应用滤波器 action[0] = signal.lfilter(b1, a1, [action[0]]) action[1] = signal.lfilter(b1, a1, [action[1]]) action[2] = signal.lfilter(b1, a1, [action[2]]) obs, reward, done, info = env.step(action) # env.render() if done: print('done') # obs = env.reset() time.sleep(30) break break # 三维 # fig1 = plt.figure("机械臂运行轨迹") # ax = fig1.add_subplot(projection="3d") # 三维图形 # ax.plot(x, y, z, label="simulation") # # plt.xlabel("X") # # plt.ylabel("Y") # ax.set_xlabel("X坐标(米)") # ax.set_ylabel("Y坐标(米)") # ax.set_zlabel("Z坐标(米)") # ax.legend() # # plt.savefig('jaka_3d.png') # plt.show() # # # xy方向 # fig2 = plt.figure() # plt.plot(x, y, label="xy-simulation", color="g") # plt.xlabel("X") # plt.ylabel("Y") # plt.savefig("jaka_xy.png") # plt.show() # # # xy方向 # fig3 = plt.figure() # plt.plot(y, z, label="yz-simulation", color="g") # plt.xlabel("y") # plt.ylabel("z") # plt.savefig("jaka_yz.png") # plt.show()