example-apps/relevance-workbench/data/index-data.py (55 lines of code) (raw):
from elasticsearch import Elasticsearch, helpers
import argparse, os, json
import gzip
from tqdm import tqdm
parser = argparse.ArgumentParser()
# required args
parser.add_argument(
"--data_folder", dest="data_folder", required=False, default="./data"
)
parser.add_argument("--es_user", dest="es_user", required=False, default="elastic")
parser.add_argument("--es_password", dest="es_password", required=True)
parser.add_argument("--cloud_id", dest="cloud_id", required=True)
parser.add_argument(
"--index_name", dest="index_name", required=False, default="search-movies"
)
parser.add_argument(
"--gzip_file", dest="gzip_file", required=False, default="movies-sample.json.gz"
)
args = parser.parse_args()
def data_generator(file_json, index, pipeline):
for doc in file_json:
doc["_run_ml_inference"] = True
yield {
"_index": index,
"pipeline": pipeline,
"_source": doc,
}
print("Init Elasticsearch client")
es = Elasticsearch(
cloud_id=args.cloud_id,
basic_auth=(args.es_user, args.es_password),
request_timeout=600,
)
print("Indexing movies data, this might take a while...")
file = gzip.open(args.gzip_file, "r")
json_bytes = file.read()
json_str = json_bytes.decode("utf-8")
file_json = json.loads(json_str)
total_documents = len(file_json)
progress_bar = tqdm(total=total_documents, unit="documents")
success_count = 0
for ok, info in helpers.streaming_bulk(
client=es,
actions=data_generator(file_json, args.index_name, args.index_name),
raise_on_error=False,
):
if ok:
success_count += 1
else:
print(f"Unable to index {info['index']['_id']}: {info['index']['error']}")
progress_bar.update(1)
progress_bar.set_postfix(success=success_count)
progress_bar.close()
# Calculate the success percentage
success_percentage = (success_count / total_documents) * 100
print(f"Indexing completed! Success percentage: {success_percentage}%")
print("Done indexing movies data")