def to_csv_file()

in src/sagemaker/feature_store/dataset_builder.py [0:0]


    def to_csv_file(self) -> Tuple[str, str]:
        """Get query string and result in .csv format file

        Returns:
            The S3 path of the .csv file.
            The query string executed.
        """
        if isinstance(self._base, pd.DataFrame):
            temp_id = utils.unique_name_from_base("dataframe-base")
            local_file_name = f"{temp_id}.csv"
            desired_s3_folder = os.path.join(self._output_path, temp_id)
            self._base.to_csv(local_file_name, index=False, header=False)
            s3.S3Uploader.upload(
                local_path=local_file_name,
                desired_s3_uri=desired_s3_folder,
                sagemaker_session=self._sagemaker_session,
                kms_key=self._kms_key_id,
            )
            os.remove(local_file_name)
            temp_table_name = f'dataframe_{temp_id.replace("-", "_")}'
            self._create_temp_table(temp_table_name, desired_s3_folder)
            base_features = list(self._base.columns)
            event_time_identifier_feature_dtype = self._base[
                self._event_time_identifier_feature_name
            ].dtypes
            self._event_time_identifier_feature_type = (
                FeatureGroup.DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get(
                    str(event_time_identifier_feature_dtype), None
                )
            )
            query_string = self._construct_query_string(
                FeatureGroupToBeMerged(
                    base_features,
                    self._included_feature_names if self._included_feature_names else base_features,
                    self._included_feature_names if self._included_feature_names else base_features,
                    _DEFAULT_CATALOG,
                    _DEFAULT_DATABASE,
                    temp_table_name,
                    self._record_identifier_feature_name,
                    FeatureDefinition(
                        self._event_time_identifier_feature_name,
                        self._event_time_identifier_feature_type,
                    ),
                    None,
                    TableType.DATA_FRAME,
                )
            )
            query_result = self._run_query(query_string, _DEFAULT_CATALOG, _DEFAULT_DATABASE)
            # TODO: cleanup temp table, need more clarification, keep it for now
            return query_result.get("QueryExecution", {}).get("ResultConfiguration", {}).get(
                "OutputLocation", None
            ), query_result.get("QueryExecution", {}).get("Query", None)
        if isinstance(self._base, FeatureGroup):
            base_feature_group = construct_feature_group_to_be_merged(
                self._base, self._included_feature_names
            )
            self._record_identifier_feature_name = base_feature_group.record_identifier_feature_name
            self._event_time_identifier_feature_name = (
                base_feature_group.event_time_identifier_feature.feature_name
            )
            self._event_time_identifier_feature_type = (
                base_feature_group.event_time_identifier_feature.feature_type
            )
            query_string = self._construct_query_string(base_feature_group)
            query_result = self._run_query(
                query_string,
                base_feature_group.catalog,
                base_feature_group.database,
            )
            return query_result.get("QueryExecution", {}).get("ResultConfiguration", {}).get(
                "OutputLocation", None
            ), query_result.get("QueryExecution", {}).get("Query", None)
        raise ValueError("Base must be either a FeatureGroup or a DataFrame.")