in common/sagemaker_rl/orchestrator/workflow/manager/join_manager.py [0:0]
def start_dummy_join(self, joined_data_buffer, ratio=0.8):
"""Start a dummy joining job with the given joined data buffer
Args:
joined_data_buffer (list): A list of json blobs containing joined data points
ratio (float): Split ratio for training and evaluation data set
"""
logger.info(f"Splitting data into train/evaluation set with ratio of {ratio}")
joined_train_data_buffer = []
joined_eval_data_buffer = []
for record in joined_data_buffer:
if record["sample_prob"] <= ratio:
joined_train_data_buffer.append(record)
else:
joined_eval_data_buffer.append(record)
s3_output_path = (
f"s3://{self.query_s3_output_bucket}/"
f"{self.experiment_id}/joined_data/{self.join_job_id}"
)
logger.info(f"Joined data will be stored under {s3_output_path}")
# updates join table states vid ddb client
self.join_db_client.update_join_job_current_state(
self.experiment_id, self.join_job_id, "PENDING"
)
self.join_db_client.update_join_job_output_joined_train_data_s3_path(
self.experiment_id, self.join_job_id, f"{s3_output_path}/train"
)
self.join_db_client.update_join_job_output_joined_eval_data_s3_path(
self.experiment_id, self.join_job_id, f"{s3_output_path}/eval"
)
# upload joined data
joined_train_data_path = self._upload_data_buffer_as_joined_data_format(
joined_train_data_buffer,
self.query_s3_output_bucket,
f"{self.experiment_id}/joined_data/{self.join_job_id}/train",
)
joined_eval_data_path = self._upload_data_buffer_as_joined_data_format(
joined_eval_data_buffer,
self.query_s3_output_bucket,
f"{self.experiment_id}/joined_data/{self.join_job_id}/eval",
)
# dummy join finished, update joining job state
if joined_train_data_path and joined_eval_data_path:
current_state = "SUCCEEDED"
else:
current_state = "FAILED"
self.join_db_client.update_join_job_current_state(
self.experiment_id, self.join_job_id, current_state
)