in esrally/metrics.py [0:0]
def guarded(self, target, *args, **kwargs):
# pylint: disable=import-outside-toplevel
import elasticsearch
import elasticsearch.helpers
from elastic_transport import ApiError, TransportError
max_execution_count = 10
execution_count = 0
while execution_count <= max_execution_count:
time_to_sleep = 2**execution_count + random.random()
execution_count += 1
try:
return target(*args, **kwargs)
except elasticsearch.exceptions.ConnectionTimeout as e:
if execution_count <= max_execution_count:
self.logger.debug(
"Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
e.message,
execution_count,
max_execution_count,
time_to_sleep,
)
time.sleep(time_to_sleep)
else:
operation = target.__name__
self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count)
node = self._client.transport.node_pool.get()
msg = (
"A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on "
"host [%s] at port [%s]." % (operation, node.host, node.port)
)
raise exceptions.RallyError(msg)
except elasticsearch.exceptions.ConnectionError as e:
if execution_count <= max_execution_count:
self.logger.debug(
"Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
e.message,
execution_count,
max_execution_count,
time_to_sleep,
)
time.sleep(time_to_sleep)
else:
node = self._client.transport.node_pool.get()
msg = (
"Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]"
" or fix the configuration in [%s]." % (node.host, node.port, config.ConfigFile().location)
)
self.logger.exception(msg)
# connection errors doesn't neccessarily mean it's during setup
raise exceptions.RallyError(msg)
except elasticsearch.exceptions.AuthenticationException:
# we know that it is just one host (see EsClientFactory)
node = self._client.transport.node_pool.get()
msg = (
"The configured user could not authenticate against your Elasticsearch metrics store running on host [%s] at "
"port [%s] (wrong password?). Please fix the configuration in [%s]."
% (node.host, node.port, config.ConfigFile().location)
)
self.logger.exception(msg)
raise exceptions.SystemSetupError(msg)
except elasticsearch.exceptions.AuthorizationException:
node = self._client.transport.node_pool.get()
msg = (
"The configured user does not have enough privileges to run the operation [%s] against your Elasticsearch metrics "
"store running on host [%s] at port [%s]. Please adjust your x-pack configuration or specify a user with enough "
"privileges in the configuration in [%s]." % (target.__name__, node.host, node.port, config.ConfigFile().location)
)
self.logger.exception(msg)
raise exceptions.SystemSetupError(msg)
except elasticsearch.helpers.BulkIndexError as e:
for err in e.errors:
err_type = err.get("index", {}).get("error", {}).get("type", None)
if err.get("index", {}).get("status", None) not in self.retryable_status_codes:
msg = f"Unretryable error encountered when sending metrics to remote metrics store: [{err_type}]"
self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
raise exceptions.RallyError(msg)
if execution_count <= max_execution_count:
self.logger.debug(
"Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
e,
execution_count,
max_execution_count,
time_to_sleep,
)
time.sleep(time_to_sleep)
else:
msg = f"Failed to send metrics to remote metrics store: [{e.errors}]"
self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
raise exceptions.RallyError(msg)
except ApiError as e:
if e.status_code in self.retryable_status_codes and execution_count <= max_execution_count:
self.logger.debug(
"%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.",
e.error,
e.status_code,
execution_count,
max_execution_count,
time_to_sleep,
)
time.sleep(time_to_sleep)
else:
node = self._client.transport.node_pool.get()
msg = (
"An error [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] "
"at port [%s]." % (e.error, target.__name__, node.host, node.port)
)
self.logger.exception(msg)
# this does not necessarily mean it's a system setup problem...
raise exceptions.RallyError(msg)
except TransportError as e:
node = self._client.transport.node_pool.get()
if e.errors:
err = e.errors
else:
err = e
msg = (
"Transport error(s) [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on "
"host [%s] at port [%s]." % (err, target.__name__, node.host, node.port)
)
self.logger.exception(msg)
# this does not necessarily mean it's a system setup problem...
raise exceptions.RallyError(msg)