Module deepcomp.util.simulation
Expand source code
import os
import time
import random
from datetime import datetime
from collections import defaultdict
import pandas as pd
import structlog
import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from tqdm import tqdm
from joblib import Parallel, delayed
import ray
import ray.tune
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from deepcomp.util.constants import SUPPORTED_ALGS, SUPPORTED_RENDER, RESULT_DIR, TRAIN_DIR, TEST_DIR, VIDEO_DIR
from deepcomp.agent.dummy import RandomAgent, FixedAgent
from deepcomp.agent.heuristics import GreedyBestSelection, GreedyAllSelection, DynamicSelection
from deepcomp.agent.brute_force import BruteForceAgent
from deepcomp.util.logs import config_logging
class Simulation:
"""Simulation class for training and testing agents."""
def __init__(self, config, agent_name, cli_args, debug=False):
"""
Create a new simulation object to hold the agent and environment, train & test & visualize the agent + env.
:param config: RLlib agent config
:param agent_name: String identifying the agent. Supported: 'ppo', 'greedy-best', 'random', 'fixed'
:param cli_args: Dict of CLI args
:param debug: Whether or not to enable ray's local_mode for debugging
"""
# config and env
self.config = config
self.env_class = config['env']
self.env_name = config['env'].__name__
self.env_config = config['env_config']
self.episode_length = self.env_config['episode_length']
# detect automatically if the env is a multi-agent env by checking all (not just immediate) ancestors
self.multi_agent_env = MultiAgentEnv in self.env_class.__mro__
# num workers for parallel execution of eval episodes
self.num_workers = config['num_workers']
self.cli_args = cli_args
# agent
assert agent_name in SUPPORTED_ALGS, f"Agent {agent_name} not supported. Supported agents: {SUPPORTED_ALGS}"
self.agent_name = agent_name
self.agent = None
# only init ray if necessary --> lower overhead for dummy agents
if self.agent_name == 'ppo':
ray.init(local_mode=debug)
self.agent_path = None
# filename for saving is set when loading the agent
self.result_filename = None
self.log = structlog.get_logger()
self.log.debug('Simulation init', env=self.env_name, eps_length=self.episode_length, agent=self.agent_name,
multi_agent=self.multi_agent_env, num_workers=self.num_workers)
@staticmethod
def extract_agent_id(agent_path):
"""Extract and return agent ID from path. Eg, 'PPO_MultiAgentMobileEnv_14c68_00000_0_2020-10-22_10-03-33'"""
if agent_path is not None:
# walk through parts of path and return the one starting with 'PPO_'
parts = os.path.normpath(agent_path).split(os.sep)
for p in parts:
if p.startswith('PPO_'):
return p
return None
@property
def metadata(self):
"""Dict with metadata about the simulation"""
# distinguish multi-agent RL with separate NNs rather than a shared NN for all agents
agent_str = self.cli_args.agent
if agent_str == 'multi' and self.cli_args.separate_agent_nns:
agent_str = 'multi-sep-nns'
data = {
'alg': self.cli_args.alg,
'agent': agent_str,
'agent_path': self.agent_path,
'agent_id': self.extract_agent_id(self.agent_path),
'env': self.env_name,
'env_size': self.cli_args.env,
'eps_length': self.episode_length,
'num_bs': len(self.env_config['bs_list']),
'sharing_model': self.cli_args.sharing,
'num_ue_static': self.cli_args.static_ues,
'num_ue_slow': self.cli_args.slow_ues,
'num_ue_fast': self.cli_args.fast_ues,
'result_filename': self.result_filename,
}
# add training iteration
if data['alg'] == 'ppo':
data['train_iteration'] = self.agent.iteration
# not sure how to access the actual training steps or whether that's even possible
return data
def train(self, stop_criteria, restore_path=None, scheduler=None):
"""
Train an RLlib agent using tune until any of the configured stopping criteria is met.
:param stop_criteria: Dict with stopping criteria.
See https://docs.ray.io/en/latest/tune/api_docs/execution.html#tune-run
:param restore_path: Path to trained agent to continue training (if any)
The agent's latest checkpoint is loaded automatically
The trained agent needs to have the same settings and scenario for continuing training
When continuing training, the number of training steps continues too, ie, is not reset to 0 after restoring
:return: Return the path to the saved agent (checkpoint) and tune's ExperimentAnalysis object
See https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis-tune-experimentanalysis
"""
# load latest checkpoint within the given agent's directory
if restore_path is not None:
restore_path = self.get_last_checkpoint_path(restore_path)
analysis = ray.tune.run(PPOTrainer, config=self.config, local_dir=RESULT_DIR, stop=stop_criteria,
# checkpoint every 10 iterations and at the end; keep the best 10 checkpoints
checkpoint_at_end=True, checkpoint_freq=10, keep_checkpoints_num=10,
checkpoint_score_attr='episode_reward_mean', restore=restore_path,
scheduler=scheduler)
analysis.default_metric = 'episode_reward_mean'
analysis.default_mode = 'max'
# tune returns an ExperimentAnalysis that can be cast to a Pandas data frame
# object https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis
df = analysis.dataframe()
checkpoint_path = analysis.get_best_checkpoint(trial=analysis.get_best_trial())
self.log.info('Training done', timesteps_total=int(df['timesteps_total']),
episodes_total=int(df['episodes_total']), episode_reward_mean=float(df['episode_reward_mean']),
num_steps_sampled=int(df['info/num_steps_sampled']),
num_steps_trained=int(df['info/num_steps_trained']),
log_dir=analysis.get_best_logdir())
# plot results
# this only contains (and plots) the last 100 episodes --> not useful
# --> use tensorboard instead; or read and plot progress.csv
# eps_results = df['hist_stats']
# self.plot_learning_curve(eps_results['episode_lengths'], eps_results['episode_reward'])
return checkpoint_path, analysis
@staticmethod
def get_specific_checkpoint(rllib_dir):
"""
Return path to checkpoint file if rllib_dir points to a specific checkpoint folder (or file).
Else return None.
"""
if 'checkpoint' not in rllib_dir:
return None
# if it directly points to the checkpoint file, just return it
if os.path.isfile(rllib_dir):
return rllib_dir
# if it only points to the checkpoint folder, derive the checkpoint file and return it
checkpoint_number = rllib_dir.split('_')[-1]
return os.path.join(rllib_dir, f'checkpoint-{checkpoint_number}')
@staticmethod
def get_last_checkpoint_path(rllib_dir):
"""Given an RLlib training dir, return the full path to the last checkpoint"""
# check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it
if 'checkpoint' in rllib_dir:
return Simulation.get_specific_checkpoint(rllib_dir)
rllib_dir = os.path.abspath(rllib_dir)
checkpoints = [f for f in os.listdir(rllib_dir) if f.startswith('checkpoint')]
# sort according to checkpoint number after '_'
sorted_checkpoints = sorted(checkpoints, key=lambda cp: int(cp.split('_')[-1]))
last_checkpoint_dir = os.path.join(rllib_dir, sorted_checkpoints[-1])
# eg, retrieve '10' from '...PPO_MultiAgentMobileEnv_0_2020-07-14_17-28-33je5r1lov/checkpoint_10'
last_checkpoint_no = last_checkpoint_dir.split('_')[-1]
# construct full checkpoint path, eg, '...r1lov/checkpoint_10/checkpoint-10'
last_checkpoint_path = os.path.join(last_checkpoint_dir, f'checkpoint-{last_checkpoint_no}')
return last_checkpoint_path
@staticmethod
def get_best_checkpoint_path(rllib_dir):
"""Given an RLlib training dir, return the full path of the best checkpoint"""
# check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it
if 'checkpoint' in rllib_dir:
return Simulation.get_specific_checkpoint(rllib_dir)
rllib_dir = os.path.abspath(rllib_dir)
analysis = ray.tune.Analysis(rllib_dir)
analysis.default_metric = 'episode_reward_mean'
# analysis.default_metric = 'custom_metrics/sum_utility_mean'
analysis.default_mode = 'max'
checkpoint = analysis.get_best_checkpoint(analysis._get_trial_paths()[0])
return os.path.abspath(checkpoint)
def load_agent(self, rllib_dir=None, rand_seed=None, fixed_action=1, explore=False):
"""
Load a trained RLlib agent from the specified rllib_path. Call this before testing a trained agent.
:param rllib_dir: Path pointing to the agent's training dir (only used for RLlib agents)
:param rand_seed: RNG seed used by the random agent (ignored by other agents)
:param fixed_action: Fixed action performed by the fixed agent (ignored by the others)
:param explore: Whether to keep exploration enabled. Set to False when testing an RLlib agent.
True for continuing training.
"""
checkpoint_path = None
if self.agent_name == 'ppo':
# turn off exploration for testing the loaded agent
self.config['explore'] = explore
self.agent = PPOTrainer(config=self.config, env=self.env_class)
self.agent_path = self.get_best_checkpoint_path(rllib_dir)
self.log.info('Loading PPO agent', checkpoint=self.agent_path)
self.agent.restore(self.agent_path)
if self.agent_name == 'greedy-best':
self.agent = GreedyBestSelection()
if self.agent_name == 'greedy-all':
self.agent = GreedyAllSelection()
if self.agent_name == 'dynamic':
self.agent = DynamicSelection(epsilon=0.8)
if self.agent_name == 'brute-force':
self.agent = BruteForceAgent(self.num_workers)
if self.agent_name == 'random':
# instantiate the environment to get the action space
env = self.env_class(self.env_config)
self.agent = RandomAgent(env.action_space, seed=rand_seed)
if self.agent_name == 'fixed':
self.agent = FixedAgent(action=fixed_action, noop_interval=100)
self.log.info('Agent loaded', agent=type(self.agent).__name__, rllib_dir=rllib_dir, checkpoint=checkpoint_path)
# set a suitable filename for saving testing videos and results later
self.set_result_filename()
def set_result_filename(self):
"""Return a suitable filename (without file ending) in the format 'agent_env-class_env-size_num-ues_time'"""
assert self.agent is not None, "Set the filename after loading the agent"
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
agent_name = type(self.agent).__name__
env_size = self.cli_args.env
num_ues = self.cli_args.static_ues + self.cli_args.slow_ues + self.cli_args.fast_ues
train = 'rand' if self.cli_args.rand_train else 'fixed'
test = 'rand' if self.cli_args.rand_test else 'fixed'
seed = self.cli_args.seed
self.result_filename = \
f'{agent_name}_{self.env_name}_{env_size}_{self.cli_args.sharing}_{num_ues}UEs-{self.cli_args.reward}' \
f'_{train}-{test}_{seed}_{timestamp}'
def save_animation(self, fig, patches, mode):
"""
Create and save matplotlib animation
:param fig: Matplotlib figure
:param patches: List of patches to draw for each step in the animation
:param mode: How to save the animation. Options: 'video' (=html5) or 'gif' (requires ImageMagick)
"""
render_modes = SUPPORTED_RENDER - {None}
assert mode in render_modes, f"Render mode {mode} not in {render_modes}"
anim = matplotlib.animation.ArtistAnimation(fig, patches, repeat=False)
# save html5 video
if mode == 'html' or mode == 'both':
html = anim.to_html5_video()
with open(f'{VIDEO_DIR}/{self.result_filename}.html', 'w') as f:
f.write(html)
self.log.info('Video saved', path=f'{VIDEO_DIR}/{self.result_filename}.html')
# save gif; requires external dependency ImageMagick
if mode == 'gif' or mode == 'both':
try:
anim.save(f'{VIDEO_DIR}/{self.result_filename}.gif', writer='imagemagick')
self.log.info('Gif saved', path=f'{VIDEO_DIR}/{self.result_filename}.gif')
except TypeError:
self.log.error('ImageMagick needs to be installed for saving gifs.')
def apply_action_single_agent(self, obs, env, state=None):
"""
For the given observation and a trained/loaded agent, get and apply the next action. Only single-agent envs.
:param dict obs: Dict of observations for all agents
:param env: The environment to which to apply the actions to
:param state: Optional state of the RNN/LSTM if used
:returns: tuple (obs, r, done, info, state) WHERE
obs is the next observation
r is the immediate reward
done is done['__all__'] indicating if all agents are done
"""
assert not self.multi_agent_env, "Use apply_action_multi_agent for multi-agent envs"
assert self.agent is not None, "Train or load an agent before running the simulation"
# normal MLP NN
if state is None:
action = self.agent.compute_action(obs)
# RNN/LSTM, which requires state
else:
action, state, logits = self.agent.compute_action(obs, state=state)
next_obs, reward, done, info = env.step(action)
self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs, done=done)
return next_obs, reward, done, info, state
def apply_action_multi_agent(self, obs, env, state=None):
"""
Same as apply_action_single_agent, but for multi-agent envs. For each agent, unpack obs & choose action,
before applying it to the env.
:param dict obs: Dict of observations for all agents
:param env: The environment to which to apply the actions to
:param state: Optional state of the RNN/LSTM if used
:returns: tuple (obs, r, done, info, state) WHERE
obs is the next observation
r is the summed up immediate reward for all agents
done is done['__all__'] indicating if all agents are done
"""
assert self.multi_agent_env, "Use apply_action_single_agent for single-agent envs"
assert self.agent is not None, "Train or load an agent before running the simulation"
action = {}
for agent_id, agent_obs in obs.items():
policy_id = self.config['multiagent']['policy_mapping_fn'](agent_id)
# normal MLP NN
if state is None:
action[agent_id] = self.agent.compute_action(agent_obs, policy_id=policy_id)
# RNN/LSTM, which requires state
else:
action[agent_id], state, logits = self.agent.compute_action(agent_obs, policy_id=policy_id, state=state)
next_obs, reward, done, info = env.step(action)
# info is currently the same for all agents; just get the first one
info = list(info.values())[0]
self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs,
done=done['__all__'])
return next_obs, sum(reward.values()), done['__all__'], info, state
def run_episode(self, env, render=None, log_dict=None):
"""
Run a single episode on the given environment. Append episode reward and exec time to list and return.
:param env: Instance of the environment to use (each joblib iteration will still use its own instance)
:param render: Whether/How to render the episode
:param log_dict: Dict with logging levels to set
:return: Tuple of eps_duration (scalar), step rewards (list), metrics per step (list of dicts)
"""
# list of rewards and metrics (which are a dict) for each time step
rewards = []
scalar_metrics = []
vector_metrics = []
# no need to instantiate new env since each joblib iteration has its own copy
# that's why we need to set the logging level again for each iteration
config_logging()
if log_dict is not None:
env.set_log_level(log_dict)
eps_start = time.time()
if render is not None:
fig = plt.figure(figsize=env.map.figsize)
# equal aspect ratio to avoid distortions
plt.gca().set_aspect('equal')
# run until episode ends
patches = []
t = 0
done = False
obs = env.reset()
# if using brute-force agent, pass the environment
if self.agent_name == 'brute-force':
self.agent.env = env
# init state for LSTM: https://github.com/ray-project/ray/issues/9220#issuecomment-652146377
state = None
if self.config['model']['use_lstm']:
cell_size = self.config['model']['lstm_cell_size']
state = [np.zeros(cell_size), np.zeros(cell_size)]
# for continuous problems, stop evaluation after fixed eps length
while (done is None or not done) and t < self.episode_length:
if render is not None:
patches.append(env.render())
if render == 'plot':
plt.show()
# get and apply action
if self.multi_agent_env:
obs, reward, done, info, state = self.apply_action_multi_agent(obs, env, state)
else:
obs, reward, done, info, state = self.apply_action_single_agent(obs, env, state)
t = info['time']
# save reward and metrics
rewards.append(reward)
scalar_metrics.append(info['scalar_metrics'])
vector_metrics.append(info['vector_metrics'])
# create the animation
if render is not None:
fig.tight_layout()
self.save_animation(fig, patches, render)
# episode time in seconds (to measure simulation efficiency)
eps_duration = time.time() - eps_start
self.log.debug('Episode complete', eps_duration=eps_duration, avg_step_reward=np.mean(rewards),
scalar_metrics=list(scalar_metrics[0].keys()), vector_metrics=list(vector_metrics[0].keys()))
return eps_duration, rewards, scalar_metrics, vector_metrics
@staticmethod
def summarize_scalar_results(eps_duration, rewards, scalar_metrics):
"""
Summarize given results into single result dict containing everything that should be logged and written to file.
:param eps_duration: List of episode durations (in s)
:param rewards: List of lists with rewards per step per episode
:param scalar_metrics: List of lists, containing a dict of metric --> value for each episode for each time step
:returns: Dict of result name --> whatever should be logged and saved (eg, mean, std, etc)
"""
results = defaultdict(list)
num_episodes = len(eps_duration)
# get metric names from first metric dict (first episode, first step); it's the same for all steps and eps
metric_names = list(scalar_metrics[0][0].keys())
# iterate over all episodes and aggregate the results per episode
for e in range(num_episodes):
# add episode, eps_duration and rewards
results['episode'].append(e)
results['eps_duration_mean'].append(eps_duration[e])
results['eps_duration_std'].append(eps_duration[e])
results['step_reward_mean'].append(np.mean(rewards[e]))
results['step_reward_std'].append(np.std(rewards[e]))
# calc mean and std per metric and episode
for metric in metric_names:
metric_values = [scalar_metrics[e][t][metric] for t in range(len(scalar_metrics[e]))]
results[f'{metric}_mean'].append(np.mean(metric_values))
results[f'{metric}_std'].append(np.std(metric_values))
# convert defaultdict to normal dict
return dict(results)
def write_scalar_results(self, scalar_results):
"""Write experiment results to CSV file. Include all relevant info."""
result_file = f'{TEST_DIR}/{self.result_filename}.csv'
self.log.info("Writing scalar results", file=result_file)
data = self.metadata
# training data for PPO
if self.agent_name == 'ppo':
data.update({
'train_steps': self.cli_args.train_steps,
'train-iter': self.cli_args.train_iter,
'target_reward': self.cli_args.target_reward,
'target-utility': self.cli_args.target_utility,
})
# add actual results and save to file
data.update(scalar_results)
df = pd.DataFrame(data=data)
df.to_csv(result_file)
def write_vector_results(self, vector_metrics):
"""
Write vector metrics into a data frames and save them to pickle, incl. meta data/attributes.
One data frame and pickle file per metric.
Vector metrics contain measurements per UE per time step (per evaluation episode).
:param vector_metrics: List of lists of dicts of dicts: One list per episode with dicts per time step.
Each dict maps metric name to another dict, which again maps UE ID to the metric value.
:return: list of result dicts
"""
# in case there are not vector metrics
if len(vector_metrics) == 0 or len(vector_metrics[0]) == 0:
return []
# construct separate dfs per metric
dfs = []
metrics = list(vector_metrics[0][0].keys())
for metric in metrics:
# init dict with empty lists
data = {'episode': [], 'time_step': []}
ues = list(vector_metrics[-1][-1][metric].keys())
for ue in ues:
data[ue] = []
# fill dict with values from vector_metrics
for eps, eps_dict in enumerate(vector_metrics):
for step, step_dict in enumerate(eps_dict):
data['episode'].append(eps)
data['time_step'].append(step)
metric_dict = step_dict[metric]
for ue in ues:
if ue in metric_dict:
data[ue].append(metric_dict[ue])
else:
data[ue].append(None)
# create and write data frame
df = pd.DataFrame(data)
df.attrs = self.metadata
df.attrs['metric'] = metric
df.attrs['num_episodes'] = len(vector_metrics)
df.attrs['env_config'] = self.env_config
df.attrs['cli_args'] = vars(self.cli_args)
dfs.append(df)
result_file = f'{TEST_DIR}/{self.result_filename}_{metric}.pkl'
self.log.info('Writing vector results', metric=metric, file=result_file)
df.to_pickle(result_file)
return dfs
def run(self, num_episodes=1, render=None, log_dict=None, write_results=False):
"""
Run one or more simulation episodes. Render situation at beginning of each time step. Return episode rewards.
:param int num_episodes: Number of episodes to run
:param str render: If and how to render the simulation. Options: None, 'plot', 'video', 'gif'
:param dict log_dict: Dict of logger names --> logging level used to configure logging in the environment
:param bool write_results: Whether or not to write experiment results to file
:return list: Return list of lists with step rewards for all episodes
"""
assert self.agent is not None, "Train or load an agent before running the simulation"
assert (num_episodes == 1) or (render is None), "Turn off rendering when running for multiple episodes"
if self.num_workers > 1:
# parallel evaluation doesn't work for PPO and brute force; the heuristics are fast anyways
self.log.warning("Evaluating with a single worker for reproducibility and compatibility.")
self.num_workers = 1
assert self.num_workers == 1, "Evaluation needs to be done with a single worker"
# enable metrics logging, configure episode randomization, instantiate env, and set logging level
self.env_config['log_metrics'] = True
self.env_config['rand_episodes'] = self.cli_args.rand_test
env = self.env_class(self.env_config)
if log_dict is not None:
env.set_log_level(log_dict)
# simulate episodes in parallel; show progress with tqdm if running for more than one episode
self.log.info('Starting evaluation', num_episodes=num_episodes, num_workers=self.num_workers,
static_ues=self.cli_args.static_ues, slow_ues=self.cli_args.slow_ues,
fast_ues=self.cli_args.fast_ues)
# there is currently no parallelization; eval is limited to a single worker
# run episodes in parallel using joblib
zipped_results = Parallel(n_jobs=self.num_workers)(
delayed(self.run_episode)(env, render, log_dict)
for _ in tqdm(range(num_episodes), disable=(num_episodes == 1))
)
# results consisting of list of tuples with (eps_duration, rewards, scalar_metrics) for each episode
# unzip to separate lists with entries for each episode (rewards and metrics are lists of lists; for each step)
eps_duration, rewards, scalar_metrics, vector_metrics = map(list, zip(*zipped_results))
# summarize results
scalar_results = self.summarize_scalar_results(eps_duration, rewards, scalar_metrics)
mean_results = {metric: np.mean(results) for metric, results in scalar_results.items()}
self.log.info('Scalar results', results=scalar_results)
self.log.info('Mean results', results=mean_results)
self.log.info("Simulation complete", num_episodes=num_episodes, eps_length=self.episode_length,
step_reward_mean=np.mean(scalar_results['step_reward_mean']),
step_reward_std=np.std(scalar_results['step_reward_std']),
avg_eps_reward=self.episode_length * np.mean(scalar_results['step_reward_mean']))
# write results to file
if write_results:
self.write_scalar_results(scalar_results)
dfs = self.write_vector_results(vector_metrics)
return rewards
Classes
class Simulation (config, agent_name, cli_args, debug=False)
-
Simulation class for training and testing agents.
Create a new simulation object to hold the agent and environment, train & test & visualize the agent + env.
:param config: RLlib agent config :param agent_name: String identifying the agent. Supported: 'ppo', 'greedy-best', 'random', 'fixed' :param cli_args: Dict of CLI args :param debug: Whether or not to enable ray's local_mode for debugging
Expand source code
class Simulation: """Simulation class for training and testing agents.""" def __init__(self, config, agent_name, cli_args, debug=False): """ Create a new simulation object to hold the agent and environment, train & test & visualize the agent + env. :param config: RLlib agent config :param agent_name: String identifying the agent. Supported: 'ppo', 'greedy-best', 'random', 'fixed' :param cli_args: Dict of CLI args :param debug: Whether or not to enable ray's local_mode for debugging """ # config and env self.config = config self.env_class = config['env'] self.env_name = config['env'].__name__ self.env_config = config['env_config'] self.episode_length = self.env_config['episode_length'] # detect automatically if the env is a multi-agent env by checking all (not just immediate) ancestors self.multi_agent_env = MultiAgentEnv in self.env_class.__mro__ # num workers for parallel execution of eval episodes self.num_workers = config['num_workers'] self.cli_args = cli_args # agent assert agent_name in SUPPORTED_ALGS, f"Agent {agent_name} not supported. Supported agents: {SUPPORTED_ALGS}" self.agent_name = agent_name self.agent = None # only init ray if necessary --> lower overhead for dummy agents if self.agent_name == 'ppo': ray.init(local_mode=debug) self.agent_path = None # filename for saving is set when loading the agent self.result_filename = None self.log = structlog.get_logger() self.log.debug('Simulation init', env=self.env_name, eps_length=self.episode_length, agent=self.agent_name, multi_agent=self.multi_agent_env, num_workers=self.num_workers) @staticmethod def extract_agent_id(agent_path): """Extract and return agent ID from path. Eg, 'PPO_MultiAgentMobileEnv_14c68_00000_0_2020-10-22_10-03-33'""" if agent_path is not None: # walk through parts of path and return the one starting with 'PPO_' parts = os.path.normpath(agent_path).split(os.sep) for p in parts: if p.startswith('PPO_'): return p return None @property def metadata(self): """Dict with metadata about the simulation""" # distinguish multi-agent RL with separate NNs rather than a shared NN for all agents agent_str = self.cli_args.agent if agent_str == 'multi' and self.cli_args.separate_agent_nns: agent_str = 'multi-sep-nns' data = { 'alg': self.cli_args.alg, 'agent': agent_str, 'agent_path': self.agent_path, 'agent_id': self.extract_agent_id(self.agent_path), 'env': self.env_name, 'env_size': self.cli_args.env, 'eps_length': self.episode_length, 'num_bs': len(self.env_config['bs_list']), 'sharing_model': self.cli_args.sharing, 'num_ue_static': self.cli_args.static_ues, 'num_ue_slow': self.cli_args.slow_ues, 'num_ue_fast': self.cli_args.fast_ues, 'result_filename': self.result_filename, } # add training iteration if data['alg'] == 'ppo': data['train_iteration'] = self.agent.iteration # not sure how to access the actual training steps or whether that's even possible return data def train(self, stop_criteria, restore_path=None, scheduler=None): """ Train an RLlib agent using tune until any of the configured stopping criteria is met. :param stop_criteria: Dict with stopping criteria. See https://docs.ray.io/en/latest/tune/api_docs/execution.html#tune-run :param restore_path: Path to trained agent to continue training (if any) The agent's latest checkpoint is loaded automatically The trained agent needs to have the same settings and scenario for continuing training When continuing training, the number of training steps continues too, ie, is not reset to 0 after restoring :return: Return the path to the saved agent (checkpoint) and tune's ExperimentAnalysis object See https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis-tune-experimentanalysis """ # load latest checkpoint within the given agent's directory if restore_path is not None: restore_path = self.get_last_checkpoint_path(restore_path) analysis = ray.tune.run(PPOTrainer, config=self.config, local_dir=RESULT_DIR, stop=stop_criteria, # checkpoint every 10 iterations and at the end; keep the best 10 checkpoints checkpoint_at_end=True, checkpoint_freq=10, keep_checkpoints_num=10, checkpoint_score_attr='episode_reward_mean', restore=restore_path, scheduler=scheduler) analysis.default_metric = 'episode_reward_mean' analysis.default_mode = 'max' # tune returns an ExperimentAnalysis that can be cast to a Pandas data frame # object https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis df = analysis.dataframe() checkpoint_path = analysis.get_best_checkpoint(trial=analysis.get_best_trial()) self.log.info('Training done', timesteps_total=int(df['timesteps_total']), episodes_total=int(df['episodes_total']), episode_reward_mean=float(df['episode_reward_mean']), num_steps_sampled=int(df['info/num_steps_sampled']), num_steps_trained=int(df['info/num_steps_trained']), log_dir=analysis.get_best_logdir()) # plot results # this only contains (and plots) the last 100 episodes --> not useful # --> use tensorboard instead; or read and plot progress.csv # eps_results = df['hist_stats'] # self.plot_learning_curve(eps_results['episode_lengths'], eps_results['episode_reward']) return checkpoint_path, analysis @staticmethod def get_specific_checkpoint(rllib_dir): """ Return path to checkpoint file if rllib_dir points to a specific checkpoint folder (or file). Else return None. """ if 'checkpoint' not in rllib_dir: return None # if it directly points to the checkpoint file, just return it if os.path.isfile(rllib_dir): return rllib_dir # if it only points to the checkpoint folder, derive the checkpoint file and return it checkpoint_number = rllib_dir.split('_')[-1] return os.path.join(rllib_dir, f'checkpoint-{checkpoint_number}') @staticmethod def get_last_checkpoint_path(rllib_dir): """Given an RLlib training dir, return the full path to the last checkpoint""" # check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it if 'checkpoint' in rllib_dir: return Simulation.get_specific_checkpoint(rllib_dir) rllib_dir = os.path.abspath(rllib_dir) checkpoints = [f for f in os.listdir(rllib_dir) if f.startswith('checkpoint')] # sort according to checkpoint number after '_' sorted_checkpoints = sorted(checkpoints, key=lambda cp: int(cp.split('_')[-1])) last_checkpoint_dir = os.path.join(rllib_dir, sorted_checkpoints[-1]) # eg, retrieve '10' from '...PPO_MultiAgentMobileEnv_0_2020-07-14_17-28-33je5r1lov/checkpoint_10' last_checkpoint_no = last_checkpoint_dir.split('_')[-1] # construct full checkpoint path, eg, '...r1lov/checkpoint_10/checkpoint-10' last_checkpoint_path = os.path.join(last_checkpoint_dir, f'checkpoint-{last_checkpoint_no}') return last_checkpoint_path @staticmethod def get_best_checkpoint_path(rllib_dir): """Given an RLlib training dir, return the full path of the best checkpoint""" # check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it if 'checkpoint' in rllib_dir: return Simulation.get_specific_checkpoint(rllib_dir) rllib_dir = os.path.abspath(rllib_dir) analysis = ray.tune.Analysis(rllib_dir) analysis.default_metric = 'episode_reward_mean' # analysis.default_metric = 'custom_metrics/sum_utility_mean' analysis.default_mode = 'max' checkpoint = analysis.get_best_checkpoint(analysis._get_trial_paths()[0]) return os.path.abspath(checkpoint) def load_agent(self, rllib_dir=None, rand_seed=None, fixed_action=1, explore=False): """ Load a trained RLlib agent from the specified rllib_path. Call this before testing a trained agent. :param rllib_dir: Path pointing to the agent's training dir (only used for RLlib agents) :param rand_seed: RNG seed used by the random agent (ignored by other agents) :param fixed_action: Fixed action performed by the fixed agent (ignored by the others) :param explore: Whether to keep exploration enabled. Set to False when testing an RLlib agent. True for continuing training. """ checkpoint_path = None if self.agent_name == 'ppo': # turn off exploration for testing the loaded agent self.config['explore'] = explore self.agent = PPOTrainer(config=self.config, env=self.env_class) self.agent_path = self.get_best_checkpoint_path(rllib_dir) self.log.info('Loading PPO agent', checkpoint=self.agent_path) self.agent.restore(self.agent_path) if self.agent_name == 'greedy-best': self.agent = GreedyBestSelection() if self.agent_name == 'greedy-all': self.agent = GreedyAllSelection() if self.agent_name == 'dynamic': self.agent = DynamicSelection(epsilon=0.8) if self.agent_name == 'brute-force': self.agent = BruteForceAgent(self.num_workers) if self.agent_name == 'random': # instantiate the environment to get the action space env = self.env_class(self.env_config) self.agent = RandomAgent(env.action_space, seed=rand_seed) if self.agent_name == 'fixed': self.agent = FixedAgent(action=fixed_action, noop_interval=100) self.log.info('Agent loaded', agent=type(self.agent).__name__, rllib_dir=rllib_dir, checkpoint=checkpoint_path) # set a suitable filename for saving testing videos and results later self.set_result_filename() def set_result_filename(self): """Return a suitable filename (without file ending) in the format 'agent_env-class_env-size_num-ues_time'""" assert self.agent is not None, "Set the filename after loading the agent" timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") agent_name = type(self.agent).__name__ env_size = self.cli_args.env num_ues = self.cli_args.static_ues + self.cli_args.slow_ues + self.cli_args.fast_ues train = 'rand' if self.cli_args.rand_train else 'fixed' test = 'rand' if self.cli_args.rand_test else 'fixed' seed = self.cli_args.seed self.result_filename = \ f'{agent_name}_{self.env_name}_{env_size}_{self.cli_args.sharing}_{num_ues}UEs-{self.cli_args.reward}' \ f'_{train}-{test}_{seed}_{timestamp}' def save_animation(self, fig, patches, mode): """ Create and save matplotlib animation :param fig: Matplotlib figure :param patches: List of patches to draw for each step in the animation :param mode: How to save the animation. Options: 'video' (=html5) or 'gif' (requires ImageMagick) """ render_modes = SUPPORTED_RENDER - {None} assert mode in render_modes, f"Render mode {mode} not in {render_modes}" anim = matplotlib.animation.ArtistAnimation(fig, patches, repeat=False) # save html5 video if mode == 'html' or mode == 'both': html = anim.to_html5_video() with open(f'{VIDEO_DIR}/{self.result_filename}.html', 'w') as f: f.write(html) self.log.info('Video saved', path=f'{VIDEO_DIR}/{self.result_filename}.html') # save gif; requires external dependency ImageMagick if mode == 'gif' or mode == 'both': try: anim.save(f'{VIDEO_DIR}/{self.result_filename}.gif', writer='imagemagick') self.log.info('Gif saved', path=f'{VIDEO_DIR}/{self.result_filename}.gif') except TypeError: self.log.error('ImageMagick needs to be installed for saving gifs.') def apply_action_single_agent(self, obs, env, state=None): """ For the given observation and a trained/loaded agent, get and apply the next action. Only single-agent envs. :param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the immediate reward done is done['__all__'] indicating if all agents are done """ assert not self.multi_agent_env, "Use apply_action_multi_agent for multi-agent envs" assert self.agent is not None, "Train or load an agent before running the simulation" # normal MLP NN if state is None: action = self.agent.compute_action(obs) # RNN/LSTM, which requires state else: action, state, logits = self.agent.compute_action(obs, state=state) next_obs, reward, done, info = env.step(action) self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs, done=done) return next_obs, reward, done, info, state def apply_action_multi_agent(self, obs, env, state=None): """ Same as apply_action_single_agent, but for multi-agent envs. For each agent, unpack obs & choose action, before applying it to the env. :param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the summed up immediate reward for all agents done is done['__all__'] indicating if all agents are done """ assert self.multi_agent_env, "Use apply_action_single_agent for single-agent envs" assert self.agent is not None, "Train or load an agent before running the simulation" action = {} for agent_id, agent_obs in obs.items(): policy_id = self.config['multiagent']['policy_mapping_fn'](agent_id) # normal MLP NN if state is None: action[agent_id] = self.agent.compute_action(agent_obs, policy_id=policy_id) # RNN/LSTM, which requires state else: action[agent_id], state, logits = self.agent.compute_action(agent_obs, policy_id=policy_id, state=state) next_obs, reward, done, info = env.step(action) # info is currently the same for all agents; just get the first one info = list(info.values())[0] self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs, done=done['__all__']) return next_obs, sum(reward.values()), done['__all__'], info, state def run_episode(self, env, render=None, log_dict=None): """ Run a single episode on the given environment. Append episode reward and exec time to list and return. :param env: Instance of the environment to use (each joblib iteration will still use its own instance) :param render: Whether/How to render the episode :param log_dict: Dict with logging levels to set :return: Tuple of eps_duration (scalar), step rewards (list), metrics per step (list of dicts) """ # list of rewards and metrics (which are a dict) for each time step rewards = [] scalar_metrics = [] vector_metrics = [] # no need to instantiate new env since each joblib iteration has its own copy # that's why we need to set the logging level again for each iteration config_logging() if log_dict is not None: env.set_log_level(log_dict) eps_start = time.time() if render is not None: fig = plt.figure(figsize=env.map.figsize) # equal aspect ratio to avoid distortions plt.gca().set_aspect('equal') # run until episode ends patches = [] t = 0 done = False obs = env.reset() # if using brute-force agent, pass the environment if self.agent_name == 'brute-force': self.agent.env = env # init state for LSTM: https://github.com/ray-project/ray/issues/9220#issuecomment-652146377 state = None if self.config['model']['use_lstm']: cell_size = self.config['model']['lstm_cell_size'] state = [np.zeros(cell_size), np.zeros(cell_size)] # for continuous problems, stop evaluation after fixed eps length while (done is None or not done) and t < self.episode_length: if render is not None: patches.append(env.render()) if render == 'plot': plt.show() # get and apply action if self.multi_agent_env: obs, reward, done, info, state = self.apply_action_multi_agent(obs, env, state) else: obs, reward, done, info, state = self.apply_action_single_agent(obs, env, state) t = info['time'] # save reward and metrics rewards.append(reward) scalar_metrics.append(info['scalar_metrics']) vector_metrics.append(info['vector_metrics']) # create the animation if render is not None: fig.tight_layout() self.save_animation(fig, patches, render) # episode time in seconds (to measure simulation efficiency) eps_duration = time.time() - eps_start self.log.debug('Episode complete', eps_duration=eps_duration, avg_step_reward=np.mean(rewards), scalar_metrics=list(scalar_metrics[0].keys()), vector_metrics=list(vector_metrics[0].keys())) return eps_duration, rewards, scalar_metrics, vector_metrics @staticmethod def summarize_scalar_results(eps_duration, rewards, scalar_metrics): """ Summarize given results into single result dict containing everything that should be logged and written to file. :param eps_duration: List of episode durations (in s) :param rewards: List of lists with rewards per step per episode :param scalar_metrics: List of lists, containing a dict of metric --> value for each episode for each time step :returns: Dict of result name --> whatever should be logged and saved (eg, mean, std, etc) """ results = defaultdict(list) num_episodes = len(eps_duration) # get metric names from first metric dict (first episode, first step); it's the same for all steps and eps metric_names = list(scalar_metrics[0][0].keys()) # iterate over all episodes and aggregate the results per episode for e in range(num_episodes): # add episode, eps_duration and rewards results['episode'].append(e) results['eps_duration_mean'].append(eps_duration[e]) results['eps_duration_std'].append(eps_duration[e]) results['step_reward_mean'].append(np.mean(rewards[e])) results['step_reward_std'].append(np.std(rewards[e])) # calc mean and std per metric and episode for metric in metric_names: metric_values = [scalar_metrics[e][t][metric] for t in range(len(scalar_metrics[e]))] results[f'{metric}_mean'].append(np.mean(metric_values)) results[f'{metric}_std'].append(np.std(metric_values)) # convert defaultdict to normal dict return dict(results) def write_scalar_results(self, scalar_results): """Write experiment results to CSV file. Include all relevant info.""" result_file = f'{TEST_DIR}/{self.result_filename}.csv' self.log.info("Writing scalar results", file=result_file) data = self.metadata # training data for PPO if self.agent_name == 'ppo': data.update({ 'train_steps': self.cli_args.train_steps, 'train-iter': self.cli_args.train_iter, 'target_reward': self.cli_args.target_reward, 'target-utility': self.cli_args.target_utility, }) # add actual results and save to file data.update(scalar_results) df = pd.DataFrame(data=data) df.to_csv(result_file) def write_vector_results(self, vector_metrics): """ Write vector metrics into a data frames and save them to pickle, incl. meta data/attributes. One data frame and pickle file per metric. Vector metrics contain measurements per UE per time step (per evaluation episode). :param vector_metrics: List of lists of dicts of dicts: One list per episode with dicts per time step. Each dict maps metric name to another dict, which again maps UE ID to the metric value. :return: list of result dicts """ # in case there are not vector metrics if len(vector_metrics) == 0 or len(vector_metrics[0]) == 0: return [] # construct separate dfs per metric dfs = [] metrics = list(vector_metrics[0][0].keys()) for metric in metrics: # init dict with empty lists data = {'episode': [], 'time_step': []} ues = list(vector_metrics[-1][-1][metric].keys()) for ue in ues: data[ue] = [] # fill dict with values from vector_metrics for eps, eps_dict in enumerate(vector_metrics): for step, step_dict in enumerate(eps_dict): data['episode'].append(eps) data['time_step'].append(step) metric_dict = step_dict[metric] for ue in ues: if ue in metric_dict: data[ue].append(metric_dict[ue]) else: data[ue].append(None) # create and write data frame df = pd.DataFrame(data) df.attrs = self.metadata df.attrs['metric'] = metric df.attrs['num_episodes'] = len(vector_metrics) df.attrs['env_config'] = self.env_config df.attrs['cli_args'] = vars(self.cli_args) dfs.append(df) result_file = f'{TEST_DIR}/{self.result_filename}_{metric}.pkl' self.log.info('Writing vector results', metric=metric, file=result_file) df.to_pickle(result_file) return dfs def run(self, num_episodes=1, render=None, log_dict=None, write_results=False): """ Run one or more simulation episodes. Render situation at beginning of each time step. Return episode rewards. :param int num_episodes: Number of episodes to run :param str render: If and how to render the simulation. Options: None, 'plot', 'video', 'gif' :param dict log_dict: Dict of logger names --> logging level used to configure logging in the environment :param bool write_results: Whether or not to write experiment results to file :return list: Return list of lists with step rewards for all episodes """ assert self.agent is not None, "Train or load an agent before running the simulation" assert (num_episodes == 1) or (render is None), "Turn off rendering when running for multiple episodes" if self.num_workers > 1: # parallel evaluation doesn't work for PPO and brute force; the heuristics are fast anyways self.log.warning("Evaluating with a single worker for reproducibility and compatibility.") self.num_workers = 1 assert self.num_workers == 1, "Evaluation needs to be done with a single worker" # enable metrics logging, configure episode randomization, instantiate env, and set logging level self.env_config['log_metrics'] = True self.env_config['rand_episodes'] = self.cli_args.rand_test env = self.env_class(self.env_config) if log_dict is not None: env.set_log_level(log_dict) # simulate episodes in parallel; show progress with tqdm if running for more than one episode self.log.info('Starting evaluation', num_episodes=num_episodes, num_workers=self.num_workers, static_ues=self.cli_args.static_ues, slow_ues=self.cli_args.slow_ues, fast_ues=self.cli_args.fast_ues) # there is currently no parallelization; eval is limited to a single worker # run episodes in parallel using joblib zipped_results = Parallel(n_jobs=self.num_workers)( delayed(self.run_episode)(env, render, log_dict) for _ in tqdm(range(num_episodes), disable=(num_episodes == 1)) ) # results consisting of list of tuples with (eps_duration, rewards, scalar_metrics) for each episode # unzip to separate lists with entries for each episode (rewards and metrics are lists of lists; for each step) eps_duration, rewards, scalar_metrics, vector_metrics = map(list, zip(*zipped_results)) # summarize results scalar_results = self.summarize_scalar_results(eps_duration, rewards, scalar_metrics) mean_results = {metric: np.mean(results) for metric, results in scalar_results.items()} self.log.info('Scalar results', results=scalar_results) self.log.info('Mean results', results=mean_results) self.log.info("Simulation complete", num_episodes=num_episodes, eps_length=self.episode_length, step_reward_mean=np.mean(scalar_results['step_reward_mean']), step_reward_std=np.std(scalar_results['step_reward_std']), avg_eps_reward=self.episode_length * np.mean(scalar_results['step_reward_mean'])) # write results to file if write_results: self.write_scalar_results(scalar_results) dfs = self.write_vector_results(vector_metrics) return rewards
Static methods
def extract_agent_id(agent_path)
-
Extract and return agent ID from path. Eg, 'PPO_MultiAgentMobileEnv_14c68_00000_0_2020-10-22_10-03-33'
Expand source code
@staticmethod def extract_agent_id(agent_path): """Extract and return agent ID from path. Eg, 'PPO_MultiAgentMobileEnv_14c68_00000_0_2020-10-22_10-03-33'""" if agent_path is not None: # walk through parts of path and return the one starting with 'PPO_' parts = os.path.normpath(agent_path).split(os.sep) for p in parts: if p.startswith('PPO_'): return p return None
def get_best_checkpoint_path(rllib_dir)
-
Given an RLlib training dir, return the full path of the best checkpoint
Expand source code
@staticmethod def get_best_checkpoint_path(rllib_dir): """Given an RLlib training dir, return the full path of the best checkpoint""" # check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it if 'checkpoint' in rllib_dir: return Simulation.get_specific_checkpoint(rllib_dir) rllib_dir = os.path.abspath(rllib_dir) analysis = ray.tune.Analysis(rllib_dir) analysis.default_metric = 'episode_reward_mean' # analysis.default_metric = 'custom_metrics/sum_utility_mean' analysis.default_mode = 'max' checkpoint = analysis.get_best_checkpoint(analysis._get_trial_paths()[0]) return os.path.abspath(checkpoint)
def get_last_checkpoint_path(rllib_dir)
-
Given an RLlib training dir, return the full path to the last checkpoint
Expand source code
@staticmethod def get_last_checkpoint_path(rllib_dir): """Given an RLlib training dir, return the full path to the last checkpoint""" # check if rllib_dir is really already a pointer to a specific checkpoint; in that case, just return it if 'checkpoint' in rllib_dir: return Simulation.get_specific_checkpoint(rllib_dir) rllib_dir = os.path.abspath(rllib_dir) checkpoints = [f for f in os.listdir(rllib_dir) if f.startswith('checkpoint')] # sort according to checkpoint number after '_' sorted_checkpoints = sorted(checkpoints, key=lambda cp: int(cp.split('_')[-1])) last_checkpoint_dir = os.path.join(rllib_dir, sorted_checkpoints[-1]) # eg, retrieve '10' from '...PPO_MultiAgentMobileEnv_0_2020-07-14_17-28-33je5r1lov/checkpoint_10' last_checkpoint_no = last_checkpoint_dir.split('_')[-1] # construct full checkpoint path, eg, '...r1lov/checkpoint_10/checkpoint-10' last_checkpoint_path = os.path.join(last_checkpoint_dir, f'checkpoint-{last_checkpoint_no}') return last_checkpoint_path
def get_specific_checkpoint(rllib_dir)
-
Return path to checkpoint file if rllib_dir points to a specific checkpoint folder (or file). Else return None.
Expand source code
@staticmethod def get_specific_checkpoint(rllib_dir): """ Return path to checkpoint file if rllib_dir points to a specific checkpoint folder (or file). Else return None. """ if 'checkpoint' not in rllib_dir: return None # if it directly points to the checkpoint file, just return it if os.path.isfile(rllib_dir): return rllib_dir # if it only points to the checkpoint folder, derive the checkpoint file and return it checkpoint_number = rllib_dir.split('_')[-1] return os.path.join(rllib_dir, f'checkpoint-{checkpoint_number}')
def summarize_scalar_results(eps_duration, rewards, scalar_metrics)
-
Summarize given results into single result dict containing everything that should be logged and written to file.
:param eps_duration: List of episode durations (in s) :param rewards: List of lists with rewards per step per episode :param scalar_metrics: List of lists, containing a dict of metric –> value for each episode for each time step :returns: Dict of result name –> whatever should be logged and saved (eg, mean, std, etc)
Expand source code
@staticmethod def summarize_scalar_results(eps_duration, rewards, scalar_metrics): """ Summarize given results into single result dict containing everything that should be logged and written to file. :param eps_duration: List of episode durations (in s) :param rewards: List of lists with rewards per step per episode :param scalar_metrics: List of lists, containing a dict of metric --> value for each episode for each time step :returns: Dict of result name --> whatever should be logged and saved (eg, mean, std, etc) """ results = defaultdict(list) num_episodes = len(eps_duration) # get metric names from first metric dict (first episode, first step); it's the same for all steps and eps metric_names = list(scalar_metrics[0][0].keys()) # iterate over all episodes and aggregate the results per episode for e in range(num_episodes): # add episode, eps_duration and rewards results['episode'].append(e) results['eps_duration_mean'].append(eps_duration[e]) results['eps_duration_std'].append(eps_duration[e]) results['step_reward_mean'].append(np.mean(rewards[e])) results['step_reward_std'].append(np.std(rewards[e])) # calc mean and std per metric and episode for metric in metric_names: metric_values = [scalar_metrics[e][t][metric] for t in range(len(scalar_metrics[e]))] results[f'{metric}_mean'].append(np.mean(metric_values)) results[f'{metric}_std'].append(np.std(metric_values)) # convert defaultdict to normal dict return dict(results)
Instance variables
var metadata
-
Dict with metadata about the simulation
Expand source code
@property def metadata(self): """Dict with metadata about the simulation""" # distinguish multi-agent RL with separate NNs rather than a shared NN for all agents agent_str = self.cli_args.agent if agent_str == 'multi' and self.cli_args.separate_agent_nns: agent_str = 'multi-sep-nns' data = { 'alg': self.cli_args.alg, 'agent': agent_str, 'agent_path': self.agent_path, 'agent_id': self.extract_agent_id(self.agent_path), 'env': self.env_name, 'env_size': self.cli_args.env, 'eps_length': self.episode_length, 'num_bs': len(self.env_config['bs_list']), 'sharing_model': self.cli_args.sharing, 'num_ue_static': self.cli_args.static_ues, 'num_ue_slow': self.cli_args.slow_ues, 'num_ue_fast': self.cli_args.fast_ues, 'result_filename': self.result_filename, } # add training iteration if data['alg'] == 'ppo': data['train_iteration'] = self.agent.iteration # not sure how to access the actual training steps or whether that's even possible return data
Methods
def apply_action_multi_agent(self, obs, env, state=None)
-
Same as apply_action_single_agent, but for multi-agent envs. For each agent, unpack obs & choose action, before applying it to the env.
:param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the summed up immediate reward for all agents done is done['all'] indicating if all agents are done
Expand source code
def apply_action_multi_agent(self, obs, env, state=None): """ Same as apply_action_single_agent, but for multi-agent envs. For each agent, unpack obs & choose action, before applying it to the env. :param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the summed up immediate reward for all agents done is done['__all__'] indicating if all agents are done """ assert self.multi_agent_env, "Use apply_action_single_agent for single-agent envs" assert self.agent is not None, "Train or load an agent before running the simulation" action = {} for agent_id, agent_obs in obs.items(): policy_id = self.config['multiagent']['policy_mapping_fn'](agent_id) # normal MLP NN if state is None: action[agent_id] = self.agent.compute_action(agent_obs, policy_id=policy_id) # RNN/LSTM, which requires state else: action[agent_id], state, logits = self.agent.compute_action(agent_obs, policy_id=policy_id, state=state) next_obs, reward, done, info = env.step(action) # info is currently the same for all agents; just get the first one info = list(info.values())[0] self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs, done=done['__all__']) return next_obs, sum(reward.values()), done['__all__'], info, state
def apply_action_single_agent(self, obs, env, state=None)
-
For the given observation and a trained/loaded agent, get and apply the next action. Only single-agent envs.
:param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the immediate reward done is done['all'] indicating if all agents are done
Expand source code
def apply_action_single_agent(self, obs, env, state=None): """ For the given observation and a trained/loaded agent, get and apply the next action. Only single-agent envs. :param dict obs: Dict of observations for all agents :param env: The environment to which to apply the actions to :param state: Optional state of the RNN/LSTM if used :returns: tuple (obs, r, done, info, state) WHERE obs is the next observation r is the immediate reward done is done['__all__'] indicating if all agents are done """ assert not self.multi_agent_env, "Use apply_action_multi_agent for multi-agent envs" assert self.agent is not None, "Train or load an agent before running the simulation" # normal MLP NN if state is None: action = self.agent.compute_action(obs) # RNN/LSTM, which requires state else: action, state, logits = self.agent.compute_action(obs, state=state) next_obs, reward, done, info = env.step(action) self.log.debug("Step", t=info['time'], obs=obs, action=action, reward=reward, next_obs=next_obs, done=done) return next_obs, reward, done, info, state
def load_agent(self, rllib_dir=None, rand_seed=None, fixed_action=1, explore=False)
-
Load a trained RLlib agent from the specified rllib_path. Call this before testing a trained agent.
:param rllib_dir: Path pointing to the agent's training dir (only used for RLlib agents) :param rand_seed: RNG seed used by the random agent (ignored by other agents) :param fixed_action: Fixed action performed by the fixed agent (ignored by the others) :param explore: Whether to keep exploration enabled. Set to False when testing an RLlib agent. True for continuing training.
Expand source code
def load_agent(self, rllib_dir=None, rand_seed=None, fixed_action=1, explore=False): """ Load a trained RLlib agent from the specified rllib_path. Call this before testing a trained agent. :param rllib_dir: Path pointing to the agent's training dir (only used for RLlib agents) :param rand_seed: RNG seed used by the random agent (ignored by other agents) :param fixed_action: Fixed action performed by the fixed agent (ignored by the others) :param explore: Whether to keep exploration enabled. Set to False when testing an RLlib agent. True for continuing training. """ checkpoint_path = None if self.agent_name == 'ppo': # turn off exploration for testing the loaded agent self.config['explore'] = explore self.agent = PPOTrainer(config=self.config, env=self.env_class) self.agent_path = self.get_best_checkpoint_path(rllib_dir) self.log.info('Loading PPO agent', checkpoint=self.agent_path) self.agent.restore(self.agent_path) if self.agent_name == 'greedy-best': self.agent = GreedyBestSelection() if self.agent_name == 'greedy-all': self.agent = GreedyAllSelection() if self.agent_name == 'dynamic': self.agent = DynamicSelection(epsilon=0.8) if self.agent_name == 'brute-force': self.agent = BruteForceAgent(self.num_workers) if self.agent_name == 'random': # instantiate the environment to get the action space env = self.env_class(self.env_config) self.agent = RandomAgent(env.action_space, seed=rand_seed) if self.agent_name == 'fixed': self.agent = FixedAgent(action=fixed_action, noop_interval=100) self.log.info('Agent loaded', agent=type(self.agent).__name__, rllib_dir=rllib_dir, checkpoint=checkpoint_path) # set a suitable filename for saving testing videos and results later self.set_result_filename()
def run(self, num_episodes=1, render=None, log_dict=None, write_results=False)
-
Run one or more simulation episodes. Render situation at beginning of each time step. Return episode rewards.
:param int num_episodes: Number of episodes to run :param str render: If and how to render the simulation. Options: None, 'plot', 'video', 'gif' :param dict log_dict: Dict of logger names –> logging level used to configure logging in the environment :param bool write_results: Whether or not to write experiment results to file :return list: Return list of lists with step rewards for all episodes
Expand source code
def run(self, num_episodes=1, render=None, log_dict=None, write_results=False): """ Run one or more simulation episodes. Render situation at beginning of each time step. Return episode rewards. :param int num_episodes: Number of episodes to run :param str render: If and how to render the simulation. Options: None, 'plot', 'video', 'gif' :param dict log_dict: Dict of logger names --> logging level used to configure logging in the environment :param bool write_results: Whether or not to write experiment results to file :return list: Return list of lists with step rewards for all episodes """ assert self.agent is not None, "Train or load an agent before running the simulation" assert (num_episodes == 1) or (render is None), "Turn off rendering when running for multiple episodes" if self.num_workers > 1: # parallel evaluation doesn't work for PPO and brute force; the heuristics are fast anyways self.log.warning("Evaluating with a single worker for reproducibility and compatibility.") self.num_workers = 1 assert self.num_workers == 1, "Evaluation needs to be done with a single worker" # enable metrics logging, configure episode randomization, instantiate env, and set logging level self.env_config['log_metrics'] = True self.env_config['rand_episodes'] = self.cli_args.rand_test env = self.env_class(self.env_config) if log_dict is not None: env.set_log_level(log_dict) # simulate episodes in parallel; show progress with tqdm if running for more than one episode self.log.info('Starting evaluation', num_episodes=num_episodes, num_workers=self.num_workers, static_ues=self.cli_args.static_ues, slow_ues=self.cli_args.slow_ues, fast_ues=self.cli_args.fast_ues) # there is currently no parallelization; eval is limited to a single worker # run episodes in parallel using joblib zipped_results = Parallel(n_jobs=self.num_workers)( delayed(self.run_episode)(env, render, log_dict) for _ in tqdm(range(num_episodes), disable=(num_episodes == 1)) ) # results consisting of list of tuples with (eps_duration, rewards, scalar_metrics) for each episode # unzip to separate lists with entries for each episode (rewards and metrics are lists of lists; for each step) eps_duration, rewards, scalar_metrics, vector_metrics = map(list, zip(*zipped_results)) # summarize results scalar_results = self.summarize_scalar_results(eps_duration, rewards, scalar_metrics) mean_results = {metric: np.mean(results) for metric, results in scalar_results.items()} self.log.info('Scalar results', results=scalar_results) self.log.info('Mean results', results=mean_results) self.log.info("Simulation complete", num_episodes=num_episodes, eps_length=self.episode_length, step_reward_mean=np.mean(scalar_results['step_reward_mean']), step_reward_std=np.std(scalar_results['step_reward_std']), avg_eps_reward=self.episode_length * np.mean(scalar_results['step_reward_mean'])) # write results to file if write_results: self.write_scalar_results(scalar_results) dfs = self.write_vector_results(vector_metrics) return rewards
def run_episode(self, env, render=None, log_dict=None)
-
Run a single episode on the given environment. Append episode reward and exec time to list and return.
:param env: Instance of the environment to use (each joblib iteration will still use its own instance) :param render: Whether/How to render the episode :param log_dict: Dict with logging levels to set :return: Tuple of eps_duration (scalar), step rewards (list), metrics per step (list of dicts)
Expand source code
def run_episode(self, env, render=None, log_dict=None): """ Run a single episode on the given environment. Append episode reward and exec time to list and return. :param env: Instance of the environment to use (each joblib iteration will still use its own instance) :param render: Whether/How to render the episode :param log_dict: Dict with logging levels to set :return: Tuple of eps_duration (scalar), step rewards (list), metrics per step (list of dicts) """ # list of rewards and metrics (which are a dict) for each time step rewards = [] scalar_metrics = [] vector_metrics = [] # no need to instantiate new env since each joblib iteration has its own copy # that's why we need to set the logging level again for each iteration config_logging() if log_dict is not None: env.set_log_level(log_dict) eps_start = time.time() if render is not None: fig = plt.figure(figsize=env.map.figsize) # equal aspect ratio to avoid distortions plt.gca().set_aspect('equal') # run until episode ends patches = [] t = 0 done = False obs = env.reset() # if using brute-force agent, pass the environment if self.agent_name == 'brute-force': self.agent.env = env # init state for LSTM: https://github.com/ray-project/ray/issues/9220#issuecomment-652146377 state = None if self.config['model']['use_lstm']: cell_size = self.config['model']['lstm_cell_size'] state = [np.zeros(cell_size), np.zeros(cell_size)] # for continuous problems, stop evaluation after fixed eps length while (done is None or not done) and t < self.episode_length: if render is not None: patches.append(env.render()) if render == 'plot': plt.show() # get and apply action if self.multi_agent_env: obs, reward, done, info, state = self.apply_action_multi_agent(obs, env, state) else: obs, reward, done, info, state = self.apply_action_single_agent(obs, env, state) t = info['time'] # save reward and metrics rewards.append(reward) scalar_metrics.append(info['scalar_metrics']) vector_metrics.append(info['vector_metrics']) # create the animation if render is not None: fig.tight_layout() self.save_animation(fig, patches, render) # episode time in seconds (to measure simulation efficiency) eps_duration = time.time() - eps_start self.log.debug('Episode complete', eps_duration=eps_duration, avg_step_reward=np.mean(rewards), scalar_metrics=list(scalar_metrics[0].keys()), vector_metrics=list(vector_metrics[0].keys())) return eps_duration, rewards, scalar_metrics, vector_metrics
def save_animation(self, fig, patches, mode)
-
Create and save matplotlib animation
:param fig: Matplotlib figure :param patches: List of patches to draw for each step in the animation :param mode: How to save the animation. Options: 'video' (=html5) or 'gif' (requires ImageMagick)
Expand source code
def save_animation(self, fig, patches, mode): """ Create and save matplotlib animation :param fig: Matplotlib figure :param patches: List of patches to draw for each step in the animation :param mode: How to save the animation. Options: 'video' (=html5) or 'gif' (requires ImageMagick) """ render_modes = SUPPORTED_RENDER - {None} assert mode in render_modes, f"Render mode {mode} not in {render_modes}" anim = matplotlib.animation.ArtistAnimation(fig, patches, repeat=False) # save html5 video if mode == 'html' or mode == 'both': html = anim.to_html5_video() with open(f'{VIDEO_DIR}/{self.result_filename}.html', 'w') as f: f.write(html) self.log.info('Video saved', path=f'{VIDEO_DIR}/{self.result_filename}.html') # save gif; requires external dependency ImageMagick if mode == 'gif' or mode == 'both': try: anim.save(f'{VIDEO_DIR}/{self.result_filename}.gif', writer='imagemagick') self.log.info('Gif saved', path=f'{VIDEO_DIR}/{self.result_filename}.gif') except TypeError: self.log.error('ImageMagick needs to be installed for saving gifs.')
def set_result_filename(self)
-
Return a suitable filename (without file ending) in the format 'agent_env-class_env-size_num-ues_time'
Expand source code
def set_result_filename(self): """Return a suitable filename (without file ending) in the format 'agent_env-class_env-size_num-ues_time'""" assert self.agent is not None, "Set the filename after loading the agent" timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") agent_name = type(self.agent).__name__ env_size = self.cli_args.env num_ues = self.cli_args.static_ues + self.cli_args.slow_ues + self.cli_args.fast_ues train = 'rand' if self.cli_args.rand_train else 'fixed' test = 'rand' if self.cli_args.rand_test else 'fixed' seed = self.cli_args.seed self.result_filename = \ f'{agent_name}_{self.env_name}_{env_size}_{self.cli_args.sharing}_{num_ues}UEs-{self.cli_args.reward}' \ f'_{train}-{test}_{seed}_{timestamp}'
def train(self, stop_criteria, restore_path=None, scheduler=None)
-
Train an RLlib agent using tune until any of the configured stopping criteria is met.
:param stop_criteria: Dict with stopping criteria. See https://docs.ray.io/en/latest/tune/api_docs/execution.html#tune-run :param restore_path: Path to trained agent to continue training (if any) The agent's latest checkpoint is loaded automatically The trained agent needs to have the same settings and scenario for continuing training When continuing training, the number of training steps continues too, ie, is not reset to 0 after restoring :return: Return the path to the saved agent (checkpoint) and tune's ExperimentAnalysis object See https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis-tune-experimentanalysis
Expand source code
def train(self, stop_criteria, restore_path=None, scheduler=None): """ Train an RLlib agent using tune until any of the configured stopping criteria is met. :param stop_criteria: Dict with stopping criteria. See https://docs.ray.io/en/latest/tune/api_docs/execution.html#tune-run :param restore_path: Path to trained agent to continue training (if any) The agent's latest checkpoint is loaded automatically The trained agent needs to have the same settings and scenario for continuing training When continuing training, the number of training steps continues too, ie, is not reset to 0 after restoring :return: Return the path to the saved agent (checkpoint) and tune's ExperimentAnalysis object See https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis-tune-experimentanalysis """ # load latest checkpoint within the given agent's directory if restore_path is not None: restore_path = self.get_last_checkpoint_path(restore_path) analysis = ray.tune.run(PPOTrainer, config=self.config, local_dir=RESULT_DIR, stop=stop_criteria, # checkpoint every 10 iterations and at the end; keep the best 10 checkpoints checkpoint_at_end=True, checkpoint_freq=10, keep_checkpoints_num=10, checkpoint_score_attr='episode_reward_mean', restore=restore_path, scheduler=scheduler) analysis.default_metric = 'episode_reward_mean' analysis.default_mode = 'max' # tune returns an ExperimentAnalysis that can be cast to a Pandas data frame # object https://docs.ray.io/en/latest/tune/api_docs/analysis.html#experimentanalysis df = analysis.dataframe() checkpoint_path = analysis.get_best_checkpoint(trial=analysis.get_best_trial()) self.log.info('Training done', timesteps_total=int(df['timesteps_total']), episodes_total=int(df['episodes_total']), episode_reward_mean=float(df['episode_reward_mean']), num_steps_sampled=int(df['info/num_steps_sampled']), num_steps_trained=int(df['info/num_steps_trained']), log_dir=analysis.get_best_logdir()) # plot results # this only contains (and plots) the last 100 episodes --> not useful # --> use tensorboard instead; or read and plot progress.csv # eps_results = df['hist_stats'] # self.plot_learning_curve(eps_results['episode_lengths'], eps_results['episode_reward']) return checkpoint_path, analysis
def write_scalar_results(self, scalar_results)
-
Write experiment results to CSV file. Include all relevant info.
Expand source code
def write_scalar_results(self, scalar_results): """Write experiment results to CSV file. Include all relevant info.""" result_file = f'{TEST_DIR}/{self.result_filename}.csv' self.log.info("Writing scalar results", file=result_file) data = self.metadata # training data for PPO if self.agent_name == 'ppo': data.update({ 'train_steps': self.cli_args.train_steps, 'train-iter': self.cli_args.train_iter, 'target_reward': self.cli_args.target_reward, 'target-utility': self.cli_args.target_utility, }) # add actual results and save to file data.update(scalar_results) df = pd.DataFrame(data=data) df.to_csv(result_file)
def write_vector_results(self, vector_metrics)
-
Write vector metrics into a data frames and save them to pickle, incl. meta data/attributes. One data frame and pickle file per metric. Vector metrics contain measurements per UE per time step (per evaluation episode).
:param vector_metrics: List of lists of dicts of dicts: One list per episode with dicts per time step. Each dict maps metric name to another dict, which again maps UE ID to the metric value. :return: list of result dicts
Expand source code
def write_vector_results(self, vector_metrics): """ Write vector metrics into a data frames and save them to pickle, incl. meta data/attributes. One data frame and pickle file per metric. Vector metrics contain measurements per UE per time step (per evaluation episode). :param vector_metrics: List of lists of dicts of dicts: One list per episode with dicts per time step. Each dict maps metric name to another dict, which again maps UE ID to the metric value. :return: list of result dicts """ # in case there are not vector metrics if len(vector_metrics) == 0 or len(vector_metrics[0]) == 0: return [] # construct separate dfs per metric dfs = [] metrics = list(vector_metrics[0][0].keys()) for metric in metrics: # init dict with empty lists data = {'episode': [], 'time_step': []} ues = list(vector_metrics[-1][-1][metric].keys()) for ue in ues: data[ue] = [] # fill dict with values from vector_metrics for eps, eps_dict in enumerate(vector_metrics): for step, step_dict in enumerate(eps_dict): data['episode'].append(eps) data['time_step'].append(step) metric_dict = step_dict[metric] for ue in ues: if ue in metric_dict: data[ue].append(metric_dict[ue]) else: data[ue].append(None) # create and write data frame df = pd.DataFrame(data) df.attrs = self.metadata df.attrs['metric'] = metric df.attrs['num_episodes'] = len(vector_metrics) df.attrs['env_config'] = self.env_config df.attrs['cli_args'] = vars(self.cli_args) dfs.append(df) result_file = f'{TEST_DIR}/{self.result_filename}_{metric}.pkl' self.log.info('Writing vector results', metric=metric, file=result_file) df.to_pickle(result_file) return dfs