def _construct_query_string()

in src/sagemaker/feature_store/dataset_builder.py [0:0]


    def _construct_query_string(self, base: FeatureGroupToBeMerged) -> str:
        """Internal method for constructing SQL query string by parameters.

        Args:
            base (FeatureGroupToBeMerged): A FeatureGroupToBeMerged object which has the metadata.
        Returns:
            The query string.

        Raises:
            ValueError: target_feature_name_in_base is an invalid feature name.
        """
        base_table_query_string = self._construct_table_query(base, "base")
        query_string = f"WITH fg_base AS ({base_table_query_string})"
        if len(self._feature_groups_to_be_merged) > 0:
            with_subquery_string = "".join(
                [
                    f",\nfg_{i} AS ({self._construct_table_query(feature_group, str(i))})"
                    for i, feature_group in enumerate(self._feature_groups_to_be_merged)
                ]
            )
            query_string += with_subquery_string

        selected_features = ""
        selected_features += ", ".join(map("fg_base.{0}".format, base.projected_feature_names))
        if len(self._feature_groups_to_be_merged) > 0:
            for i, feature_group in enumerate(self._feature_groups_to_be_merged):
                selected_features += ", "
                selected_features += ", ".join(
                    [
                        f'fg_{i}."{feature_name}" as "{feature_name}.{(i + 1)}"'
                        for feature_name in feature_group.projected_feature_names
                    ]
                )

        selected_features_final = ""
        selected_features_final += ", ".join(base.projected_feature_names)
        if len(self._feature_groups_to_be_merged) > 0:
            for i, feature_group in enumerate(self._feature_groups_to_be_merged):
                selected_features_final += ", "
                selected_features_final += ", ".join(
                    [
                        '"{0}.{1}"'.format(feature_name, (i + 1))
                        for feature_name in feature_group.projected_feature_names
                    ]
                )

        query_string += (
            f"\nSELECT {selected_features_final}\n"
            + "FROM (\n"
            + f"SELECT {selected_features}, row_number() OVER (\n"
            + f'PARTITION BY fg_base."{base.record_identifier_feature_name}"\n'
            + f'ORDER BY fg_base."{base.event_time_identifier_feature.feature_name}" DESC'
        )

        recent_record_where_clause = ""
        if self._number_of_recent_records is not None and self._number_of_recent_records >= 0:
            recent_record_where_clause = f"WHERE row_recent <= {self._number_of_recent_records}"

        join_subquery_strings = []
        if len(self._feature_groups_to_be_merged) > 0:
            for i, feature_group in enumerate(self._feature_groups_to_be_merged):
                if not feature_group.target_feature_name_in_base:
                    feature_group.target_feature_name_in_base = self._record_identifier_feature_name
                else:
                    if feature_group.target_feature_name_in_base not in base.features:
                        raise ValueError(
                            f"Feature {feature_group.target_feature_name_in_base} not found in base"
                        )
                query_string += (
                    f', fg_{i}."{feature_group.event_time_identifier_feature.feature_name}" DESC'
                )
                join_subquery_strings.append(self._construct_join_condition(feature_group, str(i)))

        query_string += (
            "\n) AS row_recent\n"
            + "FROM fg_base"
            + "".join(join_subquery_strings)
            + "\n)\n"
            + f"{recent_record_where_clause}"
        )

        if self._number_of_records is not None and self._number_of_records >= 0:
            query_string += f"\nLIMIT {self._number_of_records}"
        return query_string