public static Message decode()

in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/MessageDecoderExt.java [31:105]


  public static Message decode(Message.Type type, ByteBuf in, boolean decodeBody) {
    long requestId;
    // cannot use actual class decode method because common module cannot refer flink shaded netty.
    switch (type) {
      case RPC_REQUEST:
        requestId = in.readLong();
        in.readInt();
        if (decodeBody) {
          return new RpcRequest(requestId, new FlinkNettyManagedBuffer(in));
        } else {
          return new RpcRequest(requestId, NettyManagedBuffer.EmptyBuffer);
        }

      case RPC_RESPONSE:
        requestId = in.readLong();
        in.readInt();
        if (decodeBody) {
          return new RpcResponse(requestId, new FlinkNettyManagedBuffer(in));
        } else {
          return new RpcResponse(requestId, NettyManagedBuffer.EmptyBuffer);
        }

      case RPC_FAILURE:
        requestId = in.readLong();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        String errorString = new String(bytes, StandardCharsets.UTF_8);
        return new RpcFailure(requestId, errorString);

      case ONE_WAY_MESSAGE:
        in.readInt();
        if (decodeBody) {
          return new OneWayMessage(new FlinkNettyManagedBuffer(in));
        } else {
          return new OneWayMessage(NettyManagedBuffer.EmptyBuffer);
        }

      case READ_ADD_CREDIT:
        long streamId = in.readLong();
        int credit = in.readInt();
        return new ReadAddCredit(streamId, credit);

      case READ_DATA:
        streamId = in.readLong();
        return new ReadData(streamId);

      case SUBPARTITION_READ_DATA:
        streamId = in.readLong();
        int subPartitionId = in.readInt();
        return new SubPartitionReadData(streamId, subPartitionId);

      case BACKLOG_ANNOUNCEMENT:
        streamId = in.readLong();
        int backlog = in.readInt();
        return new BacklogAnnouncement(streamId, backlog);

      case TRANSPORTABLE_ERROR:
        streamId = in.readLong();
        int len = in.readInt();
        byte[] errorBytes = new byte[len];
        in.readBytes(errorBytes);
        return new TransportableError(streamId, errorBytes);

      case HEARTBEAT:
        return new Heartbeat();

      case BUFFER_STREAM_END:
        streamId = in.readLong();
        return new BufferStreamEnd(streamId);

      default:
        throw new IllegalArgumentException("Unexpected message type: " + type);
    }
  }