def ingest_joined_data()

in common/sagemaker_rl/orchestrator/workflow/manager/experiment_manager.py [0:0]


    def ingest_joined_data(self, joined_data_buffer, ratio=0.8):
        """Upload joined data in joined data buffer to S3 bucket

        Args:
            joined_data_buffer (list): A list of json blobs containing
                joined data
            ratio (float): Split ratio to split data into
                training data and evaluation data
        """
        # local join to  simulate a joining workflow

        # update next_join_job_id and joining state
        next_join_job_id = JoinManager.name_next_join_job(experiment_id=self.experiment_id)
        self.exp_db_client.update_experiment_next_join_job_id(self.experiment_id, next_join_job_id)
        self.exp_db_client.update_experiment_joining_state(self.experiment_id, JoiningState.PENDING)

        self.next_join_job = JoinManager(
            join_db_client=self.join_db_client,
            experiment_id=self.experiment_id,
            join_job_id=next_join_job_id,
            input_obs_data_s3_path="local-join-does-not-apply",
            input_reward_data_s3_path="local-join-does-not-apply",
            boto_session=self.boto_session,
        )

        logger.info("Started dummy local joining job...")
        self.next_join_job.start_dummy_join(joined_data_buffer=joined_data_buffer, ratio=ratio)

        # this method can be invoked either in local/SM mode
        succeeded_state = (
            self.experiment_record._joining_state == JoiningState.SUCCEEDED
            and self.experiment_record._last_joined_job_id == next_join_job_id
            and self.experiment_record._next_join_job_id is None
        )
        num_retries = 0

        while not succeeded_state:
            # Sync experiment state if required
            self._sync_experiment_state_with_ddb()
            logger.debug("Waiting for experiment table joining status to be updated...")
            time.sleep(2 * (2 ** num_retries))
            succeeded_state = (
                self.experiment_record._joining_state == JoiningState.SUCCEEDED
                and self.experiment_record._last_joined_job_id == next_join_job_id
                and self.experiment_record._next_join_job_id is None
            )
            num_retries += 1
            if num_retries >= 5:
                raise UnhandledWorkflowException(
                    f"Joining job '{self.experiment_record._next_join_job_id}' "
                    f"was in state of '{self.experiment_record._joining_state}'. Failed to sync table states."
                )
            if (
                self.experiment_record._joining_state == JoiningState.FAILED
                or self.experiment_record._joining_state == JoiningState.CANCELLED
            ):
                raise WorkflowJoiningJobException(
                    f"Joining job '{self.experiment_record._next_join_job_id}' "
                    f"ended with state '{self.experiment_record._joining_state}'. Please check if provided "
                    "joined_data_buffer was in correct data format."
                )