in archived/rl_gamerserver_ray/common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]
def deploy_model(self, model_id, wait=True, **kwargs):
"""Deploy a new model by creating a new hosting endpoint
or update the model hosted by an existing endpoint
Args:
model_id (str): A unique string representing which model
to deploy/update
wait (bool): Whether to wait until the deployment finish
"""
# TODO: add validation/instructions if multiple deployment
# request happened in th same experiment
# Sync experiment state if required
self._sync_experiment_state_with_ddb()
# check if 'model_id' is already hosted
if (
self.experiment_record._last_hosted_model_id == model_id
and self.experiment_record._hosting_state == HostingState.DEPLOYED
):
logger.info(f"Model {model_id} is already being hosted. No deployment needed.")
return
# No deployment if the given model is not ready
if not self._check_if_model_ready(model_id):
return
# given model is in state of 'Completed', ready to deploy
logger.info(f"Model '{model_id}' is ready to deploy.")
# checking hosting workflow state
if self.experiment_record._hosting_endpoint is None:
if self.local_mode:
present, closed = self._close_existing_containers()
if present:
if closed:
logger.info(
"Closed docker container[s] that was already running (maybe from previous job)."
)
else:
logger.exception(
"Failed to close a docker container that was already running (maybe from "
"previous job). Please close it manually and retry."
)
else:
logger.info("No hosting endpoint found, creating a new hosting endpoint.")
# update 'next_model_to_host_id' and 'hosting_state'
self.exp_db_client.update_experiment_next_model_to_host_id(self.experiment_id, model_id)
self.exp_db_client.update_experiment_hosting_state(
self.experiment_id, HostingState.PENDING
)
# starting hosting endpoint
try:
self._setup_hosting_endpoint(model_id, wait=wait, **kwargs)
except Exception as e:
logger.error(e)
pass
else:
if self.experiment_record._hosting_state.endswith("ING"):
logger.warning("Some deployment request is in progress, canceled this one")
return
elif self.experiment_record._hosting_state.endswith("ED"):
self._update_model_in_endpoint(self.soft_deployment, model_id, wait=wait)
# wait until exp ddb table updated
if self.local_mode or wait:
deployed_state = (
self.experiment_record._hosting_state == HostingState.DEPLOYED
and self.experiment_record._last_hosted_model_id == model_id
and self.experiment_record._next_model_to_host_id is None
)
num_retries = 0
num_retries_blue_green_deployment = 0
while not deployed_state:
# Sync experiment state if required
# local mode is fast, 'num_retries' increases exponentially
self._sync_experiment_state_with_ddb()
logger.debug("Waiting for experiment table hosting status to be updated...")
if self.soft_deployment:
time.sleep(2 * (2 ** num_retries))
deployed_state = (
self.experiment_record._hosting_state == HostingState.DEPLOYED
and self.experiment_record._last_hosted_model_id == model_id
and self.experiment_record._next_model_to_host_id is None
)
num_retries += 1
if num_retries >= 5 and self.local_mode:
raise UnhandledWorkflowException(
f"Deployment with model "
f"'{self.experiment_record._next_model_to_host_id}' was in "
f"state of '{self.experiment_record._hosting_state}'. Failed "
"to sync table status."
)
else:
# blue-green deployment takes ~8 min, retry every 30 seconds
time.sleep(30)
deployed_state = (
self.experiment_record._hosting_state == HostingState.DEPLOYED
and self.experiment_record._last_hosted_model_id == model_id
and self.experiment_record._next_model_to_host_id is None
)
num_retries_blue_green_deployment += 1
if num_retries_blue_green_deployment % 2 == 0:
logger.debug(
f"Waited {int(num_retries_blue_green_deployment / 2)} "
f"minutes for blue-green deployment..."
)
if (
num_retries_blue_green_deployment >= 30
): # restrict maximum wait time to 15min
raise UnhandledWorkflowException(
f"Deployment with model "
f"'{self.experiment_record._next_model_to_host_id}' was in "
f"state of '{self.experiment_record._hosting_state}'. Failed "
"to sync table status."
)
if self.experiment_record._hosting_state == HostingState.FAILED:
raise SageMakerHostingException(
"Deployment with model "
f"'{self.experiment_record._next_model_to_host_id}' ended "
f"with state '{self.experiment_record._hosting_state}'. "
"Please check Sagemaker log for more information."
)