-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmulti_agent_env.py
executable file
·232 lines (181 loc) · 8.41 KB
/
multi_agent_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import gymnasium as gym
import numpy as np
import random
import matplotlib.pyplot as plt
'''
This class implements a multi-agent environment for path following.
'''
class MultiAgentPathFollowingEnv(gym.Env):
def __init__(self, num_agents=20, buffer_size=1000):
super(MultiAgentPathFollowingEnv, self).__init__()
self.num_agents = num_agents
self.buffer_size = buffer_size
self.replay_buffer = []
self.action_space = gym.spaces.Box(
low=np.array([[-0.5, 0.1]] * num_agents),
high=np.array([[0.5, 1.0]] * num_agents),
dtype=np.float32
)
self.observation_space = gym.spaces.Box(
low=-np.inf,
high=np.inf,
shape=(num_agents, 4),
dtype=np.float32
)
self.reset()
def reset(self):
'''
Reset the environment to the initial state.
'''
self.positions = np.zeros((self.num_agents, 2))
self.orientations = np.zeros(self.num_agents)
self.path = self.generate_path()
self.time = 0
self.r_speed = 0.0
self.r_forward = 0.0
self.r_distance = 0.0
self.r_angle = 0.0
self.failed = 0.0
self.success = 0.0
self.active_agents = np.ones(self.num_agents, dtype=bool)
return self.get_state(), {}
def get_state(self):
'''
Get the current state of the environment.
'''
distances, angles = self.calculate_path_metrics()
return np.column_stack((self.positions, distances, angles))
def step(self, actions):
'''
Step the environment forward using the given actions.
'''
self.update_positions(actions)
distances, angles = self.calculate_path_metrics()
speeds = actions[:, 1]
rewards = self.calculate_rewards(distances, angles, speeds)
self.active_agents = self.active_agents & (distances <= 5.0)
self.failed = (self.active_agents == 0)
self.success = (np.linalg.norm(self.positions - self.path[-1], axis=1) < 0.2)
if self.success:
rewards += 100.0
elif self.failed:
rewards -= 100.0
next_state = self.get_state()
for i in range(self.num_agents):
self.replay_buffer.append((self.get_state(), actions[i], rewards[i], next_state[i]))
if len(self.replay_buffer) > self.buffer_size:
self.replay_buffer.pop(0)
self.time += 1
done = self.time > 100 or self.success or self.failed
return next_state, rewards, done, {}
def render(self, mode='human'):
if not hasattr(self, 'fig'):
# Create the plot
self.fig, self.ax = plt.subplots()
# Create agent markers
self.path_plot, = self.ax.plot([], [], label='Path')
# Create front indicators (small lines)
self.agent_plots = [self.ax.plot([], [], 'o', label=f'Agent {i}')[0] for i in range(self.num_agents)]
self.agent_fronts = [self.ax.plot([], [], '-', color='red')[0] for _ in range(self.num_agents)]
# Set plot limits
self.ax.set_xlim(-10, 110)
self.ax.set_ylim(-10, 10)
self.ax.legend()
plt.ion()
plt.show(block=False)
# Update path
path_x, path_y = zip(*self.path)
self.path_plot.set_data(path_x, path_y)
# Update agent positions and front indicators
for i, agent_plot in enumerate(self.agent_plots):
if self.active_agents[i]:
x, y = self.positions[i]
agent_plot.set_data(x, y) # Update agent position
# Compute front indicator position (short line in the direction of orientation)
front_x = x + 0.5 * np.cos(self.orientations[i])
front_y = y + 0.5 * np.sin(self.orientations[i])
self.agent_fronts[i].set_data([x, front_x], [y, front_y]) # Draw front line
self.fig.canvas.draw_idle()
self.fig.canvas.flush_events()
def update_positions(self, actions):
'''
Update the positions of the agents based on the given actions.
'''
actions = actions.reshape((self.num_agents, 2))
steerings, speeds = actions[:, 0], actions[:, 1]
self.orientations += steerings * 0.1
dx = speeds * np.cos(self.orientations)
dy = speeds * np.sin(self.orientations)
self.positions += np.column_stack((dx, dy))
def calculate_path_metrics(self):
'''
This function calculates the distance and angle between each agent and the path.
Obs: In the first version we are considering and returning only the distance and
angle of the closest point in the path. Maybe it's necessary to consider more
points in the path to follow the path better..
'''
# Calculate the euclidean distance between each agent and EVERY point in the path
distances = np.linalg.norm(self.path[:, None, :] - self.positions[None, :, :], axis=2)
# Find the index of the closest point in the path for each agent
closest_indices = np.argmin(distances, axis=0)
# Find the coordinates of the closest points
closest_points = self.path[closest_indices]
# Calculate the distance for the closest point
distance_to_path = distances[closest_indices, np.arange(self.num_agents)]
# Calculate the angle between the closest point and the agent's orientation
path_directions = np.arctan2(closest_points[:, 1] - self.positions[:, 1],
closest_points[:, 0] - self.positions[:, 0])
# The difference between the agent's orientation and the path direction
angle_to_path = path_directions - self.orientations
distance_to_closest_point = distance_to_path
angle_to_closest_point = angle_to_path
return distance_to_closest_point, angle_to_closest_point
def calculate_rewards(self, distances, angles, speeds):
'''
Calculate the reward for each agent based on the distance and angle to the path.
'''
# Reward based on the distance to the path.
# k_distance is the coefficient of the distance reward.
k_distance = 0.5
self.r_distance = k_distance * (-distances)
# Reward based on the angle to the path.
# k_angle is the coefficient of the angle reward.
k_angle = 0.3
self.r_angle = k_angle * (-angles)
# # Bônus por estar muito próximo do caminho
# reward[distances < 0.1] += 1.0
if speeds < 0.05:
self.r_speed -= 5.0
'''
Reward based on the progress made along the path
By: Path Following Optimization for an Underactuated
USV Using Smoothly-Convergent Deep Reinforcement Learning
'''
# Maximum coefficient of the navigation reward
k_N = 1.0
# Full path length
total_path_length = np.linalg.norm(self.path[0] - self.path[-1])
# Distance traveled from start
progress_param = np.linalg.norm(self.positions - self.path[0], axis=1)
# Normalize progress
navigational_reward = k_N * (progress_param / total_path_length)
# Add to total reward
self.r_forward += navigational_reward
# print("-----")
# print("r_distance: ", self.r_distance)
# print("r_forward: ", self.r_forward)
# print("r_speed: ", self.r_speed)
# print("r_angle: ", self.r_angle)
reward = self.r_distance + self.r_forward + self.r_speed + self.r_angle
# # Alta recompensa para agentes que chegaram ao fim do caminho
# final_position = self.path[-1] # Última posição do caminho
# reached_goal = np.linalg.norm(self.positions - final_position, axis=1) < 0.2 # Verifica se o agente chegou perto do fim
# reward[reached_goal] += 100.0 # Alta recompensa para incentivar alcançar o objetivo
# reward = np.maximum(reward, 0.0)
return reward
def generate_path(self):
return np.array([[i, 0.0] for i in range(100)])
def sample_from_replay_buffer(self, batch_size):
return random.sample(self.replay_buffer, batch_size) if len(self.replay_buffer) >= batch_size else []