in archived/rl_gamerserver_ray/common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]
def _update_experiment_db_hosting_workflow_metadata(self, hosting_workflow_metadata):
"""Update the hosting workflow metadata in the experiment table
Args:
hosting_workflow_metadata (dict): A dictionary containing
hosting workflow related metadata
"""
if hosting_workflow_metadata is None:
return
hosting_state = hosting_workflow_metadata.get("hosting_state", None)
hosting_endpoint = hosting_workflow_metadata.get("hosting_endpoint", None)
next_model_to_host_id = hosting_workflow_metadata.get("next_model_to_host_id", None)
last_hosted_model_id = hosting_workflow_metadata.get("last_hosted_model_id", None)
# confirm if deploy status is correct by sending a request.
if hosting_state == HostingState.DEPLOYED and last_hosted_model_id:
try:
predictor = self.experiment_manager.predictor
model_id = predictor.get_hosted_model_id()
assert model_id == last_hosted_model_id
except Exception:
self.exp_db_client.update_experiment_hosting_state(self.experiment_id, None)
self.exp_db_client.update_experiment_hosting_endpoint(self.experiment_id, None)
self.experiment_manager.experiment_record._hosting_state = None
self.experiment_manager.experiment_record._hosting_endpoint = None
# some deployment request is in progress
if hosting_state is not None and hosting_state.endswith("ING"):
if (hosting_endpoint is None) or (not self.experiment_manager.soft_deployment):
# deployment happen with a new endpoint initiation or blue green deployment
# describe endpoint to get state of the deployment
try:
sm_endpoint_info = self.sagemaker_client.describe_endpoint(
EndpointName=self.experiment_id
)
except Exception:
# Do not raise exception
return
hosting_state = HOSTING_ENDPOINT_STATUS_MAP[sm_endpoint_info.get("EndpointStatus")]
self.experiment_manager.experiment_record._hosting_state = hosting_state
# update table states via ddb client
self.exp_db_client.update_experiment_hosting_state(
self.experiment_id, hosting_state
)
if hosting_state == HostingState.DEPLOYED:
# update local record
self.experiment_manager.experiment_record._hosting_endpoint = (
sm_endpoint_info.get("EndpointArn")
)
self.experiment_manager.experiment_record._last_hosted_model_id = (
next_model_to_host_id
)
self.experiment_manager.experiment_record._next_model_to_host_id = None
# update DynamoDB record
self.exp_db_client.update_experiment_hosting_endpoint(
self.experiment_id, sm_endpoint_info.get("EndpointArn")
)
self.exp_db_client.update_experiment_last_hosted_model_id(
self.experiment_id, next_model_to_host_id
)
self.exp_db_client.update_experiment_next_model_to_host_id(
self.experiment_id, None
)
self._update_metrics_from_latest_hosting_update(next_model_to_host_id)
else:
# deployment happened on existing endpoint
if self.experiment_manager.soft_deployment:
# query endpoint to get the current hosted model id
model_id = ""
num_retries = 0
while model_id != next_model_to_host_id:
predictor = self.experiment_manager.predictor
model_id = predictor.get_hosted_model_id()
num_retries += 1
if (not self.experiment_manager.local_mode) or (num_retries >= 5):
break
time.sleep(1)
# hosted model id got updated
if model_id == next_model_to_host_id:
hosting_state = HostingState.DEPLOYED
else:
hosting_state = HostingState.DEPLOYING
self.experiment_manager.experiment_record._hosting_state = hosting_state
# update hosting_state in exp table
self.exp_db_client.update_experiment_hosting_state(
self.experiment_id, hosting_state
)
if hosting_state == HostingState.DEPLOYED:
# update local record
self.experiment_manager.experiment_record._last_hosted_model_id = (
next_model_to_host_id
)
self.experiment_manager.experiment_record._next_model_to_host_id = None
# update DynamoDB record
self.exp_db_client.update_experiment_last_hosted_model_id(
self.experiment_id, next_model_to_host_id
)
self.exp_db_client.update_experiment_next_model_to_host_id(
self.experiment_id, None
)
self._update_metrics_from_latest_hosting_update(next_model_to_host_id)