Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added trajectory splitting functions. #290

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions src/prpy/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,92 @@ def CopyTrajectory(traj, env=None):
return copy_traj


def GetTrajectoryHead(trajectory, t_split):
"""
Split the given trajectory at the given time and return the head.

The head trajectory is a partial copy of the original.

@param trajectory the trajectory that will be split
@param t_split the time at which to split the trajectory
@return a trajectory that goes from t=0 to t=split
"""
if not IsTimedTrajectory(trajectory):
raise ValueError("Cannot split untimed trajectory at a time.")

if not (0. <= t_split <= trajectory.GetDuration()):
raise ValueError("Cannot split trajectory of duration {:f} at t={:f}."
.format(trajectory.GetDuration(), t_split))

# Find the intermediate point and index of the split time.
split_waypoint = trajectory.Sample(t_split)
split_idx = trajectory.GetFirstWaypointIndexAfterTime(t_split)

# Create a trajectory from the beginning to the split.
head = openravepy.RaveCreateTrajectory(trajectory.GetEnv(),
trajectory.GetXMLId())
head.Init(trajectory.GetConfigurationSpecification())
head.Insert(0, trajectory.GetWaypoints(0, split_idx))
head.Insert(head.GetNumWaypoints(), split_waypoint)
return head


def GetTrajectoryTail(trajectory, t_split):
"""
Split the given trajectory at the given time and returns the tail.

The tail trajectory is a partial copy of the original.

@param trajectory the trajectory that will be split
@param t_split the time at which to split the trajectory
@return a trajectory that goes from t=split to t=end
"""
if not IsTimedTrajectory(trajectory):
raise ValueError("Cannot split untimed trajectory at a time.")

if not (0. <= t_split <= trajectory.GetDuration()):
raise ValueError("Cannot split trajectory of duration {:f} at t={:f}."
.format(trajectory.GetDuration(), t_split))

# Find the intermediate point and index of the split time.
split_waypoint = trajectory.Sample(t_split)
split_idx = trajectory.GetFirstWaypointIndexAfterTime(t_split)

# Correct for the delta-time offset of the split waypoint.
cspec = trajectory.GetConfigurationSpecification()
split_delta = cspec.ExtractDeltaTime(split_waypoint)
cspec.InsertDeltaTime(split_waypoint, 0.)

# Create a trajectory from the split to the end.
tail = openravepy.RaveCreateTrajectory(trajectory.GetEnv(),
trajectory.GetXMLId())
tail.Init(trajectory.GetConfigurationSpecification())
tail.Insert(0, split_waypoint)
tail.Insert(1, trajectory.GetWaypoints(split_idx,
trajectory.GetNumWaypoints()))

# Correct the delta-time of the first waypoint in the tail.
waypoint = tail.GetWaypoint(1)
waypoint_delta = cspec.ExtractDeltaTime(waypoint)
cspec.InsertDeltaTime(waypoint, waypoint_delta - split_delta)
tail.Insert(1, waypoint, True)
return tail


def SplitTrajectory(trajectory, t_split):
"""
Split the given trajectory at the given time and returns the head and tail.

The head and tail trajectories are copies of the original.

@param trajectory the trajectory that will be split
@param t_split the time at which to split the trajectory
@return a trajectory tuple of (head, tail)
"""
return (GetTrajectoryHead(trajectory, t_split),
GetTrajectoryTail(trajectory, t_split))


def GetTrajectoryTags(traj):
""" Read key/value pairs from a trajectory.

Expand Down Expand Up @@ -1137,6 +1223,7 @@ def ComputeEnabledAABB(kinbody):
half_extents = (max_corner - min_corner) / 2.
return AABB(center, half_extents)


def UntimeTrajectory(trajectory, env=None):
"""
Returns an untimed copy of the provided trajectory.
Expand Down Expand Up @@ -1617,7 +1704,7 @@ def GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=None):
inf ==> The L_infinity norm
@param generator sampling_func A function that returns a sequence of
sample times.
e.g. SampleTimeGenerator()
e.g. SampleTimeGenerator()
or
VanDerCorputSampleGenerator()

Expand Down Expand Up @@ -1684,7 +1771,7 @@ def GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=None):
q1 = traj_cspec.ExtractJointValues(waypoint, robot, dof_indices)
dq = numpy.abs(q1 - q0)
max_diff_float = numpy.max( numpy.abs(q1 - q0) / q_resolutions)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-emptive explanation: this variable was totally unused and my auto-linter compelled me to destroy it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My auto-linter often compels me to destroy things gently muttering "Now I am become Death, the destroyer of worlds."

On Apr 4, 2016, at 11:05 AM, Pras Velagapudi [email protected] wrote:

In src/prpy/util.py:

@@ -1683,8 +1770,7 @@ def GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=None):
waypoint = traj.GetWaypoint(i)
q1 = traj_cspec.ExtractJointValues(waypoint, robot, dof_indices)
dq = numpy.abs(q1 - q0)

- max_diff_float = numpy.max( numpy.abs(q1 - q0) / q_resolutions)

Pre-emptive explanation: this variable was totally unused and my auto-linter compelled me to destroy it.


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub

# Get the number of steps (as a float) required for
# each joint at DOF resolution
num_steps = dq / q_resolutions
Expand Down Expand Up @@ -1717,7 +1804,7 @@ def GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=None):

# Sample the trajectory using the specified sample generator
seq = None
if sampling_func == None:
if sampling_func is None:
# (default) Linear sequence, from start to end
seq = SampleTimeGenerator(0, traj_duration, step=2)
else:
Expand Down