movie_search_metadata/demo_app/_vais_setup.py (108 lines of code) (raw):
import os
import time
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine
PROJECT_ID = os.getenv('PROJECT_ID')
BUCKET = os.getenv('BUCKET')
LOCATION = 'global'
DATASTORE_ID = 'movie-search-datastore'
ENGINE_ID = 'movie-search-engine'
def create_datastore(project_id, location, datastore_id):
client_options = (
ClientOptions(api_endpoint=f'{location}-discoveryengine.googleapis.com')
if location != 'global'
else None
)
client = discoveryengine.DataStoreServiceClient(client_options=client_options)
parent = client.collection_path(
project=project_id,
location=location,
collection='default_collection',
)
data_store = discoveryengine.DataStore(
display_name='Movie search datastore',
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
solution_types=[discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH],
content_config=discoveryengine.DataStore.ContentConfig.CONTENT_REQUIRED,
)
request = discoveryengine.CreateDataStoreRequest(
parent=parent,
data_store_id=datastore_id,
data_store=data_store,
)
operation = client.create_data_store(request=request)
print(f'Waiting for operation to complete: {operation.operation.name}')
response = operation.result()
return response
def import_documents(project_id, location, datastore_id, bucket):
client_options = (
ClientOptions(api_endpoint=f'{location}-discoveryengine.googleapis.com')
if location != 'global'
else None
)
client = discoveryengine.DocumentServiceClient(client_options=client_options)
parent = client.branch_path(
project=project_id,
location=location,
data_store=datastore_id,
branch='default_branch'
)
request = discoveryengine.ImportDocumentsRequest(
parent=parent,
gcs_source=discoveryengine.GcsSource(
input_uris=[f'{bucket}/metadata/*.txt'],
data_schema='content',
),
reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.FULL
)
operation = client.import_documents(request=request)
print(f'Waiting for operation to complete: {operation.operation.name}')
print('This may take around 30 mins...')
response = operation.result(timeout=3600)
return response
def create_engine(project_id, location, datastore_id, engine_id):
client_options = (
ClientOptions(api_endpoint=f'{location}-discoveryengine.googleapis.com')
if location != 'global'
else None
)
client = discoveryengine.EngineServiceClient(client_options=client_options)
parent = client.collection_path(
project=project_id,
location=location,
collection='default_collection'
)
engine = discoveryengine.Engine(
display_name='Movie Search Engine',
industry_vertical=discoveryengine.IndustryVertical.GENERIC,
solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
search_engine_config=discoveryengine.Engine.SearchEngineConfig(
search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
search_add_ons=[discoveryengine.SearchAddOn.SEARCH_ADD_ON_LLM],
),
data_store_ids=[datastore_id],
)
request = discoveryengine.CreateEngineRequest(
parent=parent,
engine=engine,
engine_id=engine_id,
)
operation = client.create_engine(request=request)
print(f'Waiting for operation to complete: {operation.operation.name}')
response = operation.result()
return response
if __name__ == '__main__':
print('\n## Creating datastore...')
try:
create_datastore(PROJECT_ID, LOCATION, DATASTORE_ID)
except Exception as e:
print(e)
time.sleep(10)
print('\n## Importing documents...')
import_documents(PROJECT_ID, LOCATION, DATASTORE_ID, BUCKET)
print('\n## Creating search engine...')
try:
create_engine(PROJECT_ID, LOCATION, DATASTORE_ID, ENGINE_ID)
except Exception as e:
print(e)
print('\nDone.')