agents/agentic_rag/app/retrievers.py (84 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ruff: noqa
# mypy: disable-error-code="no-untyped-def"
import os
from unittest.mock import MagicMock
from langchain_google_community.vertex_rank import VertexAIRank
from langchain_google_vertexai import VertexAIEmbeddings
{% if cookiecutter.datastore_type == "vertex_ai_search" -%}
from langchain_google_community import VertexAISearchRetriever
def get_retriever(
project_id: str,
data_store_id: str,
data_store_region: str,
embedding: VertexAIEmbeddings,
embedding_column: str = "embedding",
max_documents: int = 10,
custom_embedding_ratio: float = 0.5,
) -> VertexAISearchRetriever:
"""
Creates and returns an instance of the retriever service.
Uses mock service if the INTEGRATION_TEST environment variable is set to "TRUE",
otherwise initializes real Vertex AI retriever.
"""
try:
return VertexAISearchRetriever(
project_id=project_id,
data_store_id=data_store_id,
location_id=data_store_region,
engine_data_type=1,
# The following parameters are used when you want to search
# using custom embeddings in Agent Builder.
# The ratio is set to 0.5 by default to use a mix of custom
# embeddings but you can adapt the ratio as you need.
custom_embedding_ratio=custom_embedding_ratio,
custom_embedding=embedding,
custom_embedding_field_path=embedding_column,
# Extracting 20 documents before re-rank.
max_documents=max_documents,
beta=True,
)
except Exception:
retriever = MagicMock()
def raise_exception(*args, **kwargs) -> None:
"""Function that raises an exception when the retriever is not available."""
raise Exception("Retriever not available")
retriever.invoke = raise_exception
return retriever
{% elif cookiecutter.datastore_type == "vertex_ai_vector_search" -%}
from google.cloud import aiplatform
from langchain_google_vertexai import VectorSearchVectorStore
from langchain_core.vectorstores import VectorStoreRetriever
def get_retriever(
project_id: str,
region: str,
vector_search_bucket: str,
vector_search_index: str,
vector_search_index_endpoint: str,
embedding: VertexAIEmbeddings,
) -> VectorStoreRetriever:
"""
Creates and returns an instance of the retriever service.
"""
try:
aiplatform.init(
project=project_id,
location=region,
staging_bucket=vector_search_bucket,
)
my_index = aiplatform.MatchingEngineIndex(vector_search_index)
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
vector_search_index_endpoint
)
return VectorSearchVectorStore.from_components(
project_id=project_id,
region=region,
gcs_bucket_name=vector_search_bucket.replace("gs://", ""),
index_id=my_index.name,
endpoint_id=my_index_endpoint.name,
embedding=embedding,
stream_update=True,
).as_retriever()
except Exception:
retriever = MagicMock()
def raise_exception(*args, **kwargs) -> None:
"""Function that raises an exception when the retriever is not available."""
raise Exception("Retriever not available")
retriever.invoke = raise_exception
return retriever
{% endif %}
def get_compressor(project_id: str, top_n: int = 5) -> VertexAIRank:
"""
Creates and returns an instance of the compressor service.
"""
try:
return VertexAIRank(
project_id=project_id,
location_id="global",
ranking_config="default_ranking_config",
title_field="id",
top_n=top_n,
)
except Exception:
compressor = MagicMock()
compressor.compress_documents = lambda x: []
return compressor