def build_vertex_search()

in components/llm_service/src/services/query/vertex_search.py [0:0]


def build_vertex_search(q_engine: QueryEngine) -> \
    Tuple[List[QueryDocument], List[str]]:
  """
  Build a Vertex Search-based Query Engine

  q_engine.doc_url must specify either a gcs uri, an http:// https://
     URL, or bq://<bigquery dataset and table separated by colons>
  ie.
  q_engine.doc_url = "gs://bucket/optional_subfolder"
    or
  q_engine.doc_url = "bq://BIGQUERY_DATASET:BIGQUERY_TABLE"
    or
  q_engine.doc_url = "https://example.com/news"

  Args:
    q_engine: QueryEngine to build

  Returns:
    Tuple of list of QueryDocument objects of docs processed,
      list of uris of docs not processed
  """
  # initialize some variables
  data_url = q_engine.doc_url
  project_id = PROJECT_ID
  location = "global"

  Logger.info(f"Building vertex search engine [{q_engine.name}] [{data_url}]")

  # initialize doc tracking lists
  docs_to_be_processed = []
  docs_processed = []
  docs_not_processed = []
  doc_models_processed = []

  # initialize datastore id
  data_store_id = None

  # validate data_url
  if not (data_url.startswith("bq://")
       or data_url.startswith("gs://")
       or data_url.startswith("http://") or data_url.startswith("https://")):
    raise RuntimeError(f"Invalid data url: {data_url}")

  try:
    # inventory the documents to be ingested
    if data_url.startswith("http://") or data_url.startswith("https://"):
      # download web docs and store in a GCS bucket
      gcs_url, docs_to_be_processed = download_web_docs(q_engine, data_url)
      data_url = gcs_url
    elif data_url.startswith("bq://"):
      docs_to_be_processed = [DataSourceFile(src_url=data_url)]
    elif data_url.startswith("gs://"):
      docs_to_be_processed = inventory_gcs_files(data_url)

    # create data store
    data_store_id = datastore_id_from_engine(q_engine)
    operation = create_data_store(q_engine, project_id, data_store_id)
    wait_for_operation(operation)

    # perform import
    docs_processed, docs_not_processed = \
        import_documents_to_datastore(data_url,
                                      docs_to_be_processed,
                                      project_id,
                                      location,
                                      data_store_id)

    # create search engine
    operation = create_search_engine(q_engine, project_id, data_store_id)
    wait_for_operation(operation)
    Logger.info(f"Created vertex search engine for {q_engine.name}")

    # save metadata for datastore in query engine
    q_engine.index_id = data_store_id
    q_engine.update()

    # create QueryDocument models for processed documents
    for doc in docs_processed:
      query_document = QueryDocument(
        query_engine_id=q_engine.id,
        query_engine=q_engine.name,
        doc_url=doc.src_url,
        index_file=doc.gcs_path
      )
      query_document.save()
      doc_models_processed.append(query_document)

  except Exception as e:
    Logger.error(f"Error building vertex search query engine [{str(e)}]")
    Logger.error(traceback.print_exc())

    # on build error, delete any vertex search assets that were created
    delete_vertex_search(q_engine, data_store_id)
    raise e

  return doc_models_processed, docs_not_processed