RL-PowerTracking-new/DOA_SAC_sim2real.py
fly6516 e3b17f5eb2 refactor(env): 优化 DOA_SAC_sim2real 脚本
- 修改了 URDF 文件路径
- 删除了冗余代码和注释
- 调整了代码格式和缩进,提高了可读性
- 更新了模型保存路径
2025-05-27 21:55:49 +08:00

534 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import os
import pybullet as p
import pybullet_data
import math
import numpy as np
import random
from scipy import signal
import sys
# sys.path.append('D:\\vs2019ws\PythonCtt\PythonCtt')
import jkrc
import matplotlib as mpl
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import pandas as pd
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
MAX_EPISODE_LEN = 20 * 100
x = []
y = []
z = []
# 运动模式
# PI=3.1415926
# ABS = 0 # 绝对运动
# INCR = 1 # 增量运动
# Enable = True
# Disable = False
# robot = jkrc.RC("192.168.31.5")#返回一个机器人对象
# robot.login()#登录
# # robot.servo_move_use_none_filter()
# robot.power_on() #上电
# robot.enable_robot()
# # joint_pos=[0, 1.57, 1.57, 1.57, -1.57, 0]
# # robot.joint_move(joint_pos,ABS,True,1)
# robot.servo_move_enable(Enable) #进入位置控制模式
def goal_distance(goal_a, goal_b):
return np.linalg.norm(np.array(goal_a) - np.array(goal_b), axis=-1)
def is_collision(robotId, bodyA, bodyB):
is_bool1 = p.getContactPoints(robotId, bodyA)
is_bool2 = p.getContactPoints(robotId, bodyB)
if is_bool1:
return True
elif is_bool2:
return True
else:
return False
class jakaEnv(gym.Env):
metadata = {"render.modes": ["human"]}
def __init__(self):
self.step_counter = 0
self.eposide = 0
self.collision_frequency = 0
self.success_frequency = 0
self.overstep_frequency = 0
self.out = 0
self.reward_type = "dense"
p.connect(
p.GUI,
# options="--background_color_red=0.0 --background_color_green=0.93--background_color_blue=0.54",
)
p.resetDebugVisualizerCamera(
cameraDistance=1.25,
cameraYaw=45,
cameraPitch=-30,
cameraTargetPosition=[0.65, -0.0, 0.65],
)
self.x_low = 0.3
self.x_high = 0.7
self.y_low = -0.05
self.y_high = 1.05
self.z_low = 0.65
self.z_high = 1.3
self.speed = [0.3, 0.5, 0.8, 1.0]
self.action_space = spaces.Box(np.array([-1] * 3), np.array([1] * 3))
self.observation_space = spaces.Box(np.array([-1] * 38, np.float32), np.array([1] * 38, np.float32))
def compute_reward(self, achieved_goal, goal):
d = goal_distance(achieved_goal, goal)
if self.reward_type == "sparse":
return -(d > self.distance_threshold).astyp(np.float32)
else:
return -d
def step(self, action):
p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING)
orientation = p.getQuaternionFromEuler([-math.pi / 2, -math.pi, math.pi / 2.])
dv = 0.05
dx = action[0] * dv
dy = action[1] * dv
dz = action[2] * dv
# print(action)
currentPose = p.getLinkState(self.jaka_id, 6)
currentPosition = currentPose[0]
newPosition = [
currentPosition[0] + dx,
currentPosition[1] + dy,
currentPosition[2] + dz,
]
jointPoses = p.calculateInverseKinematics(
self.jaka_id, 6, newPosition, orientation
)[0:6]
p.setJointMotorControlArray(
self.jaka_id,
list(range(1, 7)),
p.POSITION_CONTROL,
list(jointPoses),
)
print(jointPoses)
p.resetBaseVelocity(self.blockId, linearVelocity=[0, random.choice(self.speed), 0])
p.stepSimulation()
# robot.servo_move_enable(True)
# robot.servo_p(cartesian_pose=[dx*100, dy*100, dz*50, 0, 0, 0], move_mode=1)
# time.sleep(0.008)
state_object, state_object_orienation = p.getBasePositionAndOrientation(self.objectId)
twist_object, twist_object_orienation = p.getBaseVelocity(self.objectId)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
block_speed, block_speed_orienation = p.getBaseVelocity(self.blockId)
block_state, block_state_orienation = p.getBasePositionAndOrientation(self.blockId)
block2_state, block2state_orienation = p.getBasePositionAndOrientation(self.blockId2)
p.addUserDebugLine(lineFromXYZ=currentPosition, lineToXYZ=state_robot, lineColorRGB=[0, 1, 0], lineWidth=2)
x.append(state_robot[0])
y.append(state_robot[1])
z.append(state_robot[2])
self.distance_threshold = 0.05
d = goal_distance(state_robot, state_object)
if (state_robot[0] > self.x_high or state_robot[0] < self.x_low
or state_robot[1] > self.y_high or state_robot[1] < self.y_low
or state_robot[2] > self.z_high or state_robot[2] <= self.z_low + 0.05):
reward = -400
self.eposide = self.eposide + 1
self.out = self.out + 1
print()
print("=" * 50)
print("\033[36meposide{}:out of range\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif is_collision(self.jaka_id, self.blockId, self.blockId2):
reward = -400
self.eposide = self.eposide + 1
self.collision_frequency = self.collision_frequency + 1
print()
print("=" * 50)
print("\033[33meposide{}:collision\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif d < self.distance_threshold:
reward = -1 / self.compute_reward(state_robot, state_object)
self.eposide = self.eposide + 1
self.success_frequency = self.success_frequency + 1
print()
print("=" * 50)
print("\033[31eposide{}:success\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
else:
reward = self.compute_reward(state_robot, state_object)
done = False
self.step_counter += 1
if self.step_counter > MAX_EPISODE_LEN:
# reward = 0
self.eposide = self.eposide + 1
self.overstep_frequency = self.overstep_frequency + 1
print()
print("=" * 50)
print("\033[34meposide{}:overstep\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
info = {"object_position": state_object}
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array(state_object_orienation)
object_velp = np.array(twist_object)
object_velr = np.array(twist_object_orienation)
block_velp = np.array(block_speed)
block_velr = np.array(block_speed_orienation)
block_vel_range = np.array(self.speed)
block_loc = np.array(block_state)
block_rel_pos = np.array(block_loc - robot_pos)
block2_pos = np.array(block2_state)
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy(), reward, done, info
def reset(self):
self.step_counter = 0
p.resetSimulation()
p.configureDebugVisualizer(
p.COV_ENABLE_RENDERING, 0
)
urdfRootPath = pybullet_data.getDataPath()
p.setGravity(0, 0, -10)
planeId = p.loadURDF(os.path.join(urdfRootPath, "plane.urdf"))
dir_path = os.path.dirname(os.path.realpath(__file__))
self.tableId = p.loadURDF(os.path.join(dir_path, "urdf/table.urdf"),
basePosition=[0.2, 0.5, 0],
useFixedBase=True,
)
self.blockId = p.loadURDF(os.path.join(dir_path, "urdf/block4.urdf"),
basePosition=[0.4, 0, 0.47],
useFixedBase=True,
)
self.blockId2 = p.loadURDF(os.path.join(dir_path, "urdf/block.urdf"),
basePosition=[0.6, 0.66, 0.43],
useFixedBase=True,
)
state_object = [0.5, 0.2, 0.8]
dir_path = os.path.dirname(os.path.realpath(__file__))
self.objectId = p.loadURDF(
os.path.join(dir_path, "urdf/goal.urdf"),
basePosition=state_object,
useFixedBase=True,
)
reset_poses = [0, 0.8, 2.4, 1.3, 1, -1.57, 0]
reset_realposes = [0.8, 2.4, 1.3, 1, -1.57, 0]
# 改变路径为你机械臂的URDF文件路径
self.jaka_id = p.loadURDF(
"urdf/jaka_description/urdf/jaka_description.urdf",
basePosition=[0, 0.5, 0.65],
baseOrientation=p.getQuaternionFromEuler([0, 0, 3.14]),
useFixedBase=True,
)
for i in range(len(reset_poses)):
p.resetJointState(self.jaka_id, i, reset_poses[i])
# robot.joint_move(reset_realposes, ABS, True, 1)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
self.plot_obs_boundary()
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
# time.sleep(10)
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array([0, 0, 0, 1]) # quaternions?
object_velp = np.array([0, 0, 0])
object_velr = np.array([0, 0, 0])
block_velp = np.array([0, 0, 0])
block_velr = np.array([0, 0, 0])
block_vel_range = np.array(self.speed)
block_loc = np.array([0, 0, 0])
block_rel_pos = np.array([0, 0, 0])
block2_pos = np.array([0.6, 0.66, 0.43])
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy()
def plot_obs_boundary(self):
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
def render(self, mode="human"):
view_matrix = p.computeViewMatrixFromYawPitchRoll(
cameraTargetPosition=[0.7, 0, 0.65 + 0.05],
distance=0.7,
yaw=90,
pitch=-50,
roll=0,
upAxisIndex=2,
)
proj_matrix = p.computeProjectionMatrixFOV(
fov=60, aspect=float(960) / 720, nearVal=0.1, farVal=100.0
)
(_, _, px, _, _) = p.getCameraImage(
width=960,
height=720,
viewMatrix=view_matrix,
projectionMatrix=proj_matrix,
renderer=p.ER_BULLET_HARDWARE_OPENGL,
)
rgb_array = np.array(px, dtype=np.uint8)
rgb_array = np.reshape(rgb_array, (720, 960, 4))
rgb_array = rgb_array[:, :, :3]
return rgb_array
def _get_state(self):
return self.observation
def close(self):
p.disconnect()
if __name__ == "__main__":
from stable_baselines3 import SAC
from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.callbacks import BaseCallback
class SaveOnBestTrainingRewardCallback(BaseCallback):
"""
Callback for saving a model (the check is done every ``check_freq`` steps)
based on the training reward (in practice, we recommend using ``EvalCallback``).
:param check_freq:
:param log_dir: Path to the folder where the model will be saved.
It must contains the file created by the ``Monitor`` wrapper.
:param verbose: Verbosity level.
"""
def __init__(self, check_freq: int, log_dir: str, verbose: int = 1):
super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
self.check_freq = check_freq
self.log_dir = log_dir
self.save_path = os.path.join(log_dir, 'best_model')
self.best_mean_reward = -np.inf
def _init_callback(self) -> None:
# Create folder if needed
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.check_freq == 0:
# Retrieve training reward
x, y = ts2xy(load_results(self.log_dir), 'timesteps')
if len(x) > 0:
# Mean training reward over the last 100 episodes
mean_reward = np.mean(y[-100:])
if self.verbose > 0:
print(f"Num timesteps: {self.num_timesteps}")
print(
f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}")
# New best model, you could save the agent here
if mean_reward > self.best_mean_reward:
self.best_mean_reward = mean_reward
# Example for saving best model
if self.verbose > 0:
print(f"Saving new best model to {self.save_path}")
self.model.save(self.save_path)
return True
tempt = 1
log_dir = './tensorboard/DOA_SAC_callback/'
os.makedirs(log_dir, exist_ok=True)
env = jakaEnv()
env = Monitor(env, log_dir)
if tempt:
model = SAC('MlpPolicy', env=env, verbose=1, tensorboard_log=log_dir,
device="cuda"
)
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)
model.learn(total_timesteps=4000000, callback=callback)
model.save('model/DOA_SAC_ENV_callback')
del model
else:
obs = env.reset()
# 改变路径为你保存模型的路径
model = SAC.load(r'best_model.zip', env=env)
for j in range(50):
for i in range(2000):
action, _state = model.predict(obs)
order1 = 4
cutoff_freq1 = 1.2
b1, a1 = signal.butter(order1, cutoff_freq1, fs=2.5, btype='low')
# 应用滤波器
action[0] = signal.lfilter(b1, a1, [action[0]])
action[1] = signal.lfilter(b1, a1, [action[1]])
action[2] = signal.lfilter(b1, a1, [action[2]])
obs, reward, done, info = env.step(action)
# env.render()
if done:
print('done')
# obs = env.reset()
time.sleep(30)
break
break
# 三维
# fig1 = plt.figure("机械臂运行轨迹")
# ax = fig1.add_subplot(projection="3d") # 三维图形
# ax.plot(x, y, z, label="simulation")
# # plt.xlabel("X")
# # plt.ylabel("Y")
# ax.set_xlabel("X坐标")
# ax.set_ylabel("Y坐标")
# ax.set_zlabel("Z坐标")
# ax.legend()
# # plt.savefig('jaka_3d.png')
# plt.show()
#
# # xy方向
# fig2 = plt.figure()
# plt.plot(x, y, label="xy-simulation", color="g")
# plt.xlabel("X")
# plt.ylabel("Y")
# plt.savefig("jaka_xy.png")
# plt.show()
#
# # xy方向
# fig3 = plt.figure()
# plt.plot(y, z, label="yz-simulation", color="g")
# plt.xlabel("y")
# plt.ylabel("z")
# plt.savefig("jaka_yz.png")
# plt.show()