in src/main/java/com/uber/rss/handlers/DownloadChannelInboundHandler.java [111:161]
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
logger.debug("Got incoming message: {}, {}", msg, connectionInfo);
if (idleCheck != null) {
idleCheck.updateLastReadTime();
}
// Process other messages. We assume the header messages are already processed, thus some fields of this
// class are already populated with proper values, e.g. user field.
if (msg instanceof ConnectDownloadRequest) {
logger.info("ConnectDownloadRequest: {}, {}", msg, connectionInfo);
ConnectDownloadRequest connectRequest = (ConnectDownloadRequest) msg;
appShufflePartitionId = new AppShufflePartitionId(
connectRequest.getAppId(),
connectRequest.getAppAttempt(),
connectRequest.getShuffleId(),
connectRequest.getPartitionId()
);
fetchTaskAttemptIds = connectRequest.getTaskAttemptIds();
ShuffleStageStatus shuffleStageStatus = downloadServerHandler.getShuffleStageStatus(appShufflePartitionId.getAppShuffleId());
if (shuffleStageStatus.getFileStatus() == ShuffleStageStatus.FILE_STATUS_SHUFFLE_STAGE_NOT_STARTED) {
logger.warn(String.format("Shuffle stage not started for %s, %s", appShufflePartitionId.getAppShuffleId(), connectionInfo));
HandlerUtil.writeResponseStatus(ctx, MessageConstants.RESPONSE_STATUS_SHUFFLE_STAGE_NOT_STARTED);
return;
}
downloadServerHandler.initialize(connectRequest);
MapTaskCommitStatus mapTaskCommitStatus = shuffleStageStatus.getMapTaskCommitStatus();
boolean dataAvailable = mapTaskCommitStatus != null && mapTaskCommitStatus.isPartitionDataAvailable(fetchTaskAttemptIds);
String fileCompressionCodec = ""; // TODO delete this
ConnectDownloadResponse connectResponse = new ConnectDownloadResponse(serverId, RssBuildInfo.Version, runningVersion, fileCompressionCodec, mapTaskCommitStatus, dataAvailable);
sendResponseAndFiles(ctx, dataAvailable, shuffleStageStatus, connectResponse, idleCheck);
} else if (msg instanceof GetDataAvailabilityRequest) {
ShuffleStageStatus shuffleStageStatus = downloadServerHandler.getShuffleStageStatus(appShufflePartitionId.getAppShuffleId());
MapTaskCommitStatus mapTaskCommitStatus = shuffleStageStatus.getMapTaskCommitStatus();
boolean dataAvailable;
dataAvailable = mapTaskCommitStatus != null && mapTaskCommitStatus.isPartitionDataAvailable(fetchTaskAttemptIds);
GetDataAvailabilityResponse getDataAvailabilityResponse = new GetDataAvailabilityResponse(mapTaskCommitStatus, dataAvailable);
sendResponseAndFiles(ctx, dataAvailable, shuffleStageStatus, getDataAvailabilityResponse, idleCheck);
} else {
throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", msg, connectionInfo));
}
} finally {
ReferenceCountUtil.release(msg);
}
}