in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/FrameDecoder.java [125:164]
private boolean onFrameStarted(final ChannelHandlerContext ctx, final ByteBuf in) {
assert (controlBodyBytesToRead == 0);
assert (dataBodyBytesToRead == 0);
assert (inputContext == null);
if (in.readableBytes() < HEADER_LENGTH) {
// cannot read a frame header frame now
return false;
}
final byte flags = in.readByte();
final int transferIndex = in.readInt();
final long length = in.readUnsignedInt();
if (length < 0) {
throw new IllegalStateException(String.format("Frame length is negative: %d", length));
}
if ((flags & ((byte) (1 << 3))) == 0) {
// setup context for reading control frame body
controlBodyBytesToRead = length;
} else {
// setup context for reading data frame body
dataBodyBytesToRead = length;
final ByteTransferDataDirection dataDirection = (flags & ((byte) (1 << 2))) == 0
? ByteTransferDataDirection.INITIATOR_SENDS_DATA : ByteTransferDataDirection.INITIATOR_RECEIVES_DATA;
final boolean newSubStreamFlag = (flags & ((byte) (1 << 1))) != 0;
isLastFrame = (flags & ((byte) (1 << 0))) != 0;
inputContext = contextManager.getInputContext(dataDirection, transferIndex);
if (inputContext == null) {
throw new IllegalStateException(String.format("Transport context for %s:%d was not found between the local"
+ "address %s and the remote address %s", dataDirection, transferIndex,
ctx.channel().localAddress(), ctx.channel().remoteAddress()));
}
if (newSubStreamFlag) {
inputContext.onNewStream();
}
if (dataBodyBytesToRead == 0) {
onDataFrameEnd();
}
}
return true;
}