paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableSerializer.java [72:94]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (committableVersion != getVersion()) {
            throw new RuntimeException("Can not deserialize version: " + committableVersion);
        }

        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        long checkpointId = buffer.getLong();
        Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
        byte[] wrapped = new byte[bytes.length - 13];
        buffer.get(wrapped);
        int version = buffer.getInt();

        Object wrappedCommittable;
        switch (kind) {
            case FILE:
                wrappedCommittable = commitMessageSerializer.deserialize(version, wrapped);
                break;
            case LOG_OFFSET:
                wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
                break;
            default:
                throw new UnsupportedOperationException("Unsupported kind: " + kind);
        }
        return new Committable(checkpointId, kind, wrappedCommittable);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java [131:153]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (committableVersion != getVersion()) {
            throw new RuntimeException("Can not deserialize version: " + committableVersion);
        }

        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        long checkpointId = buffer.getLong();
        Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
        byte[] wrapped = new byte[bytes.length - 13];
        buffer.get(wrapped);
        int version = buffer.getInt();

        Object wrappedCommittable;
        switch (kind) {
            case FILE:
                wrappedCommittable = commitMessageSerializer.deserialize(version, wrapped);
                break;
            case LOG_OFFSET:
                wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
                break;
            default:
                throw new UnsupportedOperationException("Unsupported kind: " + kind);
        }
        return new Committable(checkpointId, kind, wrappedCommittable);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



