RL-PowerTracking-new/DOA_SAC_sim2real.py

685 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import torch
import torch.nn as nn
from torch.multiprocessing import Process, Queue, set_start_method
import time
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import os
import pybullet as p
import pybullet_data
import math
import numpy as np
import random
from scipy import signal
import sys
# 动态路径配置为硬件模块如jkrc实现动态路径配置确保在不同环境中均可正确加载。
try:
import jkrc
JAKA_AVAILABLE = True
except ImportError:
jkrc = None
JAKA_AVAILABLE = False
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
MAX_EPISODE_LEN = 20 * 100
x = []
y = []
z = []
# 运动模式
# PI=3.1415926
# ABS = 0 # 绝对运动
# INCR = 1 # 增量运动
# Enable = True
# Disable = False
# robot = jkrc.RC("192.168.31.5")#返回一个机器人对象
# robot.login()#登录
# # robot.servo_move_use_none_filter()
# robot.power_on() #上电
# robot.enable_robot()
# # joint_pos=[0, 1.57, 1.57, 1.57, -1.57, 0]
# # robot.joint_move(joint_pos,ABS,True,1)
# robot.servo_move_enable(Enable) #进入位置控制模式
def goal_distance(goal_a, goal_b):
return np.linalg.norm(np.array(goal_a) - np.array(goal_b), axis=-1)
def is_collision(robotId, bodyA, bodyB):
is_bool1 = p.getContactPoints(robotId, bodyA)
is_bool2 = p.getContactPoints(robotId, bodyB)
if is_bool1:
return True
elif is_bool2:
return True
else:
return False
class jakaEnv(gym.Env):
metadata = {"render.modes": ["human"]}
def __init__(self):
self.step_counter = 0
self.eposide = 0
self.collision_frequency = 0
self.success_frequency = 0
self.overstep_frequency = 0
self.out = 0
self.reward_type = "dense"
# 奖励函数权重参数(可调节)
self.alpha = 1.0 # 距离奖励权重
self.beta = 0.1 # 动作惩罚权重
self.gamma = 200 # 碰撞惩罚权重
self.delta = 100 # 边界惩罚权重
p.connect(
p.GUI,
# options="--background_color_red=0.0 --background_color_green=0.93--background_color_blue=0.54",
)
p.resetDebugVisualizerCamera(
cameraDistance=1.25,
cameraYaw=45,
cameraPitch=-30,
cameraTargetPosition=[0.65, -0.0, 0.65],
)
self.x_low = 0.3
self.x_high = 0.7
self.y_low = -0.05
self.y_high = 1.05
self.z_low = 0.65
self.z_high = 1.3
self.speed = [0.3, 0.5, 0.8, 1.0]
self.action_space = spaces.Box(np.array([-1] * 3), np.array([1] * 3))
# 修改为38维观测空间
self.observation_space = spaces.Box(
np.array([-1] * 38, np.float32),
np.array([1] * 38, np.float32)
)
def compute_reward(self, achieved_goal, goal, action=None):
d = goal_distance(achieved_goal, goal)
# 归一化距离奖励使用tanh函数
distance_reward = -np.tanh(d)
# 动作幅度惩罚L2正则化
action_penalty = 0
if action is not None:
action_penalty = np.linalg.norm(action)
# 碰撞惩罚(硬边界)
collision_penalty = self.gamma if is_collision(self.jaka_id, self.blockId, self.blockId2) else 0
# 边界惩罚(指数级增长)
boundary_penalty = 0
if achieved_goal[0] > self.x_high or achieved_goal[0] < self.x_low:
boundary_penalty += self.delta * np.exp(abs(achieved_goal[0] - self.x_high) + abs(achieved_goal[0] - self.x_low))
if achieved_goal[1] > self.y_high or achieved_goal[1] < self.y_low:
boundary_penalty += self.delta * np.exp(abs(achieved_goal[1] - self.y_high) + abs(achieved_goal[1] - self.y_low))
if achieved_goal[2] > self.z_high or achieved_goal[2] <= self.z_low + 0.05:
boundary_penalty += self.delta * np.exp(abs(achieved_goal[2] - self.z_high) + abs(achieved_goal[2] - self.z_low))
# 组合奖励项
reward = self.alpha * distance_reward \
- self.beta * action_penalty \
- collision_penalty \
- boundary_penalty
return reward
def step(self, action):
p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING)
orientation = p.getQuaternionFromEuler([-math.pi / 2, -math.pi, math.pi / 2.])
dv = 0.05
dx = action[0] * dv
dy = action[1] * dv
dz = action[2] * dv
# print(action)
currentPose = p.getLinkState(self.jaka_id, 6)
currentPosition = currentPose[0]
newPosition = [
currentPosition[0] + dx,
currentPosition[1] + dy,
currentPosition[2] + dz,
]
jointPoses = p.calculateInverseKinematics(
self.jaka_id, 6, newPosition, orientation
)[0:6]
p.setJointMotorControlArray(
self.jaka_id,
list(range(1, 7)),
p.POSITION_CONTROL,
list(jointPoses),
)
print(jointPoses)
p.resetBaseVelocity(self.blockId, linearVelocity=[0, random.choice(self.speed), 0])
p.stepSimulation()
# robot.servo_move_enable(True)
# robot.servo_p(cartesian_pose=[dx*100, dy*100, dz*50, 0, 0, 0], move_mode=1)
# time.sleep(0.008)
state_object, state_object_orienation = p.getBasePositionAndOrientation(self.objectId)
twist_object, twist_object_orienation = p.getBaseVelocity(self.objectId)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
block_speed, block_speed_orienation = p.getBaseVelocity(self.blockId)
block_state, block_state_orienation = p.getBasePositionAndOrientation(self.blockId)
block2_state, block2state_orienation = p.getBasePositionAndOrientation(self.blockId2)
p.addUserDebugLine(lineFromXYZ=currentPosition, lineToXYZ=state_robot, lineColorRGB=[0, 1, 0], lineWidth=2)
x.append(state_robot[0])
y.append(state_robot[1])
z.append(state_robot[2])
self.distance_threshold = 0.05
d = goal_distance(state_robot, state_object)
# 使用新的奖励函数
reward = self.compute_reward(state_robot, state_object, action)
if (state_robot[0] > self.x_high or state_robot[0] < self.x_low
or state_robot[1] > self.y_high or state_robot[1] < self.y_low
or state_robot[2] > self.z_high or state_robot[2] <= self.z_low + 0.05):
reward = -400
self.eposide = self.eposide + 1
self.out = self.out + 1
print()
print("=" * 50)
print("\033[36meposide{}:out of range\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif is_collision(self.jaka_id, self.blockId, self.blockId2):
reward = -400
self.eposide = self.eposide + 1
self.collision_frequency = self.collision_frequency + 1
print()
print("=" * 50)
print("\033[33meposide{}:collision\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
elif d < self.distance_threshold:
reward = -1 / self.compute_reward(state_robot, state_object, action)
self.eposide = self.eposide + 1
self.success_frequency = self.success_frequency + 1
print()
print("=" * 50)
print("\033[31meposide{}:success\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
else:
reward = self.compute_reward(state_robot, state_object, action)
done = False
self.step_counter += 1
if self.step_counter > MAX_EPISODE_LEN:
# reward = 0
self.eposide = self.eposide + 1
self.overstep_frequency = self.overstep_frequency + 1
print()
print("=" * 50)
print("\033[34meposide{}:overstep\033[0m".format(self.eposide))
print()
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
self.collision_frequency,
self.success_frequency,
self.overstep_frequency, self.out))
print("=" * 50)
print()
done = True
info = {"object_position": state_object}
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array(state_object_orienation)
object_velp = np.array(twist_object)
object_velr = np.array(twist_object_orienation)
block_velp = np.array(block_speed)
block_velr = np.array(block_speed_orienation)
block_vel_range = np.array(self.speed)
block_loc = np.array(block_state)
block_rel_pos = np.array(block_loc - robot_pos)
block2_pos = np.array(block2_state)
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy(), reward, done, info
def reset(self):
self.step_counter = 0
p.resetSimulation()
p.configureDebugVisualizer(
p.COV_ENABLE_RENDERING, 0
)
urdfRootPath = pybullet_data.getDataPath()
p.setGravity(0, 0, -10)
planeId = p.loadURDF(os.path.join(urdfRootPath, "plane.urdf"))
dir_path = os.path.dirname(os.path.realpath(__file__))
self.tableId = p.loadURDF(os.path.join(dir_path, "urdf/table.urdf"),
basePosition=[0.2, 0.5, 0],
useFixedBase=True,
)
self.blockId = p.loadURDF(os.path.join(dir_path, "urdf/block4.urdf"),
basePosition=[0.4, 0, 0.47],
useFixedBase=True,
)
self.blockId2 = p.loadURDF(os.path.join(dir_path, "urdf/block.urdf"),
basePosition=[0.6, 0.66, 0.43],
useFixedBase=True,
)
state_object = [0.5, 0.2, 0.8]
dir_path = os.path.dirname(os.path.realpath(__file__))
self.objectId = p.loadURDF(
os.path.join(dir_path, "urdf/goal.urdf"),
basePosition=state_object,
useFixedBase=True,
)
reset_poses = [0, 0.8, 2.4, 1.3, 1, -1.57, 0]
reset_realposes = [0.8, 2.4, 1.3, 1, -1.57, 0]
# 改变路径为你机械臂的URDF文件路径
self.jaka_id = p.loadURDF(
"urdf/jaka_description/urdf/jaka_description.urdf",
basePosition=[0, 0.5, 0.65],
baseOrientation=p.getQuaternionFromEuler([0, 0, 3.14]),
useFixedBase=True,
)
for i in range(len(reset_poses)):
p.resetJointState(self.jaka_id, i, reset_poses[i])
# robot.joint_move(reset_realposes, ABS, True, 1)
state_robot = p.getLinkState(self.jaka_id, 6)[0]
self.plot_obs_boundary()
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
# time.sleep(10)
robot_pos = np.array(state_robot)
object_pos = np.array(state_object)
object_rel_pos = object_pos - robot_pos
object_rot = np.array([0, 0, 0, 1]) # quaternions?
object_velp = np.array([0, 0, 0])
object_velr = np.array([0, 0, 0])
block_velp = np.array([0, 0, 0])
block_velr = np.array([0, 0, 0])
block_vel_range = np.array(self.speed)
block_loc = np.array([0, 0, 0])
block_rel_pos = np.array([0, 0, 0])
block2_pos = np.array([0.6, 0.66, 0.43])
obs = np.concatenate(
[
robot_pos.ravel(),
object_pos.ravel(),
object_rel_pos.ravel(),
object_rot.ravel(),
object_velp.ravel(),
object_velr.ravel(),
block_velp.ravel(),
block_velr.ravel(),
block_vel_range.ravel(),
block_loc.ravel(),
block_rel_pos.ravel(),
block2_pos.ravel(),
]
)
return obs.copy()
def plot_obs_boundary(self):
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
lineToXYZ=[self.x_low, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_low])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
lineToXYZ=[self.x_high, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_low, self.z_low],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_high, self.z_low],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_high, self.y_low, self.z_high])
p.addUserDebugLine(
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
lineToXYZ=[self.x_low, self.y_high, self.z_high])
def render(self, mode="human"):
view_matrix = p.computeViewMatrixFromYawPitchRoll(
cameraTargetPosition=[0.7, 0, 0.65 + 0.05],
distance=0.7,
yaw=90,
pitch=-50,
roll=0,
upAxisIndex=2,
)
proj_matrix = p.computeProjectionMatrixFOV(
fov=60, aspect=float(960) / 720, nearVal=0.1, farVal=100.0
)
(_, _, px, _, _) = p.getCameraImage(
width=960,
height=720,
viewMatrix=view_matrix,
projectionMatrix=proj_matrix,
renderer=p.ER_BULLET_HARDWARE_OPENGL,
)
rgb_array = np.array(px, dtype=np.uint8)
rgb_array = np.reshape(rgb_array, (720, 960, 4))
rgb_array = rgb_array[:, :, :3]
return rgb_array
def _get_state(self):
return self.observation
def close(self):
p.disconnect()
if __name__ == "__main__":
from stable_baselines3 import SAC
from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.callbacks import BaseCallback, EvalCallback
class SaveOnBestTrainingRewardCallback(BaseCallback):
"""
Callback for saving a model (the check is done every ``check_freq`` steps)
based on the training reward (in practice, we recommend using ``EvalCallback``).
:param check_freq:
:param log_dir: Path to the folder where the model will be saved.
It must contains the file created by the ``Monitor`` wrapper.
:param verbose: Verbosity level.
"""
def __init__(self, check_freq: int, log_dir: str, verbose: int = 1):
super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
self.check_freq = check_freq
self.log_dir = log_dir
self.save_path = os.path.join(log_dir, 'best_model')
self.best_mean_reward = -np.inf
def _init_callback(self) -> None:
# Create folder if needed
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.check_freq == 0:
# Retrieve training reward
x, y = ts2xy(load_results(self.log_dir), 'timesteps')
if len(x) > 0:
# Mean training reward over the last 100 episodes
mean_reward = np.mean(y[-100:])
if self.verbose > 0:
print(f"Num timesteps: {self.num_timesteps}")
print(
f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}")
# New best model, you could save the agent here
if mean_reward > self.best_mean_reward:
self.best_mean_reward = mean_reward
# Example for saving best model
if self.verbose > 0:
print(f"Saving new best model to {self.save_path}")
self.model.save(self.save_path)
return True
class PeriodicModelCheckpointCallback(BaseCallback):
"""
Callback for periodically saving the model at specified intervals,
storing each checkpoint in its own subdirectory.
"""
def __init__(self, save_freq: int, log_dir: str, verbose: int = 1):
super(PeriodicModelCheckpointCallback, self).__init__(verbose)
self.save_freq = save_freq
self.log_dir = log_dir
self.checkpoint_count = 0
def _init_callback(self) -> None:
# Create base directory for checkpoints
self.checkpoint_dir = os.path.join(self.log_dir, 'checkpoints')
os.makedirs(self.checkpoint_dir, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.save_freq == 0:
# Create new checkpoint directory
checkpoint_path = os.path.join(self.checkpoint_dir, f'checkpoint_{self.n_calls}')
os.makedirs(checkpoint_path, exist_ok=True)
# Save model
if self.verbose > 0:
print(f'Saving model checkpoint to {checkpoint_path}')
self.model.save(os.path.join(checkpoint_path, 'model'))
self.checkpoint_count += 1
return True
tempt = 0
log_dir = './tensorboard/DOA_SAC_callback/'
os.makedirs(log_dir, exist_ok=True)
env = jakaEnv()
env = Monitor(env, log_dir)
if tempt:
# 检查CUDA可用性并自动选择设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# 自动检测可用GPU数量
num_gpus = torch.cuda.device_count()
print(f"Number of available GPUs: {num_gpus}")
model = SAC('MlpPolicy', env=env, verbose=1, tensorboard_log=log_dir,
device=device, # 自动选择设备
buffer_size=200000, # 增大经验回放缓冲区至200万
batch_size=512 * max(1, num_gpus), # 根据GPU数量动态调整批量大小
gamma=0.995, # 提高折扣因子至0.995
tau=0.001, # 减小软更新参数至0.001
ent_coef='auto_0.1', # 自动调节熵系数目标至0.1
learning_rate=5e-4, # 提高学习率至5e-4
target_update_interval=1, # 提高目标网络更新频率
gradient_steps=4 # 增加梯度更新频率至4次/step
)
# 如果有多个GPU启用数据并行
if num_gpus > 1:
print("Using DataParallel for multi-GPU training")
model.policy = nn.DataParallel(model.policy)
# 创建回调函数列表,包含原有的最佳模型保存回调和新的周期性检查点回调
callback_list = [
SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir),
PeriodicModelCheckpointCallback(save_freq=5000, log_dir=log_dir)
]
model.learn(
total_timesteps=4000000,
callback=callback_list,
tb_log_name="SAC_2",
log_interval=50 # 添加日志间隔以监控训练进度
)
model.save('model/DOA_SAC_ENV_callback')
del model
else:
obs = env.reset()
# 改变路径为你保存模型的路径
model = SAC.load(r'tensorboard/DOA_SAC_callback/best_model.zip', env=env)
for j in range(50):
for i in range(2000):
action, _state = model.predict(obs)
order1 = 4
cutoff_freq1 = 1.2
b1, a1 = signal.butter(order1, cutoff_freq1, fs=2.5, btype='low')
# 应用滤波器
action[0] = signal.lfilter(b1, a1, [action[0]])
action[1] = signal.lfilter(b1, a1, [action[1]])
action[2] = signal.lfilter(b1, a1, [action[2]])
obs, reward, done, info = env.step(action)
# env.render()
if done:
print('done')
# obs = env.reset()
time.sleep(30)
break
break
# 三维
# fig1 = plt.figure("机械臂运行轨迹")
# ax = fig1.add_subplot(projection="3d") # 三维图形
# ax.plot(x, y, z, label="simulation")
# # plt.xlabel("X")
# # plt.ylabel("Y")
# ax.set_xlabel("X坐标")
# ax.set_ylabel("Y坐标")
# ax.set_zlabel("Z坐标")
# ax.legend()
# # plt.savefig('jaka_3d.png')
# plt.show()
#
# # xy方向
# fig2 = plt.figure()
# plt.plot(x, y, label="xy-simulation", color="g")
# plt.xlabel("X")
# plt.ylabel("Y")
# plt.savefig("jaka_xy.png")
# plt.show()
#
# # xy方向
# fig3 = plt.figure()
# plt.plot(y, z, label="yz-simulation", color="g")
# plt.xlabel("y")
# plt.ylabel("z")
# plt.savefig("jaka_yz.png")
# plt.show()
def train_parallel(num_processes):
"""多进程并行训练函数"""
set_start_method('spawn')
# 创建共享模型使用Stable Baselines3的SAC
env = jakaEnv() # 创建环境实例
model = SAC('MlpPolicy', env=env, verbose=1, device="cuda") # 使用CUDA加速
shared_model = model.policy.to(torch.device('cuda')) # 显式指定模型在GPU上
shared_model.share_memory() # 共享模型参数
# 创建进程列表
processes = []
for rank in range(num_processes):
p = Process(target=train_process, args=(rank, shared_model))
p.start()
processes.append(p)
# 等待所有进程完成
for p in processes:
p.join()
def train_process(rank, shared_model):
"""单个训练进程"""
# 为每个进程分配独立GPU
device = torch.device(f'cuda:{rank % torch.cuda.device_count()}')
# 创建独立环境实例
env = create_arm_environment() # 替换为实际的环境创建函数
# 创建本地模型副本
local_model = SAC().to(device)
local_model.load_state_dict(shared_model.state_dict())
# 在此处替换原有训练循环为并行版本
while True:
# 训练本地模型...
# 同步参数到共享模型
with torch.no_grad():
for param, shared_param in zip(local_model.parameters(), shared_model.parameters()):
shared_param.copy_(param)
def create_arm_environment():
"""创建机械臂环境实例"""
return jakaEnv() # 返回机械臂环境实例
if __name__ == '__main__':
# 启动并行训练使用4个进程为例
train_parallel(4)