in src/main/java/com/uber/rss/decoders/StreamServerMessageDecoder.java [131:304]
private void decodeImpl(ChannelHandlerContext ctx,
ByteBuf in,
List<Object> out) {
metrics.getNumIncomingRequests().inc(1);
if (in.readableBytes() == 0) {
return;
}
switch (state) {
case READ_MAGIC_BYTE_AND_VERSION:
if (in.readableBytes() < 2 * Byte.BYTES) {
return;
}
byte magicByte = in.readByte();
byte version;
switch (magicByte) {
case MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE:
version = in.readByte();
if (version != MessageConstants.UPLOAD_UPLINK_VERSION_3) {
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.warn(
"Invalid notify version {} from client {}",
version, clientInfo);
ctx.close();
logger.debug("Closed connection to client {}", clientInfo);
return;
}
state = State.READ_MESSAGE_TYPE;
return;
case MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE:
version = in.readByte();
if (version != MessageConstants.DOWNLOAD_UPLINK_VERSION_3) {
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.warn(
"Invalid download version {} from client {}",
version, clientInfo);
ctx.close();
logger.debug("Closed connection to client {}", clientInfo);
return;
}
state = State.READ_MESSAGE_TYPE;
return;
case MessageConstants.NOTIFY_UPLINK_MAGIC_BYTE:
version = in.readByte();
if (version != MessageConstants.NOTIFY_UPLINK_VERSION_3) {
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.warn(
"Invalid control version {} from client {}",
version, clientInfo);
ctx.close();
logger.debug("Closed connection to client {}", clientInfo);
return;
}
state = State.READ_MESSAGE_TYPE;
return;
case MessageConstants.REGISTRY_UPLINK_MAGIC_BYTE:
version = in.readByte();
if (version != MessageConstants.REGISTRY_UPLINK_VERSION_3) {
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.warn(
"Invalid registry version {} from client {}",
version, clientInfo);
ctx.close();
logger.debug("Closed connection to client {}", clientInfo);
return;
}
state = State.READ_MESSAGE_TYPE;
return;
default:
String clientInfo = NettyUtils.getServerConnectionInfo(ctx);
logger.warn(
"Invalid magic byte {} from client {}",
magicByte, clientInfo);
ctx.close();
logger.debug("Closed connection to client {}", clientInfo);
return;
}
case READ_MESSAGE_TYPE:
if (in.readableBytes() < Integer.BYTES) {
return;
}
int messageType = in.readInt();
if (messageType < 0) {
controlMessageType = messageType;
state = State.READ_CONTROL_MESSAGE_LEN;
} else {
partitionId = messageType;
state = State.READ_TASK_ATTEMPT_ID;
}
return;
case READ_CONTROL_MESSAGE_LEN:
if (in.readableBytes() < Integer.BYTES) {
return;
}
requiredBytes = in.readInt();
if (requiredBytes < 0) {
throw new RssInvalidDataException(String.format(
"Invalid control message length: %s, %s",
requiredBytes, NettyUtils.getServerConnectionInfo(ctx)));
}
if (requiredBytes == 0) {
Object controlMessage = getControlMessage(ctx, controlMessageType, in);
out.add(controlMessage);
resetData();
state = State.READ_MESSAGE_TYPE;
} else {
state = State.READ_CONTROL_MESSAGE_BYTES;
}
return;
case READ_CONTROL_MESSAGE_BYTES:
if (in.readableBytes() < requiredBytes) {
return;
}
Object controlMessage = getControlMessage(ctx, controlMessageType, in);
out.add(controlMessage);
resetData();
state = State.READ_MESSAGE_TYPE;
return;
case READ_TASK_ATTEMPT_ID:
if (in.readableBytes() < Long.BYTES) {
return;
}
in.readBytes(taskAttemptIdBytes);
taskAttemptId = ByteBufUtils.readLong(taskAttemptIdBytes, 0);
if (taskAttemptId < 0) {
throw new RssInvalidDataException(String.format(
"Invalid task attempt id: %s, %s",
taskAttemptId, NettyUtils.getServerConnectionInfo(ctx)));
}
state = State.READ_DATA_MESSAGE_LEN;
return;
case READ_DATA_MESSAGE_LEN:
if (in.readableBytes() < Integer.BYTES) {
return;
}
int dataLen = in.readInt();
if (dataLen < 0) {
throw new RssInvalidDataException(String.format(
"Invalid data length: %s, %s",
dataLen, NettyUtils.getServerConnectionInfo(ctx)));
}
if (dataLen == 0) {
out.add(createShuffleDataWrapper(in, 0));
resetData();
requiredBytes = 0;
state = State.READ_MESSAGE_TYPE;
} else {
requiredBytes = dataLen;
state = State.READ_DATA_MESSAGE_BYTES;
shuffleDataBuffer.clear();
}
return;
case READ_DATA_MESSAGE_BYTES:
if (in.readableBytes() < requiredBytes) {
int count = in.readableBytes();
shuffleDataBuffer.ensureWritable(count);
in.readBytes(shuffleDataBuffer, count);
requiredBytes -= count;
} else {
shuffleDataBuffer.ensureWritable(requiredBytes);
in.readBytes(shuffleDataBuffer, requiredBytes);
out.add(createShuffleDataWrapper(shuffleDataBuffer, shuffleDataBuffer.readableBytes()));
requiredBytes = 0;
resetData();
state = State.READ_MESSAGE_TYPE;
}
return;
default:
throw new RssException(String.format(
"Should not get incoming data in state %s, client %s",
state, NettyUtils.getServerConnectionInfo(ctx)));
}
}