in archived/rl_gamerserver_ray/common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]
def evaluate_model(self, input_data_s3_prefix=None, evaluate_model_id=None, wait=True):
"""
Start an evaluation job to evaluate a model
Args:
input_data_s3_prefix (str): S3 data path containing data used
for evaluation
evaluate_model_id (str): The model used for evaluation
wait (bool): Whether to wait until the evaluation job finish
"""
# Sync experiment state if required
self._sync_experiment_state_with_ddb()
if evaluate_model_id is None:
if self.experiment_record._last_trained_model_id:
# use 'last_trained_model_id' by default as input model for evaluation
logger.info(
f"Using last trained model {self.experiment_record._last_trained_model_id}"
"for evaluation"
)
evaluate_model_id = self.experiment_record._last_trained_model_id
else:
logger.error("Evaluation ModelId in None!")
pass
elif evaluate_model_id != self.experiment_record._last_trained_model_id:
# evaluate_model_id is not None and also not last_trained_model_id
if not self._check_if_model_ready(evaluate_model_id):
logger.error(f"ModelId {evaluate_model_id} is not ready for evaluation.")
evaluate_model_id = None
else:
pass
else:
# evaluate_model_id is not None and evaluate_model_id == _last_trained_model_id
pass
if not evaluate_model_id:
# evaluate_model_id is still None. Raise an exception...
raise InvalidUsageException("Please provide a valid ModelId to be evaluated")
if (
self.experiment_record._evaluation_state is not None
and self.experiment_record._evaluation_state.endswith("ING")
):
logger.warning(
f"A evaluation request with job id '{self.experiment_record._next_evaluation_job_id}' "
f"was in the state of '{self.experiment_record._evaluation_state}'. "
"Wait until the evaluation job finished or canceled the request."
)
raise InvalidUsageException(
"Please wait for old Evaluation Job to Complete before requesting a new one!"
)
else:
next_evaluation_job_id = f"{evaluate_model_id}-eval-{str(int(time.time()))}"
logger.info(
f"Evaluating model '{evaluate_model_id}' with evaluation job id '{next_evaluation_job_id}'"
)
self.exp_db_client.update_experiment_next_evaluation_job_id(
self.experiment_id, next_evaluation_job_id
)
self.exp_db_client.update_experiment_evaluation_state(
self.experiment_id, EvaluationState.PENDING
)
manifest_file_path = None
if isinstance(input_data_s3_prefix, list):
# generate manifest file and upload to s3
manifest_file_path = self._generate_manifest(input_data_s3_prefix)
else:
# add logic if input_data_s3_prefix is string
pass
try:
self.next_model_to_evaluate = ModelManager(
model_db_client=self.model_db_client,
experiment_id=self.experiment_id,
model_id=evaluate_model_id,
image=self.image,
role=self.resource_manager.iam_role_arn,
instance_config=self.resource_manager.evaluation_fleet_config,
boto_session=self.boto_session,
algor_config=self.algor_config,
)
self.next_model_to_evaluate.evaluate(
input_data_s3_prefix=input_data_s3_prefix,
manifest_file_path=manifest_file_path,
evaluation_job_name=next_evaluation_job_id,
local_mode=self.local_mode,
wait=wait,
logs=True,
)
except Exception as e:
logger.error(e)
pass
# wait until exp ddb table updated
if self.local_mode or wait:
evaluated_state = (
self.experiment_record._evaluation_state == EvaluationState.EVALUATED
and self.experiment_record._last_evaluation_job_id == next_evaluation_job_id
and self.experiment_record._next_evaluation_job_id is None
)
num_retries = 0
while not evaluated_state:
# Sync experiment state if required
self._sync_experiment_state_with_ddb()
logger.debug("Waiting for experiment table evaluation status to be updated...")
time.sleep(2 * (2 ** num_retries))
evaluated_state = (
self.experiment_record._evaluation_state == EvaluationState.EVALUATED
and self.experiment_record._last_evaluation_job_id == next_evaluation_job_id
and self.experiment_record._next_evaluation_job_id is None
)
num_retries += 1
if num_retries >= 5:
raise UnhandledWorkflowException(
f"Evaluation job '{self.experiment_record._next_evaluation_job_id}' "
f"was in state of '{self.experiment_record._evaluation_state}'. Failed to sync table states."
)
if (
self.experiment_record._evaluation_state == EvaluationState.FAILED
or self.experiment_record._evaluation_state == EvaluationState.STOPPED
):
raise SageMakerTrainingJobException(
f"Evaluation job '{self.experiment_record._next_evaluation_job_id}' "
f"ended in state of '{self.experiment_record._evaluation_state}'. Please check Sagemaker logs for "
"more information."
)