fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java [53:85]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        out.writeInt(state.getAssignedBuckets().size());
        for (TableBucket tableBucket : state.getAssignedBuckets()) {
            out.writeLong(tableBucket.getTableId());

            // write partition
            // if partition is not null
            if (tableBucket.getPartitionId() != null) {
                out.writeBoolean(true);
                out.writeLong(tableBucket.getPartitionId());
            } else {
                out.writeBoolean(false);
            }

            out.writeInt(tableBucket.getBucket());
        }
        // write assigned partitions
        out.writeInt(state.getAssignedPartitions().size());
        for (Map.Entry<Long, String> entry : state.getAssignedPartitions().entrySet()) {
            out.writeLong(entry.getKey());
            out.writeUTF(entry.getValue());
        }

        final byte[] result = out.getCopyOfBuffer();
        out.clear();
        return result;
    }

    @Override
    public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
        if (version != VERSION_0) {
            throw new IOException("Unknown version or corrupt state: " + version);
        }
        final DataInputDeserializer in = new DataInputDeserializer(serialized);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/state/FlussSourceEnumeratorStateSerializer.java [63:95]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        out.writeInt(state.getAssignedBuckets().size());
        for (TableBucket tableBucket : state.getAssignedBuckets()) {
            out.writeLong(tableBucket.getTableId());

            // write partition
            // if partition is not null
            if (tableBucket.getPartitionId() != null) {
                out.writeBoolean(true);
                out.writeLong(tableBucket.getPartitionId());
            } else {
                out.writeBoolean(false);
            }

            out.writeInt(tableBucket.getBucket());
        }
        // write assigned partitions
        out.writeInt(state.getAssignedPartitions().size());
        for (Map.Entry<Long, String> entry : state.getAssignedPartitions().entrySet()) {
            out.writeLong(entry.getKey());
            out.writeUTF(entry.getValue());
        }

        final byte[] result = out.getCopyOfBuffer();
        out.clear();
        return result;
    }

    @Override
    public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
        if (version != VERSION_0) {
            throw new IOException("Unknown version or corrupt state: " + version);
        }
        final DataInputDeserializer in = new DataInputDeserializer(serialized);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



