Module deepcomp.env.entities.user
Expand source code
import random
import structlog
from shapely.geometry import Point
import matplotlib.pyplot as plt
from matplotlib import cm
from deepcomp.env.util.utility import step_utility, log_utility
from deepcomp.util.constants import FAIR_WEIGHT_ALPHA, FAIR_WEIGHT_BETA, EPSILON
class User:
"""
A user/UE moving around in the world and requesting mobile services
Connection to BS are checked before connecting and after every move to check if connection is lost or still stable
"""
def __init__(self, id, map, pos_x, pos_y, movement, dr_req=1):
"""
Create new UE object
:param id: Unique ID of UE (string)
:param map: Map object representing the playground/world
:param pos_x: x-coord of starting position or 'random'
:param pos_y: y-coord of starting position or 'random'
:param movement: Movement utility object implementing the movement of the UE
:param dr_req: Data rate requirement by UE for successful service
"""
self.id = id
self.map = map
self.movement = movement
self.dr_req = dr_req
# dict of connected BS: BS (only connected BS are keys!) --> data rate of connection
self.bs_dr = dict()
# own RNG for reproducibility; global random shares state that's manipulated by RL during training
self.rng = random.Random()
self.init_pos_x = pos_x
self.init_pos_y = pos_y
self.pos = None
self.reset_pos()
self.movement.reset()
# exponentially weighted moving average data rate
self.ewma_dr = 0
self.log = structlog.get_logger(id=self.id, pos=str(self.pos), ewma_dr=self.ewma_dr,
conn_bs=list(self.bs_dr.keys()), dr_req=self.dr_req)
self.log.info('UE init')
def __repr__(self):
return str(self.id)
# compare and hash UEs based on their ID only
def __eq__(self, other):
if type(other) is type(self):
return self.id == other.id
return False
def __hash__(self):
return hash(self.id)
@property
def curr_dr(self):
"""Current data rate the UE gets through all its BS connections"""
dr = sum([dr for dr in self.bs_dr.values()])
self.log.debug("Current data rate", curr_dr=dr)
return dr
@property
def dr_req_satisfied(self):
"""Whether or not the UE's data rate requirement is satisfied by its current total data rate"""
return self.curr_dr >= self.dr_req
@property
def utility(self):
"""Utility based on the current data rate and utility function"""
# return step_utility(self.curr_dr, self.dr_req)
return log_utility(self.curr_dr)
def seed(self, seed=None):
self.rng.seed(seed)
self.movement.seed(seed)
def reset_pos(self):
"""(Re)set position based on initial position x and y as Point. Resolve 'random'."""
# set pos_x
pos_x = self.init_pos_x
if pos_x == 'random':
pos_x = self.rng.randint(0, int(self.map.width))
# set pos_y
pos_y = self.init_pos_y
if pos_y == 'random':
pos_y = self.rng.randint(0, int(self.map.height))
# set pos as Point
self.pos = Point(pos_x, pos_y)
def reset(self):
"""Reset UE position, movement, and connections."""
self.reset_pos()
self.movement.reset()
self.bs_dr = dict()
self.ewma_dr = 0
def plot(self, radius=2):
"""
Plot the UE as filled circle with a given radius and the ID. Color from red to green indicating the utility.
:param radius: Radius of the circle
:return: A list of created matplotlib artists
"""
# show utility as red to yellow to green. use color map for [0,1) --> normalize utility first
colormap = cm.get_cmap('RdYlGn')
norm = plt.Normalize(-20, 20)
color = colormap(norm(self.utility))
artists = plt.plot(*self.pos.buffer(radius).exterior.xy, color=color)
artists.extend(plt.fill(*self.pos.buffer(radius).exterior.xy, color=color))
artists.append(plt.annotate(self.id, xy=(self.pos.x, self.pos.y), ha='center', va='center'))
# show curr data rate and utility below the UE
artists.append(plt.annotate(f'dr: {self.curr_dr:.2f}', xy=(self.pos.x, self.pos.y -radius -2),
ha='center', va='center'))
artists.append(plt.annotate(f'util: {self.utility:.2f}', xy=(self.pos.x, self.pos.y -radius -6),
ha='center', va='center'))
return artists
def update_curr_dr(self):
"""Update the current data rate of all BS connections according to the current situation (pos & assignment)"""
for bs in self.bs_dr.keys():
self.bs_dr[bs] = bs.data_rate(self)
def update_ewma_dr(self, weight=0.9):
"""
Update the exp. weighted moving avg. of this UE's current data rate:
`EWMA(t) = weight * dr + (1-weight) * EWMA(t-1)`
Used as historic avg. rate for proportional-fair sharing. Called after movement.
:param weight: Weight for EWMA in [0, 1]. The higher, the more focus on new/current dr and less on previous.
"""
self.ewma_dr = weight * self.curr_dr + (1 - weight) * self.ewma_dr
self.log = self.log.bind(ewma_dr=self.ewma_dr)
def move(self):
"""
Do one step: Move according to own movement pattern. Check for lost connections. Update EWMA data rate.
:return: Number of connections lost through movement
"""
self.pos = self.movement.step(self.pos)
num_lost_connections = self.check_bs_connection()
self.log = self.log.bind(pos=str(self.pos))
self.update_ewma_dr()
self.log.debug("User move", lost_connections=num_lost_connections)
return num_lost_connections
def check_bs_connection(self):
"""
Check if assigned BS connections are still stable (after move), else remove.
:return: Number of removed/lost connections
"""
remove_bs = []
for bs in self.bs_dr.keys():
if not bs.can_connect(self.pos):
self.log.info("Losing connection to BS", bs=bs)
remove_bs.append(bs)
# remove/disconnect bs
for bs in remove_bs:
self.disconnect_from_bs(bs)
return len(remove_bs)
def connect_to_bs(self, bs, disconnect=False, return_connected=False):
"""
Try to connect to specified basestation. Return if successful.
:param bs: Basestation to connect to
:param disconnect: If True, disconnect from BS if it was previously connected.
:param return_connected: If True, return whether the UE is now connected to the BS or not.
Else, return if the (dis-)connect was successful.
:return: True if (dis-)connected successfully. False if out of range. If return_connected, return if connected.
"""
log = self.log.bind(bs=bs, disconnect=disconnect, conn_bs=list(self.bs_dr.keys()))
# already connected
if bs in self.bs_dr.keys():
if disconnect:
self.disconnect_from_bs(bs)
log.info("Disconnected")
if return_connected:
return False
else:
log.info("Staying connected")
return True
# not yet connected
if bs.can_connect(self.pos):
# add BS to connections; important: initialize with data rate
# also important: initialize before adding connection to bs.conn_ues; affects how data rate is calc
self.bs_dr[bs] = bs.data_rate(self)
bs.conn_ues.append(self)
self.log = self.log.bind(conn_bs=list(self.bs_dr.keys()))
log.info("Connected")
return True
else:
# log.info("Cannot connect")
return False
def disconnect_from_bs(self, bs):
"""Disconnect from given BS. Assume BS is currently connected."""
assert bs in self.bs_dr.keys(), "Not connected to BS --> Cannot disconnect"
del self.bs_dr[bs]
bs.conn_ues.remove(self)
self.log = self.log.bind(conn_bs=list(self.bs_dr.keys()))
def ues_at_same_bs(self):
"""Return set of UEs that are currently connected to any of the BS that this UE is connected to"""
ue_set = set()
for bs in self.bs_dr.keys():
ue_set.update(set(bs.conn_ues))
self.log.debug('UEs at same BS', ue_set=ue_set)
return ue_set
Classes
class User (id, map, pos_x, pos_y, movement, dr_req=1)
-
A user/UE moving around in the world and requesting mobile services Connection to BS are checked before connecting and after every move to check if connection is lost or still stable
Create new UE object :param id: Unique ID of UE (string) :param map: Map object representing the playground/world :param pos_x: x-coord of starting position or 'random' :param pos_y: y-coord of starting position or 'random' :param movement: Movement utility object implementing the movement of the UE :param dr_req: Data rate requirement by UE for successful service
Expand source code
class User: """ A user/UE moving around in the world and requesting mobile services Connection to BS are checked before connecting and after every move to check if connection is lost or still stable """ def __init__(self, id, map, pos_x, pos_y, movement, dr_req=1): """ Create new UE object :param id: Unique ID of UE (string) :param map: Map object representing the playground/world :param pos_x: x-coord of starting position or 'random' :param pos_y: y-coord of starting position or 'random' :param movement: Movement utility object implementing the movement of the UE :param dr_req: Data rate requirement by UE for successful service """ self.id = id self.map = map self.movement = movement self.dr_req = dr_req # dict of connected BS: BS (only connected BS are keys!) --> data rate of connection self.bs_dr = dict() # own RNG for reproducibility; global random shares state that's manipulated by RL during training self.rng = random.Random() self.init_pos_x = pos_x self.init_pos_y = pos_y self.pos = None self.reset_pos() self.movement.reset() # exponentially weighted moving average data rate self.ewma_dr = 0 self.log = structlog.get_logger(id=self.id, pos=str(self.pos), ewma_dr=self.ewma_dr, conn_bs=list(self.bs_dr.keys()), dr_req=self.dr_req) self.log.info('UE init') def __repr__(self): return str(self.id) # compare and hash UEs based on their ID only def __eq__(self, other): if type(other) is type(self): return self.id == other.id return False def __hash__(self): return hash(self.id) @property def curr_dr(self): """Current data rate the UE gets through all its BS connections""" dr = sum([dr for dr in self.bs_dr.values()]) self.log.debug("Current data rate", curr_dr=dr) return dr @property def dr_req_satisfied(self): """Whether or not the UE's data rate requirement is satisfied by its current total data rate""" return self.curr_dr >= self.dr_req @property def utility(self): """Utility based on the current data rate and utility function""" # return step_utility(self.curr_dr, self.dr_req) return log_utility(self.curr_dr) def seed(self, seed=None): self.rng.seed(seed) self.movement.seed(seed) def reset_pos(self): """(Re)set position based on initial position x and y as Point. Resolve 'random'.""" # set pos_x pos_x = self.init_pos_x if pos_x == 'random': pos_x = self.rng.randint(0, int(self.map.width)) # set pos_y pos_y = self.init_pos_y if pos_y == 'random': pos_y = self.rng.randint(0, int(self.map.height)) # set pos as Point self.pos = Point(pos_x, pos_y) def reset(self): """Reset UE position, movement, and connections.""" self.reset_pos() self.movement.reset() self.bs_dr = dict() self.ewma_dr = 0 def plot(self, radius=2): """ Plot the UE as filled circle with a given radius and the ID. Color from red to green indicating the utility. :param radius: Radius of the circle :return: A list of created matplotlib artists """ # show utility as red to yellow to green. use color map for [0,1) --> normalize utility first colormap = cm.get_cmap('RdYlGn') norm = plt.Normalize(-20, 20) color = colormap(norm(self.utility)) artists = plt.plot(*self.pos.buffer(radius).exterior.xy, color=color) artists.extend(plt.fill(*self.pos.buffer(radius).exterior.xy, color=color)) artists.append(plt.annotate(self.id, xy=(self.pos.x, self.pos.y), ha='center', va='center')) # show curr data rate and utility below the UE artists.append(plt.annotate(f'dr: {self.curr_dr:.2f}', xy=(self.pos.x, self.pos.y -radius -2), ha='center', va='center')) artists.append(plt.annotate(f'util: {self.utility:.2f}', xy=(self.pos.x, self.pos.y -radius -6), ha='center', va='center')) return artists def update_curr_dr(self): """Update the current data rate of all BS connections according to the current situation (pos & assignment)""" for bs in self.bs_dr.keys(): self.bs_dr[bs] = bs.data_rate(self) def update_ewma_dr(self, weight=0.9): """ Update the exp. weighted moving avg. of this UE's current data rate: `EWMA(t) = weight * dr + (1-weight) * EWMA(t-1)` Used as historic avg. rate for proportional-fair sharing. Called after movement. :param weight: Weight for EWMA in [0, 1]. The higher, the more focus on new/current dr and less on previous. """ self.ewma_dr = weight * self.curr_dr + (1 - weight) * self.ewma_dr self.log = self.log.bind(ewma_dr=self.ewma_dr) def move(self): """ Do one step: Move according to own movement pattern. Check for lost connections. Update EWMA data rate. :return: Number of connections lost through movement """ self.pos = self.movement.step(self.pos) num_lost_connections = self.check_bs_connection() self.log = self.log.bind(pos=str(self.pos)) self.update_ewma_dr() self.log.debug("User move", lost_connections=num_lost_connections) return num_lost_connections def check_bs_connection(self): """ Check if assigned BS connections are still stable (after move), else remove. :return: Number of removed/lost connections """ remove_bs = [] for bs in self.bs_dr.keys(): if not bs.can_connect(self.pos): self.log.info("Losing connection to BS", bs=bs) remove_bs.append(bs) # remove/disconnect bs for bs in remove_bs: self.disconnect_from_bs(bs) return len(remove_bs) def connect_to_bs(self, bs, disconnect=False, return_connected=False): """ Try to connect to specified basestation. Return if successful. :param bs: Basestation to connect to :param disconnect: If True, disconnect from BS if it was previously connected. :param return_connected: If True, return whether the UE is now connected to the BS or not. Else, return if the (dis-)connect was successful. :return: True if (dis-)connected successfully. False if out of range. If return_connected, return if connected. """ log = self.log.bind(bs=bs, disconnect=disconnect, conn_bs=list(self.bs_dr.keys())) # already connected if bs in self.bs_dr.keys(): if disconnect: self.disconnect_from_bs(bs) log.info("Disconnected") if return_connected: return False else: log.info("Staying connected") return True # not yet connected if bs.can_connect(self.pos): # add BS to connections; important: initialize with data rate # also important: initialize before adding connection to bs.conn_ues; affects how data rate is calc self.bs_dr[bs] = bs.data_rate(self) bs.conn_ues.append(self) self.log = self.log.bind(conn_bs=list(self.bs_dr.keys())) log.info("Connected") return True else: # log.info("Cannot connect") return False def disconnect_from_bs(self, bs): """Disconnect from given BS. Assume BS is currently connected.""" assert bs in self.bs_dr.keys(), "Not connected to BS --> Cannot disconnect" del self.bs_dr[bs] bs.conn_ues.remove(self) self.log = self.log.bind(conn_bs=list(self.bs_dr.keys())) def ues_at_same_bs(self): """Return set of UEs that are currently connected to any of the BS that this UE is connected to""" ue_set = set() for bs in self.bs_dr.keys(): ue_set.update(set(bs.conn_ues)) self.log.debug('UEs at same BS', ue_set=ue_set) return ue_set
Instance variables
var curr_dr
-
Current data rate the UE gets through all its BS connections
Expand source code
@property def curr_dr(self): """Current data rate the UE gets through all its BS connections""" dr = sum([dr for dr in self.bs_dr.values()]) self.log.debug("Current data rate", curr_dr=dr) return dr
var dr_req_satisfied
-
Whether or not the UE's data rate requirement is satisfied by its current total data rate
Expand source code
@property def dr_req_satisfied(self): """Whether or not the UE's data rate requirement is satisfied by its current total data rate""" return self.curr_dr >= self.dr_req
var utility
-
Utility based on the current data rate and utility function
Expand source code
@property def utility(self): """Utility based on the current data rate and utility function""" # return step_utility(self.curr_dr, self.dr_req) return log_utility(self.curr_dr)
Methods
def check_bs_connection(self)
-
Check if assigned BS connections are still stable (after move), else remove. :return: Number of removed/lost connections
Expand source code
def check_bs_connection(self): """ Check if assigned BS connections are still stable (after move), else remove. :return: Number of removed/lost connections """ remove_bs = [] for bs in self.bs_dr.keys(): if not bs.can_connect(self.pos): self.log.info("Losing connection to BS", bs=bs) remove_bs.append(bs) # remove/disconnect bs for bs in remove_bs: self.disconnect_from_bs(bs) return len(remove_bs)
def connect_to_bs(self, bs, disconnect=False, return_connected=False)
-
Try to connect to specified basestation. Return if successful.
:param bs: Basestation to connect to :param disconnect: If True, disconnect from BS if it was previously connected. :param return_connected: If True, return whether the UE is now connected to the BS or not. Else, return if the (dis-)connect was successful. :return: True if (dis-)connected successfully. False if out of range. If return_connected, return if connected.
Expand source code
def connect_to_bs(self, bs, disconnect=False, return_connected=False): """ Try to connect to specified basestation. Return if successful. :param bs: Basestation to connect to :param disconnect: If True, disconnect from BS if it was previously connected. :param return_connected: If True, return whether the UE is now connected to the BS or not. Else, return if the (dis-)connect was successful. :return: True if (dis-)connected successfully. False if out of range. If return_connected, return if connected. """ log = self.log.bind(bs=bs, disconnect=disconnect, conn_bs=list(self.bs_dr.keys())) # already connected if bs in self.bs_dr.keys(): if disconnect: self.disconnect_from_bs(bs) log.info("Disconnected") if return_connected: return False else: log.info("Staying connected") return True # not yet connected if bs.can_connect(self.pos): # add BS to connections; important: initialize with data rate # also important: initialize before adding connection to bs.conn_ues; affects how data rate is calc self.bs_dr[bs] = bs.data_rate(self) bs.conn_ues.append(self) self.log = self.log.bind(conn_bs=list(self.bs_dr.keys())) log.info("Connected") return True else: # log.info("Cannot connect") return False
def disconnect_from_bs(self, bs)
-
Disconnect from given BS. Assume BS is currently connected.
Expand source code
def disconnect_from_bs(self, bs): """Disconnect from given BS. Assume BS is currently connected.""" assert bs in self.bs_dr.keys(), "Not connected to BS --> Cannot disconnect" del self.bs_dr[bs] bs.conn_ues.remove(self) self.log = self.log.bind(conn_bs=list(self.bs_dr.keys()))
def move(self)
-
Do one step: Move according to own movement pattern. Check for lost connections. Update EWMA data rate.
:return: Number of connections lost through movement
Expand source code
def move(self): """ Do one step: Move according to own movement pattern. Check for lost connections. Update EWMA data rate. :return: Number of connections lost through movement """ self.pos = self.movement.step(self.pos) num_lost_connections = self.check_bs_connection() self.log = self.log.bind(pos=str(self.pos)) self.update_ewma_dr() self.log.debug("User move", lost_connections=num_lost_connections) return num_lost_connections
def plot(self, radius=2)
-
Plot the UE as filled circle with a given radius and the ID. Color from red to green indicating the utility. :param radius: Radius of the circle :return: A list of created matplotlib artists
Expand source code
def plot(self, radius=2): """ Plot the UE as filled circle with a given radius and the ID. Color from red to green indicating the utility. :param radius: Radius of the circle :return: A list of created matplotlib artists """ # show utility as red to yellow to green. use color map for [0,1) --> normalize utility first colormap = cm.get_cmap('RdYlGn') norm = plt.Normalize(-20, 20) color = colormap(norm(self.utility)) artists = plt.plot(*self.pos.buffer(radius).exterior.xy, color=color) artists.extend(plt.fill(*self.pos.buffer(radius).exterior.xy, color=color)) artists.append(plt.annotate(self.id, xy=(self.pos.x, self.pos.y), ha='center', va='center')) # show curr data rate and utility below the UE artists.append(plt.annotate(f'dr: {self.curr_dr:.2f}', xy=(self.pos.x, self.pos.y -radius -2), ha='center', va='center')) artists.append(plt.annotate(f'util: {self.utility:.2f}', xy=(self.pos.x, self.pos.y -radius -6), ha='center', va='center')) return artists
def reset(self)
-
Reset UE position, movement, and connections.
Expand source code
def reset(self): """Reset UE position, movement, and connections.""" self.reset_pos() self.movement.reset() self.bs_dr = dict() self.ewma_dr = 0
def reset_pos(self)
-
(Re)set position based on initial position x and y as Point. Resolve 'random'.
Expand source code
def reset_pos(self): """(Re)set position based on initial position x and y as Point. Resolve 'random'.""" # set pos_x pos_x = self.init_pos_x if pos_x == 'random': pos_x = self.rng.randint(0, int(self.map.width)) # set pos_y pos_y = self.init_pos_y if pos_y == 'random': pos_y = self.rng.randint(0, int(self.map.height)) # set pos as Point self.pos = Point(pos_x, pos_y)
def seed(self, seed=None)
-
Expand source code
def seed(self, seed=None): self.rng.seed(seed) self.movement.seed(seed)
def ues_at_same_bs(self)
-
Return set of UEs that are currently connected to any of the BS that this UE is connected to
Expand source code
def ues_at_same_bs(self): """Return set of UEs that are currently connected to any of the BS that this UE is connected to""" ue_set = set() for bs in self.bs_dr.keys(): ue_set.update(set(bs.conn_ues)) self.log.debug('UEs at same BS', ue_set=ue_set) return ue_set
def update_curr_dr(self)
-
Update the current data rate of all BS connections according to the current situation (pos & assignment)
Expand source code
def update_curr_dr(self): """Update the current data rate of all BS connections according to the current situation (pos & assignment)""" for bs in self.bs_dr.keys(): self.bs_dr[bs] = bs.data_rate(self)
def update_ewma_dr(self, weight=0.9)
-
Update the exp. weighted moving avg. of this UE's current data rate:
EWMA(t) = weight * dr + (1-weight) * EWMA(t-1)
Used as historic avg. rate for proportional-fair sharing. Called after movement.:param weight: Weight for EWMA in [0, 1]. The higher, the more focus on new/current dr and less on previous.
Expand source code
def update_ewma_dr(self, weight=0.9): """ Update the exp. weighted moving avg. of this UE's current data rate: `EWMA(t) = weight * dr + (1-weight) * EWMA(t-1)` Used as historic avg. rate for proportional-fair sharing. Called after movement. :param weight: Weight for EWMA in [0, 1]. The higher, the more focus on new/current dr and less on previous. """ self.ewma_dr = weight * self.curr_dr + (1 - weight) * self.ewma_dr self.log = self.log.bind(ewma_dr=self.ewma_dr)