in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/MessageDecoderExt.java [31:105]
public static Message decode(Message.Type type, ByteBuf in, boolean decodeBody) {
long requestId;
// cannot use actual class decode method because common module cannot refer flink shaded netty.
switch (type) {
case RPC_REQUEST:
requestId = in.readLong();
in.readInt();
if (decodeBody) {
return new RpcRequest(requestId, new FlinkNettyManagedBuffer(in));
} else {
return new RpcRequest(requestId, NettyManagedBuffer.EmptyBuffer);
}
case RPC_RESPONSE:
requestId = in.readLong();
in.readInt();
if (decodeBody) {
return new RpcResponse(requestId, new FlinkNettyManagedBuffer(in));
} else {
return new RpcResponse(requestId, NettyManagedBuffer.EmptyBuffer);
}
case RPC_FAILURE:
requestId = in.readLong();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
String errorString = new String(bytes, StandardCharsets.UTF_8);
return new RpcFailure(requestId, errorString);
case ONE_WAY_MESSAGE:
in.readInt();
if (decodeBody) {
return new OneWayMessage(new FlinkNettyManagedBuffer(in));
} else {
return new OneWayMessage(NettyManagedBuffer.EmptyBuffer);
}
case READ_ADD_CREDIT:
long streamId = in.readLong();
int credit = in.readInt();
return new ReadAddCredit(streamId, credit);
case READ_DATA:
streamId = in.readLong();
return new ReadData(streamId);
case SUBPARTITION_READ_DATA:
streamId = in.readLong();
int subPartitionId = in.readInt();
return new SubPartitionReadData(streamId, subPartitionId);
case BACKLOG_ANNOUNCEMENT:
streamId = in.readLong();
int backlog = in.readInt();
return new BacklogAnnouncement(streamId, backlog);
case TRANSPORTABLE_ERROR:
streamId = in.readLong();
int len = in.readInt();
byte[] errorBytes = new byte[len];
in.readBytes(errorBytes);
return new TransportableError(streamId, errorBytes);
case HEARTBEAT:
return new Heartbeat();
case BUFFER_STREAM_END:
streamId = in.readLong();
return new BufferStreamEnd(streamId);
default:
throw new IllegalArgumentException("Unexpected message type: " + type);
}
}