def search()

in backend/unified-cloud-search/services/unified_cloud_search_service.py [0:0]


    def search(self, query: str, num_neighbors: int) -> List[SearchResult]:
        logger.info(f"index_endpoint.match completed")

        # Retrieve latest access token
        access_token = subprocess.run(
            "gcloud auth print-access-token", shell=True, capture_output=True, text=True
        ).stdout.strip()

        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        json_data = {
            "query": query,
            "page_size": num_neighbors,
            "offset": 0,
        }

        url = (
            f"https://discoveryengine.googleapis.com/v1alpha/projects/{self.project_id}/locations/{self.location}/collections/default_collection/dataStores/{self.datastore_id}/servingConfigs/default_search:search"
            if not self.is_staging
            else f"https://staging-discoveryengine.sandbox.googleapis.com/v1alpha/projects/{self.project_id}/locations/{self.location}/collections/default_collection/dataStores/{self.datastore_id}/servingConfigs/default_config:search"
        )

        response = requests.post(
            url,
            headers=headers,
            json=json_data,
        )

        if response.status_code == 200:
            matches_all = self.convert_to_search_result(
                results=response.json()["results"]
            )

            logger.info(f"matches converted")

            matches_all_nonoptional: List[SearchResult] = [
                match for match in matches_all if match is not None
            ]

            logger.info(f"matches none filtered")

            return matches_all_nonoptional
        else:
            raise RuntimeError("Error retrieving search results")