in core/src/main/java/org/apache/rocketmq/streams/core/window/WindowState.java [201:258]
public static <K,V> WindowState<K,V> byte2WindowState(byte[] bytes) throws Throwable {
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
int totalLength = byteBuf.readInt();
if (bytes.length < totalLength) {
//上层已经拆好了包
throw new IllegalArgumentException("byteBuf length less than total");
}
long recordLastTimestamp = byteBuf.readLong();
long recordEarliestTimestamp = byteBuf.readLong();
//key class
int keyClazzLength = byteBuf.readInt();
ByteBuf buf = byteBuf.readBytes(keyClazzLength);
byte[] keyClazzBytes = new byte[keyClazzLength];
buf.readBytes(keyClazzBytes);
//实例化
String keyClassName = new String(keyClazzBytes, StandardCharsets.UTF_8);
Class<?> keyClazz = Class.forName(keyClassName);
//key
int keyLength = byteBuf.readInt();
ByteBuf keyBuf = byteBuf.readBytes(keyLength);
byte[] keyBytes = new byte[keyLength];
keyBuf.readBytes(keyBytes);
//value class
int valueClazzLength = byteBuf.readInt();
ByteBuf valueClazzBuf = byteBuf.readBytes(valueClazzLength);
byte[] valueClazzBytes = new byte[valueClazzLength];
valueClazzBuf.readBytes(valueClazzBytes);
//实例化
String valueClassName = new String(valueClazzBytes, StandardCharsets.UTF_8);
Class<?> valueClazz = Class.forName(valueClassName);
//value
int valueLength = byteBuf.readInt();
ByteBuf valueBuf = byteBuf.readBytes(valueLength);
byte[] valueBytes = new byte[valueLength];
valueBuf.readBytes(valueBytes);
WindowState<K, V> result = new WindowState<>();
result.setRecordLastTimestamp(recordLastTimestamp);
result.setRecordEarliestTimestamp(recordEarliestTimestamp);
result.setKeyBytes(keyBytes);
result.setValueBytes(valueBytes);
result.setKeyClazz(keyClazz);
result.setValueClazz(valueClazz);
result.setKey(Utils.byte2Object(keyBytes, result.getKeyClazz()));
result.setValue(Utils.byte2Object(valueBytes, result.getValueClazz()));
byteBuf.release();
buf.release();
keyBuf.release();
valueBuf.release();
valueClazzBuf.release();
return result;
}