in wireshark/pulsarDissector.cc [281:460]
static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int offset, int maxOffset) {
if (tvb_get_ntohs(tvb, offset) == MAGIC_BROKER_ENTRY_METADATA) {
offset += 2;
auto brokerEntryMetadataSize = (int)tvb_get_ntohl(tvb, offset);
offset += brokerEntryMetadataSize + 2;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MAGIC_BROKER_ENTRY_METADATA %d",
brokerEntryMetadataSize);
#endif
}
if (tvb_get_ntohs(tvb, offset) == MAGIC_CRC32C) {
#ifdef DEBUG
auto checksum = tvb_get_ntohl(tvb, offset);
proto_tree_add_debug_text(frame_tree, "[DEBUG] CRC32C %d", checksum);
#endif
// Skip CRC32C Magic (2) and CRC32C checksum (4)
offset += 2;
offset += 4;
}
// Decode message metadata
auto metadataSize = tvb_get_ntohl(tvb, offset);
offset += 4;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MetadataSize %d, maxOffset %d", metadataSize, maxOffset);
#endif
if (offset + metadataSize > maxOffset) {
// Not enough data to dissect metadata
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] Not enough data to dissect message metadata");
#endif
return;
}
static MessageMetadata msgMetadata;
auto ptr = tvb_get_ptr(tvb, offset, metadataSize);
if (!msgMetadata.ParseFromArray(ptr, metadataSize)) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true,
"Error parsing protocol buffer message metadata");
return;
}
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata Utf8DebugString : %s",
msgMetadata.Utf8DebugString().c_str());
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata SerializeAsString : %s",
msgMetadata.SerializeAsString().c_str());
#endif
proto_item* md_tree =
proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize, ett_pulsar, nullptr,
"MessageMetadata / %s / %" G_GINT64_MODIFIER "u",
msgMetadata.producer_name().c_str(), msgMetadata.sequence_id());
proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize,
msgMetadata.producer_name().c_str());
// IDs
proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize,
msgMetadata.sequence_id());
if (msgMetadata.has_highest_sequence_id()) {
proto_tree_add_uint64(md_tree, hf_pulsar_highest_sequence_id, tvb, offset, metadataSize,
msgMetadata.highest_sequence_id());
}
if (msgMetadata.has_chunk_id()) {
proto_tree_add_uint(md_tree, hf_pulsar_chunk_id, tvb, offset, metadataSize, msgMetadata.chunk_id());
if (msgMetadata.has_num_chunks_from_msg()) {
proto_tree_add_uint(md_tree, hf_pulsar_num_chunks_from_msg, tvb, offset, metadataSize,
msgMetadata.num_chunks_from_msg());
}
}
if (msgMetadata.has_uuid()) {
proto_tree_add_string(md_tree, hf_pulsar_uuid, tvb, offset, metadataSize, msgMetadata.uuid().c_str());
}
// Times
proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize,
msgMetadata.publish_time());
if (msgMetadata.has_deliver_at_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_at_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time());
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_after_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time() - msgMetadata.publish_time());
}
if (msgMetadata.has_event_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_event_time, tvb, offset, metadataSize,
msgMetadata.event_time());
}
// Compression
if (msgMetadata.has_compression()) {
proto_tree_add_string(md_tree, hf_pulsar_compression_type, tvb, offset, metadataSize,
to_str(msgMetadata.compression(), compression_type_name_vs));
}
if (msgMetadata.has_uncompressed_size()) {
proto_tree_add_uint(md_tree, hf_pulsar_uncompressed_size, tvb, offset, metadataSize,
msgMetadata.uncompressed_size());
}
// Keys
if (msgMetadata.has_partition_key()) {
proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
msgMetadata.partition_key().c_str());
}
if (msgMetadata.has_ordering_key()) {
proto_tree_add_string(md_tree, hf_pulsar_ordering_key, tvb, offset, metadataSize,
msgMetadata.ordering_key().c_str());
}
// Encryption
if (msgMetadata.has_encryption_algo()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_algo, tvb, offset, metadataSize,
msgMetadata.encryption_algo().c_str());
}
if (msgMetadata.has_encryption_param()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_param, tvb, offset, metadataSize,
msgMetadata.encryption_param().c_str());
}
if (msgMetadata.encryption_keys_size() > 0) {
proto_item* encryption_keys_tree = proto_tree_add_subtree_format(
md_tree, tvb, offset, msgMetadata.encryption_param().size(), ett_pulsar, nullptr,
"EncryptionParam / %s", msgMetadata.encryption_algo().c_str());
for (int i = 0; i < msgMetadata.encryption_keys().size(); i++) {
const auto& encryption_key = msgMetadata.encryption_keys(i);
proto_tree_add_string_format(
encryption_keys_tree, hf_pulsar_encryption_keys, tvb, offset, metadataSize, "", "%s : %s",
encryption_key.has_key() ? encryption_key.key().c_str() : "<none>",
encryption_key.has_value() ? encryption_key.value().c_str() : "<none>");
}
}
// Properties
if (msgMetadata.properties_size() > 0) {
proto_item* properties_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Properties");
for (int i = 0; i < msgMetadata.properties_size(); i++) {
const KeyValue& kv = msgMetadata.properties(i);
proto_tree_add_string_format(properties_tree, hf_pulsar_property, tvb, offset, metadataSize, "",
"%s : %s", kv.key().c_str(), kv.value().c_str());
}
}
// Replication
if (msgMetadata.has_replicated_from()) {
proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
msgMetadata.replicated_from().c_str());
}
if (msgMetadata.replicate_to_size() > 0) {
proto_item* replicate_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Replicate to");
for (int i = 0; i < msgMetadata.replicate_to_size(); i++) {
proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
"", "%s", msgMetadata.replicate_to(i).c_str());
}
}
// Transaction
if (msgMetadata.has_txnid_least_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_least_bits, tvb, offset, metadataSize,
msgMetadata.txnid_least_bits());
}
if (msgMetadata.has_txnid_most_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_most_bits, tvb, offset, metadataSize,
msgMetadata.txnid_most_bits());
}
// Payloads
offset += metadataSize;
uint32_t payloadSize = maxOffset - offset;
proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, nullptr, "Payload / size=%u",
payloadSize);
}