def create_async()

in esrally/client/factory.py [0:0]


    def create_async(self, api_key=None, client_id=None):
        # pylint: disable=import-outside-toplevel
        import io

        import aiohttp
        from elasticsearch.serializer import JSONSerializer

        from esrally.client.asynchronous import (
            RallyAsyncElasticsearch,
            RallyAsyncTransport,
        )

        class LazyJSONSerializer(JSONSerializer):
            def loads(self, data):
                meta = RallyAsyncElasticsearch.request_context.get()
                if "raw_response" in meta:
                    return io.BytesIO(data)
                else:
                    return super().loads(data)

        async def on_request_start(session, trace_config_ctx, params):
            RallyAsyncElasticsearch.on_request_start()

        async def on_request_end(session, trace_config_ctx, params):
            RallyAsyncElasticsearch.on_request_end()

        trace_config = aiohttp.TraceConfig()
        trace_config.on_request_start.append(on_request_start)
        # It is tempting to register this callback on `TraceConfig.on_request_end()`. However, aiohttp will call
        # `TraceConfig.on_request_end()` when the *first* chunk of the response has been received. However, this can
        # skew service time significantly if the response is large *and* it is streamed by Elasticsearch
        # (see ChunkedToXContent in the Elasticsearch code base).
        #
        # Therefore, we register for `TraceConfig.on_response_chunk_received()` which is called multiple times. As
        # Rally's implementation of the `on_request_end` callback handler updates the timestamp on every call, Rally
        # will ultimately record the time when it received the *last* chunk. This is what we want because any code
        # that is using the Elasticsearch client library can only act on the response once it is fully received.
        #
        # We also keep registering for `TraceConfig.on_request_end()` instead of relying on
        # `TraceConfig.on_response_chunk_received()` only to handle corner cases during client timeout when aiohttp does
        # not call request exception handler, but does call request end handler. See
        # https://github.com/elastic/rally/issues/1860 for details.
        trace_config.on_response_chunk_received.append(on_request_end)
        trace_config.on_request_end.append(on_request_end)
        # ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
        trace_config.on_request_exception.append(on_request_end)

        # override the builtin JSON serializer
        self.client_options["serializer"] = LazyJSONSerializer()

        if api_key is not None:
            self.client_options.pop("http_auth", None)
            self.client_options.pop("basic_auth", None)
            self.client_options["api_key"] = api_key

        async_client = RallyAsyncElasticsearch(
            distribution_version=self.distribution_version,
            distribution_flavor=self.distribution_flavor,
            hosts=self.hosts,
            transport_class=RallyAsyncTransport,
            ssl_context=self.ssl_context,
            maxsize=self.max_connections,
            **self.client_options,
        )

        # the AsyncElasticsearch constructor automatically creates the corresponding NodeConfig objects, so we set
        # their instance attributes after they've been instantiated
        for node_connection in async_client.transport.node_pool.all():
            node_connection.trace_configs = [trace_config]
            node_connection.enable_cleanup_closed = self.enable_cleanup_closed
            node_connection.static_responses = self.static_responses
            node_connection.client_id = client_id

        return async_client