in esrally/client/factory.py [0:0]
def wait_for_rest_layer(es, max_attempts=40):
"""
Waits for ``max_attempts`` until Elasticsearch's REST API is available.
:param es: Elasticsearch client to use for connecting.
:param max_attempts: The maximum number of attempts to check whether the REST API is available.
:return: True iff Elasticsearch's REST API is available.
"""
# assume that at least the hosts that we expect to contact should be available. Note that this is not 100%
# bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts
# but this is still better than just checking for any random node's REST API being reachable.
expected_node_count = len(es.transport.node_pool)
logger = logging.getLogger(__name__)
attempt = 0
while attempt <= max_attempts:
attempt += 1
# pylint: disable=import-outside-toplevel
from elastic_transport import (
ApiError,
ConnectionError,
SerializationError,
TlsError,
TransportError,
)
try:
# see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also
# available when the cluster status is RED (as long as all required nodes are present)
es.cluster.health(wait_for_nodes=f">={expected_node_count}")
logger.debug("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt)
return True
except SerializationError as e:
if "Client sent an HTTP request to an HTTPS server" in str(e):
raise exceptions.SystemSetupError(
"Rally sent an HTTP request to an HTTPS server. Are you sure this is an HTTP endpoint?", e
)
if attempt <= max_attempts:
logger.debug("Got serialization error [%s] on attempt [%s]. Sleeping...", e, attempt)
time.sleep(3)
else:
raise
except TlsError as e:
raise exceptions.SystemSetupError("Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", e)
except ConnectionError as e:
if "ProtocolError" in str(e):
raise exceptions.SystemSetupError(
"Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", e
)
if attempt <= max_attempts:
logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
else:
raise
except TransportError as e:
if attempt <= max_attempts:
logger.debug("Got transport error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
else:
raise
except ApiError as e:
# cluster block, x-pack not initialized yet, our wait condition is not reached
if e.status_code in (503, 401, 408) and attempt <= max_attempts:
logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.message, attempt)
time.sleep(3)
else:
logger.warning("Got unexpected status code [%s] on attempt [%s].", e.message, attempt)
raise
return False