scripts/verify.py (64 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
# ruff: noqa: T201
import asyncio
import os
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from elasticsearch import AsyncElasticsearch
from connectors.config import load_config
DEFAULT_CONFIG = os.path.join(os.path.dirname(__file__), "..", "config.yml")
async def verify(service_type, index_name, size, config):
config = config["elasticsearch"]
host = config["host"]
auth = config["username"], config["password"]
client = AsyncElasticsearch(hosts=[host], basic_auth=auth, request_timeout=120)
client_info = await client.info()
client_version = client_info.get("version", {}) or {}
if not client_version.get("build_flavor") == "serverless":
await client.indices.refresh(index=index_name)
try:
print(f"Verifying {index_name}...")
resp = await client.count(index=index_name)
count = resp["count"]
print(f"Found {count} documents")
if count < size:
raise Exception(f"We want {size} docs")
# checking one doc
res = await client.search(index=index_name, query={"match_all": {}})
first_doc = res["hits"]["hits"][0]["_source"]
print("First doc")
print(first_doc)
if len(first_doc.keys()) < 3:
raise Exception("The doc does not look right")
if "_extract_binary_content" in first_doc:
raise Exception("The pipeline did not run")
if "_attachment" in first_doc:
raise Exception("Content extraction was not successful")
print("🤗")
finally:
await client.close()
def _parser():
parser = ArgumentParser(
prog="verify", formatter_class=ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--config-file", type=str, help="Configuration file", default=DEFAULT_CONFIG
)
parser.add_argument(
"--service-type", type=str, help="Service type", default="mongodb"
)
parser.add_argument(
"--index-name", type=str, help="Elasticsearch index", default="search-mongo"
)
parser.add_argument("--size", type=int, help="How many docs", default=10001)
return parser
def main(args=None):
parser = _parser()
args = parser.parse_args(args=args)
config_file = args.config_file
if not os.path.exists(config_file):
raise IOError(f"{config_file} does not exist")
config = load_config(config_file)
try:
asyncio.run(verify(args.service_type, args.index_name, args.size, config))
print("Bye")
except (asyncio.CancelledError, KeyboardInterrupt):
print("Bye")
if __name__ == "__main__":
main()