in archived/rl_gamerserver_ray/common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]
def sync_experiment_state_with_ddb(self):
"""
Synchronize ExperimentDb states to local and update
states of Training/Evaluation and Hosting workflows
"""
record = self.exp_db_client.get_experiment_record(self.experiment_id)
# sync records to experiment states
self.experiment_manager.experiment_record = ExperimentRecord.load_from_ddb_record(record)
# update training workflow if needed
training_workflow_metadata = record.get("training_workflow_metadata", None)
# first update any in-progress next_model_to_train
next_model_to_train_id = self.experiment_manager.experiment_record._next_model_to_train_id
training_state = self.experiment_manager.experiment_record._training_state
if next_model_to_train_id is not None and training_state.endswith("ING"):
if self.experiment_manager.next_model_to_train is not None:
self.experiment_manager.next_model_to_train.update_model_training_state()
else:
# only init the ModelManager() if the training job record already exists
if (
self.model_db_client.get_model_record(
self.experiment_id, next_model_to_train_id
)
is not None
):
next_model_to_train = ModelManager(
model_db_client=self.model_db_client,
experiment_id=self.experiment_id,
model_id=next_model_to_train_id,
)
next_model_to_train.update_model_training_state()
time.sleep(1)
self._update_experiment_db_training_workflow_metadata(training_workflow_metadata)
# update evaluation workflow if needed
evaluation_workflow_metadata = record.get("evaluation_workflow_metadata", None)
# first update any in-progress next_evaluation_job
next_evaluation_job_id = self.experiment_manager.experiment_record._next_evaluation_job_id
evaluation_state = self.experiment_manager.experiment_record._evaluation_state
if next_evaluation_job_id is not None and evaluation_state.endswith("ING"):
if self.experiment_manager.next_model_to_evaluate is not None:
self.experiment_manager.next_model_to_evaluate.update_model_evaluation_state()
else:
# only init the ModelManager() if the evaluation job record already exists
if (
self.model_db_client.get_model_record(
self.experiment_id, next_evaluation_job_id.split("-eval-")[0]
)
is not None
):
next_model_to_evaluate = ModelManager(
model_db_client=self.model_db_client,
experiment_id=self.experiment_id,
model_id=next_evaluation_job_id.split("-eval-")[0],
)
next_model_to_evaluate.update_model_evaluation_state()
time.sleep(1)
self._update_experiment_db_evaluation_workflow_metadata(evaluation_workflow_metadata)
# update hosting workflow if needed
hosting_workflow_metadata = record.get("hosting_workflow_metadata", None)
self._update_experiment_db_hosting_workflow_metadata(hosting_workflow_metadata)
# update joining workflow if needed
joining_workflow_metadata = record.get("joining_workflow_metadata", None)
# first update any in-progress next_join_job
next_join_job_id = self.experiment_manager.experiment_record._next_join_job_id
joining_state = self.experiment_manager.experiment_record._joining_state
if next_join_job_id is not None and joining_state.endswith("ING"):
if self.experiment_manager.next_join_job is not None:
self.experiment_manager.next_join_job.update_join_job_state()
else:
# only init the JoinManager() if the join job record already exists
if (
self.join_db_client.get_join_job_record(self.experiment_id, next_join_job_id)
is not None
):
next_join_job = JoinManager(
join_db_client=self.join_db_client,
experiment_id=self.experiment_id,
join_job_id=next_join_job_id,
)
next_join_job.update_join_job_state()
time.sleep(1)
self._update_experiment_db_joining_workflow_metadata(joining_workflow_metadata)
self.emit_cloudwatch_metrics_for_training_and_hosting()