private void addVersionDecoder()

in src/main/java/com/uber/rss/decoders/StreamServerVersionDecoder.java [64:110]


    private void addVersionDecoder(ChannelHandlerContext ctx, byte type, byte version) {
        ByteToMessageDecoder newDecoder;
        String decoderName = "decoder";
        ChannelInboundHandlerAdapter newHandler;
        String handlerName = "handler";

        if (type == MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE && version == MessageConstants.UPLOAD_UPLINK_VERSION_3) {
            ByteBuf shuffleDataBuffer = ctx.alloc().buffer(MessageConstants.DEFAULT_SHUFFLE_DATA_MESSAGE_SIZE);
            newDecoder = new StreamServerMessageDecoder(shuffleDataBuffer);
            UploadChannelInboundHandler channelInboundHandler = new UploadChannelInboundHandler(
                    serverId, runningVersion, idleTimeoutMillis, executor, channelManager);
            channelInboundHandler.processChannelActive(ctx);
            newHandler = channelInboundHandler;
        } else if (type == MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE &&
                version == MessageConstants.DOWNLOAD_UPLINK_VERSION_3) {
            newDecoder = new StreamServerMessageDecoder(null);
            DownloadChannelInboundHandler channelInboundHandler = new DownloadChannelInboundHandler(
                serverId, runningVersion, idleTimeoutMillis, executor);
            channelInboundHandler.processChannelActive(ctx);
            newHandler = channelInboundHandler;
        } else if (type == MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE &&
                version == MessageConstants.NOTIFY_UPLINK_VERSION_3) {
            newDecoder = new StreamServerMessageDecoder(null);
            NotifyChannelInboundHandler channelInboundHandler = new NotifyChannelInboundHandler(serverId);
            channelInboundHandler.processChannelActive(ctx);
            newHandler = channelInboundHandler;
        } else if (type == MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE &&
                version == MessageConstants.REGISTRY_UPLINK_VERSION_3) {
            newDecoder = new StreamServerMessageDecoder(null);
            RegistryChannelInboundHandler channelInboundHandler = new RegistryChannelInboundHandler(
                                                                        serverDetailCollection, serverId);
            channelInboundHandler.processChannelActive(ctx);
            newHandler = channelInboundHandler;
        } else {
            String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
            logger.error(String.format(
                    "Invalid upload version %d for link type %s from client %s",
                    version, type, clientInfo));
            ctx.close();
            logger.info(String.format("Closed connection to client %s", clientInfo));
            return;
        }
        logger.debug(String.format("Using version %d protocol for client %s",
                version, NettyUtils.getServerConnectionInfo(ctx)));
        ctx.pipeline().replace(this, decoderName, newDecoder);
        ctx.pipeline().addAfter(decoderName, handlerName, newHandler);
    }