-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrrt_a1_advance.py
416 lines (339 loc) · 15.4 KB
/
rrt_a1_advance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# import cv2
from dm_control import mujoco
import cv2
import numpy as np
import random
import ikpy.chain
import ikpy.utils.plot as plot_utils
import transformations as tf
import PIL
import sys
# Load dm_control model
model = mujoco.Physics.from_xml_path('assets/banana.xml')
model_bgr = mujoco.Physics.from_xml_path('assets/banana_bgr.xml')
# Load the robot arm chain from the URDF file
my_chain = ikpy.chain.Chain.from_urdf_file("assets/a1_right.urdf")
random_position = True
posX_MinMax = [0.3, 0.4]
posY_MinMax = [0.4, 0.5]
# if random.choice([True, False]):
# x_pos = random.uniform(posX_MinMax[0], posX_MinMax[1])
# else:
# x_pos = random.uniform(-posX_MinMax[0], -posX_MinMax[1])
x_pos = random.uniform(posX_MinMax[0], posX_MinMax[1])
y_pos = random.uniform(posY_MinMax[0], posY_MinMax[1])
z_pos = 0.05
if random_position:
# 设置香蕉的初始位置
model.named.data.qpos['banana_joint'][:3] = [x_pos, y_pos, z_pos]
model.forward() # 更新仿真状态
class RRT:
class Node:
def __init__(self, q):
self.q = q
self.path_q = []
self.parent = None
def __init__(self, start, goal, joint_limits, expand_dis=0.05, path_resolution=0.005, max_iter=2000):
self.start = self.Node(start)
self.end = self.Node(goal)
self.joint_limits = joint_limits
self.expand_dis = expand_dis
self.path_resolution = path_resolution
self.max_iter = max_iter
self.start_tree = [self.start]
self.end_tree = [self.end]
def planning(self, model):
for i in range(self.max_iter):
# 从起点树扩展
rnd_node = self.get_random_node()
new_node = self.extend_tree(self.start_tree, rnd_node, model)
if new_node:
# 尝试连接到目标树
nearest_end = self.get_nearest_node(self.end_tree, new_node)
connect_node = self.connect_tree(self.end_tree, new_node, nearest_end, model)
if connect_node:
return self.generate_final_course(new_node, connect_node)
# 交换两棵树的角色
self.start_tree, self.end_tree = self.end_tree, self.start_tree
return None
def get_nearest_node_index(self, node_list, rnd_node):
"""
Find the index of the nearest node to the random node.
Args:
node_list: List of nodes in the RRT tree.
rnd_node: Randomly generated node.
Returns:
Index of the nearest node in the node list.
"""
dlist = [np.linalg.norm(np.array(node.q) - np.array(rnd_node.q)) for node in node_list]
min_index = dlist.index(min(dlist))
return min_index
def steer(self, from_node, to_node, extend_length=float("inf")):
new_node = self.Node(np.array(from_node.q))
distance = np.linalg.norm(np.array(to_node.q) - np.array(from_node.q))
if extend_length > distance:
extend_length = distance
num_steps = int(extend_length / self.path_resolution)
delta_q = (np.array(to_node.q) - np.array(from_node.q)) / distance
for i in range(num_steps):
new_q = new_node.q + delta_q * self.path_resolution
new_node.q = np.clip(new_q, [lim[0] for lim in self.joint_limits], [lim[1] for lim in self.joint_limits])
new_node.path_q.append(new_node.q)
new_node.parent = from_node
return new_node
def extend_tree(self, tree, target_node, model):
nearest_node = self.get_nearest_node(tree, target_node)
new_node = self.steer(nearest_node, target_node, self.expand_dis)
if self.check_collision(new_node, model):
tree.append(new_node)
return new_node
return None
def connect_tree(self, tree, target_node, nearest_node, model):
new_node = nearest_node
while True:
next_node = self.steer(new_node, target_node, self.expand_dis)
if not self.check_collision(next_node, model):
return None
tree.append(next_node)
if self.calc_dist(next_node.q, target_node.q) < self.path_resolution:
return next_node
new_node = next_node
def get_nearest_node(self, tree, rnd_node):
dlist = [np.linalg.norm(np.array(node.q) - np.array(rnd_node.q)) for node in tree]
min_index = dlist.index(min(dlist))
return tree[min_index]
def calc_dist(self, q1, q2):
return np.linalg.norm(np.array(q1) - np.array(q2))
def get_random_node(self):
if random.randint(0, 100) > self.goal_sample_rate:
rand_q = [random.uniform(joint_min, joint_max) for joint_min, joint_max in self.joint_limits]
else:
rand_q = self.end.q
return self.Node(rand_q)
def check_collision(self, node, model):
return check_collision_with_dm_control(model, node.q)
def generate_final_course(self, start_node, end_node):
path = []
node = start_node
while node.parent is not None:
path.append(node.q)
node = node.parent
path.append(self.start.q)
path = path[::-1]
node = end_node
while node.parent is not None:
path.append(node.q)
node = node.parent
path.append(self.end.q)
return path
def get_random_node(self):
rand_q = [random.uniform(joint_min, joint_max) for joint_min, joint_max in self.joint_limits]
return self.Node(rand_q)
def calc_dist_to_goal(self, q):
return np.linalg.norm(np.array(self.end.q) - np.array(q))
def get_depth(sim):
# depth is a float array, in meters.
depth = sim.render(camera_id=0, height=480, width=640,depth=True)
# Shift nearest values to the origin.
depth -= depth.min()
# Scale by 2 mean distances of near rays.
depth /= 2*depth[depth <= 1].mean()
# Scale to [0, 255]
pixels = 255*np.clip(depth, 0, 1)
image=PIL.Image.fromarray(pixels.astype(np.uint16))
return image
def check_collision_with_dm_control(model, joint_config):
"""
Function to check if a given joint configuration results in a collision using dm_control's collision detection.
Args:
model: dm_control Mujoco model
joint_config: List of joint angles to check for collision
Returns:
True if collision-free, False if there is a collision
"""
model.data.qpos[0:6] = joint_config # Set joint positions
model.forward() # Update the simulation state
# Check for collisions
contacts = model.data.ncon # Number of contacts (collisions)
# contacts=0
return contacts == 0 or check_gripper_collision(model) # True if no contacts (collision-free)
def check_gripper_collision(model):
all_contact_pairs = []
for i_contact in range(model.data.ncon):
id_geom_1 = model.data.contact[i_contact].geom1
id_geom_2 = model.data.contact[i_contact].geom2
name_geom_1 = model.model.id2name(id_geom_1, 'geom')
name_geom_2 = model.model.id2name(id_geom_2, 'geom')
contact_pair = (name_geom_1, name_geom_2)
all_contact_pairs.append(contact_pair)
touch_banana_right = ("a1_right/a1_8_gripper_finger_touch_right", "banana_collision") in all_contact_pairs
touch_banana_left = ("a1_right/a1_8_gripper_finger_touch_left", "banana_collision") in all_contact_pairs
return touch_banana_left or touch_banana_right
def apply_rrt_path_to_dm_control(model, path, video_name="rrt_robot_motion_1.mp4"):
"""
Function to apply the RRT-generated path (list of joint configurations) to the dm_control simulation,
while recording the frames into a video.
Args:
model: dm_control Mujoco model
path: List of joint configurations generated by the RRT planner
video_name: Name of the output video file
"""
# Setup for video recording
width, height = 640, 480 # Resolution of each camera
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for mp4
out = cv2.VideoWriter(video_name, fourcc, 20.0, (1280, 480)) # Two 640x480 images side by side
# set initial joint angles
model.data.qpos[0:6] = start
model.forward()
# Apply the path to the simulation and record the video
for q in path:
# Check joint limits
# print(f"{q=}")
# model.data.qpos[:] = q # Set joint angles
model.data.ctrl[0:6] = q # Set joint angles
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
# Step the simulation forward to the next state
model.step()
# 这里为了稳定悬空
for i in range(30):
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
model.step()
start_joints_down=model.data.qpos[0:6]
target_position_down=target_position
#TODO adjust the downward path, this is the final grasping position
target_position_down[2]= target_position[2] - 0.15
target_orientation_euler_down=target_orientation_euler
target_orientation_down = tf.euler_matrix(*target_orientation_euler_down)[:3, :3]
joint_angles_down = my_chain.inverse_kinematics(target_position_down, target_orientation_down, "all")
# 生成插值因子,num 表示插值的数量,比如 10 表示插值 10 次
num_interpolations = 18
t_values = np.linspace(0, 1, num=num_interpolations)
#TODO generate joint angle trajectory, based on the start and end position, no need to consider time since it is simulation
interpolated_lists_down = [
(1 - t) * start_joints_down + t * joint_angles_down[1:]
for t in t_values
]
if interpolated_lists_down:
print("down path found")
# Apply the path to the simulation and record the video
#TODO apply the path to pick up motion and
for q in interpolated_lists_down:
# 设置关节角度
model.data.ctrl[0:6] = q
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
# Step the simulation forward to the next state
model.step()
close_gripper()
for i in range(30):
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
model.step()
# #现在已经找到了路径并渲染了 目前需要关闭夹爪 并渲染
#TODO pick up path
start_joints_up = model.data.qpos[0:6]
target_position_up = target_position
target_position_up[2] = target_position[2] + 0.2
target_orientation_euler_up = target_orientation_euler
target_orientation_up = tf.euler_matrix(*target_orientation_euler_up)[:3, :3]
joint_angles_up = my_chain.inverse_kinematics(target_position_up, target_orientation_up, "all")
#TODO 生成多维的插值列表
interpolated_lists_up = [
(1 - t) * start_joints_up + t * joint_angles_up[1:]
for t in t_values
]
if interpolated_lists_up:
print("up path found")
# Apply the path to the simulation and record the video
for q in interpolated_lists_up:
# Check joint limits
# model.data.qpos[:] = q # Set joint angles
model.data.ctrl[0:6] = q # Set joint angles
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
# Step the simulation forward to the next state
model.step()
for i in range(60):
# Render from both cameras and concatenate side by side
frame_1 = model.render(camera_id=0, width=width, height=height)
frame_2 = model.render(camera_id=1, width=width, height=height)
frame_combined = np.concatenate((frame_1, frame_2), axis=1)
# Convert frame from RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_combined, cv2.COLOR_RGB2BGR)
# Write the frame to the video
out.write(frame_bgr)
model.step()
# Release the video writer
out.release()
print(f"Video saved as {video_name}")
def close_gripper():
# 夹爪闭合控制
model.data.ctrl[6]=-0.15
model.data.ctrl[7]=0.15
def open_gripper():
# 夹爪打开控制
model.data.ctrl[6]=0
model.data.ctrl[7]=0
# Example usage:
start = [0., 0., 0., 0., 0., 0.] # Start joint angles
#TODO target in Cartesian x=0.3, y=0.4✔
target_position = [0.285, 0.4, 0.3]
target_orientation_euler = [0., 0., 0.]
target_orientation = tf.euler_matrix(*target_orientation_euler)[:3, :3]
if random_position:
target_position = [x_pos, y_pos, 0.3]
target_position[0] = target_position[0] - 0.01
target_orientation_euler = [0., 0., 0.]
target_orientation = tf.euler_matrix(*target_orientation_euler)[:3, :3]
# IK
joint_angles = my_chain.inverse_kinematics(target_position, target_orientation, "all")
# goal and joint limits
goal=joint_angles[1:]
print("goal",goal)
joint_limits = [(-3, 3)] * 6 # Example joint limits
joint_limits[2] = (-3, 0) # elbow
joint_limits[3] = (-1.5, 1.5) # forearm_roll
#TODO Initialize RRT (assuming you have the RRT class set up) and Generate RRT Path✔
rrt = RRT(start, goal, joint_limits)
rrt_path = rrt.planning(model) # Generate the RRT path
# Apply the path to the MuJoCo simulation and record video
if rrt_path:
print("Path found!")
#TODO 打开夹爪✔
open_gripper()
# Apply RRT Path
apply_rrt_path_to_dm_control(model, rrt_path, video_name="rrt_robot_motion_gripper.mp4")
else:
print("No path found!")