in experiments/google/cloud/ml/applied/embeddings/search.py [0:0]
def insert_dp(dp_id: str, emb: list[float], cat=[]):
print(
"Inserting data point id "
+ dp_id
+ " into vector search index "
+ str(search_index_id)
)
try:
if cat:
insert_datapoints_payload = aiplatform_v1.IndexDatapoint(
datapoint_id=dp_id,
feature_vector=emb,
restricts=[{"namespace": "L0", "allow_list": cat}],
)
else:
insert_datapoints_payload = aiplatform_v1.IndexDatapoint(
datapoint_id=dp_id, feature_vector=emb
)
upsert_request = aiplatform_v1.UpsertDatapointsRequest(
index=search_index_id, datapoints=[insert_datapoints_payload]
)
res = index_client.upsert_datapoints(request=upsert_request)
print(
res
) # If successful, the response body is empty [https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.indexes/upsertDatapoints].
except Exception as e:
print("An error occurred:", e)
print("Unable to insert into vector search index")