paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableSerializer.java [45:67]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        byte[] wrapped;
        int version;
        switch (committable.kind()) {
            case FILE:
                version = commitMessageSerializer.getVersion();
                wrapped =
                        commitMessageSerializer.serialize(
                                (CommitMessage) committable.wrappedCommittable());
                break;
            case LOG_OFFSET:
                version = 1;
                wrapped = ((LogOffsetCommittable) committable.wrappedCommittable()).toBytes();
                break;
            default:
                throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
        }

        return ByteBuffer.allocate(8 + 1 + wrapped.length + 4)
                .putLong(committable.checkpointId())
                .put(committable.kind().toByteValue())
                .put(wrapped)
                .putInt(version)
                .array();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java [104:126]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        byte[] wrapped;
        int version;
        switch (committable.kind()) {
            case FILE:
                version = commitMessageSerializer.getVersion();
                wrapped =
                        commitMessageSerializer.serialize(
                                (CommitMessage) committable.wrappedCommittable());
                break;
            case LOG_OFFSET:
                version = 1;
                wrapped = ((LogOffsetCommittable) committable.wrappedCommittable()).toBytes();
                break;
            default:
                throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
        }

        return ByteBuffer.allocate(8 + 1 + wrapped.length + 4)
                .putLong(committable.checkpointId())
                .put(committable.kind().toByteValue())
                .put(wrapped)
                .putInt(version)
                .array();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



