def sync_experiment_state_with_ddb()

in archived/rl_gamerserver_ray/common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]


    def sync_experiment_state_with_ddb(self):
        """
        Synchronize ExperimentDb states to local and update
        states of Training/Evaluation and Hosting workflows

        """
        record = self.exp_db_client.get_experiment_record(self.experiment_id)

        # sync records to experiment states
        self.experiment_manager.experiment_record = ExperimentRecord.load_from_ddb_record(record)

        # update training workflow if needed
        training_workflow_metadata = record.get("training_workflow_metadata", None)
        # first update any in-progress next_model_to_train
        next_model_to_train_id = self.experiment_manager.experiment_record._next_model_to_train_id
        training_state = self.experiment_manager.experiment_record._training_state
        if next_model_to_train_id is not None and training_state.endswith("ING"):
            if self.experiment_manager.next_model_to_train is not None:
                self.experiment_manager.next_model_to_train.update_model_training_state()
            else:
                # only init the ModelManager() if the training job record already exists
                if (
                    self.model_db_client.get_model_record(
                        self.experiment_id, next_model_to_train_id
                    )
                    is not None
                ):
                    next_model_to_train = ModelManager(
                        model_db_client=self.model_db_client,
                        experiment_id=self.experiment_id,
                        model_id=next_model_to_train_id,
                    )
                    next_model_to_train.update_model_training_state()
        time.sleep(1)
        self._update_experiment_db_training_workflow_metadata(training_workflow_metadata)

        # update evaluation workflow if needed
        evaluation_workflow_metadata = record.get("evaluation_workflow_metadata", None)
        # first update any in-progress next_evaluation_job
        next_evaluation_job_id = self.experiment_manager.experiment_record._next_evaluation_job_id
        evaluation_state = self.experiment_manager.experiment_record._evaluation_state
        if next_evaluation_job_id is not None and evaluation_state.endswith("ING"):
            if self.experiment_manager.next_model_to_evaluate is not None:
                self.experiment_manager.next_model_to_evaluate.update_model_evaluation_state()
            else:
                # only init the ModelManager() if the evaluation job record already exists
                if (
                    self.model_db_client.get_model_record(
                        self.experiment_id, next_evaluation_job_id.split("-eval-")[0]
                    )
                    is not None
                ):
                    next_model_to_evaluate = ModelManager(
                        model_db_client=self.model_db_client,
                        experiment_id=self.experiment_id,
                        model_id=next_evaluation_job_id.split("-eval-")[0],
                    )
                    next_model_to_evaluate.update_model_evaluation_state()
        time.sleep(1)
        self._update_experiment_db_evaluation_workflow_metadata(evaluation_workflow_metadata)

        # update hosting workflow if needed
        hosting_workflow_metadata = record.get("hosting_workflow_metadata", None)
        self._update_experiment_db_hosting_workflow_metadata(hosting_workflow_metadata)

        # update joining workflow if needed
        joining_workflow_metadata = record.get("joining_workflow_metadata", None)
        # first update any in-progress next_join_job
        next_join_job_id = self.experiment_manager.experiment_record._next_join_job_id
        joining_state = self.experiment_manager.experiment_record._joining_state
        if next_join_job_id is not None and joining_state.endswith("ING"):
            if self.experiment_manager.next_join_job is not None:
                self.experiment_manager.next_join_job.update_join_job_state()
            else:
                # only init the JoinManager() if the join job record already exists
                if (
                    self.join_db_client.get_join_job_record(self.experiment_id, next_join_job_id)
                    is not None
                ):
                    next_join_job = JoinManager(
                        join_db_client=self.join_db_client,
                        experiment_id=self.experiment_id,
                        join_job_id=next_join_job_id,
                    )
                    next_join_job.update_join_job_state()
        time.sleep(1)
        self._update_experiment_db_joining_workflow_metadata(joining_workflow_metadata)

        self.emit_cloudwatch_metrics_for_training_and_hosting()