in src/sagemaker/feature_store/dataset_builder.py [0:0]
def _construct_query_string(self, base: FeatureGroupToBeMerged) -> str:
"""Internal method for constructing SQL query string by parameters.
Args:
base (FeatureGroupToBeMerged): A FeatureGroupToBeMerged object which has the metadata.
Returns:
The query string.
Raises:
ValueError: target_feature_name_in_base is an invalid feature name.
"""
base_table_query_string = self._construct_table_query(base, "base")
query_string = f"WITH fg_base AS ({base_table_query_string})"
if len(self._feature_groups_to_be_merged) > 0:
with_subquery_string = "".join(
[
f",\nfg_{i} AS ({self._construct_table_query(feature_group, str(i))})"
for i, feature_group in enumerate(self._feature_groups_to_be_merged)
]
)
query_string += with_subquery_string
selected_features = ""
selected_features += ", ".join(map("fg_base.{0}".format, base.projected_feature_names))
if len(self._feature_groups_to_be_merged) > 0:
for i, feature_group in enumerate(self._feature_groups_to_be_merged):
selected_features += ", "
selected_features += ", ".join(
[
f'fg_{i}."{feature_name}" as "{feature_name}.{(i + 1)}"'
for feature_name in feature_group.projected_feature_names
]
)
selected_features_final = ""
selected_features_final += ", ".join(base.projected_feature_names)
if len(self._feature_groups_to_be_merged) > 0:
for i, feature_group in enumerate(self._feature_groups_to_be_merged):
selected_features_final += ", "
selected_features_final += ", ".join(
[
'"{0}.{1}"'.format(feature_name, (i + 1))
for feature_name in feature_group.projected_feature_names
]
)
query_string += (
f"\nSELECT {selected_features_final}\n"
+ "FROM (\n"
+ f"SELECT {selected_features}, row_number() OVER (\n"
+ f'PARTITION BY fg_base."{base.record_identifier_feature_name}"\n'
+ f'ORDER BY fg_base."{base.event_time_identifier_feature.feature_name}" DESC'
)
recent_record_where_clause = ""
if self._number_of_recent_records is not None and self._number_of_recent_records >= 0:
recent_record_where_clause = f"WHERE row_recent <= {self._number_of_recent_records}"
join_subquery_strings = []
if len(self._feature_groups_to_be_merged) > 0:
for i, feature_group in enumerate(self._feature_groups_to_be_merged):
if not feature_group.target_feature_name_in_base:
feature_group.target_feature_name_in_base = self._record_identifier_feature_name
else:
if feature_group.target_feature_name_in_base not in base.features:
raise ValueError(
f"Feature {feature_group.target_feature_name_in_base} not found in base"
)
query_string += (
f', fg_{i}."{feature_group.event_time_identifier_feature.feature_name}" DESC'
)
join_subquery_strings.append(self._construct_join_condition(feature_group, str(i)))
query_string += (
"\n) AS row_recent\n"
+ "FROM fg_base"
+ "".join(join_subquery_strings)
+ "\n)\n"
+ f"{recent_record_where_clause}"
)
if self._number_of_records is not None and self._number_of_records >= 0:
query_string += f"\nLIMIT {self._number_of_records}"
return query_string