in src/sagemaker/feature_store/dataset_builder.py [0:0]
def _construct_table_query(self, feature_group: FeatureGroupToBeMerged, suffix: str) -> str:
"""Internal method for constructing SQL query string by parameters.
Args:
feature_group (FeatureGroupToBeMerged): A FeatureGroupToBeMerged object which has the
FeatureGroup metadata.
suffix (str): A temp identifier of the FeatureGroup.
Returns:
The query string.
"""
included_features = self._construct_table_included_features(feature_group, suffix)
# If base is a FeatureGroup then included_features_write_time will have a write_time column
# Or included_features_write_time is same as included_features
included_features_write_time = included_features
if feature_group.table_type is TableType.FEATURE_GROUP:
included_features_write_time += f', table_{suffix}."write_time"'
record_feature_name = feature_group.record_identifier_feature_name
event_time_feature_name = feature_group.event_time_identifier_feature.feature_name
if self._include_duplicated_records and self._include_deleted_records:
return (
f"SELECT {included_features}\n"
+ f'FROM "{feature_group.database}"."{feature_group.table_name}" table_{suffix}\n'
+ self._construct_where_query_string(
suffix, feature_group.event_time_identifier_feature, ["NOT is_deleted"]
)
)
if feature_group.table_type is TableType.FEATURE_GROUP and self._include_deleted_records:
rank_query_string = ""
if feature_group.table_type is TableType.FEATURE_GROUP:
rank_query_string = (
f'ORDER BY origin_{suffix}."api_invocation_time" DESC, '
+ f'origin_{suffix}."write_time" DESC\n'
)
return (
f"SELECT {included_features}\n"
+ "FROM (\n"
+ "SELECT *, row_number() OVER (\n"
+ f'PARTITION BY origin_{suffix}."{record_feature_name}", '
+ f'origin_{suffix}."{event_time_feature_name}"\n'
+ rank_query_string
+ f") AS row_{suffix}\n"
+ f'FROM "{feature_group.database}"."{feature_group.table_name}" origin_{suffix}\n'
+ "WHERE NOT is_deleted"
+ f") AS table_{suffix}\n"
+ self._construct_where_query_string(
suffix,
feature_group.event_time_identifier_feature,
[f"row_{suffix} = 1"],
)
)
rank_query_string = ""
if feature_group.table_type is TableType.FEATURE_GROUP:
rank_query_string = (
f'OR (table_{suffix}."{event_time_feature_name}" = '
+ f'deleted_{suffix}."{event_time_feature_name}" '
+ f'AND table_{suffix}."api_invocation_time" > '
+ f'deleted_{suffix}."api_invocation_time")\n'
+ f'OR (table_{suffix}."{event_time_feature_name}" = '
+ f'deleted_{suffix}."{event_time_feature_name}" '
+ f'AND table_{suffix}."api_invocation_time" = '
+ f'deleted_{suffix}."api_invocation_time" '
+ f'AND table_{suffix}."write_time" > deleted_{suffix}."write_time")\n'
)
final_query_string = ""
if feature_group.table_type is TableType.FEATURE_GROUP:
if self._include_duplicated_records:
final_query_string = (
f"WITH {self._construct_deleted_query(feature_group, suffix)}\n"
+ f"SELECT {included_features}\n"
+ "FROM (\n"
+ f"SELECT {included_features_write_time}\n"
+ f'FROM "{feature_group.database}"."{feature_group.table_name}"'
+ f" table_{suffix}\n"
+ f"LEFT JOIN deleted_{suffix}\n"
+ f'ON table_{suffix}."{record_feature_name}" = '
+ f'deleted_{suffix}."{record_feature_name}"\n'
+ f'WHERE deleted_{suffix}."{record_feature_name}" IS NULL\n'
+ "UNION ALL\n"
+ f"SELECT {included_features_write_time}\n"
+ f"FROM deleted_{suffix}\n"
+ f'JOIN "{feature_group.database}"."{feature_group.table_name}"'
+ f" table_{suffix}\n"
+ f'ON table_{suffix}."{record_feature_name}" = '
+ f'deleted_{suffix}."{record_feature_name}"\n'
+ "AND (\n"
+ f'table_{suffix}."{event_time_feature_name}" > '
+ f'deleted_{suffix}."{event_time_feature_name}"\n'
+ rank_query_string
+ ")\n"
+ f") AS table_{suffix}\n"
+ self._construct_where_query_string(
suffix, feature_group.event_time_identifier_feature, []
)
)
else:
final_query_string = (
f"WITH {self._construct_dedup_query(feature_group, suffix)},\n"
+ f"{self._construct_deleted_query(feature_group, suffix)}\n"
+ f"SELECT {included_features}\n"
+ "FROM (\n"
+ f"SELECT {included_features_write_time}\n"
+ f"FROM table_{suffix}\n"
+ f"LEFT JOIN deleted_{suffix}\n"
+ f'ON table_{suffix}."{record_feature_name}" = '
+ f'deleted_{suffix}."{record_feature_name}"\n'
+ f'WHERE deleted_{suffix}."{record_feature_name}" IS NULL\n'
+ "UNION ALL\n"
+ f"SELECT {included_features_write_time}\n"
+ f"FROM deleted_{suffix}\n"
+ f"JOIN table_{suffix}\n"
+ f'ON table_{suffix}."{record_feature_name}" = '
+ f'deleted_{suffix}."{record_feature_name}"\n'
+ "AND (\n"
+ f'table_{suffix}."{event_time_feature_name}" > '
+ f'deleted_{suffix}."{event_time_feature_name}"\n'
+ rank_query_string
+ ")\n"
+ f") AS table_{suffix}\n"
+ self._construct_where_query_string(
suffix, feature_group.event_time_identifier_feature, []
)
)
else:
final_query_string = (
f"WITH {self._construct_dedup_query(feature_group, suffix)}\n"
+ f"SELECT {included_features}\n"
+ "FROM (\n"
+ f"SELECT {included_features_write_time}\n"
+ f"FROM table_{suffix}\n"
+ f") AS table_{suffix}\n"
+ self._construct_where_query_string(
suffix, feature_group.event_time_identifier_feature, []
)
)
return final_query_string