Module deepcomp.env.single_ue.base
Base mobile environment. Implemented and extended by sublcasses.
Expand source code
"""Base mobile environment. Implemented and extended by sublcasses."""
import copy
import random
import logging
import gym
import gym.spaces
import structlog
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
import matplotlib.patheffects as pe
from deepcomp.util.logs import config_logging
from deepcomp.env.util.utility import step_utility, log_utility
from deepcomp.env.entities.user import User
from deepcomp.env.util.movement import RandomWaypoint
class MobileEnv(gym.Env):
"""
Base environment class with moving UEs and stationary BS on a map. RLlib and OpenAI Gym-compatible.
No observation or action space implemented. This needs to be done in subclasses.
"""
metadata = {'render.modes': ['human']}
def __init__(self, env_config):
"""
Create a new environment object with an OpenAI Gym interface. Required fields in the env_config:
* episode_length: Total number of simulation time steps in one episode
* map: Map object representing the playground
* bs_list: List of base station objects in the environment
* ue_list: List of UE objects in the environment
* seed: Seed for the RNG; for reproducibility. May be None.
:param env_config: Dict containing all configuration options for the environment. Required by RLlib.
"""
super().__init__()
self.time = 0
self.episode_length = env_config['episode_length']
self.map = env_config['map']
self.bs_list = env_config['bs_list']
self.ue_list = env_config['ue_list']
# keep a copy of the original list, so it can easily be restored in reset()
# shallow copy. If I just assign, it points to the same object. If I deepcopy, it copies and generates new UEs
self.original_ue_list = copy.copy(env_config['ue_list'])
self.new_ue_interval = env_config['new_ue_interval']
# seed the environment
self.env_seed = env_config['seed']
self.seed(env_config['seed'])
self.rand_episodes = env_config['rand_episodes']
self.log_metrics = env_config['log_metrics']
# current observation
self.obs = None
# observation and action space are defined in the subclass --> different variants
self.observation_space = None
self.action_space = None
# configure logging inside env to ensure it works in ray/rllib. https://github.com/ray-project/ray/issues/9030
config_logging()
self.log = structlog.get_logger()
self.log.info('Env init', env_config=env_config)
# call after initializing everything else (needs settings, log)
self.max_ues = self.get_max_num_ue()
@property
def num_bs(self):
return len(self.bs_list)
@property
def num_ue(self):
return len(self.ue_list)
@property
def total_dr(self):
return sum([ue.curr_dr for ue in self.ue_list])
@property
def total_utility(self):
return sum([ue.utility for ue in self.ue_list])
def seed(self, seed=None):
if seed is not None:
random.seed(seed)
# seed RNG of map (used to generate new UEs' arrival points)
self.map.seed(seed)
# seed the RNG of all UEs (which themselves seed their movement)
offset = 0
for ue in self.ue_list:
# add an offset to each UE's seed to avoid that all UEs have the same "random" pos and movement
# use a fixed, not random offset here; otherwise it's again different in test & train
offset += 100
ue.seed(seed + offset)
def set_log_level(self, log_dict):
"""
Set a logging levels for a set of given logger. Needs to happen here, inside the env, for RLlib workers to work.
:param dict log_dict: Dict with logger name --> logging level (eg, logging.INFO)
"""
for logger_name, level in log_dict.items():
logging.getLogger(logger_name).setLevel(level)
def get_ue_obs(self, ue):
"""Return the an observation of the current world for a given UE"""
raise NotImplementedError('Implement in subclass')
def calc_reward(self, ue, penalty):
"""
Calculate and return reward for specific UE: The UE's utility (based on its data rate) + penalty
"""
# clip utility to -20, 20 to avoid -inf for 0 dr and cap at 100 dr
clip_util = np.clip(ue.utility, -20, 20)
# add penalty and clip again to stay in range -20, 20
return np.clip(clip_util + penalty, -20, 20) / 20
def reset(self):
"""Reset environment by resetting time and all UEs (pos & movement) and their connections"""
if not self.rand_episodes:
# seed again before every reset --> always run same episodes. agent behavior may still differ
self.seed(self.env_seed)
self.time = 0
# reset UE list to avoid growing it infinitely
old_list = self.ue_list
self.ue_list = copy.copy(self.original_ue_list)
# delete UEs that are no longer on the list
for ue in set(old_list) - set(self.ue_list):
del ue
del old_list
# reset all UEs and BS
for ue in self.ue_list:
ue.reset()
for bs in self.bs_list:
bs.reset()
return self.get_obs()
def get_max_num_ue(self):
"""Get the maximum number of UEs within an episode based on the new UE interval"""
max_ues = self.num_ue
if self.new_ue_interval is not None:
# calculate the max number of UEs if one new UE is added at a given interval
# eps_length - 1 because time is increased before checking done and t=eps_length is never reached
max_ues = self.num_ue + int((self.episode_length - 1) / self.new_ue_interval)
self.log.info('Num. UEs varies over time.', new_ue_interval=self.new_ue_interval, num_ues=self.num_ue,
max_ues=max_ues, episode_length=self.episode_length)
return max_ues
def get_ue_actions(self, action):
"""
Retrieve the action per UE from the RL agent's action and return in in form of a dict.
Does not yet apply actions to env.
Here, in the single agent case, just get the action of a single active UE.
Overwritten in other child envs, eg, for multi or central agent.
:param action: Action that depends on the agent type (single, central, multi)
:return: Dict that consistently (indep. of agent type) maps UE (object) --> action
"""
assert self.action_space.contains(action), f"Action {action} does not fit action space {self.action_space}"
# default to noop action
action_dict = {ue: 0 for ue in self.ue_list}
# select active UE (to update in this step) using round robin and get corresponding action
ue = self.ue_list[self.time % self.num_ue]
action_dict[ue] = action
return action_dict
def apply_ue_actions(self, action_dict):
"""
Given a dict of UE actions, apply them to the environment and return a dict of penalties.
This function remains unchanged and is simply inherited by all envs.
Current penalty: -3 for any connect/disconnect action (for overhead)
:param: Dict of UE --> action
:return: Dict UE --> penalty
"""
penalties = {ue: 0 for ue in self.ue_list}
for ue, action in action_dict.items():
# apply action: try to connect to BS; or: 0 = no op
if action > 0:
bs = self.bs_list[action-1]
connected = ue.connect_to_bs(bs, disconnect=True, return_connected=True)
# penalty -5 for connecting to a BS that's already in use by other UEs
# if connected and bs.num_conn_ues >= 2:
# penalties[ue] = -5
# # and +5 for disconnecting from a BS that's used by others
# if not connected and bs.num_conn_ues >= 1:
# penalties[ue] = +5
# penalty -3 for any connect/disconnect (whether successful or not)
# penalties[ue] = -3
# # add a penalty for concurrent connections (overhead for joint transmission), ie, for any 2+ connections
# # tunable penalty weight representing the cost of concurrent connections
# weight = 0
# connections = len(ue.bs_dr)
# if connections > 1:
# penalty -= weight * (connections - 1)
return penalties
def test_ue_actions(self, action_dict):
"""
Test a given set of UE actions by applying them to a copy of the environment and return the rewards.
:param action_dict: Actions to apply. Dict of UE --> action
:return: Rewards for each UE when applying the action
"""
# for proportional-fair sharing, also keep track of EWMA dr as it affects the allocated resources
original_ewma_drs = {ue: ue.ewma_dr for ue in self.ue_list}
self.apply_ue_actions(action_dict)
# update ewma; need to update drs first for EWMA calculation to be correct
self.update_ue_drs_rewards(penalties=None, update_only=True)
for ue in self.ue_list:
ue.update_ewma_dr()
rewards = self.update_ue_drs_rewards(penalties=None)
# to revert the action, apply it again: toggles connection again between same UE-BS
self.apply_ue_actions(action_dict)
# revert the ewma
for ue in self.ue_list:
ue.ewma_dr = original_ewma_drs[ue]
self.update_ue_drs_rewards(penalties=None, update_only=True)
# simpler + can be parallelized: just create a copy of the env, apply actions, get reward, delete copy
# no, doesn't work: It's MUUUUUUUUUUUUUCH slower (probably due to all the deep copies)
# test_env = copy.deepcopy(self)
# test_env.apply_ue_actions(action_dict)
# rewards = test_env.update_ue_drs_rewards(penalties=None)
# del test_env
return rewards
def update_ue_drs_rewards(self, penalties, update_only=False):
"""
Update cached data rates of all UE-BS connections.
Calculate and return corresponding rewards based on given penalties.
:param penalties: Dict of penalties for all UEs. Used for calculating rewards.
:return: Dict of rewards: UE --> reward (incl. penalty)
"""
rewards = dict()
for ue in self.ue_list:
ue.update_curr_dr()
# calc and return reward if needed
if update_only:
# only update drs and return reward 0
rewards[ue] = 0
else:
if penalties is None or ue not in penalties.keys():
rewards[ue] = self.calc_reward(ue, penalty=0)
else:
rewards[ue] = self.calc_reward(ue, penalties[ue])
return rewards
def move_ues(self):
"""
Move all UEs and return dict of penalties corresponding to number of lost connections.
:return: Dict with num lost connections: UE --> num. lost connections
"""
lost_conn = dict()
for ue in self.ue_list:
num_lost_conn = ue.move()
# add penalty of -1 for each lost connection through movement (rather than actively disconnected)
lost_conn[ue] = num_lost_conn
return lost_conn
def get_obs(self):
"""
Return the current observation. Called to get the next observation after a step.
Here, the obs for the next UE. Overwritten by env vars as needed.
:returns: Next observation
"""
next_ue = self.ue_list[self.time % self.num_ue]
return self.get_ue_obs(next_ue)
def step_reward(self, rewards):
"""
Return the overall reward for the step (called at the end of a step). Overwritten by variants.
:param rewards: Dict of avg rewards per UE (before and after movement)
:returns: Reward for the step (depends on the env variant; here just for one UE)
"""
# here: get reward for UE that applied the action (at time - 1)
ue = self.ue_list[(self.time-1) % self.num_ue]
return rewards[ue]
def done(self):
"""
Return whether the episode is done.
:return: Whether the current episode is done or not
"""
# return self.time >= self.episode_length
# always return done=None to indicate that there is no natural episode ending
# by default, the env is reset by RLlib and simulator when horizon=eps_length is reached
# for continuous training, it the env is never reset during training
return None
def info(self):
"""
Return info dict that's returned after a step. Includes info about current time step and metrics of choice.
The metrics can be adjusted as required, but need to be structured into scalar_metrics and vector_metrics
to be handled automatically.
"""
if not self.log_metrics:
return {'time': self.time}
info_dict = {
'time': self.time,
# scalar metrics are metrics that consist of a single number/value per step
'scalar_metrics': {
# these are currently dicts; would need to aggregate them to a single number if needed
# 'unsucc_conn': unsucc_conn,
# 'lost_conn': lost_conn,
# num UEs without any connection
# 'num_ues_wo_conn': sum([1 if len(ue.bs_dr) == 0 else 0 for ue in self.ue_list]),
# 'avg_utility': np.mean([ue.utility for ue in self.ue_list]),
'sum_utility': sum([ue.utility for ue in self.ue_list])
},
# vector metrics are metrics that contain a dict of values for each step with UE --> metric
# currently not supported (needs some adjustments in simulator)
'vector_metrics': {
'dr': {f'UE {ue}': ue.curr_dr for ue in self.ue_list},
'utility': {f'UE {ue}': ue.utility for ue in self.ue_list},
}
}
return info_dict
def step(self, action):
"""
Environment step consisting of 1) Applying actions, 2) Updating data rates and rewards, 3) Moving UEs,
4) Updating data rates and rewards again (avg with reward before), 5) Updating the observation
In the base env here, only one UE applies actions per time step. This is overwritten in other env variants.
:param action: Action to be applied. Here, for a single UE. In other env variants, for all UEs.
:return: Tuple of next observation, reward, done, info
"""
prev_obs = self.obs
# get & apply action
action_dict = self.get_ue_actions(action)
penalties = self.apply_ue_actions(action_dict)
# add new UE according to configured interval
# important to add it here, after the action is applied but before the reward is calculated
# after action is applied, otherwise a central approach could already apply a decision for the newly added UE
# before reward is calculated, otherwise the reward and observation keys would not match (required by RLlib)
if self.new_ue_interval is not None and self.time > 0 and self.time % self.new_ue_interval == 0:
self.add_new_ue()
# move UEs, update data rates and rewards in between; increment time
rewards_before = self.update_ue_drs_rewards(penalties=penalties)
lost_conn = self.move_ues()
# penalty of -1 for lost connections due to movement (rather than active disconnect)
# penalties = {ue: -1 * lost_conn[ue] for ue in self.ue_list}
# update of drs is needed even if we don't need the reward
rewards_after = self.update_ue_drs_rewards(penalties=None, update_only=True)
# rewards = {ue: np.mean([rewards_before[ue], rewards_after[ue]]) for ue in self.ue_list}
rewards = rewards_before
self.time += 1
# get and return next obs, reward, done, info
self.obs = self.get_obs()
reward = self.step_reward(rewards)
done = self.done()
# dummy unsucc_conn -1 since it's currently not of interest
# unsucc_conn = {ue: -1 for ue in self.ue_list}
info = self.info()
self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs,
done=done)
return self.obs, reward, done, info
def render(self, mode='human'):
"""Plot and visualize the current status of the world. Return the patch of actors for animation."""
# list of matplotlib "artists", which can be used to create animations
patch = []
# limit to map borders
plt.xlim(0, self.map.width)
plt.ylim(0, self.map.height)
# users & connections
# show utility as red to yellow to green. use color map for [0,1) --> normalize utility first
colormap = cm.get_cmap('RdYlGn')
norm = plt.Normalize(-20, 20)
for ue in self.ue_list:
# plot connections to all BS
for bs, dr in ue.bs_dr.items():
color = colormap(norm(log_utility(dr)))
# add black background/borders for lines to make them better visible if the utility color is too light
patch.extend(plt.plot([ue.pos.x, bs.pos.x], [ue.pos.y, bs.pos.y], color=color,
path_effects=[pe.SimpleLineShadow(shadow_color='black'), pe.Normal()]))
# path_effects=[pe.Stroke(linewidth=2, foreground='black'), pe.Normal()]))
# plot UE
patch.extend(ue.plot())
# base stations
for bs in self.bs_list:
patch.extend(bs.plot())
# title isn't redrawn in animation (out of box) --> static --> show time as text inside box, top-right corner
patch.append(plt.title(type(self).__name__))
# extra info: time step, total data rate & utility
patch.append(plt.text(0.9*self.map.width, 0.95*self.map.height, f"t={self.time}"))
patch.append(plt.text(0.9*self.map.width, 0.9*self.map.height, f"dr={self.total_dr:.2f}"))
patch.append(plt.text(0.9*self.map.width, 0.85*self.map.height, f"util={self.total_utility:.2f}"))
# legend doesn't change --> only draw once at the beginning
# if self.time == 0:
# plt.legend(loc='upper left')
return patch
def add_new_ue(self, velocity='slow'):
"""Simulate arrival of a new UE in the env"""
# choose ID based on last UE's ID; assuming it can be cast to int
id = int(self.ue_list[-1].id) + 1
# choose a random position on the border of the map for the UE to appear
pos = self.map.rand_border_point()
new_ue = User(str(id), self.map, pos.x, pos.y, movement=RandomWaypoint(self.map, velocity=velocity))
# seed with fixed but unique seed to have different movement
new_ue.seed(self.env_seed + id * 100)
# reset to ensure the new seed is applied, eg, to always select the same "random" waypoint with the same seed
new_ue.reset()
self.ue_list.append(new_ue)
Classes
class MobileEnv (env_config)
-
Base environment class with moving UEs and stationary BS on a map. RLlib and OpenAI Gym-compatible. No observation or action space implemented. This needs to be done in subclasses.
Create a new environment object with an OpenAI Gym interface. Required fields in the env_config:
- episode_length: Total number of simulation time steps in one episode
- map: Map object representing the playground
- bs_list: List of base station objects in the environment
- ue_list: List of UE objects in the environment
- seed: Seed for the RNG; for reproducibility. May be None.
:param env_config: Dict containing all configuration options for the environment. Required by RLlib.
Expand source code
class MobileEnv(gym.Env): """ Base environment class with moving UEs and stationary BS on a map. RLlib and OpenAI Gym-compatible. No observation or action space implemented. This needs to be done in subclasses. """ metadata = {'render.modes': ['human']} def __init__(self, env_config): """ Create a new environment object with an OpenAI Gym interface. Required fields in the env_config: * episode_length: Total number of simulation time steps in one episode * map: Map object representing the playground * bs_list: List of base station objects in the environment * ue_list: List of UE objects in the environment * seed: Seed for the RNG; for reproducibility. May be None. :param env_config: Dict containing all configuration options for the environment. Required by RLlib. """ super().__init__() self.time = 0 self.episode_length = env_config['episode_length'] self.map = env_config['map'] self.bs_list = env_config['bs_list'] self.ue_list = env_config['ue_list'] # keep a copy of the original list, so it can easily be restored in reset() # shallow copy. If I just assign, it points to the same object. If I deepcopy, it copies and generates new UEs self.original_ue_list = copy.copy(env_config['ue_list']) self.new_ue_interval = env_config['new_ue_interval'] # seed the environment self.env_seed = env_config['seed'] self.seed(env_config['seed']) self.rand_episodes = env_config['rand_episodes'] self.log_metrics = env_config['log_metrics'] # current observation self.obs = None # observation and action space are defined in the subclass --> different variants self.observation_space = None self.action_space = None # configure logging inside env to ensure it works in ray/rllib. https://github.com/ray-project/ray/issues/9030 config_logging() self.log = structlog.get_logger() self.log.info('Env init', env_config=env_config) # call after initializing everything else (needs settings, log) self.max_ues = self.get_max_num_ue() @property def num_bs(self): return len(self.bs_list) @property def num_ue(self): return len(self.ue_list) @property def total_dr(self): return sum([ue.curr_dr for ue in self.ue_list]) @property def total_utility(self): return sum([ue.utility for ue in self.ue_list]) def seed(self, seed=None): if seed is not None: random.seed(seed) # seed RNG of map (used to generate new UEs' arrival points) self.map.seed(seed) # seed the RNG of all UEs (which themselves seed their movement) offset = 0 for ue in self.ue_list: # add an offset to each UE's seed to avoid that all UEs have the same "random" pos and movement # use a fixed, not random offset here; otherwise it's again different in test & train offset += 100 ue.seed(seed + offset) def set_log_level(self, log_dict): """ Set a logging levels for a set of given logger. Needs to happen here, inside the env, for RLlib workers to work. :param dict log_dict: Dict with logger name --> logging level (eg, logging.INFO) """ for logger_name, level in log_dict.items(): logging.getLogger(logger_name).setLevel(level) def get_ue_obs(self, ue): """Return the an observation of the current world for a given UE""" raise NotImplementedError('Implement in subclass') def calc_reward(self, ue, penalty): """ Calculate and return reward for specific UE: The UE's utility (based on its data rate) + penalty """ # clip utility to -20, 20 to avoid -inf for 0 dr and cap at 100 dr clip_util = np.clip(ue.utility, -20, 20) # add penalty and clip again to stay in range -20, 20 return np.clip(clip_util + penalty, -20, 20) / 20 def reset(self): """Reset environment by resetting time and all UEs (pos & movement) and their connections""" if not self.rand_episodes: # seed again before every reset --> always run same episodes. agent behavior may still differ self.seed(self.env_seed) self.time = 0 # reset UE list to avoid growing it infinitely old_list = self.ue_list self.ue_list = copy.copy(self.original_ue_list) # delete UEs that are no longer on the list for ue in set(old_list) - set(self.ue_list): del ue del old_list # reset all UEs and BS for ue in self.ue_list: ue.reset() for bs in self.bs_list: bs.reset() return self.get_obs() def get_max_num_ue(self): """Get the maximum number of UEs within an episode based on the new UE interval""" max_ues = self.num_ue if self.new_ue_interval is not None: # calculate the max number of UEs if one new UE is added at a given interval # eps_length - 1 because time is increased before checking done and t=eps_length is never reached max_ues = self.num_ue + int((self.episode_length - 1) / self.new_ue_interval) self.log.info('Num. UEs varies over time.', new_ue_interval=self.new_ue_interval, num_ues=self.num_ue, max_ues=max_ues, episode_length=self.episode_length) return max_ues def get_ue_actions(self, action): """ Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env. Here, in the single agent case, just get the action of a single active UE. Overwritten in other child envs, eg, for multi or central agent. :param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) --> action """ assert self.action_space.contains(action), f"Action {action} does not fit action space {self.action_space}" # default to noop action action_dict = {ue: 0 for ue in self.ue_list} # select active UE (to update in this step) using round robin and get corresponding action ue = self.ue_list[self.time % self.num_ue] action_dict[ue] = action return action_dict def apply_ue_actions(self, action_dict): """ Given a dict of UE actions, apply them to the environment and return a dict of penalties. This function remains unchanged and is simply inherited by all envs. Current penalty: -3 for any connect/disconnect action (for overhead) :param: Dict of UE --> action :return: Dict UE --> penalty """ penalties = {ue: 0 for ue in self.ue_list} for ue, action in action_dict.items(): # apply action: try to connect to BS; or: 0 = no op if action > 0: bs = self.bs_list[action-1] connected = ue.connect_to_bs(bs, disconnect=True, return_connected=True) # penalty -5 for connecting to a BS that's already in use by other UEs # if connected and bs.num_conn_ues >= 2: # penalties[ue] = -5 # # and +5 for disconnecting from a BS that's used by others # if not connected and bs.num_conn_ues >= 1: # penalties[ue] = +5 # penalty -3 for any connect/disconnect (whether successful or not) # penalties[ue] = -3 # # add a penalty for concurrent connections (overhead for joint transmission), ie, for any 2+ connections # # tunable penalty weight representing the cost of concurrent connections # weight = 0 # connections = len(ue.bs_dr) # if connections > 1: # penalty -= weight * (connections - 1) return penalties def test_ue_actions(self, action_dict): """ Test a given set of UE actions by applying them to a copy of the environment and return the rewards. :param action_dict: Actions to apply. Dict of UE --> action :return: Rewards for each UE when applying the action """ # for proportional-fair sharing, also keep track of EWMA dr as it affects the allocated resources original_ewma_drs = {ue: ue.ewma_dr for ue in self.ue_list} self.apply_ue_actions(action_dict) # update ewma; need to update drs first for EWMA calculation to be correct self.update_ue_drs_rewards(penalties=None, update_only=True) for ue in self.ue_list: ue.update_ewma_dr() rewards = self.update_ue_drs_rewards(penalties=None) # to revert the action, apply it again: toggles connection again between same UE-BS self.apply_ue_actions(action_dict) # revert the ewma for ue in self.ue_list: ue.ewma_dr = original_ewma_drs[ue] self.update_ue_drs_rewards(penalties=None, update_only=True) # simpler + can be parallelized: just create a copy of the env, apply actions, get reward, delete copy # no, doesn't work: It's MUUUUUUUUUUUUUCH slower (probably due to all the deep copies) # test_env = copy.deepcopy(self) # test_env.apply_ue_actions(action_dict) # rewards = test_env.update_ue_drs_rewards(penalties=None) # del test_env return rewards def update_ue_drs_rewards(self, penalties, update_only=False): """ Update cached data rates of all UE-BS connections. Calculate and return corresponding rewards based on given penalties. :param penalties: Dict of penalties for all UEs. Used for calculating rewards. :return: Dict of rewards: UE --> reward (incl. penalty) """ rewards = dict() for ue in self.ue_list: ue.update_curr_dr() # calc and return reward if needed if update_only: # only update drs and return reward 0 rewards[ue] = 0 else: if penalties is None or ue not in penalties.keys(): rewards[ue] = self.calc_reward(ue, penalty=0) else: rewards[ue] = self.calc_reward(ue, penalties[ue]) return rewards def move_ues(self): """ Move all UEs and return dict of penalties corresponding to number of lost connections. :return: Dict with num lost connections: UE --> num. lost connections """ lost_conn = dict() for ue in self.ue_list: num_lost_conn = ue.move() # add penalty of -1 for each lost connection through movement (rather than actively disconnected) lost_conn[ue] = num_lost_conn return lost_conn def get_obs(self): """ Return the current observation. Called to get the next observation after a step. Here, the obs for the next UE. Overwritten by env vars as needed. :returns: Next observation """ next_ue = self.ue_list[self.time % self.num_ue] return self.get_ue_obs(next_ue) def step_reward(self, rewards): """ Return the overall reward for the step (called at the end of a step). Overwritten by variants. :param rewards: Dict of avg rewards per UE (before and after movement) :returns: Reward for the step (depends on the env variant; here just for one UE) """ # here: get reward for UE that applied the action (at time - 1) ue = self.ue_list[(self.time-1) % self.num_ue] return rewards[ue] def done(self): """ Return whether the episode is done. :return: Whether the current episode is done or not """ # return self.time >= self.episode_length # always return done=None to indicate that there is no natural episode ending # by default, the env is reset by RLlib and simulator when horizon=eps_length is reached # for continuous training, it the env is never reset during training return None def info(self): """ Return info dict that's returned after a step. Includes info about current time step and metrics of choice. The metrics can be adjusted as required, but need to be structured into scalar_metrics and vector_metrics to be handled automatically. """ if not self.log_metrics: return {'time': self.time} info_dict = { 'time': self.time, # scalar metrics are metrics that consist of a single number/value per step 'scalar_metrics': { # these are currently dicts; would need to aggregate them to a single number if needed # 'unsucc_conn': unsucc_conn, # 'lost_conn': lost_conn, # num UEs without any connection # 'num_ues_wo_conn': sum([1 if len(ue.bs_dr) == 0 else 0 for ue in self.ue_list]), # 'avg_utility': np.mean([ue.utility for ue in self.ue_list]), 'sum_utility': sum([ue.utility for ue in self.ue_list]) }, # vector metrics are metrics that contain a dict of values for each step with UE --> metric # currently not supported (needs some adjustments in simulator) 'vector_metrics': { 'dr': {f'UE {ue}': ue.curr_dr for ue in self.ue_list}, 'utility': {f'UE {ue}': ue.utility for ue in self.ue_list}, } } return info_dict def step(self, action): """ Environment step consisting of 1) Applying actions, 2) Updating data rates and rewards, 3) Moving UEs, 4) Updating data rates and rewards again (avg with reward before), 5) Updating the observation In the base env here, only one UE applies actions per time step. This is overwritten in other env variants. :param action: Action to be applied. Here, for a single UE. In other env variants, for all UEs. :return: Tuple of next observation, reward, done, info """ prev_obs = self.obs # get & apply action action_dict = self.get_ue_actions(action) penalties = self.apply_ue_actions(action_dict) # add new UE according to configured interval # important to add it here, after the action is applied but before the reward is calculated # after action is applied, otherwise a central approach could already apply a decision for the newly added UE # before reward is calculated, otherwise the reward and observation keys would not match (required by RLlib) if self.new_ue_interval is not None and self.time > 0 and self.time % self.new_ue_interval == 0: self.add_new_ue() # move UEs, update data rates and rewards in between; increment time rewards_before = self.update_ue_drs_rewards(penalties=penalties) lost_conn = self.move_ues() # penalty of -1 for lost connections due to movement (rather than active disconnect) # penalties = {ue: -1 * lost_conn[ue] for ue in self.ue_list} # update of drs is needed even if we don't need the reward rewards_after = self.update_ue_drs_rewards(penalties=None, update_only=True) # rewards = {ue: np.mean([rewards_before[ue], rewards_after[ue]]) for ue in self.ue_list} rewards = rewards_before self.time += 1 # get and return next obs, reward, done, info self.obs = self.get_obs() reward = self.step_reward(rewards) done = self.done() # dummy unsucc_conn -1 since it's currently not of interest # unsucc_conn = {ue: -1 for ue in self.ue_list} info = self.info() self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs, done=done) return self.obs, reward, done, info def render(self, mode='human'): """Plot and visualize the current status of the world. Return the patch of actors for animation.""" # list of matplotlib "artists", which can be used to create animations patch = [] # limit to map borders plt.xlim(0, self.map.width) plt.ylim(0, self.map.height) # users & connections # show utility as red to yellow to green. use color map for [0,1) --> normalize utility first colormap = cm.get_cmap('RdYlGn') norm = plt.Normalize(-20, 20) for ue in self.ue_list: # plot connections to all BS for bs, dr in ue.bs_dr.items(): color = colormap(norm(log_utility(dr))) # add black background/borders for lines to make them better visible if the utility color is too light patch.extend(plt.plot([ue.pos.x, bs.pos.x], [ue.pos.y, bs.pos.y], color=color, path_effects=[pe.SimpleLineShadow(shadow_color='black'), pe.Normal()])) # path_effects=[pe.Stroke(linewidth=2, foreground='black'), pe.Normal()])) # plot UE patch.extend(ue.plot()) # base stations for bs in self.bs_list: patch.extend(bs.plot()) # title isn't redrawn in animation (out of box) --> static --> show time as text inside box, top-right corner patch.append(plt.title(type(self).__name__)) # extra info: time step, total data rate & utility patch.append(plt.text(0.9*self.map.width, 0.95*self.map.height, f"t={self.time}")) patch.append(plt.text(0.9*self.map.width, 0.9*self.map.height, f"dr={self.total_dr:.2f}")) patch.append(plt.text(0.9*self.map.width, 0.85*self.map.height, f"util={self.total_utility:.2f}")) # legend doesn't change --> only draw once at the beginning # if self.time == 0: # plt.legend(loc='upper left') return patch def add_new_ue(self, velocity='slow'): """Simulate arrival of a new UE in the env""" # choose ID based on last UE's ID; assuming it can be cast to int id = int(self.ue_list[-1].id) + 1 # choose a random position on the border of the map for the UE to appear pos = self.map.rand_border_point() new_ue = User(str(id), self.map, pos.x, pos.y, movement=RandomWaypoint(self.map, velocity=velocity)) # seed with fixed but unique seed to have different movement new_ue.seed(self.env_seed + id * 100) # reset to ensure the new seed is applied, eg, to always select the same "random" waypoint with the same seed new_ue.reset() self.ue_list.append(new_ue)
Ancestors
- gym.core.Env
Subclasses
Class variables
var metadata
Instance variables
var num_bs
-
Expand source code
@property def num_bs(self): return len(self.bs_list)
var num_ue
-
Expand source code
@property def num_ue(self): return len(self.ue_list)
var total_dr
-
Expand source code
@property def total_dr(self): return sum([ue.curr_dr for ue in self.ue_list])
var total_utility
-
Expand source code
@property def total_utility(self): return sum([ue.utility for ue in self.ue_list])
Methods
def add_new_ue(self, velocity='slow')
-
Simulate arrival of a new UE in the env
Expand source code
def add_new_ue(self, velocity='slow'): """Simulate arrival of a new UE in the env""" # choose ID based on last UE's ID; assuming it can be cast to int id = int(self.ue_list[-1].id) + 1 # choose a random position on the border of the map for the UE to appear pos = self.map.rand_border_point() new_ue = User(str(id), self.map, pos.x, pos.y, movement=RandomWaypoint(self.map, velocity=velocity)) # seed with fixed but unique seed to have different movement new_ue.seed(self.env_seed + id * 100) # reset to ensure the new seed is applied, eg, to always select the same "random" waypoint with the same seed new_ue.reset() self.ue_list.append(new_ue)
def apply_ue_actions(self, action_dict)
-
Given a dict of UE actions, apply them to the environment and return a dict of penalties. This function remains unchanged and is simply inherited by all envs.
Current penalty: -3 for any connect/disconnect action (for overhead)
:param: Dict of UE –> action :return: Dict UE –> penalty
Expand source code
def apply_ue_actions(self, action_dict): """ Given a dict of UE actions, apply them to the environment and return a dict of penalties. This function remains unchanged and is simply inherited by all envs. Current penalty: -3 for any connect/disconnect action (for overhead) :param: Dict of UE --> action :return: Dict UE --> penalty """ penalties = {ue: 0 for ue in self.ue_list} for ue, action in action_dict.items(): # apply action: try to connect to BS; or: 0 = no op if action > 0: bs = self.bs_list[action-1] connected = ue.connect_to_bs(bs, disconnect=True, return_connected=True) # penalty -5 for connecting to a BS that's already in use by other UEs # if connected and bs.num_conn_ues >= 2: # penalties[ue] = -5 # # and +5 for disconnecting from a BS that's used by others # if not connected and bs.num_conn_ues >= 1: # penalties[ue] = +5 # penalty -3 for any connect/disconnect (whether successful or not) # penalties[ue] = -3 # # add a penalty for concurrent connections (overhead for joint transmission), ie, for any 2+ connections # # tunable penalty weight representing the cost of concurrent connections # weight = 0 # connections = len(ue.bs_dr) # if connections > 1: # penalty -= weight * (connections - 1) return penalties
def calc_reward(self, ue, penalty)
-
Calculate and return reward for specific UE: The UE's utility (based on its data rate) + penalty
Expand source code
def calc_reward(self, ue, penalty): """ Calculate and return reward for specific UE: The UE's utility (based on its data rate) + penalty """ # clip utility to -20, 20 to avoid -inf for 0 dr and cap at 100 dr clip_util = np.clip(ue.utility, -20, 20) # add penalty and clip again to stay in range -20, 20 return np.clip(clip_util + penalty, -20, 20) / 20
def done(self)
-
Return whether the episode is done.
:return: Whether the current episode is done or not
Expand source code
def done(self): """ Return whether the episode is done. :return: Whether the current episode is done or not """ # return self.time >= self.episode_length # always return done=None to indicate that there is no natural episode ending # by default, the env is reset by RLlib and simulator when horizon=eps_length is reached # for continuous training, it the env is never reset during training return None
def get_max_num_ue(self)
-
Get the maximum number of UEs within an episode based on the new UE interval
Expand source code
def get_max_num_ue(self): """Get the maximum number of UEs within an episode based on the new UE interval""" max_ues = self.num_ue if self.new_ue_interval is not None: # calculate the max number of UEs if one new UE is added at a given interval # eps_length - 1 because time is increased before checking done and t=eps_length is never reached max_ues = self.num_ue + int((self.episode_length - 1) / self.new_ue_interval) self.log.info('Num. UEs varies over time.', new_ue_interval=self.new_ue_interval, num_ues=self.num_ue, max_ues=max_ues, episode_length=self.episode_length) return max_ues
def get_obs(self)
-
Return the current observation. Called to get the next observation after a step. Here, the obs for the next UE. Overwritten by env vars as needed.
:returns: Next observation
Expand source code
def get_obs(self): """ Return the current observation. Called to get the next observation after a step. Here, the obs for the next UE. Overwritten by env vars as needed. :returns: Next observation """ next_ue = self.ue_list[self.time % self.num_ue] return self.get_ue_obs(next_ue)
def get_ue_actions(self, action)
-
Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env.
Here, in the single agent case, just get the action of a single active UE. Overwritten in other child envs, eg, for multi or central agent.
:param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) –> action
Expand source code
def get_ue_actions(self, action): """ Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env. Here, in the single agent case, just get the action of a single active UE. Overwritten in other child envs, eg, for multi or central agent. :param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) --> action """ assert self.action_space.contains(action), f"Action {action} does not fit action space {self.action_space}" # default to noop action action_dict = {ue: 0 for ue in self.ue_list} # select active UE (to update in this step) using round robin and get corresponding action ue = self.ue_list[self.time % self.num_ue] action_dict[ue] = action return action_dict
def get_ue_obs(self, ue)
-
Return the an observation of the current world for a given UE
Expand source code
def get_ue_obs(self, ue): """Return the an observation of the current world for a given UE""" raise NotImplementedError('Implement in subclass')
def info(self)
-
Return info dict that's returned after a step. Includes info about current time step and metrics of choice. The metrics can be adjusted as required, but need to be structured into scalar_metrics and vector_metrics to be handled automatically.
Expand source code
def info(self): """ Return info dict that's returned after a step. Includes info about current time step and metrics of choice. The metrics can be adjusted as required, but need to be structured into scalar_metrics and vector_metrics to be handled automatically. """ if not self.log_metrics: return {'time': self.time} info_dict = { 'time': self.time, # scalar metrics are metrics that consist of a single number/value per step 'scalar_metrics': { # these are currently dicts; would need to aggregate them to a single number if needed # 'unsucc_conn': unsucc_conn, # 'lost_conn': lost_conn, # num UEs without any connection # 'num_ues_wo_conn': sum([1 if len(ue.bs_dr) == 0 else 0 for ue in self.ue_list]), # 'avg_utility': np.mean([ue.utility for ue in self.ue_list]), 'sum_utility': sum([ue.utility for ue in self.ue_list]) }, # vector metrics are metrics that contain a dict of values for each step with UE --> metric # currently not supported (needs some adjustments in simulator) 'vector_metrics': { 'dr': {f'UE {ue}': ue.curr_dr for ue in self.ue_list}, 'utility': {f'UE {ue}': ue.utility for ue in self.ue_list}, } } return info_dict
def move_ues(self)
-
Move all UEs and return dict of penalties corresponding to number of lost connections.
:return: Dict with num lost connections: UE –> num. lost connections
Expand source code
def move_ues(self): """ Move all UEs and return dict of penalties corresponding to number of lost connections. :return: Dict with num lost connections: UE --> num. lost connections """ lost_conn = dict() for ue in self.ue_list: num_lost_conn = ue.move() # add penalty of -1 for each lost connection through movement (rather than actively disconnected) lost_conn[ue] = num_lost_conn return lost_conn
def render(self, mode='human')
-
Plot and visualize the current status of the world. Return the patch of actors for animation.
Expand source code
def render(self, mode='human'): """Plot and visualize the current status of the world. Return the patch of actors for animation.""" # list of matplotlib "artists", which can be used to create animations patch = [] # limit to map borders plt.xlim(0, self.map.width) plt.ylim(0, self.map.height) # users & connections # show utility as red to yellow to green. use color map for [0,1) --> normalize utility first colormap = cm.get_cmap('RdYlGn') norm = plt.Normalize(-20, 20) for ue in self.ue_list: # plot connections to all BS for bs, dr in ue.bs_dr.items(): color = colormap(norm(log_utility(dr))) # add black background/borders for lines to make them better visible if the utility color is too light patch.extend(plt.plot([ue.pos.x, bs.pos.x], [ue.pos.y, bs.pos.y], color=color, path_effects=[pe.SimpleLineShadow(shadow_color='black'), pe.Normal()])) # path_effects=[pe.Stroke(linewidth=2, foreground='black'), pe.Normal()])) # plot UE patch.extend(ue.plot()) # base stations for bs in self.bs_list: patch.extend(bs.plot()) # title isn't redrawn in animation (out of box) --> static --> show time as text inside box, top-right corner patch.append(plt.title(type(self).__name__)) # extra info: time step, total data rate & utility patch.append(plt.text(0.9*self.map.width, 0.95*self.map.height, f"t={self.time}")) patch.append(plt.text(0.9*self.map.width, 0.9*self.map.height, f"dr={self.total_dr:.2f}")) patch.append(plt.text(0.9*self.map.width, 0.85*self.map.height, f"util={self.total_utility:.2f}")) # legend doesn't change --> only draw once at the beginning # if self.time == 0: # plt.legend(loc='upper left') return patch
def reset(self)
-
Reset environment by resetting time and all UEs (pos & movement) and their connections
Expand source code
def reset(self): """Reset environment by resetting time and all UEs (pos & movement) and their connections""" if not self.rand_episodes: # seed again before every reset --> always run same episodes. agent behavior may still differ self.seed(self.env_seed) self.time = 0 # reset UE list to avoid growing it infinitely old_list = self.ue_list self.ue_list = copy.copy(self.original_ue_list) # delete UEs that are no longer on the list for ue in set(old_list) - set(self.ue_list): del ue del old_list # reset all UEs and BS for ue in self.ue_list: ue.reset() for bs in self.bs_list: bs.reset() return self.get_obs()
def seed(self, seed=None)
-
Sets the seed for this env's random number generator(s).
Note
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren't accidental correlations between multiple generators.
Returns
list
: Returns the list of seeds used in this env's random number generators. The first value in the list should be the "main" seed, or the value which a reproducer should pass to 'seed'. Often, the main seed equals the provided 'seed', but this won't be true if seed=None, for example. Expand source code
def seed(self, seed=None): if seed is not None: random.seed(seed) # seed RNG of map (used to generate new UEs' arrival points) self.map.seed(seed) # seed the RNG of all UEs (which themselves seed their movement) offset = 0 for ue in self.ue_list: # add an offset to each UE's seed to avoid that all UEs have the same "random" pos and movement # use a fixed, not random offset here; otherwise it's again different in test & train offset += 100 ue.seed(seed + offset)
def set_log_level(self, log_dict)
-
Set a logging levels for a set of given logger. Needs to happen here, inside the env, for RLlib workers to work. :param dict log_dict: Dict with logger name –> logging level (eg, logging.INFO)
Expand source code
def set_log_level(self, log_dict): """ Set a logging levels for a set of given logger. Needs to happen here, inside the env, for RLlib workers to work. :param dict log_dict: Dict with logger name --> logging level (eg, logging.INFO) """ for logger_name, level in log_dict.items(): logging.getLogger(logger_name).setLevel(level)
def step(self, action)
-
Environment step consisting of 1) Applying actions, 2) Updating data rates and rewards, 3) Moving UEs, 4) Updating data rates and rewards again (avg with reward before), 5) Updating the observation
In the base env here, only one UE applies actions per time step. This is overwritten in other env variants.
:param action: Action to be applied. Here, for a single UE. In other env variants, for all UEs. :return: Tuple of next observation, reward, done, info
Expand source code
def step(self, action): """ Environment step consisting of 1) Applying actions, 2) Updating data rates and rewards, 3) Moving UEs, 4) Updating data rates and rewards again (avg with reward before), 5) Updating the observation In the base env here, only one UE applies actions per time step. This is overwritten in other env variants. :param action: Action to be applied. Here, for a single UE. In other env variants, for all UEs. :return: Tuple of next observation, reward, done, info """ prev_obs = self.obs # get & apply action action_dict = self.get_ue_actions(action) penalties = self.apply_ue_actions(action_dict) # add new UE according to configured interval # important to add it here, after the action is applied but before the reward is calculated # after action is applied, otherwise a central approach could already apply a decision for the newly added UE # before reward is calculated, otherwise the reward and observation keys would not match (required by RLlib) if self.new_ue_interval is not None and self.time > 0 and self.time % self.new_ue_interval == 0: self.add_new_ue() # move UEs, update data rates and rewards in between; increment time rewards_before = self.update_ue_drs_rewards(penalties=penalties) lost_conn = self.move_ues() # penalty of -1 for lost connections due to movement (rather than active disconnect) # penalties = {ue: -1 * lost_conn[ue] for ue in self.ue_list} # update of drs is needed even if we don't need the reward rewards_after = self.update_ue_drs_rewards(penalties=None, update_only=True) # rewards = {ue: np.mean([rewards_before[ue], rewards_after[ue]]) for ue in self.ue_list} rewards = rewards_before self.time += 1 # get and return next obs, reward, done, info self.obs = self.get_obs() reward = self.step_reward(rewards) done = self.done() # dummy unsucc_conn -1 since it's currently not of interest # unsucc_conn = {ue: -1 for ue in self.ue_list} info = self.info() self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs, done=done) return self.obs, reward, done, info
def step_reward(self, rewards)
-
Return the overall reward for the step (called at the end of a step). Overwritten by variants.
:param rewards: Dict of avg rewards per UE (before and after movement) :returns: Reward for the step (depends on the env variant; here just for one UE)
Expand source code
def step_reward(self, rewards): """ Return the overall reward for the step (called at the end of a step). Overwritten by variants. :param rewards: Dict of avg rewards per UE (before and after movement) :returns: Reward for the step (depends on the env variant; here just for one UE) """ # here: get reward for UE that applied the action (at time - 1) ue = self.ue_list[(self.time-1) % self.num_ue] return rewards[ue]
def test_ue_actions(self, action_dict)
-
Test a given set of UE actions by applying them to a copy of the environment and return the rewards.
:param action_dict: Actions to apply. Dict of UE –> action :return: Rewards for each UE when applying the action
Expand source code
def test_ue_actions(self, action_dict): """ Test a given set of UE actions by applying them to a copy of the environment and return the rewards. :param action_dict: Actions to apply. Dict of UE --> action :return: Rewards for each UE when applying the action """ # for proportional-fair sharing, also keep track of EWMA dr as it affects the allocated resources original_ewma_drs = {ue: ue.ewma_dr for ue in self.ue_list} self.apply_ue_actions(action_dict) # update ewma; need to update drs first for EWMA calculation to be correct self.update_ue_drs_rewards(penalties=None, update_only=True) for ue in self.ue_list: ue.update_ewma_dr() rewards = self.update_ue_drs_rewards(penalties=None) # to revert the action, apply it again: toggles connection again between same UE-BS self.apply_ue_actions(action_dict) # revert the ewma for ue in self.ue_list: ue.ewma_dr = original_ewma_drs[ue] self.update_ue_drs_rewards(penalties=None, update_only=True) # simpler + can be parallelized: just create a copy of the env, apply actions, get reward, delete copy # no, doesn't work: It's MUUUUUUUUUUUUUCH slower (probably due to all the deep copies) # test_env = copy.deepcopy(self) # test_env.apply_ue_actions(action_dict) # rewards = test_env.update_ue_drs_rewards(penalties=None) # del test_env return rewards
def update_ue_drs_rewards(self, penalties, update_only=False)
-
Update cached data rates of all UE-BS connections. Calculate and return corresponding rewards based on given penalties.
:param penalties: Dict of penalties for all UEs. Used for calculating rewards. :return: Dict of rewards: UE –> reward (incl. penalty)
Expand source code
def update_ue_drs_rewards(self, penalties, update_only=False): """ Update cached data rates of all UE-BS connections. Calculate and return corresponding rewards based on given penalties. :param penalties: Dict of penalties for all UEs. Used for calculating rewards. :return: Dict of rewards: UE --> reward (incl. penalty) """ rewards = dict() for ue in self.ue_list: ue.update_curr_dr() # calc and return reward if needed if update_only: # only update drs and return reward 0 rewards[ue] = 0 else: if penalties is None or ue not in penalties.keys(): rewards[ue] = self.calc_reward(ue, penalty=0) else: rewards[ue] = self.calc_reward(ue, penalties[ue]) return rewards