src/sagemaker_rl/orchestrator/workflow/manager/join_manager.py [204:240]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        query_string = f"""
            CREATE EXTERNAL TABLE IF NOT EXISTS {self.obs_table_partitioned} (
                    event_id STRING,
                    action INT,
                    observation STRING,
                    model_id STRING,
                    action_prob FLOAT,
                    sample_prob FLOAT
            )
            PARTITIONED BY (dt string)
            ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
            LOCATION '{input_obs_data_s3_path}'
        """
        s3_output_path = (
            f"s3://{self.query_s3_output_bucket}/{self.experiment_id}/joined_data/obs_tables"
        )
        query_id = self._start_query(query_string, s3_output_path)
        self.wait_query_to_finish(query_id)

        # non-partitioned-table
        query_string = f"""
            CREATE EXTERNAL TABLE IF NOT EXISTS {self.obs_table_non_partitioned} (
                    event_id STRING,
                    action INT,
                    observation STRING,
                    model_id STRING,
                    action_prob FLOAT,
                    sample_prob FLOAT
            )
            ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
            LOCATION '{input_obs_data_s3_path}'
        """
        s3_output_path = (
            f"s3://{self.query_s3_output_bucket}/{self.experiment_id}/joined_data/obs_tables"
        )
        query_id = self._start_query(query_string, s3_output_path)
        self.wait_query_to_finish(query_id)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/sagemaker_rl/orchestrator/workflow/manager/join_manager.py [248:264]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        query_string = f"""
            DROP TABLE IF EXISTS {self.obs_table_partitioned}
        """
        s3_output_path = (
            f"s3://{self.query_s3_output_bucket}/{self.experiment_id}/joined_data/obs_tables"
        )
        query_id = self._start_query(query_string, s3_output_path)
        self.wait_query_to_finish(query_id)

        query_string = f"""
            DROP TABLE IF EXISTS {self.obs_table_non_partitioned}
        """
        s3_output_path = (
            f"s3://{self.query_s3_output_bucket}/{self.experiment_id}/joined_data/obs_tables"
        )
        query_id = self._start_query(query_string, s3_output_path)
        self.wait_query_to_finish(query_id)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



