in components/llm_service/src/services/query/vertex_search.py [0:0]
def build_vertex_search(q_engine: QueryEngine) -> \
Tuple[List[QueryDocument], List[str]]:
"""
Build a Vertex Search-based Query Engine
q_engine.doc_url must specify either a gcs uri, an http:// https://
URL, or bq://<bigquery dataset and table separated by colons>
ie.
q_engine.doc_url = "gs://bucket/optional_subfolder"
or
q_engine.doc_url = "bq://BIGQUERY_DATASET:BIGQUERY_TABLE"
or
q_engine.doc_url = "https://example.com/news"
Args:
q_engine: QueryEngine to build
Returns:
Tuple of list of QueryDocument objects of docs processed,
list of uris of docs not processed
"""
# initialize some variables
data_url = q_engine.doc_url
project_id = PROJECT_ID
location = "global"
Logger.info(f"Building vertex search engine [{q_engine.name}] [{data_url}]")
# initialize doc tracking lists
docs_to_be_processed = []
docs_processed = []
docs_not_processed = []
doc_models_processed = []
# initialize datastore id
data_store_id = None
# validate data_url
if not (data_url.startswith("bq://")
or data_url.startswith("gs://")
or data_url.startswith("http://") or data_url.startswith("https://")):
raise RuntimeError(f"Invalid data url: {data_url}")
try:
# inventory the documents to be ingested
if data_url.startswith("http://") or data_url.startswith("https://"):
# download web docs and store in a GCS bucket
gcs_url, docs_to_be_processed = download_web_docs(q_engine, data_url)
data_url = gcs_url
elif data_url.startswith("bq://"):
docs_to_be_processed = [DataSourceFile(src_url=data_url)]
elif data_url.startswith("gs://"):
docs_to_be_processed = inventory_gcs_files(data_url)
# create data store
data_store_id = datastore_id_from_engine(q_engine)
operation = create_data_store(q_engine, project_id, data_store_id)
wait_for_operation(operation)
# perform import
docs_processed, docs_not_processed = \
import_documents_to_datastore(data_url,
docs_to_be_processed,
project_id,
location,
data_store_id)
# create search engine
operation = create_search_engine(q_engine, project_id, data_store_id)
wait_for_operation(operation)
Logger.info(f"Created vertex search engine for {q_engine.name}")
# save metadata for datastore in query engine
q_engine.index_id = data_store_id
q_engine.update()
# create QueryDocument models for processed documents
for doc in docs_processed:
query_document = QueryDocument(
query_engine_id=q_engine.id,
query_engine=q_engine.name,
doc_url=doc.src_url,
index_file=doc.gcs_path
)
query_document.save()
doc_models_processed.append(query_document)
except Exception as e:
Logger.error(f"Error building vertex search query engine [{str(e)}]")
Logger.error(traceback.print_exc())
# on build error, delete any vertex search assets that were created
delete_vertex_search(q_engine, data_store_id)
raise e
return doc_models_processed, docs_not_processed