public void channelRead()

in src/main/java/com/uber/rss/handlers/UploadChannelInboundHandler.java [124:224]


    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (logger.isDebugEnabled() && !(msg instanceof ShuffleDataWrapper)) {
                logger.debug("Got incoming message: {}, {}", msg, connectionInfo);
            }

            if (idleCheck != null) {
                idleCheck.updateLastReadTime();
            }

            // Process other messages. We assume the header messages are already processed, thus some fields of this
            // class are already populated with proper values, e.g. user field.

            if (msg instanceof ConnectUploadRequest) {
                try {
                    uploadServerHandler.checkMaxConnections();
                } catch (RssMaxConnectionsException e) {
                    logger.info(
                        "Cannot handle new connection due to server capacity. Closing current connection: {}. {}",
                        connectionInfo, ExceptionUtils.getSimpleMessage(e));
                    M3Stats.addException(e, M3Stats.TAG_VALUE_SERVER_HANDLER);
                    ByteBuf buf = ctx.alloc().buffer(1);
                    buf.writeByte(MessageConstants.RESPONSE_STATUS_SERVER_BUSY);
                    ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
                    return;
                }

                ConnectUploadRequest connectUploadRequest = (ConnectUploadRequest)msg;
                appId = connectUploadRequest.getAppId();
                appAttempt = connectUploadRequest.getAppAttempt();

                try {
                    uploadServerHandler.checkAppMaxWriteBytes(appId);
                } catch (RssTooMuchDataException e) {
                    logger.info(
                        "Cannot handle new connection due to writing too much data from app (%s). " +
                                "Closing current connection: {}. {}", appId, connectionInfo,
                                ExceptionUtils.getSimpleMessage(e));
                    M3Stats.addException(e, M3Stats.TAG_VALUE_SERVER_HANDLER);
                    ByteBuf buf = ctx.alloc().buffer(1);
                    buf.writeByte(MessageConstants.RESPONSE_STATUS_APP_TOO_MUCH_DATA);
                    ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
                }

                uploadServerHandler.updateLiveness(appId);

                ConnectUploadResponse connectUploadResponse = new ConnectUploadResponse(
                                                                    serverId, RssBuildInfo.Version, runningVersion);
                HandlerUtil.writeResponseMsg(ctx, MessageConstants.RESPONSE_STATUS_OK,
                                                connectUploadResponse, true);
            } else if (msg instanceof StartUploadMessage) {
                startUploadMessage = (StartUploadMessage)msg;

                AppTaskAttemptId appTaskAttemptId = new AppTaskAttemptId(appId,
                    appAttempt,
                    startUploadMessage.getShuffleId(),
                    startUploadMessage.getMapId(),
                    startUploadMessage.getAttemptId());

                ShuffleWriteConfig writeConfig = new ShuffleWriteConfig(startUploadMessage.getNumSplits());
                uploadServerHandler.initializeAppTaskAttempt(appTaskAttemptId, startUploadMessage.getNumPartitions(),
                                                                writeConfig, ctx);
            } else if (msg instanceof FinishUploadMessage) {
                logger.debug("FinishUploadMessage, {}, {}", msg, connectionInfo);
                FinishUploadMessage finishUploadMessage = (FinishUploadMessage)msg;
                finishUploadRequestLag.update(System.currentTimeMillis() - finishUploadMessage.getTimestamp());
                byte ackFlag = finishUploadMessage.getAckFlag();
                uploadServerHandler.finishUpload(finishUploadMessage.getTaskAttemptId());
                if (ackFlag != FinishUploadMessage.ACK_FLAG_NO_ACK) {
                    ByteBuf buf = ctx.alloc().buffer(1);
                    buf.writeByte(MessageConstants.RESPONSE_STATUS_OK);
                    ctx.writeAndFlush(buf);
                }
            } else if (msg instanceof ShuffleDataWrapper) {
                ShuffleDataWrapper shuffleDataWrapper = (ShuffleDataWrapper)msg;
                uploadServerHandler.writeRecord(shuffleDataWrapper);
            } else if (msg instanceof HeartbeatMessage) {
                HeartbeatMessage heartbeatMessage = (HeartbeatMessage)msg;
                String heartbeatAppId = heartbeatMessage.getAppId();
                boolean heartbeatKeepLive = heartbeatMessage.isKeepLive();
                uploadServerHandler.updateLiveness(heartbeatAppId);
                if (!heartbeatKeepLive) {
                    ctx.close();
                }
            } else if (msg instanceof GetBusyStatusRequest) {
                GetBusyStatusRequest getBusyStatusRequest = (GetBusyStatusRequest)msg;
                // TODO ideally clients should send some information to tell server what status they are interested
                Map<Long, Long> metricsMap = new HashMap<>();
                GetBusyStatusResponse getBusyStatusResponse = new GetBusyStatusResponse(metricsMap, new HashMap<>());
                getBusyStatusResponse.getMetrics().put(new Long(CONCURRENT_CONNS),
                                                            new Long (concurrentChannelsAtomicInteger.get()));
                ChannelFuture channelFuture = HandlerUtil.writeResponseMsg(ctx, MessageConstants.RESPONSE_STATUS_OK,
                                                                            getBusyStatusResponse, true);
                channelFuture.addListener(ChannelFutureListener.CLOSE);
            } else {
                throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", msg, connectionInfo));
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }