Module deepcomp.env.multi_ue.multi_agent
Expand source code
import numpy as np
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from deepcomp.env.single_ue.variants import DatarateMobileEnv, NormDrMobileEnv, RelNormEnv, MaxNormEnv
class MultiAgentMobileEnv(RelNormEnv, MultiAgentEnv):
"""
Multi-UE and multi-agent env.
Inherits the parent env's (eg, DatarateMobileEnv) constructor, step, visualization
& overwrites MultiAgentEnv's reset and step.
https://docs.ray.io/en/latest/rllib-env.html#multi-agent-and-hierarchical
"""
def __init__(self, env_config):
# this calls parent env.__ini__() since MultiAgentEnv doesn't have an __init__
super().__init__(env_config)
# inherits attributes, obs and action space from parent env
# how to aggregate rewards from multiple UEs (sum or min utility)
self.reward_agg = env_config['reward']
def get_ue_actions(self, action):
"""
Retrieve the action per UE from the RL agent's action and return in in form of a dict.
Does not yet apply actions to env.
:param action: Action that depends on the agent type (single, central, multi)
:return: Dict that consistently (indep. of agent type) maps UE (object) --> action
"""
# get action for each UE based on ID
return {ue: action[ue.id] for ue in self.ue_list if ue.id in action}
def get_obs(self):
"""Return next obs: Dict with UE --> obs"""
obs = dict()
for ue in self.ue_list:
obs[ue.id] = self.get_ue_obs(ue)
return obs
def step_reward(self, rewards):
"""
Return rewards as they are but use UE ID as key instead of UE itself.
The reward key needs to be same as obs key & sortable not just hashable.
"""
# sum_rewards = sum(rewards.values())
# return {ue.id: sum_rewards for ue in rewards.keys()}
# return {ue.id: r for ue, r in rewards.items()}
# variant: add aggregated utility of UEs at the same BS
new_rewards = dict()
for ue, r in rewards.items():
# initialize to own utility in case the UE is not connected to any BS and has no neighbors
agg_util = r
# neighbors include the UE itself
neighbors = ue.ues_at_same_bs()
if len(neighbors) > 0:
# aggregate utility of different UEs as configured
if self.reward_agg == 'sum':
agg_util = sum([rewards[neighbor] for neighbor in neighbors])
elif self.reward_agg == 'min':
agg_util = min([rewards[neighbor] for neighbor in neighbors])
else:
raise NotImplementedError(f"Unexpected reward aggregation: {self.reward_agg}")
new_rewards[ue.id] = agg_util
self.log.debug('Reward', ue=ue, neighbors=neighbors, own_r=r, agg_util=agg_util)
return new_rewards
def done(self):
"""Return dict of dones: UE --> done?"""
done = super().done()
dones = {ue.id: done for ue in self.ue_list}
dones['__all__'] = done
return dones
def info(self):
"""Return info for each UE as dict. Required by RLlib to be similar to obs."""
info_dict = super().info()
return {ue.id: info_dict for ue in self.ue_list}
class SeqMultiAgentMobileEnv(MultiAgentMobileEnv):
"""
Multi-agent env where all agents observe and act sequentially rather than simultaneously within each time step.
All agents act sequentially within a single time step before they move and time increments.
"""
def __init__(self, env_config):
super().__init__(env_config)
# order of UEs to make sequential decisions; for now identical to list order
self.ue_order = self.ue_list
self.ue_order_idx = 0
self.curr_ue = self.ue_order[self.ue_order_idx]
def get_obs(self):
"""Return only obs for current UE, such that only this UE acts"""
return {self.curr_ue.id: self.get_ue_obs(self.curr_ue)}
def step_reward(self, rewards):
"""Only reward for current UE. Calc as before"""
new_rewards = super().step_reward(rewards)
return {self.curr_ue.id: new_rewards[self.curr_ue.id]}
def done(self):
"""Set done for current UE. For all when reaching the last UE"""
done = super().done()
dones = {
self.curr_ue.id: done,
'__all__': done,
}
return dones
def info(self):
"""Same for info: Only for curr UE. Then increment to next UE since it's the last operation in the step"""
info_dict = super(MultiAgentMobileEnv, self).info()
return {self.curr_ue.id: info_dict}
def step(self, action):
"""Overwrite step to do sequential steps per agent without moving UEs and incrementing time in each step"""
# when reaching the last UE in the order, move time, UEs, etc
# if self.ue_order_idx >= len(self.ue_order):
# self.ue_order_idx = 0
# # move UEs, update drs, increment time
# self.move_ues()
# self.update_ue_drs_rewards(penalties=None, update_only=True)
# self.time += 1
# self.curr_ue = self.ue_order[self.ue_order_idx]
# same as in normal step
prev_obs = self.obs
action_dict = self.get_ue_actions(action)
penalties = self.apply_ue_actions(action_dict)
rewards = self.update_ue_drs_rewards(penalties=penalties)
# increment UE idx to now handle next user; but do not move or increment time
if self.ue_order_idx + 1 < len(self.ue_order):
self.ue_order_idx += 1
else:
self.ue_order_idx = 0
# move UEs, update drs, increment time
self.move_ues()
self.update_ue_drs_rewards(penalties=None, update_only=True)
self.time += 1
self.curr_ue = self.ue_order[self.ue_order_idx]
self.obs = self.get_obs()
reward = self.step_reward(rewards)
done = self.done()
info = self.info()
self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs,
done=done)
return self.obs, reward, done, info
Classes
class MultiAgentMobileEnv (env_config)
-
Multi-UE and multi-agent env. Inherits the parent env's (eg, DatarateMobileEnv) constructor, step, visualization & overwrites MultiAgentEnv's reset and step. https://docs.ray.io/en/latest/rllib-env.html#multi-agent-and-hierarchical
Create a new environment object with an OpenAI Gym interface. Required fields in the env_config:
- episode_length: Total number of simulation time steps in one episode
- map: Map object representing the playground
- bs_list: List of base station objects in the environment
- ue_list: List of UE objects in the environment
- seed: Seed for the RNG; for reproducibility. May be None.
:param env_config: Dict containing all configuration options for the environment. Required by RLlib.
Expand source code
class MultiAgentMobileEnv(RelNormEnv, MultiAgentEnv): """ Multi-UE and multi-agent env. Inherits the parent env's (eg, DatarateMobileEnv) constructor, step, visualization & overwrites MultiAgentEnv's reset and step. https://docs.ray.io/en/latest/rllib-env.html#multi-agent-and-hierarchical """ def __init__(self, env_config): # this calls parent env.__ini__() since MultiAgentEnv doesn't have an __init__ super().__init__(env_config) # inherits attributes, obs and action space from parent env # how to aggregate rewards from multiple UEs (sum or min utility) self.reward_agg = env_config['reward'] def get_ue_actions(self, action): """ Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env. :param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) --> action """ # get action for each UE based on ID return {ue: action[ue.id] for ue in self.ue_list if ue.id in action} def get_obs(self): """Return next obs: Dict with UE --> obs""" obs = dict() for ue in self.ue_list: obs[ue.id] = self.get_ue_obs(ue) return obs def step_reward(self, rewards): """ Return rewards as they are but use UE ID as key instead of UE itself. The reward key needs to be same as obs key & sortable not just hashable. """ # sum_rewards = sum(rewards.values()) # return {ue.id: sum_rewards for ue in rewards.keys()} # return {ue.id: r for ue, r in rewards.items()} # variant: add aggregated utility of UEs at the same BS new_rewards = dict() for ue, r in rewards.items(): # initialize to own utility in case the UE is not connected to any BS and has no neighbors agg_util = r # neighbors include the UE itself neighbors = ue.ues_at_same_bs() if len(neighbors) > 0: # aggregate utility of different UEs as configured if self.reward_agg == 'sum': agg_util = sum([rewards[neighbor] for neighbor in neighbors]) elif self.reward_agg == 'min': agg_util = min([rewards[neighbor] for neighbor in neighbors]) else: raise NotImplementedError(f"Unexpected reward aggregation: {self.reward_agg}") new_rewards[ue.id] = agg_util self.log.debug('Reward', ue=ue, neighbors=neighbors, own_r=r, agg_util=agg_util) return new_rewards def done(self): """Return dict of dones: UE --> done?""" done = super().done() dones = {ue.id: done for ue in self.ue_list} dones['__all__'] = done return dones def info(self): """Return info for each UE as dict. Required by RLlib to be similar to obs.""" info_dict = super().info() return {ue.id: info_dict for ue in self.ue_list}
Ancestors
- RelNormEnv
- BinaryMobileEnv
- MobileEnv
- gym.core.Env
- ray.rllib.env.multi_agent_env.MultiAgentEnv
Subclasses
Methods
def done(self)
-
Return dict of dones: UE –> done?
Expand source code
def done(self): """Return dict of dones: UE --> done?""" done = super().done() dones = {ue.id: done for ue in self.ue_list} dones['__all__'] = done return dones
def get_obs(self)
-
Return next obs: Dict with UE –> obs
Expand source code
def get_obs(self): """Return next obs: Dict with UE --> obs""" obs = dict() for ue in self.ue_list: obs[ue.id] = self.get_ue_obs(ue) return obs
def get_ue_actions(self, action)
-
Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env.
:param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) –> action
Expand source code
def get_ue_actions(self, action): """ Retrieve the action per UE from the RL agent's action and return in in form of a dict. Does not yet apply actions to env. :param action: Action that depends on the agent type (single, central, multi) :return: Dict that consistently (indep. of agent type) maps UE (object) --> action """ # get action for each UE based on ID return {ue: action[ue.id] for ue in self.ue_list if ue.id in action}
def info(self)
-
Return info for each UE as dict. Required by RLlib to be similar to obs.
Expand source code
def info(self): """Return info for each UE as dict. Required by RLlib to be similar to obs.""" info_dict = super().info() return {ue.id: info_dict for ue in self.ue_list}
def step_reward(self, rewards)
-
Return rewards as they are but use UE ID as key instead of UE itself. The reward key needs to be same as obs key & sortable not just hashable.
Expand source code
def step_reward(self, rewards): """ Return rewards as they are but use UE ID as key instead of UE itself. The reward key needs to be same as obs key & sortable not just hashable. """ # sum_rewards = sum(rewards.values()) # return {ue.id: sum_rewards for ue in rewards.keys()} # return {ue.id: r for ue, r in rewards.items()} # variant: add aggregated utility of UEs at the same BS new_rewards = dict() for ue, r in rewards.items(): # initialize to own utility in case the UE is not connected to any BS and has no neighbors agg_util = r # neighbors include the UE itself neighbors = ue.ues_at_same_bs() if len(neighbors) > 0: # aggregate utility of different UEs as configured if self.reward_agg == 'sum': agg_util = sum([rewards[neighbor] for neighbor in neighbors]) elif self.reward_agg == 'min': agg_util = min([rewards[neighbor] for neighbor in neighbors]) else: raise NotImplementedError(f"Unexpected reward aggregation: {self.reward_agg}") new_rewards[ue.id] = agg_util self.log.debug('Reward', ue=ue, neighbors=neighbors, own_r=r, agg_util=agg_util) return new_rewards
Inherited members
class SeqMultiAgentMobileEnv (env_config)
-
Multi-agent env where all agents observe and act sequentially rather than simultaneously within each time step. All agents act sequentially within a single time step before they move and time increments.
Create a new environment object with an OpenAI Gym interface. Required fields in the env_config:
- episode_length: Total number of simulation time steps in one episode
- map: Map object representing the playground
- bs_list: List of base station objects in the environment
- ue_list: List of UE objects in the environment
- seed: Seed for the RNG; for reproducibility. May be None.
:param env_config: Dict containing all configuration options for the environment. Required by RLlib.
Expand source code
class SeqMultiAgentMobileEnv(MultiAgentMobileEnv): """ Multi-agent env where all agents observe and act sequentially rather than simultaneously within each time step. All agents act sequentially within a single time step before they move and time increments. """ def __init__(self, env_config): super().__init__(env_config) # order of UEs to make sequential decisions; for now identical to list order self.ue_order = self.ue_list self.ue_order_idx = 0 self.curr_ue = self.ue_order[self.ue_order_idx] def get_obs(self): """Return only obs for current UE, such that only this UE acts""" return {self.curr_ue.id: self.get_ue_obs(self.curr_ue)} def step_reward(self, rewards): """Only reward for current UE. Calc as before""" new_rewards = super().step_reward(rewards) return {self.curr_ue.id: new_rewards[self.curr_ue.id]} def done(self): """Set done for current UE. For all when reaching the last UE""" done = super().done() dones = { self.curr_ue.id: done, '__all__': done, } return dones def info(self): """Same for info: Only for curr UE. Then increment to next UE since it's the last operation in the step""" info_dict = super(MultiAgentMobileEnv, self).info() return {self.curr_ue.id: info_dict} def step(self, action): """Overwrite step to do sequential steps per agent without moving UEs and incrementing time in each step""" # when reaching the last UE in the order, move time, UEs, etc # if self.ue_order_idx >= len(self.ue_order): # self.ue_order_idx = 0 # # move UEs, update drs, increment time # self.move_ues() # self.update_ue_drs_rewards(penalties=None, update_only=True) # self.time += 1 # self.curr_ue = self.ue_order[self.ue_order_idx] # same as in normal step prev_obs = self.obs action_dict = self.get_ue_actions(action) penalties = self.apply_ue_actions(action_dict) rewards = self.update_ue_drs_rewards(penalties=penalties) # increment UE idx to now handle next user; but do not move or increment time if self.ue_order_idx + 1 < len(self.ue_order): self.ue_order_idx += 1 else: self.ue_order_idx = 0 # move UEs, update drs, increment time self.move_ues() self.update_ue_drs_rewards(penalties=None, update_only=True) self.time += 1 self.curr_ue = self.ue_order[self.ue_order_idx] self.obs = self.get_obs() reward = self.step_reward(rewards) done = self.done() info = self.info() self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs, done=done) return self.obs, reward, done, info
Ancestors
- MultiAgentMobileEnv
- RelNormEnv
- BinaryMobileEnv
- MobileEnv
- gym.core.Env
- ray.rllib.env.multi_agent_env.MultiAgentEnv
Methods
def done(self)
-
Set done for current UE. For all when reaching the last UE
Expand source code
def done(self): """Set done for current UE. For all when reaching the last UE""" done = super().done() dones = { self.curr_ue.id: done, '__all__': done, } return dones
def get_obs(self)
-
Return only obs for current UE, such that only this UE acts
Expand source code
def get_obs(self): """Return only obs for current UE, such that only this UE acts""" return {self.curr_ue.id: self.get_ue_obs(self.curr_ue)}
def info(self)
-
Same for info: Only for curr UE. Then increment to next UE since it's the last operation in the step
Expand source code
def info(self): """Same for info: Only for curr UE. Then increment to next UE since it's the last operation in the step""" info_dict = super(MultiAgentMobileEnv, self).info() return {self.curr_ue.id: info_dict}
def step(self, action)
-
Overwrite step to do sequential steps per agent without moving UEs and incrementing time in each step
Expand source code
def step(self, action): """Overwrite step to do sequential steps per agent without moving UEs and incrementing time in each step""" # when reaching the last UE in the order, move time, UEs, etc # if self.ue_order_idx >= len(self.ue_order): # self.ue_order_idx = 0 # # move UEs, update drs, increment time # self.move_ues() # self.update_ue_drs_rewards(penalties=None, update_only=True) # self.time += 1 # self.curr_ue = self.ue_order[self.ue_order_idx] # same as in normal step prev_obs = self.obs action_dict = self.get_ue_actions(action) penalties = self.apply_ue_actions(action_dict) rewards = self.update_ue_drs_rewards(penalties=penalties) # increment UE idx to now handle next user; but do not move or increment time if self.ue_order_idx + 1 < len(self.ue_order): self.ue_order_idx += 1 else: self.ue_order_idx = 0 # move UEs, update drs, increment time self.move_ues() self.update_ue_drs_rewards(penalties=None, update_only=True) self.time += 1 self.curr_ue = self.ue_order[self.ue_order_idx] self.obs = self.get_obs() reward = self.step_reward(rewards) done = self.done() info = self.info() self.log.info("Step", time=self.time, prev_obs=prev_obs, action=action, reward=reward, next_obs=self.obs, done=done) return self.obs, reward, done, info
def step_reward(self, rewards)
-
Only reward for current UE. Calc as before
Expand source code
def step_reward(self, rewards): """Only reward for current UE. Calc as before""" new_rewards = super().step_reward(rewards) return {self.curr_ue.id: new_rewards[self.curr_ue.id]}
Inherited members