in src/main/java/com/uber/rss/decoders/StreamServerVersionDecoder.java [64:110]
private void addVersionDecoder(ChannelHandlerContext ctx, byte type, byte version) {
ByteToMessageDecoder newDecoder;
String decoderName = "decoder";
ChannelInboundHandlerAdapter newHandler;
String handlerName = "handler";
if (type == MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE && version == MessageConstants.UPLOAD_UPLINK_VERSION_3) {
ByteBuf shuffleDataBuffer = ctx.alloc().buffer(MessageConstants.DEFAULT_SHUFFLE_DATA_MESSAGE_SIZE);
newDecoder = new StreamServerMessageDecoder(shuffleDataBuffer);
UploadChannelInboundHandler channelInboundHandler = new UploadChannelInboundHandler(
serverId, runningVersion, idleTimeoutMillis, executor, channelManager);
channelInboundHandler.processChannelActive(ctx);
newHandler = channelInboundHandler;
} else if (type == MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE &&
version == MessageConstants.DOWNLOAD_UPLINK_VERSION_3) {
newDecoder = new StreamServerMessageDecoder(null);
DownloadChannelInboundHandler channelInboundHandler = new DownloadChannelInboundHandler(
serverId, runningVersion, idleTimeoutMillis, executor);
channelInboundHandler.processChannelActive(ctx);
newHandler = channelInboundHandler;
} else if (type == MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE &&
version == MessageConstants.NOTIFY_UPLINK_VERSION_3) {
newDecoder = new StreamServerMessageDecoder(null);
NotifyChannelInboundHandler channelInboundHandler = new NotifyChannelInboundHandler(serverId);
channelInboundHandler.processChannelActive(ctx);
newHandler = channelInboundHandler;
} else if (type == MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE &&
version == MessageConstants.REGISTRY_UPLINK_VERSION_3) {
newDecoder = new StreamServerMessageDecoder(null);
RegistryChannelInboundHandler channelInboundHandler = new RegistryChannelInboundHandler(
serverDetailCollection, serverId);
channelInboundHandler.processChannelActive(ctx);
newHandler = channelInboundHandler;
} else {
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.error(String.format(
"Invalid upload version %d for link type %s from client %s",
version, type, clientInfo));
ctx.close();
logger.info(String.format("Closed connection to client %s", clientInfo));
return;
}
logger.debug(String.format("Using version %d protocol for client %s",
version, NettyUtils.getServerConnectionInfo(ctx)));
ctx.pipeline().replace(this, decoderName, newDecoder);
ctx.pipeline().addAfter(decoderName, handlerName, newHandler);
}