in src/java/org/apache/cassandra/transport/SimpleClient.java [436:562]
private void configureModernPipeline(ChannelHandlerContext ctx, Envelope response, int largeMessageThreshold)
{
logger.info("Configuring modern pipeline");
ChannelPipeline pipeline = ctx.pipeline();
pipeline.remove(HandlerNames.ENVELOPE_DECODER);
pipeline.remove(HandlerNames.MESSAGE_DECODER);
pipeline.remove(HandlerNames.MESSAGE_ENCODER);
pipeline.remove(HandlerNames.RESPONSE_HANDLER);
BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
Channel channel = ctx.channel();
channel.config().setOption(ChannelOption.ALLOCATOR, allocator);
int queueCapacity = 1 << 20; // 1MiB
Envelope.Decoder envelopeDecoder = new Envelope.Decoder();
Message.Decoder<Message.Response> messageDecoder = Message.responseDecoder();
FrameDecoder frameDecoder = frameDecoder(ctx, allocator);
FrameEncoder frameEncoder = frameEncoder(ctx);
FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();
CQLMessageHandler.MessageConsumer<Message.Response> responseConsumer = new CQLMessageHandler.MessageConsumer<Message.Response>()
{
public void dispatch(Channel channel, Message.Response message, Dispatcher.FlushItemConverter toFlushItem, Overload backpressure)
{
responseHandler.handleResponse(channel, message);
}
public boolean hasQueueCapacity()
{
return true;
}
};
CQLMessageHandler.ErrorHandler errorHandler = (error) -> {
throw new RuntimeException("Unexpected error", error);
};
ClientResourceLimits.ResourceProvider resources = new ClientResourceLimits.ResourceProvider()
{
final ResourceLimits.Limit endpointReserve = new ResourceLimits.Basic(1024 * 1024 * 64);
final AbstractMessageHandler.WaitQueue endpointQueue = AbstractMessageHandler.WaitQueue.endpoint(endpointReserve);
final ResourceLimits.Limit globalReserve = new ResourceLimits.Basic(1024 * 1024 * 64);
final AbstractMessageHandler.WaitQueue globalQueue = AbstractMessageHandler.WaitQueue.global(endpointReserve);
public ResourceLimits.Limit globalLimit()
{
return globalReserve;
}
public AbstractMessageHandler.WaitQueue globalWaitQueue()
{
return globalQueue;
}
public ResourceLimits.Limit endpointLimit()
{
return endpointReserve;
}
public AbstractMessageHandler.WaitQueue endpointWaitQueue()
{
return endpointQueue;
}
@Override
public NonBlockingRateLimiter requestRateLimiter()
{
return NO_OP_LIMITER;
}
public void release()
{
}
};
CQLMessageHandler<Message.Response> processor =
new CQLMessageHandler<Message.Response>(ctx.channel(),
null,
version,
frameDecoder,
envelopeDecoder,
messageDecoder,
responseConsumer,
payloadAllocator,
queueCapacity,
QueueBackpressure.NO_OP,
resources,
handler -> {},
errorHandler,
ctx.channel().attr(Connection.attributeKey).get().isThrowOnOverload())
{
protected boolean processRequest(Envelope request, Overload overload)
{
boolean continueProcessing = super.processRequest(request, overload);
releaseCapacity(Ints.checkedCast(request.header.bodySizeInBytes));
return continueProcessing;
}
};
pipeline.addLast(HandlerNames.FRAME_DECODER, frameDecoder);
pipeline.addLast(HandlerNames.FRAME_ENCODER, frameEncoder);
pipeline.addLast(HandlerNames.PROCESSOR, processor);
pipeline.addLast(HandlerNames.MESSAGE_ENCODER, new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
if (!(msg instanceof List))
{
ctx.write(msg, promise);
return;
}
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
// The only case the connection can be null is when we send the initial STARTUP message (client side thus)
ProtocolVersion version = connection == null ? ProtocolVersion.CURRENT : connection.getVersion();
SimpleFlusher flusher = new SimpleFlusher(frameEncoder, largeMessageThreshold);
for (Message message : (List<Message>) msg)
flusher.enqueue(message.encode(version));
flusher.maybeWrite(ctx, promise);
}
});
pipeline.remove(this);
Message.Response message = messageDecoder.decode(ctx.channel(), response);
responseConsumer.dispatch(channel, message, (ch, req, resp) -> null, Overload.NONE);
}