in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/FrameDecoder.java [174:223]
private boolean onControlBodyAdded(final ByteBuf in, final List out)
throws InvalidProtocolBufferException {
ContextManager.getEncoderDecoderLock().lock();
in.markReaderIndex();
try {
assert (controlBodyBytesToRead > 0);
assert (dataBodyBytesToRead == 0);
assert (inputContext == null);
assert (controlBodyBytesToRead <= Integer.MAX_VALUE);
int i = 0;
while (in.readableBytes() < controlBodyBytesToRead) {
// cannot read body now
LOG.warn("ControlMessage cannot be read ({})", i);
ContextManager.getEncoderDecoderLock().unlock();
Thread.sleep(200);
ContextManager.getEncoderDecoderLock().lock();
i++;
if (i > 10) {
in.resetReaderIndex();
return false;
}
}
final byte[] bytes;
final int offset;
if (in.hasArray()) {
bytes = in.array();
offset = in.arrayOffset() + in.readerIndex();
} else {
bytes = new byte[(int) controlBodyBytesToRead];
in.getBytes(in.readerIndex(), bytes, 0, (int) controlBodyBytesToRead);
offset = 0;
}
final ByteTransferContextSetupMessage controlMessage
= ByteTransferContextSetupMessage.PARSER.parseFrom(bytes, offset, (int) controlBodyBytesToRead);
LOG.debug("ControlMessage decoded: {}", controlMessage);
out.add(controlMessage);
in.skipBytes((int) controlBodyBytesToRead);
controlBodyBytesToRead = 0;
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} finally {
ContextManager.getEncoderDecoderLock().unlock();
}
}