in src/sagemaker/feature_store/dataset_builder.py [0:0]
def to_csv_file(self) -> Tuple[str, str]:
"""Get query string and result in .csv format file
Returns:
The S3 path of the .csv file.
The query string executed.
"""
if isinstance(self._base, pd.DataFrame):
temp_id = utils.unique_name_from_base("dataframe-base")
local_file_name = f"{temp_id}.csv"
desired_s3_folder = os.path.join(self._output_path, temp_id)
self._base.to_csv(local_file_name, index=False, header=False)
s3.S3Uploader.upload(
local_path=local_file_name,
desired_s3_uri=desired_s3_folder,
sagemaker_session=self._sagemaker_session,
kms_key=self._kms_key_id,
)
os.remove(local_file_name)
temp_table_name = f'dataframe_{temp_id.replace("-", "_")}'
self._create_temp_table(temp_table_name, desired_s3_folder)
base_features = list(self._base.columns)
event_time_identifier_feature_dtype = self._base[
self._event_time_identifier_feature_name
].dtypes
self._event_time_identifier_feature_type = (
FeatureGroup.DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get(
str(event_time_identifier_feature_dtype), None
)
)
query_string = self._construct_query_string(
FeatureGroupToBeMerged(
base_features,
self._included_feature_names if self._included_feature_names else base_features,
self._included_feature_names if self._included_feature_names else base_features,
_DEFAULT_CATALOG,
_DEFAULT_DATABASE,
temp_table_name,
self._record_identifier_feature_name,
FeatureDefinition(
self._event_time_identifier_feature_name,
self._event_time_identifier_feature_type,
),
None,
TableType.DATA_FRAME,
)
)
query_result = self._run_query(query_string, _DEFAULT_CATALOG, _DEFAULT_DATABASE)
# TODO: cleanup temp table, need more clarification, keep it for now
return query_result.get("QueryExecution", {}).get("ResultConfiguration", {}).get(
"OutputLocation", None
), query_result.get("QueryExecution", {}).get("Query", None)
if isinstance(self._base, FeatureGroup):
base_feature_group = construct_feature_group_to_be_merged(
self._base, self._included_feature_names
)
self._record_identifier_feature_name = base_feature_group.record_identifier_feature_name
self._event_time_identifier_feature_name = (
base_feature_group.event_time_identifier_feature.feature_name
)
self._event_time_identifier_feature_type = (
base_feature_group.event_time_identifier_feature.feature_type
)
query_string = self._construct_query_string(base_feature_group)
query_result = self._run_query(
query_string,
base_feature_group.catalog,
base_feature_group.database,
)
return query_result.get("QueryExecution", {}).get("ResultConfiguration", {}).get(
"OutputLocation", None
), query_result.get("QueryExecution", {}).get("Query", None)
raise ValueError("Base must be either a FeatureGroup or a DataFrame.")