534 lines
19 KiB
Python
534 lines
19 KiB
Python
import time
|
||
import gym
|
||
from gym import error, spaces, utils
|
||
from gym.utils import seeding
|
||
import os
|
||
import pybullet as p
|
||
import pybullet_data
|
||
import math
|
||
import numpy as np
|
||
import random
|
||
from scipy import signal
|
||
import sys
|
||
# sys.path.append('D:\\vs2019ws\PythonCtt\PythonCtt')
|
||
import jkrc
|
||
|
||
import matplotlib as mpl
|
||
from mpl_toolkits.mplot3d import Axes3D
|
||
import matplotlib.pyplot as plt
|
||
import pandas as pd
|
||
|
||
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
|
||
|
||
MAX_EPISODE_LEN = 20 * 100
|
||
x = []
|
||
y = []
|
||
z = []
|
||
|
||
|
||
# 运动模式
|
||
# PI=3.1415926
|
||
# ABS = 0 # 绝对运动
|
||
# INCR = 1 # 增量运动
|
||
# Enable = True
|
||
# Disable = False
|
||
|
||
# robot = jkrc.RC("192.168.31.5")#返回一个机器人对象
|
||
# robot.login()#登录
|
||
# # robot.servo_move_use_none_filter()
|
||
# robot.power_on() #上电
|
||
# robot.enable_robot()
|
||
# # joint_pos=[0, 1.57, 1.57, 1.57, -1.57, 0]
|
||
# # robot.joint_move(joint_pos,ABS,True,1)
|
||
# robot.servo_move_enable(Enable) #进入位置控制模式
|
||
|
||
def goal_distance(goal_a, goal_b):
|
||
return np.linalg.norm(np.array(goal_a) - np.array(goal_b), axis=-1)
|
||
|
||
|
||
def is_collision(robotId, bodyA, bodyB):
|
||
is_bool1 = p.getContactPoints(robotId, bodyA)
|
||
is_bool2 = p.getContactPoints(robotId, bodyB)
|
||
if is_bool1:
|
||
return True
|
||
elif is_bool2:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
class jakaEnv(gym.Env):
|
||
metadata = {"render.modes": ["human"]}
|
||
|
||
def __init__(self):
|
||
self.step_counter = 0
|
||
self.eposide = 0
|
||
self.collision_frequency = 0
|
||
self.success_frequency = 0
|
||
self.overstep_frequency = 0
|
||
self.out = 0
|
||
self.reward_type = "dense"
|
||
p.connect(
|
||
p.GUI,
|
||
# options="--background_color_red=0.0 --background_color_green=0.93--background_color_blue=0.54",
|
||
)
|
||
p.resetDebugVisualizerCamera(
|
||
cameraDistance=1.25,
|
||
cameraYaw=45,
|
||
cameraPitch=-30,
|
||
cameraTargetPosition=[0.65, -0.0, 0.65],
|
||
)
|
||
self.x_low = 0.3
|
||
self.x_high = 0.7
|
||
self.y_low = -0.05
|
||
self.y_high = 1.05
|
||
self.z_low = 0.65
|
||
self.z_high = 1.3
|
||
self.speed = [0.3, 0.5, 0.8, 1.0]
|
||
self.action_space = spaces.Box(np.array([-1] * 3), np.array([1] * 3))
|
||
self.observation_space = spaces.Box(np.array([-1] * 38, np.float32), np.array([1] * 38, np.float32))
|
||
|
||
def compute_reward(self, achieved_goal, goal):
|
||
d = goal_distance(achieved_goal, goal)
|
||
if self.reward_type == "sparse":
|
||
return -(d > self.distance_threshold).astyp(np.float32)
|
||
else:
|
||
return -d
|
||
|
||
def step(self, action):
|
||
p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING)
|
||
orientation = p.getQuaternionFromEuler([-math.pi / 2, -math.pi, math.pi / 2.])
|
||
|
||
dv = 0.05
|
||
dx = action[0] * dv
|
||
dy = action[1] * dv
|
||
dz = action[2] * dv
|
||
# print(action)
|
||
currentPose = p.getLinkState(self.jaka_id, 6)
|
||
currentPosition = currentPose[0]
|
||
newPosition = [
|
||
currentPosition[0] + dx,
|
||
currentPosition[1] + dy,
|
||
currentPosition[2] + dz,
|
||
]
|
||
jointPoses = p.calculateInverseKinematics(
|
||
self.jaka_id, 6, newPosition, orientation
|
||
)[0:6]
|
||
|
||
p.setJointMotorControlArray(
|
||
self.jaka_id,
|
||
list(range(1, 7)),
|
||
p.POSITION_CONTROL,
|
||
list(jointPoses),
|
||
)
|
||
print(jointPoses)
|
||
p.resetBaseVelocity(self.blockId, linearVelocity=[0, random.choice(self.speed), 0])
|
||
p.stepSimulation()
|
||
# robot.servo_move_enable(True)
|
||
|
||
# robot.servo_p(cartesian_pose=[dx*100, dy*100, dz*50, 0, 0, 0], move_mode=1)
|
||
# time.sleep(0.008)
|
||
|
||
state_object, state_object_orienation = p.getBasePositionAndOrientation(self.objectId)
|
||
twist_object, twist_object_orienation = p.getBaseVelocity(self.objectId)
|
||
state_robot = p.getLinkState(self.jaka_id, 6)[0]
|
||
block_speed, block_speed_orienation = p.getBaseVelocity(self.blockId)
|
||
block_state, block_state_orienation = p.getBasePositionAndOrientation(self.blockId)
|
||
block2_state, block2state_orienation = p.getBasePositionAndOrientation(self.blockId2)
|
||
p.addUserDebugLine(lineFromXYZ=currentPosition, lineToXYZ=state_robot, lineColorRGB=[0, 1, 0], lineWidth=2)
|
||
x.append(state_robot[0])
|
||
y.append(state_robot[1])
|
||
z.append(state_robot[2])
|
||
|
||
self.distance_threshold = 0.05
|
||
d = goal_distance(state_robot, state_object)
|
||
if (state_robot[0] > self.x_high or state_robot[0] < self.x_low
|
||
or state_robot[1] > self.y_high or state_robot[1] < self.y_low
|
||
or state_robot[2] > self.z_high or state_robot[2] <= self.z_low + 0.05):
|
||
reward = -400
|
||
self.eposide = self.eposide + 1
|
||
self.out = self.out + 1
|
||
print()
|
||
print("=" * 50)
|
||
print("\033[36meposide{}:out of range\033[0m".format(self.eposide))
|
||
print()
|
||
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
|
||
self.collision_frequency,
|
||
self.success_frequency,
|
||
self.overstep_frequency, self.out))
|
||
print("=" * 50)
|
||
print()
|
||
done = True
|
||
|
||
elif is_collision(self.jaka_id, self.blockId, self.blockId2):
|
||
reward = -400
|
||
self.eposide = self.eposide + 1
|
||
self.collision_frequency = self.collision_frequency + 1
|
||
print()
|
||
print("=" * 50)
|
||
print("\033[33meposide{}:collision\033[0m".format(self.eposide))
|
||
print()
|
||
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
|
||
self.collision_frequency,
|
||
self.success_frequency,
|
||
self.overstep_frequency, self.out))
|
||
print("=" * 50)
|
||
print()
|
||
done = True
|
||
elif d < self.distance_threshold:
|
||
reward = -1 / self.compute_reward(state_robot, state_object)
|
||
self.eposide = self.eposide + 1
|
||
self.success_frequency = self.success_frequency + 1
|
||
print()
|
||
print("=" * 50)
|
||
print("\033[31eposide{}:success\033[0m".format(self.eposide))
|
||
|
||
print()
|
||
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
|
||
self.collision_frequency,
|
||
self.success_frequency,
|
||
self.overstep_frequency, self.out))
|
||
print("=" * 50)
|
||
print()
|
||
done = True
|
||
else:
|
||
reward = self.compute_reward(state_robot, state_object)
|
||
done = False
|
||
|
||
self.step_counter += 1
|
||
|
||
if self.step_counter > MAX_EPISODE_LEN:
|
||
# reward = 0
|
||
self.eposide = self.eposide + 1
|
||
self.overstep_frequency = self.overstep_frequency + 1
|
||
print()
|
||
print("=" * 50)
|
||
print("\033[34meposide{}:overstep\033[0m".format(self.eposide))
|
||
print()
|
||
print("\t当前碰撞次数:{}\t当前成功次数:{}\t当前超时次数:{}\t当前超出范围次数:{}".format(
|
||
self.collision_frequency,
|
||
self.success_frequency,
|
||
self.overstep_frequency, self.out))
|
||
print("=" * 50)
|
||
print()
|
||
done = True
|
||
|
||
info = {"object_position": state_object}
|
||
|
||
robot_pos = np.array(state_robot)
|
||
object_pos = np.array(state_object)
|
||
object_rel_pos = object_pos - robot_pos
|
||
object_rot = np.array(state_object_orienation)
|
||
object_velp = np.array(twist_object)
|
||
object_velr = np.array(twist_object_orienation)
|
||
block_velp = np.array(block_speed)
|
||
block_velr = np.array(block_speed_orienation)
|
||
block_vel_range = np.array(self.speed)
|
||
block_loc = np.array(block_state)
|
||
block_rel_pos = np.array(block_loc - robot_pos)
|
||
block2_pos = np.array(block2_state)
|
||
|
||
obs = np.concatenate(
|
||
[
|
||
robot_pos.ravel(),
|
||
object_pos.ravel(),
|
||
object_rel_pos.ravel(),
|
||
object_rot.ravel(),
|
||
object_velp.ravel(),
|
||
object_velr.ravel(),
|
||
block_velp.ravel(),
|
||
block_velr.ravel(),
|
||
block_vel_range.ravel(),
|
||
block_loc.ravel(),
|
||
block_rel_pos.ravel(),
|
||
block2_pos.ravel(),
|
||
]
|
||
)
|
||
|
||
return obs.copy(), reward, done, info
|
||
|
||
def reset(self):
|
||
self.step_counter = 0
|
||
p.resetSimulation()
|
||
p.configureDebugVisualizer(
|
||
p.COV_ENABLE_RENDERING, 0
|
||
)
|
||
urdfRootPath = pybullet_data.getDataPath()
|
||
p.setGravity(0, 0, -10)
|
||
planeId = p.loadURDF(os.path.join(urdfRootPath, "plane.urdf"))
|
||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||
|
||
self.tableId = p.loadURDF(os.path.join(dir_path, "urdf/table.urdf"),
|
||
basePosition=[0.2, 0.5, 0],
|
||
useFixedBase=True,
|
||
)
|
||
self.blockId = p.loadURDF(os.path.join(dir_path, "urdf/block4.urdf"),
|
||
basePosition=[0.4, 0, 0.47],
|
||
useFixedBase=True,
|
||
)
|
||
self.blockId2 = p.loadURDF(os.path.join(dir_path, "urdf/block.urdf"),
|
||
basePosition=[0.6, 0.66, 0.43],
|
||
useFixedBase=True,
|
||
)
|
||
state_object = [0.5, 0.2, 0.8]
|
||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||
self.objectId = p.loadURDF(
|
||
os.path.join(dir_path, "urdf/goal.urdf"),
|
||
basePosition=state_object,
|
||
useFixedBase=True,
|
||
)
|
||
|
||
reset_poses = [0, 0.8, 2.4, 1.3, 1, -1.57, 0]
|
||
reset_realposes = [0.8, 2.4, 1.3, 1, -1.57, 0]
|
||
# 改变路径为你机械臂的URDF文件路径
|
||
self.jaka_id = p.loadURDF(
|
||
"urdf/jaka_description/urdf/jaka_description.urdf",
|
||
basePosition=[0, 0.5, 0.65],
|
||
baseOrientation=p.getQuaternionFromEuler([0, 0, 3.14]),
|
||
useFixedBase=True,
|
||
)
|
||
|
||
for i in range(len(reset_poses)):
|
||
p.resetJointState(self.jaka_id, i, reset_poses[i])
|
||
# robot.joint_move(reset_realposes, ABS, True, 1)
|
||
|
||
state_robot = p.getLinkState(self.jaka_id, 6)[0]
|
||
self.plot_obs_boundary()
|
||
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1)
|
||
# time.sleep(10)
|
||
|
||
robot_pos = np.array(state_robot)
|
||
object_pos = np.array(state_object)
|
||
object_rel_pos = object_pos - robot_pos
|
||
object_rot = np.array([0, 0, 0, 1]) # quaternions?
|
||
object_velp = np.array([0, 0, 0])
|
||
object_velr = np.array([0, 0, 0])
|
||
block_velp = np.array([0, 0, 0])
|
||
block_velr = np.array([0, 0, 0])
|
||
block_vel_range = np.array(self.speed)
|
||
block_loc = np.array([0, 0, 0])
|
||
block_rel_pos = np.array([0, 0, 0])
|
||
block2_pos = np.array([0.6, 0.66, 0.43])
|
||
|
||
obs = np.concatenate(
|
||
[
|
||
robot_pos.ravel(),
|
||
object_pos.ravel(),
|
||
object_rel_pos.ravel(),
|
||
object_rot.ravel(),
|
||
object_velp.ravel(),
|
||
object_velr.ravel(),
|
||
block_velp.ravel(),
|
||
block_velr.ravel(),
|
||
block_vel_range.ravel(),
|
||
block_loc.ravel(),
|
||
block_rel_pos.ravel(),
|
||
block2_pos.ravel(),
|
||
|
||
]
|
||
)
|
||
|
||
return obs.copy()
|
||
|
||
def plot_obs_boundary(self):
|
||
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
|
||
lineToXYZ=[self.x_low, self.y_high, self.z_low])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
|
||
lineToXYZ=[self.x_high, self.y_low, self.z_low])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_low, self.z_low],
|
||
lineToXYZ=[self.x_low, self.y_low, self.z_high])
|
||
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
|
||
lineToXYZ=[self.x_high, self.y_low, self.z_low])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
|
||
lineToXYZ=[self.x_low, self.y_high, self.z_low])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_high, self.z_low],
|
||
lineToXYZ=[self.x_high, self.y_high, self.z_high])
|
||
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_low, self.z_low],
|
||
lineToXYZ=[self.x_high, self.y_low, self.z_high])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_high, self.z_low],
|
||
lineToXYZ=[self.x_low, self.y_high, self.z_high])
|
||
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
|
||
lineToXYZ=[self.x_low, self.y_high, self.z_high])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_low, self.y_low, self.z_high],
|
||
lineToXYZ=[self.x_high, self.y_low, self.z_high])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
|
||
lineToXYZ=[self.x_high, self.y_low, self.z_high])
|
||
p.addUserDebugLine(
|
||
lineFromXYZ=[self.x_high, self.y_high, self.z_high],
|
||
lineToXYZ=[self.x_low, self.y_high, self.z_high])
|
||
|
||
def render(self, mode="human"):
|
||
view_matrix = p.computeViewMatrixFromYawPitchRoll(
|
||
cameraTargetPosition=[0.7, 0, 0.65 + 0.05],
|
||
distance=0.7,
|
||
yaw=90,
|
||
pitch=-50,
|
||
roll=0,
|
||
upAxisIndex=2,
|
||
)
|
||
proj_matrix = p.computeProjectionMatrixFOV(
|
||
fov=60, aspect=float(960) / 720, nearVal=0.1, farVal=100.0
|
||
)
|
||
(_, _, px, _, _) = p.getCameraImage(
|
||
width=960,
|
||
height=720,
|
||
viewMatrix=view_matrix,
|
||
projectionMatrix=proj_matrix,
|
||
renderer=p.ER_BULLET_HARDWARE_OPENGL,
|
||
)
|
||
|
||
rgb_array = np.array(px, dtype=np.uint8)
|
||
rgb_array = np.reshape(rgb_array, (720, 960, 4))
|
||
|
||
rgb_array = rgb_array[:, :, :3]
|
||
return rgb_array
|
||
|
||
def _get_state(self):
|
||
return self.observation
|
||
|
||
def close(self):
|
||
p.disconnect()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
from stable_baselines3 import SAC
|
||
from stable_baselines3.common import results_plotter
|
||
from stable_baselines3.common.monitor import Monitor
|
||
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
|
||
from stable_baselines3.common.noise import NormalActionNoise
|
||
from stable_baselines3.common.callbacks import BaseCallback
|
||
|
||
|
||
class SaveOnBestTrainingRewardCallback(BaseCallback):
|
||
"""
|
||
Callback for saving a model (the check is done every ``check_freq`` steps)
|
||
based on the training reward (in practice, we recommend using ``EvalCallback``).
|
||
|
||
:param check_freq:
|
||
:param log_dir: Path to the folder where the model will be saved.
|
||
It must contains the file created by the ``Monitor`` wrapper.
|
||
:param verbose: Verbosity level.
|
||
"""
|
||
|
||
def __init__(self, check_freq: int, log_dir: str, verbose: int = 1):
|
||
super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
|
||
self.check_freq = check_freq
|
||
self.log_dir = log_dir
|
||
self.save_path = os.path.join(log_dir, 'best_model')
|
||
self.best_mean_reward = -np.inf
|
||
|
||
def _init_callback(self) -> None:
|
||
# Create folder if needed
|
||
if self.save_path is not None:
|
||
os.makedirs(self.save_path, exist_ok=True)
|
||
|
||
def _on_step(self) -> bool:
|
||
if self.n_calls % self.check_freq == 0:
|
||
|
||
# Retrieve training reward
|
||
x, y = ts2xy(load_results(self.log_dir), 'timesteps')
|
||
if len(x) > 0:
|
||
# Mean training reward over the last 100 episodes
|
||
mean_reward = np.mean(y[-100:])
|
||
if self.verbose > 0:
|
||
print(f"Num timesteps: {self.num_timesteps}")
|
||
print(
|
||
f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}")
|
||
|
||
# New best model, you could save the agent here
|
||
if mean_reward > self.best_mean_reward:
|
||
self.best_mean_reward = mean_reward
|
||
# Example for saving best model
|
||
if self.verbose > 0:
|
||
print(f"Saving new best model to {self.save_path}")
|
||
self.model.save(self.save_path)
|
||
|
||
return True
|
||
|
||
|
||
tempt = 1
|
||
log_dir = './tensorboard/DOA_SAC_callback/'
|
||
os.makedirs(log_dir, exist_ok=True)
|
||
env = jakaEnv()
|
||
env = Monitor(env, log_dir)
|
||
if tempt:
|
||
model = SAC('MlpPolicy', env=env, verbose=1, tensorboard_log=log_dir,
|
||
device="cuda"
|
||
)
|
||
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)
|
||
model.learn(total_timesteps=4000000, callback=callback)
|
||
model.save('model/DOA_SAC_ENV_callback')
|
||
del model
|
||
|
||
else:
|
||
obs = env.reset()
|
||
# 改变路径为你保存模型的路径
|
||
model = SAC.load(r'best_model.zip', env=env)
|
||
|
||
for j in range(50):
|
||
for i in range(2000):
|
||
action, _state = model.predict(obs)
|
||
order1 = 4
|
||
cutoff_freq1 = 1.2
|
||
b1, a1 = signal.butter(order1, cutoff_freq1, fs=2.5, btype='low')
|
||
# 应用滤波器
|
||
action[0] = signal.lfilter(b1, a1, [action[0]])
|
||
action[1] = signal.lfilter(b1, a1, [action[1]])
|
||
action[2] = signal.lfilter(b1, a1, [action[2]])
|
||
obs, reward, done, info = env.step(action)
|
||
# env.render()
|
||
if done:
|
||
print('done')
|
||
# obs = env.reset()
|
||
time.sleep(30)
|
||
break
|
||
break
|
||
|
||
# 三维
|
||
# fig1 = plt.figure("机械臂运行轨迹")
|
||
# ax = fig1.add_subplot(projection="3d") # 三维图形
|
||
# ax.plot(x, y, z, label="simulation")
|
||
# # plt.xlabel("X")
|
||
# # plt.ylabel("Y")
|
||
# ax.set_xlabel("X坐标(米)")
|
||
# ax.set_ylabel("Y坐标(米)")
|
||
# ax.set_zlabel("Z坐标(米)")
|
||
# ax.legend()
|
||
# # plt.savefig('jaka_3d.png')
|
||
# plt.show()
|
||
#
|
||
# # xy方向
|
||
# fig2 = plt.figure()
|
||
# plt.plot(x, y, label="xy-simulation", color="g")
|
||
# plt.xlabel("X")
|
||
# plt.ylabel("Y")
|
||
# plt.savefig("jaka_xy.png")
|
||
# plt.show()
|
||
#
|
||
# # xy方向
|
||
# fig3 = plt.figure()
|
||
# plt.plot(y, z, label="yz-simulation", color="g")
|
||
# plt.xlabel("y")
|
||
# plt.ylabel("z")
|
||
# plt.savefig("jaka_yz.png")
|
||
# plt.show()
|
||
|
||
|
||
|