in src/markov/evaluation_worker.py [0:0]
def evaluation_worker(graph_manager, number_of_trials, task_parameters, simtrace_video_s3_writers, is_continuous,
park_positions, race_type, pause_physics, unpause_physics):
""" Evaluation worker function
Arguments:
graph_manager(MultiAgentGraphManager): Graph manager of multiagent graph manager
number_of_trials(int): Number of trails you want to run the evaluation
task_parameters(TaskParameters): Information of the checkpoint, gpu/cpu,
framework etc of rlcoach
simtrace_video_s3_writers(list): Information to upload to the S3 bucket all the simtrace and mp4
is_continuous(bool): The termination condition for the car
park_positions(list of tuple): list of (x, y) for cars to park at
race_type (str): race type
"""
# Collect profiler information only IS_PROFILER_ON is true
with utils.Profiler(s3_bucket=PROFILER_S3_BUCKET, s3_prefix=PROFILER_S3_PREFIX,
output_local_path=ROLLOUT_WORKER_PROFILER_PATH, enable_profiling=IS_PROFILER_ON):
subscribe_to_save_mp4_topic, unsubscribe_from_save_mp4_topic = list(), list()
subscribe_to_save_mp4, unsubscribe_from_save_mp4 = list(), list()
for agent_param in graph_manager.agents_params:
racecar_name = 'racecar' if len(agent_param.name.split("_")) == 1 \
else "racecar_{}".format(agent_param.name.split("_")[1])
subscribe_to_save_mp4_topic.append("/{}/save_mp4/subscribe_to_save_mp4".format(racecar_name))
unsubscribe_from_save_mp4_topic.append("/{}/save_mp4/unsubscribe_from_save_mp4".format(racecar_name))
graph_manager.data_store.wait_for_checkpoints()
graph_manager.data_store.modify_checkpoint_variables()
# wait for the required cancel services to become available
if race_type != RaceType.F1.value and not utils.check_is_sageonly():
# TODO: Since we are not running Grand Prix in RoboMaker,
# we are opting out from waiting for RoboMaker's cancel job service
# in case of Grand Prix execution.
# Otherwise, SimApp will hang as service will never come alive.
#
# If we don't depend on RoboMaker anymore in the future,
# we need to remove below line, or do a better job to figure out
# whether we are running on RoboMaker or not to decide whether
# we should wait for below service or not.
rospy.wait_for_service('/robomaker/job/cancel')
# Make the clients that will allow us to pause and unpause the physics
rospy.wait_for_service('/gazebo/pause_physics_dr')
rospy.wait_for_service('/gazebo/unpause_physics_dr')
pause_physics = ServiceProxyWrapper('/gazebo/pause_physics_dr', Empty)
unpause_physics = ServiceProxyWrapper('/gazebo/unpause_physics_dr', Empty)
for mp4_sub, mp4_unsub in zip(subscribe_to_save_mp4_topic, unsubscribe_from_save_mp4_topic):
rospy.wait_for_service(mp4_sub)
rospy.wait_for_service(mp4_unsub)
for mp4_sub, mp4_unsub in zip(subscribe_to_save_mp4_topic, unsubscribe_from_save_mp4_topic):
subscribe_to_save_mp4.append(ServiceProxyWrapper(mp4_sub, Empty))
unsubscribe_from_save_mp4.append(Thread(target=ServiceProxyWrapper(mp4_unsub, Empty),
args=(EmptyRequest(), )))
graph_manager.create_graph(task_parameters=task_parameters, stop_physics=pause_physics,
start_physics=unpause_physics, empty_service_call=EmptyRequest)
logger.info("Graph manager successfully created the graph: Unpausing physics")
unpause_physics(EmptyRequest())
is_save_mp4_enabled = rospy.get_param('MP4_S3_BUCKET', None)
if is_save_mp4_enabled:
for subscribe_mp4 in subscribe_to_save_mp4:
subscribe_mp4(EmptyRequest())
configure_environment_randomizer()
track_data = TrackData.get_instance()
# Before each evaluation episode (single lap for non-continuous race and complete race for
# continuous race), a new copy of park_positions needs to be loaded into track_data because
# a park position will be pop from park_positions when a racer car need to be parked.
if is_continuous:
track_data.park_positions = park_positions
graph_manager.evaluate(EnvironmentSteps(1))
else:
for _ in range(number_of_trials):
track_data.park_positions = park_positions
graph_manager.evaluate(EnvironmentSteps(1))
if is_save_mp4_enabled:
for unsubscribe_mp4 in unsubscribe_from_save_mp4:
unsubscribe_mp4.start()
for unsubscribe_mp4 in unsubscribe_from_save_mp4:
unsubscribe_mp4.join()
# upload simtrace and mp4 into s3 bucket
for s3_writer in simtrace_video_s3_writers:
s3_writer.persist(utils.get_s3_kms_extra_args())
time.sleep(1)
pause_physics(EmptyRequest())
if race_type != RaceType.F1.value and not utils.check_is_sageonly():
# Close the down the job
utils.cancel_simulation_job()