private void configureModernPipeline()

in src/java/org/apache/cassandra/transport/SimpleClient.java [436:562]


        private void configureModernPipeline(ChannelHandlerContext ctx, Envelope response, int largeMessageThreshold)
        {
            logger.info("Configuring modern pipeline");
            ChannelPipeline pipeline = ctx.pipeline();
            pipeline.remove(HandlerNames.ENVELOPE_DECODER);
            pipeline.remove(HandlerNames.MESSAGE_DECODER);
            pipeline.remove(HandlerNames.MESSAGE_ENCODER);
            pipeline.remove(HandlerNames.RESPONSE_HANDLER);

            BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
            Channel channel = ctx.channel();
            channel.config().setOption(ChannelOption.ALLOCATOR, allocator);
            int queueCapacity = 1 << 20;  // 1MiB

            Envelope.Decoder envelopeDecoder = new Envelope.Decoder();
            Message.Decoder<Message.Response> messageDecoder = Message.responseDecoder();
            FrameDecoder frameDecoder = frameDecoder(ctx, allocator);
            FrameEncoder frameEncoder = frameEncoder(ctx);
            FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();

            CQLMessageHandler.MessageConsumer<Message.Response> responseConsumer = new CQLMessageHandler.MessageConsumer<Message.Response>()
            {
                public void dispatch(Channel channel, Message.Response message, Dispatcher.FlushItemConverter toFlushItem, Overload backpressure)
                {
                    responseHandler.handleResponse(channel, message);
                }

                public boolean hasQueueCapacity()
                {
                    return true;
                }
            };

            CQLMessageHandler.ErrorHandler errorHandler = (error) -> {
                throw new RuntimeException("Unexpected error", error);
            };

            ClientResourceLimits.ResourceProvider resources = new ClientResourceLimits.ResourceProvider()
            {
                final ResourceLimits.Limit endpointReserve = new ResourceLimits.Basic(1024 * 1024 * 64);
                final AbstractMessageHandler.WaitQueue endpointQueue = AbstractMessageHandler.WaitQueue.endpoint(endpointReserve);

                final ResourceLimits.Limit globalReserve = new ResourceLimits.Basic(1024 * 1024 * 64);
                final AbstractMessageHandler.WaitQueue globalQueue = AbstractMessageHandler.WaitQueue.global(endpointReserve);

                public ResourceLimits.Limit globalLimit()
                {
                    return globalReserve;
                }

                public AbstractMessageHandler.WaitQueue globalWaitQueue()
                {
                    return globalQueue;
                }

                public ResourceLimits.Limit endpointLimit()
                {
                    return endpointReserve;
                }

                public AbstractMessageHandler.WaitQueue endpointWaitQueue()
                {
                    return endpointQueue;
                }

                @Override
                public NonBlockingRateLimiter requestRateLimiter()
                {
                    return NO_OP_LIMITER;
                }

                public void release()
                {
                }
            };

            CQLMessageHandler<Message.Response> processor =
                new CQLMessageHandler<Message.Response>(ctx.channel(),
                                        null,
                                        version,
                                        frameDecoder,
                                        envelopeDecoder,
                                        messageDecoder,
                                        responseConsumer,
                                        payloadAllocator,
                                        queueCapacity,
                                        QueueBackpressure.NO_OP,
                                        resources,
                                        handler -> {},
                                        errorHandler,
                                        ctx.channel().attr(Connection.attributeKey).get().isThrowOnOverload())
                {
                    protected boolean processRequest(Envelope request, Overload overload)
                    {
                        boolean continueProcessing = super.processRequest(request, overload);
                        releaseCapacity(Ints.checkedCast(request.header.bodySizeInBytes));
                        return continueProcessing;
                    }
                };

            pipeline.addLast(HandlerNames.FRAME_DECODER, frameDecoder);
            pipeline.addLast(HandlerNames.FRAME_ENCODER, frameEncoder);
            pipeline.addLast(HandlerNames.PROCESSOR, processor);
            pipeline.addLast(HandlerNames.MESSAGE_ENCODER, new ChannelOutboundHandlerAdapter() {

                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
                {
                    if (!(msg instanceof List))
                    {
                        ctx.write(msg, promise);
                        return;
                    }
                    Connection connection = ctx.channel().attr(Connection.attributeKey).get();
                    // The only case the connection can be null is when we send the initial STARTUP message (client side thus)
                    ProtocolVersion version = connection == null ? ProtocolVersion.CURRENT : connection.getVersion();
                    SimpleFlusher flusher = new SimpleFlusher(frameEncoder, largeMessageThreshold);
                    for (Message message : (List<Message>) msg)
                        flusher.enqueue(message.encode(version));

                    flusher.maybeWrite(ctx, promise);
                }
            });
            pipeline.remove(this);

            Message.Response message = messageDecoder.decode(ctx.channel(), response);
            responseConsumer.dispatch(channel, message, (ch, req, resp) -> null, Overload.NONE);
        }