elser-ingest-speedtest/track.py (152 lines of code) (raw):

import asyncio from elasticsearch import BadRequestError, NotFoundError class ParamSource: def __init__(self, track, params): # choose a suitable index: if there is only one defined for this track # choose that one, but let the user always override index if len(track.indices) == 1: default_index = track.indices[0].name else: default_index = "_all" self._index_name = params.get("index", default_index) self._cache = params.get("cache", False) self._size = params.get("size", 10) self._field = params.get("field", "ml.tokens") self._num_terms = params.get("num-terms", 10) self._track_total_hits = params.get("track_total_hits", False) self._params = params async def get_xpack_capabilities(es): print() print("=" * 50) print(await es.perform_request(method="GET", path="/_xpack")) print("=" * 50) elser_v1_model_id = ".elser_model_1" elser_v2_model_id = ".elser_model_2" elser_v2_platform_specific_model_id = ".elser_model_2_linux-x86_64" # TODO enable this function once rally upgrades the elasticsearch python client to >=8.9.0 # async def put_elser(es, params): # try: # await es.ml.put_trained_model(model_id=elser_v1_model_id, input={"field_names": "text_field"}) # return True # except BadRequestError as bre: # if ( # bre.body["error"]["root_cause"][0]["reason"] # == f"Cannot create model [{model_id}] the id is the same as an current model deployment" # or bre.body["error"]["root_cause"][0]["reason"] # == f"Trained machine learning model [{model_id}] already exists" # ): # return True # else: # print(bre) # return False # except Exception as e: # print(e) # return False async def put_elser(es, params): model_id = params["model_id"] try: await es.perform_request(method="PUT", path=f"/_ml/trained_models/{model_id}", body={"input": {"field_names": ["text_field"]}}) return True except BadRequestError as bre: try: if model_already_downloaded(bre, model_id): return True else: print(bre) return False except Exception as e: print(e) return False except Exception as e: print(e) return False async def delete_elser(es, params): model_id = params["model_id"] try: await es.perform_request(method="DELETE", path=f"/_ml/trained_models/{model_id}", params={"force": "true"}) return True except BadRequestError as bre: try: if model_already_downloaded(bre, model_id): return True else: print(bre) return False except Exception as e: print(e) return False except Exception as e: print(e) return False async def poll_for_elser_completion(es, params): model_id = params["model_id"] try_count = 0 max_wait_time_seconds = 120 wait_time_per_cycle_seconds = 5 while wait_time_per_cycle_seconds * try_count < max_wait_time_seconds: try: response = await es.ml.get_trained_models(model_id=model_id, include="definition_status") if is_model_fully_defined(response): return True except NotFoundError: print("\nwaiting... try count:", try_count, end="") await asyncio.sleep(wait_time_per_cycle_seconds) try_count += 1 print() return False def is_model_fully_defined(response): return response["trained_model_configs"][0]["fully_defined"] async def stop_trained_model_deployment(es, params): model_id = params["model_id"] try: await es.ml.stop_trained_model_deployment(model_id=model_id, force=True) return True except BadRequestError as bre: try: if model_deployment_already_exists(bre, model_id): return True else: print(bre) return False except Exception as e: print(e) return False async def start_trained_model_deployment(es, params): number_of_allocations = params["number_of_allocations"] threads_per_allocation = params["threads_per_allocation"] queue_capacity = params["queue_capacity"] model_id = params["model_id"] try: await es.ml.start_trained_model_deployment( model_id=model_id, wait_for="fully_allocated", number_of_allocations=number_of_allocations, threads_per_allocation=threads_per_allocation, queue_capacity=queue_capacity, cache_size="0b", ) return True except BadRequestError as bre: try: if model_deployment_already_exists(bre, model_id): return True else: print(bre) return False except Exception as e: print(e) return False except Exception as e: print("Exception", e) return False def model_deployment_already_exists(bad_request_error, model_id): try: return ( bad_request_error.body["error"]["root_cause"][0]["reason"] == f"Could not start model deployment because an existing deployment with the same id [{model_id}] exist" ) except Exception as e: print(e) return False def model_already_downloaded(bre, model_id): try: return ( bre.body["error"]["root_cause"][0]["reason"] == f"Cannot create model [{model_id}] the id is the same as an current model deployment" or bre.body["error"]["root_cause"][0]["reason"] == f"Trained machine learning model [{model_id}] already exists" ) except Exception as e: print(e) return False async def create_elser_model(es, params): await get_xpack_capabilities(es) if await put_elser(es, params) == False: return False if await poll_for_elser_completion(es, params) == False: return False return True def register(registry): registry.register_param_source("param-source", ParamSource) registry.register_runner("put-elser", create_elser_model, async_runner=True) registry.register_runner("delete-elser", delete_elser, async_runner=True) registry.register_runner("stop-trained-model-deployment", stop_trained_model_deployment, async_runner=True) registry.register_runner("start-trained-model-deployment", start_trained_model_deployment, async_runner=True)