connectors/cli/index.py (61 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # import asyncio from elasticsearch import ApiError from connectors.es.cli_client import CLIClient from connectors.protocol import ( CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX, ConnectorIndex, ) class Index: def __init__(self, config): self.elastic_config = config self.cli_client = CLIClient(self.elastic_config) self.connectors_index = ConnectorIndex(self.elastic_config) def list_indices(self): return asyncio.run(self.__list_indices()) def clean(self, index_name): return asyncio.run(self.__clean_index(index_name)) def delete(self, index_name): return asyncio.run(self.__delete_index(index_name)) def index_or_connector_exists(self, index_name): return asyncio.run(self.__index_or_connector_exists(index_name)) async def __close(self): await self.cli_client.close() await self.connectors_index.close() async def __list_indices(self): try: return await self.cli_client.list_indices() except ApiError as e: # If the API is not available, we are in serverless mode if e.error == "api_not_available_exception": return await self.cli_client.list_indices_serverless() raise e finally: await self.__close() async def __clean_index(self, index_name): try: return await self.cli_client.clean_index(index_name) except ApiError: return False finally: await self.__close() async def __delete_index(self, index_name): try: await self.cli_client.delete_indices([index_name]) return True except ApiError: return False finally: await self.__close() async def __index_or_connector_exists(self, index_name): try: await self.cli_client.ensure_exists( indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] ) index_exists, connector_doc = await asyncio.gather( self.cli_client.index_exists(index_name), self.connectors_index.get_connector_by_index(index_name), ) connector_exists = False if connector_doc is None else True return index_exists, connector_exists finally: await self.__close()