in common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]
def initialize_first_model(self, wait=True, input_data_s3_prefix=None):
"""
Initializes the first Model training for an Experiment
Args:
wait (bool): Whether to wait until the training job finishes
input_data_s3_prefix (str): S3 data path containing data
used to train the first model
"""
# Sync experiment state if required
self._sync_experiment_state_with_ddb()
# experiment only allow one training job at a time,
# validate no other training request is in progress
if (
self.experiment_record._training_state is not None
and self.experiment_record._training_state.endswith("ING")
):
logger.error(
f"A training request with model id '{self.experiment_record._next_model_to_train_id}' "
f"was in the state of '{self.experiment_record._training_state}'. "
"Wait until the training job finished or canceled the request."
)
raise InvalidUsageException(
"Please wait for old Training Job to Complete before requesting a new one!"
)
else:
# update next_model_to_train_id and training state
next_model_to_train_id = ModelManager.name_next_model(experiment_id=self.experiment_id)
logger.info(f"Next Model name would be {next_model_to_train_id}")
self.exp_db_client.update_experiment_next_model_to_train_id(
self.experiment_id, next_model_to_train_id
)
self.exp_db_client.update_experiment_training_state(
self.experiment_id, TrainingState.PENDING
)
logger.info(f"Start training job for model '{next_model_to_train_id}''")
# generate manifest file if input is a list
manifest_file_path = None
if isinstance(input_data_s3_prefix, list):
# generate manifest file and upload to s3
manifest_file_path = self._generate_manifest(input_data_s3_prefix)
# init model for training, update model table
try:
self.next_model_to_train = ModelManager(
model_db_client=self.model_db_client,
experiment_id=self.experiment_id,
model_id=next_model_to_train_id,
image=self.image,
role=self.resource_manager.iam_role_arn,
instance_config=self.resource_manager.training_fleet_config,
boto_session=self.boto_session,
algor_config=self.algor_config,
)
self.next_model_to_train.fit(
wait=wait,
input_model_id=None,
input_data_s3_prefix=input_data_s3_prefix,
manifest_file_path=manifest_file_path,
logs=wait,
)
except Exception as e:
logger.error(
f"Failed to start new Model Training job for"
" ModelId {next_model_to_train_id}"
)
logger.error(e)
pass
# wait until ExperimentDb state is updated
if self.local_mode or wait:
trained_state = (
self.experiment_record._training_state == TrainingState.TRAINED
and self.experiment_record._last_trained_model_id == next_model_to_train_id
and self.experiment_record._next_model_to_train_id is None
)
num_retries = 0
while not trained_state:
# Sync experiment state if required
self._sync_experiment_state_with_ddb()
logger.debug("Waiting for experiment table training status to be updated...")
time.sleep(2 * (2 ** num_retries))
trained_state = (
self.experiment_record._training_state == TrainingState.TRAINED
and self.experiment_record._last_trained_model_id == next_model_to_train_id
and self.experiment_record._next_model_to_train_id is None
)
num_retries += 1
if num_retries >= 5:
raise UnhandledWorkflowException(
f"Training job '{self.experiment_record._next_model_to_train_id}' "
f"was in state of '{self.experiment_record._training_state}'. Expected it to be TRAINED."
)
if (
self.experiment_record._training_state == TrainingState.FAILED
or self.experiment_record._training_state == TrainingState.STOPPED
):
raise SageMakerTrainingJobException(
f"Training job '{self.experiment_record._next_model_to_train_id}' "
f"ended in state of '{self.experiment_record._training_state}'. Please check Sagemaker logs for "
"more information."
)