public void channelRead()

in src/main/java/com/uber/rss/handlers/DownloadChannelInboundHandler.java [111:161]


    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            logger.debug("Got incoming message: {}, {}", msg, connectionInfo);

            if (idleCheck != null) {
                idleCheck.updateLastReadTime();
            }

            // Process other messages. We assume the header messages are already processed, thus some fields of this
            // class are already populated with proper values, e.g. user field.

            if (msg instanceof ConnectDownloadRequest) {
                logger.info("ConnectDownloadRequest: {}, {}", msg, connectionInfo);

                ConnectDownloadRequest connectRequest = (ConnectDownloadRequest) msg;
                appShufflePartitionId = new AppShufflePartitionId(
                    connectRequest.getAppId(),
                    connectRequest.getAppAttempt(),
                    connectRequest.getShuffleId(),
                    connectRequest.getPartitionId()
                );
                fetchTaskAttemptIds = connectRequest.getTaskAttemptIds();

                ShuffleStageStatus shuffleStageStatus = downloadServerHandler.getShuffleStageStatus(appShufflePartitionId.getAppShuffleId());
                if (shuffleStageStatus.getFileStatus() == ShuffleStageStatus.FILE_STATUS_SHUFFLE_STAGE_NOT_STARTED) {
                    logger.warn(String.format("Shuffle stage not started for %s, %s", appShufflePartitionId.getAppShuffleId(), connectionInfo));
                    HandlerUtil.writeResponseStatus(ctx, MessageConstants.RESPONSE_STATUS_SHUFFLE_STAGE_NOT_STARTED);
                    return;
                }

                downloadServerHandler.initialize(connectRequest);

                MapTaskCommitStatus mapTaskCommitStatus = shuffleStageStatus.getMapTaskCommitStatus();
                boolean dataAvailable = mapTaskCommitStatus != null && mapTaskCommitStatus.isPartitionDataAvailable(fetchTaskAttemptIds);
                String fileCompressionCodec = ""; // TODO delete this
                ConnectDownloadResponse connectResponse = new ConnectDownloadResponse(serverId, RssBuildInfo.Version, runningVersion, fileCompressionCodec, mapTaskCommitStatus, dataAvailable);
                sendResponseAndFiles(ctx, dataAvailable, shuffleStageStatus, connectResponse, idleCheck);
            } else if (msg instanceof GetDataAvailabilityRequest) {
                ShuffleStageStatus shuffleStageStatus = downloadServerHandler.getShuffleStageStatus(appShufflePartitionId.getAppShuffleId());
                MapTaskCommitStatus mapTaskCommitStatus = shuffleStageStatus.getMapTaskCommitStatus();
                boolean dataAvailable;
                dataAvailable = mapTaskCommitStatus != null && mapTaskCommitStatus.isPartitionDataAvailable(fetchTaskAttemptIds);
                GetDataAvailabilityResponse getDataAvailabilityResponse = new GetDataAvailabilityResponse(mapTaskCommitStatus, dataAvailable);
                sendResponseAndFiles(ctx, dataAvailable, shuffleStageStatus, getDataAvailabilityResponse, idleCheck);
            } else {
                throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", msg, connectionInfo));
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }