in src/main/java/com/uber/rss/handlers/UploadChannelInboundHandler.java [124:224]
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (logger.isDebugEnabled() && !(msg instanceof ShuffleDataWrapper)) {
logger.debug("Got incoming message: {}, {}", msg, connectionInfo);
}
if (idleCheck != null) {
idleCheck.updateLastReadTime();
}
// Process other messages. We assume the header messages are already processed, thus some fields of this
// class are already populated with proper values, e.g. user field.
if (msg instanceof ConnectUploadRequest) {
try {
uploadServerHandler.checkMaxConnections();
} catch (RssMaxConnectionsException e) {
logger.info(
"Cannot handle new connection due to server capacity. Closing current connection: {}. {}",
connectionInfo, ExceptionUtils.getSimpleMessage(e));
M3Stats.addException(e, M3Stats.TAG_VALUE_SERVER_HANDLER);
ByteBuf buf = ctx.alloc().buffer(1);
buf.writeByte(MessageConstants.RESPONSE_STATUS_SERVER_BUSY);
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
return;
}
ConnectUploadRequest connectUploadRequest = (ConnectUploadRequest)msg;
appId = connectUploadRequest.getAppId();
appAttempt = connectUploadRequest.getAppAttempt();
try {
uploadServerHandler.checkAppMaxWriteBytes(appId);
} catch (RssTooMuchDataException e) {
logger.info(
"Cannot handle new connection due to writing too much data from app (%s). " +
"Closing current connection: {}. {}", appId, connectionInfo,
ExceptionUtils.getSimpleMessage(e));
M3Stats.addException(e, M3Stats.TAG_VALUE_SERVER_HANDLER);
ByteBuf buf = ctx.alloc().buffer(1);
buf.writeByte(MessageConstants.RESPONSE_STATUS_APP_TOO_MUCH_DATA);
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
}
uploadServerHandler.updateLiveness(appId);
ConnectUploadResponse connectUploadResponse = new ConnectUploadResponse(
serverId, RssBuildInfo.Version, runningVersion);
HandlerUtil.writeResponseMsg(ctx, MessageConstants.RESPONSE_STATUS_OK,
connectUploadResponse, true);
} else if (msg instanceof StartUploadMessage) {
startUploadMessage = (StartUploadMessage)msg;
AppTaskAttemptId appTaskAttemptId = new AppTaskAttemptId(appId,
appAttempt,
startUploadMessage.getShuffleId(),
startUploadMessage.getMapId(),
startUploadMessage.getAttemptId());
ShuffleWriteConfig writeConfig = new ShuffleWriteConfig(startUploadMessage.getNumSplits());
uploadServerHandler.initializeAppTaskAttempt(appTaskAttemptId, startUploadMessage.getNumPartitions(),
writeConfig, ctx);
} else if (msg instanceof FinishUploadMessage) {
logger.debug("FinishUploadMessage, {}, {}", msg, connectionInfo);
FinishUploadMessage finishUploadMessage = (FinishUploadMessage)msg;
finishUploadRequestLag.update(System.currentTimeMillis() - finishUploadMessage.getTimestamp());
byte ackFlag = finishUploadMessage.getAckFlag();
uploadServerHandler.finishUpload(finishUploadMessage.getTaskAttemptId());
if (ackFlag != FinishUploadMessage.ACK_FLAG_NO_ACK) {
ByteBuf buf = ctx.alloc().buffer(1);
buf.writeByte(MessageConstants.RESPONSE_STATUS_OK);
ctx.writeAndFlush(buf);
}
} else if (msg instanceof ShuffleDataWrapper) {
ShuffleDataWrapper shuffleDataWrapper = (ShuffleDataWrapper)msg;
uploadServerHandler.writeRecord(shuffleDataWrapper);
} else if (msg instanceof HeartbeatMessage) {
HeartbeatMessage heartbeatMessage = (HeartbeatMessage)msg;
String heartbeatAppId = heartbeatMessage.getAppId();
boolean heartbeatKeepLive = heartbeatMessage.isKeepLive();
uploadServerHandler.updateLiveness(heartbeatAppId);
if (!heartbeatKeepLive) {
ctx.close();
}
} else if (msg instanceof GetBusyStatusRequest) {
GetBusyStatusRequest getBusyStatusRequest = (GetBusyStatusRequest)msg;
// TODO ideally clients should send some information to tell server what status they are interested
Map<Long, Long> metricsMap = new HashMap<>();
GetBusyStatusResponse getBusyStatusResponse = new GetBusyStatusResponse(metricsMap, new HashMap<>());
getBusyStatusResponse.getMetrics().put(new Long(CONCURRENT_CONNS),
new Long (concurrentChannelsAtomicInteger.get()));
ChannelFuture channelFuture = HandlerUtil.writeResponseMsg(ctx, MessageConstants.RESPONSE_STATUS_OK,
getBusyStatusResponse, true);
channelFuture.addListener(ChannelFutureListener.CLOSE);
} else {
throw new RssInvalidDataException(String.format("Unsupported message: %s, %s", msg, connectionInfo));
}
} finally {
ReferenceCountUtil.release(msg);
}
}