Module deepcomp.env.util.movement
Utility functions for UE movement. Must inherit from abstract Movement class.
Expand source code
"""Utility functions for UE movement. Must inherit from abstract Movement class."""
import random
import structlog
import numpy as np
from shapely.geometry import Point
class Movement:
"""Abstract movement class that all subclasses must inherit from"""
def __init__(self, map):
self.map = map
# own RNG for reproducibility; global random shares state that's manipulated by RL during training
self.rng = random.Random()
def seed(self, seed=None):
self.rng.seed(seed)
def reset(self):
raise NotImplementedError("This function must be implemented in the subclass")
def step(self, curr_pos):
raise NotImplementedError("This function must be implemented in the subclass")
class UniformMovement(Movement):
"""
Uniformly move with same speed in one direction. When hitting the borders of the map, "bounce off" like a ball
"""
def __init__(self, map, move_x=0, move_y=0):
"""
Create object for uniform movement into given direction. Instantiate new movement object for each UE!
:param map: Map representing the area of movement
:param move_x: How far to move in x direction per step. Number or 'slow'/'fast'.
:param move_y: How far to move in y direction per step. Number or 'slow'/'fast'.
"""
super().__init__(map)
self.init_move_x = move_x
self.init_move_y = move_y
self.move_x = None
self.move_y = None
def __str__(self):
return f"UniformMovement({self.move_x}, {self.move_y})"
def reset(self):
"""Reset to original movement direction (may change when hitting a map border)"""
if self.init_move_x == 'slow':
self.move_x = self.rng.randint(1, 5)
elif self.init_move_x == 'fast':
self.move_x = self.rng.randint(10, 20)
else:
# assume init_move_x was a specific number for how to move
self.move_x = self.init_move_x
# same for move_y
if self.init_move_y == 'slow':
self.move_y = self.rng.randint(1, 5)
elif self.init_move_y == 'fast':
self.move_y = self.rng.randint(10, 20)
else:
# assume init_move_y was a specific number for how to move
self.move_y = self.init_move_y
def step(self, curr_pos):
"""
Make a step from the current position into the given direction. Bounce off at map borders.
:param Point curr_pos: Current position
:returns Point: New position after taking one step
"""
# seems like points are immutable --> replace by new point
new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y)
# reverse movement if otherwise moving out of map
if not new_pos.within(self.map.shape):
self.move_x = -self.move_x
self.move_y = -self.move_y
new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y)
return new_pos
class RandomWaypoint(Movement):
"""
Create random waypoints, move towards them, pause, and move towards new random waypoint
with new random velocity (within given range)
"""
def __init__(self, map, velocity, pause_duration=2, border_buffer=10):
"""
Create random waypoint movement utility object. Instantiate new movement object for each UE!
:param map: Map representing the area of movement that must not be left
:param velocity: Distance to move within one step. Number or 'slow'/'fast'.
:param pause_duration: Duration [in env steps] to pause after reaching each waypoint
:param border_buffer: Buffer to the map border in which no waypoints are placed
"""
super().__init__(map)
self.init_velocity = velocity
self.velocity = None
self.waypoint = None
self.pause_duration = pause_duration
self.pausing = False
self.curr_pause = 0
assert border_buffer > 0, "Border Buffer must be >0 to avoid placing waypoints on or outside map borders."
self.border_buffer = border_buffer
self.log = structlog.get_logger()
def __str__(self):
return f"RandomWaypoint({self.init_velocity})"
def reset(self):
"""Reset velocity and waypoint to new random values. Reset current pause."""
if self.init_velocity == 'slow':
self.velocity = self.rng.randint(1, 3)
elif self.init_velocity == 'fast':
self.velocity = self.rng.randint(5, 10)
else:
self.velocity = self.init_velocity
self.waypoint = self.random_waypoint()
self.pausing = False
self.curr_pause = 0
self.log.debug('Reset movement', new_velocity=self.velocity, new_waypoint=str(self.waypoint))
def random_waypoint(self):
"""Return a new random waypoint inside the map"""
x = self.rng.randint(self.border_buffer, int(self.map.width - self.border_buffer))
y = self.rng.randint(self.border_buffer, int(self.map.height - self.border_buffer))
new_waypoint = Point(x, y)
assert new_waypoint.within(self.map.shape), f"Waypoint {str(new_waypoint)} is outside the map!"
return new_waypoint
def step_towards_waypoint(self, curr_pos):
"""
Take one step from the current position towards the current waypoint.
:param Point curr_pos: Current position
:return: New position after moving
"""
self.log = self.log.bind(prev_pos=str(curr_pos), waypoint=str(self.waypoint))
# if already close enough to waypoint, move directly onto waypoint (not past it)
if curr_pos.distance(self.waypoint) <= self.velocity:
self.log.debug('Waypoint reached')
return self.waypoint
# else move by self.velocity towards waypoint: https://math.stackexchange.com/a/175906/234077
# convert points to np arrays/vectors for calculation
np_curr = np.array([curr_pos.x, curr_pos.y])
np_waypoint = np.array([self.waypoint.x, self.waypoint.y])
v = np_waypoint - np_curr
norm_v = v / np.linalg.norm(v)
np_new_pos = np_curr + self.velocity * norm_v
new_pos = Point(np_new_pos)
self.log.debug('Step to waypoint', new_pos=str(new_pos))
return new_pos
def step(self, curr_pos):
"""
Perform one movement step: Move towards waypoint, pause, select new waypoint & repeat.
:param Point curr_pos: Current position of the moving entity (eg, UE)
:return: New position after one step
"""
assert curr_pos.within(self.map.shape) or curr_pos.touches(self.map.shape), \
f"Current position {str(curr_pos)} is outside the map!"
# start pausing when reaching the waypoint
if curr_pos == self.waypoint:
self.pausing = True
if self.pausing:
# continue pausing if pause duration is not yet reached
if self.curr_pause < self.pause_duration:
self.curr_pause += 1
return curr_pos
# else stop pausing and choose a new waypoint --> reset()
else:
self.reset()
# move towards (new) waypoint
new_pos = self.step_towards_waypoint(curr_pos)
return new_pos
Classes
class Movement (map)
-
Abstract movement class that all subclasses must inherit from
Expand source code
class Movement: """Abstract movement class that all subclasses must inherit from""" def __init__(self, map): self.map = map # own RNG for reproducibility; global random shares state that's manipulated by RL during training self.rng = random.Random() def seed(self, seed=None): self.rng.seed(seed) def reset(self): raise NotImplementedError("This function must be implemented in the subclass") def step(self, curr_pos): raise NotImplementedError("This function must be implemented in the subclass")
Subclasses
Methods
def reset(self)
-
Expand source code
def reset(self): raise NotImplementedError("This function must be implemented in the subclass")
def seed(self, seed=None)
-
Expand source code
def seed(self, seed=None): self.rng.seed(seed)
def step(self, curr_pos)
-
Expand source code
def step(self, curr_pos): raise NotImplementedError("This function must be implemented in the subclass")
class RandomWaypoint (map, velocity, pause_duration=2, border_buffer=10)
-
Create random waypoints, move towards them, pause, and move towards new random waypoint with new random velocity (within given range)
Create random waypoint movement utility object. Instantiate new movement object for each UE!
:param map: Map representing the area of movement that must not be left :param velocity: Distance to move within one step. Number or 'slow'/'fast'. :param pause_duration: Duration [in env steps] to pause after reaching each waypoint :param border_buffer: Buffer to the map border in which no waypoints are placed
Expand source code
class RandomWaypoint(Movement): """ Create random waypoints, move towards them, pause, and move towards new random waypoint with new random velocity (within given range) """ def __init__(self, map, velocity, pause_duration=2, border_buffer=10): """ Create random waypoint movement utility object. Instantiate new movement object for each UE! :param map: Map representing the area of movement that must not be left :param velocity: Distance to move within one step. Number or 'slow'/'fast'. :param pause_duration: Duration [in env steps] to pause after reaching each waypoint :param border_buffer: Buffer to the map border in which no waypoints are placed """ super().__init__(map) self.init_velocity = velocity self.velocity = None self.waypoint = None self.pause_duration = pause_duration self.pausing = False self.curr_pause = 0 assert border_buffer > 0, "Border Buffer must be >0 to avoid placing waypoints on or outside map borders." self.border_buffer = border_buffer self.log = structlog.get_logger() def __str__(self): return f"RandomWaypoint({self.init_velocity})" def reset(self): """Reset velocity and waypoint to new random values. Reset current pause.""" if self.init_velocity == 'slow': self.velocity = self.rng.randint(1, 3) elif self.init_velocity == 'fast': self.velocity = self.rng.randint(5, 10) else: self.velocity = self.init_velocity self.waypoint = self.random_waypoint() self.pausing = False self.curr_pause = 0 self.log.debug('Reset movement', new_velocity=self.velocity, new_waypoint=str(self.waypoint)) def random_waypoint(self): """Return a new random waypoint inside the map""" x = self.rng.randint(self.border_buffer, int(self.map.width - self.border_buffer)) y = self.rng.randint(self.border_buffer, int(self.map.height - self.border_buffer)) new_waypoint = Point(x, y) assert new_waypoint.within(self.map.shape), f"Waypoint {str(new_waypoint)} is outside the map!" return new_waypoint def step_towards_waypoint(self, curr_pos): """ Take one step from the current position towards the current waypoint. :param Point curr_pos: Current position :return: New position after moving """ self.log = self.log.bind(prev_pos=str(curr_pos), waypoint=str(self.waypoint)) # if already close enough to waypoint, move directly onto waypoint (not past it) if curr_pos.distance(self.waypoint) <= self.velocity: self.log.debug('Waypoint reached') return self.waypoint # else move by self.velocity towards waypoint: https://math.stackexchange.com/a/175906/234077 # convert points to np arrays/vectors for calculation np_curr = np.array([curr_pos.x, curr_pos.y]) np_waypoint = np.array([self.waypoint.x, self.waypoint.y]) v = np_waypoint - np_curr norm_v = v / np.linalg.norm(v) np_new_pos = np_curr + self.velocity * norm_v new_pos = Point(np_new_pos) self.log.debug('Step to waypoint', new_pos=str(new_pos)) return new_pos def step(self, curr_pos): """ Perform one movement step: Move towards waypoint, pause, select new waypoint & repeat. :param Point curr_pos: Current position of the moving entity (eg, UE) :return: New position after one step """ assert curr_pos.within(self.map.shape) or curr_pos.touches(self.map.shape), \ f"Current position {str(curr_pos)} is outside the map!" # start pausing when reaching the waypoint if curr_pos == self.waypoint: self.pausing = True if self.pausing: # continue pausing if pause duration is not yet reached if self.curr_pause < self.pause_duration: self.curr_pause += 1 return curr_pos # else stop pausing and choose a new waypoint --> reset() else: self.reset() # move towards (new) waypoint new_pos = self.step_towards_waypoint(curr_pos) return new_pos
Ancestors
Methods
def random_waypoint(self)
-
Return a new random waypoint inside the map
Expand source code
def random_waypoint(self): """Return a new random waypoint inside the map""" x = self.rng.randint(self.border_buffer, int(self.map.width - self.border_buffer)) y = self.rng.randint(self.border_buffer, int(self.map.height - self.border_buffer)) new_waypoint = Point(x, y) assert new_waypoint.within(self.map.shape), f"Waypoint {str(new_waypoint)} is outside the map!" return new_waypoint
def reset(self)
-
Reset velocity and waypoint to new random values. Reset current pause.
Expand source code
def reset(self): """Reset velocity and waypoint to new random values. Reset current pause.""" if self.init_velocity == 'slow': self.velocity = self.rng.randint(1, 3) elif self.init_velocity == 'fast': self.velocity = self.rng.randint(5, 10) else: self.velocity = self.init_velocity self.waypoint = self.random_waypoint() self.pausing = False self.curr_pause = 0 self.log.debug('Reset movement', new_velocity=self.velocity, new_waypoint=str(self.waypoint))
def step(self, curr_pos)
-
Perform one movement step: Move towards waypoint, pause, select new waypoint & repeat.
:param Point curr_pos: Current position of the moving entity (eg, UE) :return: New position after one step
Expand source code
def step(self, curr_pos): """ Perform one movement step: Move towards waypoint, pause, select new waypoint & repeat. :param Point curr_pos: Current position of the moving entity (eg, UE) :return: New position after one step """ assert curr_pos.within(self.map.shape) or curr_pos.touches(self.map.shape), \ f"Current position {str(curr_pos)} is outside the map!" # start pausing when reaching the waypoint if curr_pos == self.waypoint: self.pausing = True if self.pausing: # continue pausing if pause duration is not yet reached if self.curr_pause < self.pause_duration: self.curr_pause += 1 return curr_pos # else stop pausing and choose a new waypoint --> reset() else: self.reset() # move towards (new) waypoint new_pos = self.step_towards_waypoint(curr_pos) return new_pos
def step_towards_waypoint(self, curr_pos)
-
Take one step from the current position towards the current waypoint.
:param Point curr_pos: Current position :return: New position after moving
Expand source code
def step_towards_waypoint(self, curr_pos): """ Take one step from the current position towards the current waypoint. :param Point curr_pos: Current position :return: New position after moving """ self.log = self.log.bind(prev_pos=str(curr_pos), waypoint=str(self.waypoint)) # if already close enough to waypoint, move directly onto waypoint (not past it) if curr_pos.distance(self.waypoint) <= self.velocity: self.log.debug('Waypoint reached') return self.waypoint # else move by self.velocity towards waypoint: https://math.stackexchange.com/a/175906/234077 # convert points to np arrays/vectors for calculation np_curr = np.array([curr_pos.x, curr_pos.y]) np_waypoint = np.array([self.waypoint.x, self.waypoint.y]) v = np_waypoint - np_curr norm_v = v / np.linalg.norm(v) np_new_pos = np_curr + self.velocity * norm_v new_pos = Point(np_new_pos) self.log.debug('Step to waypoint', new_pos=str(new_pos)) return new_pos
class UniformMovement (map, move_x=0, move_y=0)
-
Uniformly move with same speed in one direction. When hitting the borders of the map, "bounce off" like a ball
Create object for uniform movement into given direction. Instantiate new movement object for each UE!
:param map: Map representing the area of movement :param move_x: How far to move in x direction per step. Number or 'slow'/'fast'. :param move_y: How far to move in y direction per step. Number or 'slow'/'fast'.
Expand source code
class UniformMovement(Movement): """ Uniformly move with same speed in one direction. When hitting the borders of the map, "bounce off" like a ball """ def __init__(self, map, move_x=0, move_y=0): """ Create object for uniform movement into given direction. Instantiate new movement object for each UE! :param map: Map representing the area of movement :param move_x: How far to move in x direction per step. Number or 'slow'/'fast'. :param move_y: How far to move in y direction per step. Number or 'slow'/'fast'. """ super().__init__(map) self.init_move_x = move_x self.init_move_y = move_y self.move_x = None self.move_y = None def __str__(self): return f"UniformMovement({self.move_x}, {self.move_y})" def reset(self): """Reset to original movement direction (may change when hitting a map border)""" if self.init_move_x == 'slow': self.move_x = self.rng.randint(1, 5) elif self.init_move_x == 'fast': self.move_x = self.rng.randint(10, 20) else: # assume init_move_x was a specific number for how to move self.move_x = self.init_move_x # same for move_y if self.init_move_y == 'slow': self.move_y = self.rng.randint(1, 5) elif self.init_move_y == 'fast': self.move_y = self.rng.randint(10, 20) else: # assume init_move_y was a specific number for how to move self.move_y = self.init_move_y def step(self, curr_pos): """ Make a step from the current position into the given direction. Bounce off at map borders. :param Point curr_pos: Current position :returns Point: New position after taking one step """ # seems like points are immutable --> replace by new point new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y) # reverse movement if otherwise moving out of map if not new_pos.within(self.map.shape): self.move_x = -self.move_x self.move_y = -self.move_y new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y) return new_pos
Ancestors
Methods
def reset(self)
-
Reset to original movement direction (may change when hitting a map border)
Expand source code
def reset(self): """Reset to original movement direction (may change when hitting a map border)""" if self.init_move_x == 'slow': self.move_x = self.rng.randint(1, 5) elif self.init_move_x == 'fast': self.move_x = self.rng.randint(10, 20) else: # assume init_move_x was a specific number for how to move self.move_x = self.init_move_x # same for move_y if self.init_move_y == 'slow': self.move_y = self.rng.randint(1, 5) elif self.init_move_y == 'fast': self.move_y = self.rng.randint(10, 20) else: # assume init_move_y was a specific number for how to move self.move_y = self.init_move_y
def step(self, curr_pos)
-
Make a step from the current position into the given direction. Bounce off at map borders.
:param Point curr_pos: Current position :returns Point: New position after taking one step
Expand source code
def step(self, curr_pos): """ Make a step from the current position into the given direction. Bounce off at map borders. :param Point curr_pos: Current position :returns Point: New position after taking one step """ # seems like points are immutable --> replace by new point new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y) # reverse movement if otherwise moving out of map if not new_pos.within(self.map.shape): self.move_x = -self.move_x self.move_y = -self.move_y new_pos = Point(curr_pos.x + self.move_x, curr_pos.y + self.move_y) return new_pos