in esrally/client/factory.py [0:0]
def create_async(self, api_key=None, client_id=None):
# pylint: disable=import-outside-toplevel
import io
import aiohttp
from elasticsearch.serializer import JSONSerializer
from esrally.client.asynchronous import (
RallyAsyncElasticsearch,
RallyAsyncTransport,
)
class LazyJSONSerializer(JSONSerializer):
def loads(self, data):
meta = RallyAsyncElasticsearch.request_context.get()
if "raw_response" in meta:
return io.BytesIO(data)
else:
return super().loads(data)
async def on_request_start(session, trace_config_ctx, params):
RallyAsyncElasticsearch.on_request_start()
async def on_request_end(session, trace_config_ctx, params):
RallyAsyncElasticsearch.on_request_end()
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
# It is tempting to register this callback on `TraceConfig.on_request_end()`. However, aiohttp will call
# `TraceConfig.on_request_end()` when the *first* chunk of the response has been received. However, this can
# skew service time significantly if the response is large *and* it is streamed by Elasticsearch
# (see ChunkedToXContent in the Elasticsearch code base).
#
# Therefore, we register for `TraceConfig.on_response_chunk_received()` which is called multiple times. As
# Rally's implementation of the `on_request_end` callback handler updates the timestamp on every call, Rally
# will ultimately record the time when it received the *last* chunk. This is what we want because any code
# that is using the Elasticsearch client library can only act on the response once it is fully received.
#
# We also keep registering for `TraceConfig.on_request_end()` instead of relying on
# `TraceConfig.on_response_chunk_received()` only to handle corner cases during client timeout when aiohttp does
# not call request exception handler, but does call request end handler. See
# https://github.com/elastic/rally/issues/1860 for details.
trace_config.on_response_chunk_received.append(on_request_end)
trace_config.on_request_end.append(on_request_end)
# ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
trace_config.on_request_exception.append(on_request_end)
# override the builtin JSON serializer
self.client_options["serializer"] = LazyJSONSerializer()
if api_key is not None:
self.client_options.pop("http_auth", None)
self.client_options.pop("basic_auth", None)
self.client_options["api_key"] = api_key
async_client = RallyAsyncElasticsearch(
distribution_version=self.distribution_version,
distribution_flavor=self.distribution_flavor,
hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
maxsize=self.max_connections,
**self.client_options,
)
# the AsyncElasticsearch constructor automatically creates the corresponding NodeConfig objects, so we set
# their instance attributes after they've been instantiated
for node_connection in async_client.transport.node_pool.all():
node_connection.trace_configs = [trace_config]
node_connection.enable_cleanup_closed = self.enable_cleanup_closed
node_connection.static_responses = self.static_responses
node_connection.client_id = client_id
return async_client