in connectors/es/client.py [0:0]
def __init__(self, config):
self.serverless = config.get("serverless", False)
self.config = config
self.configured_host = config.get("host", "http://localhost:9200")
self.host = url_to_node_config(
self.configured_host,
use_default_ports_for_scheme=True,
)
self._sleeps = CancellableSleeps()
self._retrier = TransientElasticsearchRetrier(
logger,
config.get("max_retries", DEFAULT_ELASTICSEARCH_MAX_RETRIES),
config.get("retry_interval", DEFAULT_ELASTICSEARCH_RETRY_INTERVAL),
)
options = {
"hosts": [self.host],
"request_timeout": config.get("request_timeout", 120),
"retry_on_timeout": config.get("retry_on_timeout", True),
}
logger.debug(f"Initial Elasticsearch node configuration is {self.host}")
if "api_key" in config:
logger.debug(f"Connecting with an API Key ({config['api_key'][:5]}...)")
options["api_key"] = config["api_key"]
if "username" in config or "password" in config:
msg = "configured API key will be used over configured basic auth"
if (
config.get("username") == "elastic"
and config.get("password") == "changeme"
):
logger.debug(
msg
) # don't cause a panic if it's just the default creds
else:
logger.warning(msg)
elif "username" in config:
auth = config["username"], config["password"]
options["basic_auth"] = auth
logger.debug(f"Connecting using Basic Auth (user: {config['username']})")
if config.get("ssl", False):
options["verify_certs"] = config.get("verify_certs", True)
if options["verify_certs"]:
if "ca_certs" in config:
ca_certs = config["ca_certs"]
logger.debug(f"Verifying cert with {ca_certs}")
options["ca_certs"] = ca_certs
else:
logger.debug("Verifying cert with system certificates")
level = config.get("log_level", "INFO").upper()
es_logger = logging.getLogger("elastic_transport.node")
set_extra_logger(
es_logger,
log_level=logging.getLevelName(level),
filebeat=logger.filebeat, # pyright: ignore
)
self.max_wait_duration = config.get("max_wait_duration", 60)
self.initial_backoff_duration = config.get("initial_backoff_duration", 5)
self.backoff_multiplier = config.get("backoff_multiplier", 2)
options["headers"] = config.get("headers", {})
options["headers"]["user-agent"] = self.__class__.user_agent
options["headers"]["X-elastic-product-origin"] = "connectors"
self.client = AsyncElasticsearch(**options)
self._keep_waiting = True