def _construct_table_query()

in src/sagemaker/feature_store/dataset_builder.py [0:0]


    def _construct_table_query(self, feature_group: FeatureGroupToBeMerged, suffix: str) -> str:
        """Internal method for constructing SQL query string by parameters.

        Args:
            feature_group (FeatureGroupToBeMerged): A FeatureGroupToBeMerged object which has the
                FeatureGroup metadata.
            suffix (str): A temp identifier of the FeatureGroup.
        Returns:
            The query string.
        """
        included_features = self._construct_table_included_features(feature_group, suffix)

        # If base is a FeatureGroup then included_features_write_time will have a write_time column
        # Or included_features_write_time is same as included_features
        included_features_write_time = included_features

        if feature_group.table_type is TableType.FEATURE_GROUP:
            included_features_write_time += f', table_{suffix}."write_time"'
        record_feature_name = feature_group.record_identifier_feature_name
        event_time_feature_name = feature_group.event_time_identifier_feature.feature_name
        if self._include_duplicated_records and self._include_deleted_records:
            return (
                f"SELECT {included_features}\n"
                + f'FROM "{feature_group.database}"."{feature_group.table_name}" table_{suffix}\n'
                + self._construct_where_query_string(
                    suffix, feature_group.event_time_identifier_feature, ["NOT is_deleted"]
                )
            )
        if feature_group.table_type is TableType.FEATURE_GROUP and self._include_deleted_records:
            rank_query_string = ""
            if feature_group.table_type is TableType.FEATURE_GROUP:
                rank_query_string = (
                    f'ORDER BY origin_{suffix}."api_invocation_time" DESC, '
                    + f'origin_{suffix}."write_time" DESC\n'
                )
            return (
                f"SELECT {included_features}\n"
                + "FROM (\n"
                + "SELECT *, row_number() OVER (\n"
                + f'PARTITION BY origin_{suffix}."{record_feature_name}", '
                + f'origin_{suffix}."{event_time_feature_name}"\n'
                + rank_query_string
                + f") AS row_{suffix}\n"
                + f'FROM "{feature_group.database}"."{feature_group.table_name}" origin_{suffix}\n'
                + "WHERE NOT is_deleted"
                + f") AS table_{suffix}\n"
                + self._construct_where_query_string(
                    suffix,
                    feature_group.event_time_identifier_feature,
                    [f"row_{suffix} = 1"],
                )
            )
        rank_query_string = ""
        if feature_group.table_type is TableType.FEATURE_GROUP:
            rank_query_string = (
                f'OR (table_{suffix}."{event_time_feature_name}" = '
                + f'deleted_{suffix}."{event_time_feature_name}" '
                + f'AND table_{suffix}."api_invocation_time" > '
                + f'deleted_{suffix}."api_invocation_time")\n'
                + f'OR (table_{suffix}."{event_time_feature_name}" = '
                + f'deleted_{suffix}."{event_time_feature_name}" '
                + f'AND table_{suffix}."api_invocation_time" = '
                + f'deleted_{suffix}."api_invocation_time" '
                + f'AND table_{suffix}."write_time" > deleted_{suffix}."write_time")\n'
            )

        final_query_string = ""
        if feature_group.table_type is TableType.FEATURE_GROUP:
            if self._include_duplicated_records:
                final_query_string = (
                    f"WITH {self._construct_deleted_query(feature_group, suffix)}\n"
                    + f"SELECT {included_features}\n"
                    + "FROM (\n"
                    + f"SELECT {included_features_write_time}\n"
                    + f'FROM "{feature_group.database}"."{feature_group.table_name}"'
                    + f" table_{suffix}\n"
                    + f"LEFT JOIN deleted_{suffix}\n"
                    + f'ON table_{suffix}."{record_feature_name}" = '
                    + f'deleted_{suffix}."{record_feature_name}"\n'
                    + f'WHERE deleted_{suffix}."{record_feature_name}" IS NULL\n'
                    + "UNION ALL\n"
                    + f"SELECT {included_features_write_time}\n"
                    + f"FROM deleted_{suffix}\n"
                    + f'JOIN "{feature_group.database}"."{feature_group.table_name}"'
                    + f" table_{suffix}\n"
                    + f'ON table_{suffix}."{record_feature_name}" = '
                    + f'deleted_{suffix}."{record_feature_name}"\n'
                    + "AND (\n"
                    + f'table_{suffix}."{event_time_feature_name}" > '
                    + f'deleted_{suffix}."{event_time_feature_name}"\n'
                    + rank_query_string
                    + ")\n"
                    + f") AS table_{suffix}\n"
                    + self._construct_where_query_string(
                        suffix, feature_group.event_time_identifier_feature, []
                    )
                )
            else:
                final_query_string = (
                    f"WITH {self._construct_dedup_query(feature_group, suffix)},\n"
                    + f"{self._construct_deleted_query(feature_group, suffix)}\n"
                    + f"SELECT {included_features}\n"
                    + "FROM (\n"
                    + f"SELECT {included_features_write_time}\n"
                    + f"FROM table_{suffix}\n"
                    + f"LEFT JOIN deleted_{suffix}\n"
                    + f'ON table_{suffix}."{record_feature_name}" = '
                    + f'deleted_{suffix}."{record_feature_name}"\n'
                    + f'WHERE deleted_{suffix}."{record_feature_name}" IS NULL\n'
                    + "UNION ALL\n"
                    + f"SELECT {included_features_write_time}\n"
                    + f"FROM deleted_{suffix}\n"
                    + f"JOIN table_{suffix}\n"
                    + f'ON table_{suffix}."{record_feature_name}" = '
                    + f'deleted_{suffix}."{record_feature_name}"\n'
                    + "AND (\n"
                    + f'table_{suffix}."{event_time_feature_name}" > '
                    + f'deleted_{suffix}."{event_time_feature_name}"\n'
                    + rank_query_string
                    + ")\n"
                    + f") AS table_{suffix}\n"
                    + self._construct_where_query_string(
                        suffix, feature_group.event_time_identifier_feature, []
                    )
                )
        else:
            final_query_string = (
                f"WITH {self._construct_dedup_query(feature_group, suffix)}\n"
                + f"SELECT {included_features}\n"
                + "FROM (\n"
                + f"SELECT {included_features_write_time}\n"
                + f"FROM table_{suffix}\n"
                + f") AS table_{suffix}\n"
                + self._construct_where_query_string(
                    suffix, feature_group.event_time_identifier_feature, []
                )
            )
        return final_query_string