static void dissect_message_metadata()

in wireshark/pulsarDissector.cc [281:460]


static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int offset, int maxOffset) {
    if (tvb_get_ntohs(tvb, offset) == MAGIC_BROKER_ENTRY_METADATA) {
        offset += 2;
        auto brokerEntryMetadataSize = (int)tvb_get_ntohl(tvb, offset);
        offset += brokerEntryMetadataSize + 2;
#ifdef DEBUG
        proto_tree_add_debug_text(frame_tree, "[DEBUG] MAGIC_BROKER_ENTRY_METADATA %d",
                                  brokerEntryMetadataSize);
#endif
    }
    if (tvb_get_ntohs(tvb, offset) == MAGIC_CRC32C) {
#ifdef DEBUG
        auto checksum = tvb_get_ntohl(tvb, offset);
        proto_tree_add_debug_text(frame_tree, "[DEBUG] CRC32C %d", checksum);
#endif
        // Skip CRC32C Magic (2) and CRC32C checksum (4)
        offset += 2;
        offset += 4;
    }
    // Decode message metadata
    auto metadataSize = tvb_get_ntohl(tvb, offset);
    offset += 4;
#ifdef DEBUG
    proto_tree_add_debug_text(frame_tree, "[DEBUG] MetadataSize %d, maxOffset %d", metadataSize, maxOffset);
#endif

    if (offset + metadataSize > maxOffset) {
        // Not enough data to dissect metadata
#ifdef DEBUG
        proto_tree_add_debug_text(frame_tree, "[DEBUG] Not enough data to dissect message metadata");
#endif
        return;
    }

    static MessageMetadata msgMetadata;
    auto ptr = tvb_get_ptr(tvb, offset, metadataSize);

    if (!msgMetadata.ParseFromArray(ptr, metadataSize)) {
        proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true,
                                      "Error parsing protocol buffer message metadata");
        return;
    }

#ifdef DEBUG
    proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata Utf8DebugString : %s",
                              msgMetadata.Utf8DebugString().c_str());
    proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata SerializeAsString : %s",
                              msgMetadata.SerializeAsString().c_str());
#endif

    proto_item* md_tree =
        proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize, ett_pulsar, nullptr,
                                      "MessageMetadata / %s / %" G_GINT64_MODIFIER "u",
                                      msgMetadata.producer_name().c_str(), msgMetadata.sequence_id());
    proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize,
                          msgMetadata.producer_name().c_str());

    // IDs
    proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize,
                          msgMetadata.sequence_id());
    if (msgMetadata.has_highest_sequence_id()) {
        proto_tree_add_uint64(md_tree, hf_pulsar_highest_sequence_id, tvb, offset, metadataSize,
                              msgMetadata.highest_sequence_id());
    }

    if (msgMetadata.has_chunk_id()) {
        proto_tree_add_uint(md_tree, hf_pulsar_chunk_id, tvb, offset, metadataSize, msgMetadata.chunk_id());
        if (msgMetadata.has_num_chunks_from_msg()) {
            proto_tree_add_uint(md_tree, hf_pulsar_num_chunks_from_msg, tvb, offset, metadataSize,
                                msgMetadata.num_chunks_from_msg());
        }
    }

    if (msgMetadata.has_uuid()) {
        proto_tree_add_string(md_tree, hf_pulsar_uuid, tvb, offset, metadataSize, msgMetadata.uuid().c_str());
    }

    // Times
    proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize,
                          msgMetadata.publish_time());

    if (msgMetadata.has_deliver_at_time()) {
        proto_tree_add_uint64(md_tree, hf_pulsar_deliver_at_time, tvb, offset, metadataSize,
                              msgMetadata.deliver_at_time());
        proto_tree_add_uint64(md_tree, hf_pulsar_deliver_after_time, tvb, offset, metadataSize,
                              msgMetadata.deliver_at_time() - msgMetadata.publish_time());
    }

    if (msgMetadata.has_event_time()) {
        proto_tree_add_uint64(md_tree, hf_pulsar_event_time, tvb, offset, metadataSize,
                              msgMetadata.event_time());
    }

    // Compression
    if (msgMetadata.has_compression()) {
        proto_tree_add_string(md_tree, hf_pulsar_compression_type, tvb, offset, metadataSize,
                              to_str(msgMetadata.compression(), compression_type_name_vs));
    }

    if (msgMetadata.has_uncompressed_size()) {
        proto_tree_add_uint(md_tree, hf_pulsar_uncompressed_size, tvb, offset, metadataSize,
                            msgMetadata.uncompressed_size());
    }

    // Keys
    if (msgMetadata.has_partition_key()) {
        proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
                              msgMetadata.partition_key().c_str());
    }

    if (msgMetadata.has_ordering_key()) {
        proto_tree_add_string(md_tree, hf_pulsar_ordering_key, tvb, offset, metadataSize,
                              msgMetadata.ordering_key().c_str());
    }

    // Encryption
    if (msgMetadata.has_encryption_algo()) {
        proto_tree_add_string(md_tree, hf_pulsar_encryption_algo, tvb, offset, metadataSize,
                              msgMetadata.encryption_algo().c_str());
    }
    if (msgMetadata.has_encryption_param()) {
        proto_tree_add_string(md_tree, hf_pulsar_encryption_param, tvb, offset, metadataSize,
                              msgMetadata.encryption_param().c_str());
    }
    if (msgMetadata.encryption_keys_size() > 0) {
        proto_item* encryption_keys_tree = proto_tree_add_subtree_format(
            md_tree, tvb, offset, msgMetadata.encryption_param().size(), ett_pulsar, nullptr,
            "EncryptionParam / %s", msgMetadata.encryption_algo().c_str());
        for (int i = 0; i < msgMetadata.encryption_keys().size(); i++) {
            const auto& encryption_key = msgMetadata.encryption_keys(i);
            proto_tree_add_string_format(
                encryption_keys_tree, hf_pulsar_encryption_keys, tvb, offset, metadataSize, "", "%s : %s",
                encryption_key.has_key() ? encryption_key.key().c_str() : "<none>",
                encryption_key.has_value() ? encryption_key.value().c_str() : "<none>");
        }
    }

    // Properties
    if (msgMetadata.properties_size() > 0) {
        proto_item* properties_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
                                                                    ett_pulsar, nullptr, "Properties");
        for (int i = 0; i < msgMetadata.properties_size(); i++) {
            const KeyValue& kv = msgMetadata.properties(i);
            proto_tree_add_string_format(properties_tree, hf_pulsar_property, tvb, offset, metadataSize, "",
                                         "%s : %s", kv.key().c_str(), kv.value().c_str());
        }
    }

    // Replication
    if (msgMetadata.has_replicated_from()) {
        proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
                              msgMetadata.replicated_from().c_str());
    }

    if (msgMetadata.replicate_to_size() > 0) {
        proto_item* replicate_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
                                                                   ett_pulsar, nullptr, "Replicate to");
        for (int i = 0; i < msgMetadata.replicate_to_size(); i++) {
            proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
                                         "", "%s", msgMetadata.replicate_to(i).c_str());
        }
    }

    // Transaction
    if (msgMetadata.has_txnid_least_bits()) {
        proto_tree_add_uint64(md_tree, hf_pulsar_txnid_least_bits, tvb, offset, metadataSize,
                              msgMetadata.txnid_least_bits());
    }

    if (msgMetadata.has_txnid_most_bits()) {
        proto_tree_add_uint64(md_tree, hf_pulsar_txnid_most_bits, tvb, offset, metadataSize,
                              msgMetadata.txnid_most_bits());
    }

    // Payloads
    offset += metadataSize;
    uint32_t payloadSize = maxOffset - offset;
    proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, nullptr, "Payload / size=%u",
                                  payloadSize);
}