in ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java [320:359]
private ChannelInboundHandler getClientHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof DataStreamReply)) {
LOG.error("{}: unexpected message {}", name, msg.getClass());
return;
}
final DataStreamReply reply = (DataStreamReply) msg;
LOG.debug("{}: read {}", name, reply);
final ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(
reply.getClientId(), reply.getStreamId());
final NettyClientReplies.ReplyMap replyMap = replies.getReplyMap(clientInvocationId);
if (replyMap == null) {
LOG.error("{}: {} replyMap not found for reply: {}", name, clientInvocationId, reply);
return;
}
try {
replyMap.receiveReply(reply);
} catch (Throwable cause) {
LOG.warn(name + ": channelRead error:", cause);
replyMap.completeExceptionally(cause);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.warn(name + ": exceptionCaught", cause);
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
connection.scheduleReconnect("channel is inactive", null);
}
};
}