static int dissect_pulsar_message()

in wireshark/pulsarDissector.cc [493:1078]


static int dissect_pulsar_message(tvbuff_t* tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
    uint32_t offset = FRAME_SIZE_LEN;
    int maxOffset = tvb_captured_length(tvb);
    auto cmdSize = (uint32_t)tvb_get_ntohl(tvb, offset);
    offset += 4;

    if (offset + cmdSize > maxOffset) {
        // Not enough data to dissect
#ifdef DEBUG
        proto_tree_add_debug_text(tree, "[Debug] Not enough data to dissect command");
#endif
        return maxOffset;
    }

    col_set_str(pinfo->cinfo, COL_PROTOCOL, "Pulsar");

    conversation_t* conversation = find_or_create_conversation(pinfo);
    auto state = (ConnectionState*)conversation_get_proto_data(conversation, proto_pulsar);
    if (state == nullptr) {
        state = new ConnectionState();
        conversation_add_proto_data(conversation, proto_pulsar, state);
    }

    auto ptr = (uint8_t*)tvb_get_ptr(tvb, offset, cmdSize);
    if (!command.ParseFromArray(ptr, cmdSize)) {
        proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true,
                                      "Error parsing protocol buffer command");
        return maxOffset;
    }

    int cmdOffset = offset;
    offset += cmdSize;

    col_add_str(pinfo->cinfo, COL_INFO, to_str(command.type(), pulsar_cmd_names));

    proto_item* frame_tree = nullptr;
    proto_item* cmd_tree = nullptr;
    if (tree) { /* we are being asked for details */
        proto_item* ti = proto_tree_add_item(tree, proto_pulsar, tvb, 0, -1, ENC_NA);
        frame_tree = proto_item_add_subtree(ti, ett_pulsar);
        proto_tree_add_item(frame_tree, hf_pulsar_frame_size, tvb, 0, 4, ENC_BIG_ENDIAN);
        proto_tree_add_item(frame_tree, hf_pulsar_cmd_size, tvb, 4, 4, ENC_BIG_ENDIAN);
        cmd_tree = proto_tree_add_subtree_format(frame_tree, tvb, 8, cmdSize, ett_pulsar, nullptr,
                                                 "Command %s", to_str(command.type(), pulsar_cmd_names));
        proto_tree_add_string(cmd_tree, hf_pulsar_cmd_type, tvb, 8, cmdSize,
                              to_str(command.type(), pulsar_cmd_names));
    }

    switch (command.type()) {
        case BaseCommand::CONNECT: {
            const CommandConnect& connect = command.connect();
            if (tree) {
                proto_tree_add_string(cmd_tree, hf_pulsar_client_version, tvb, cmdOffset, cmdSize,
                                      connect.client_version().c_str());
                proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
                                      to_str(connect.protocol_version(), protocol_version_vs));
                proto_tree_add_string(cmd_tree, hf_pulsar_auth_method, tvb, cmdOffset, cmdSize,
                                      to_str(connect.auth_method(), auth_methods_vs));
                if (connect.has_auth_data()) {
                    proto_tree_add_string(cmd_tree, hf_pulsar_auth_data, tvb, cmdOffset, cmdSize,
                                          connect.auth_data().c_str());
                }
            }
            break;
        }
        case BaseCommand::CONNECTED: {
            const CommandConnected& connected = command.connected();
            if (tree) {
                proto_tree_add_string(cmd_tree, hf_pulsar_server_version, tvb, cmdOffset, cmdSize,
                                      connected.server_version().c_str());
                proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
                                      to_str(connected.protocol_version(), protocol_version_vs));
            }
            break;
        }
        case BaseCommand::SUBSCRIBE: {
            const CommandSubscribe& subscribe = command.subscribe();
            RequestData& reqData = state->requests[subscribe.request_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ConsumerData& consumerData = state->consumers[subscribe.consumer_id()];
            consumerData.topic = subscribe.topic();
            consumerData.subscriptionName = subscribe.subscription();
            consumerData.consumerName = subscribe.consumer_name();
            consumerData.subType = subscribe.subtype();

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
                            to_str(subscribe.subtype(), sub_type_names_vs), subscribe.topic().c_str(),
                            subscribe.subscription().c_str(), subscribe.consumer_name().c_str());

            if (tree) {
                proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                      subscribe.topic().c_str());
                proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
                                      subscribe.subscription().c_str());
                proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
                                      to_str(subscribe.subtype(), sub_type_names_vs));
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      subscribe.consumer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      subscribe.request_id());
                proto_tree_add_string(
                    cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                    subscribe.has_consumer_name() ? subscribe.consumer_name().c_str() : "<none>");
            }
            break;
        }
        case BaseCommand::PRODUCER: {
            const CommandProducer& producer = command.producer();
            RequestResponseData& reqData = state->requests[producer.request_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
            reqData.id = producer.producer_id();

            state->producers[producer.producer_id()].topic = producer.topic();

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producer.topic().c_str());

            if (tree) {
                proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                      producer.topic().c_str());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                      producer.producer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      producer.request_id());
                proto_tree_add_string(
                    cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                    producer.has_producer_name() ? producer.producer_name().c_str() : "<none>");

                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::SEND: {
            const CommandSend& send = command.send();
            RequestData& reqData = state->producers[send.producer_id()].messages[send.sequence_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ProducerData& producerData = state->producers[send.producer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
                            producerData.producerName.c_str(), send.sequence_id());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                      send.producer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
                                      send.sequence_id());

                // Decode message metadata
                dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                                                  producerData.producerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             producerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                // Pair with frame information
                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::SEND_RECEIPT: {
            const CommandSendReceipt& send_receipt = command.send_receipt();
            RequestData& reqData =
                state->producers[send_receipt.producer_id()].messages[send_receipt.sequence_id()];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ProducerData& producerData = state->producers[send_receipt.producer_id()];
            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
                            producerData.producerName.c_str(), send_receipt.sequence_id());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                      send_receipt.producer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
                                      send_receipt.sequence_id());
                if (send_receipt.has_message_id()) {
                    const MessageIdData& messageId = send_receipt.message_id();
                    proto_tree_add_string_format(cmd_tree, hf_pulsar_message_id, tvb, cmdOffset, cmdSize, "",
                                                 "Message Id: %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER
                                                 "u",
                                                 messageId.ledgerid(), messageId.entryid());
                }

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                                                  producerData.producerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             producerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::SEND_ERROR: {
            const CommandSendError& send_error = command.send_error();
            RequestData& reqData =
                state->producers[send_error.producer_id()].messages[send_error.sequence_id()];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ProducerData& producerData = state->producers[send_error.producer_id()];
            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
                            producerData.producerName.c_str(), send_error.sequence_id());

            if (tree) {
                proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
                                              "Error in sending operation");
                proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                      send_error.producer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
                                      send_error.sequence_id());

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
                                                  to_str(send_error.error(), server_errors_vs));

                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                                             producerData.producerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             producerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                // Pair with frame information
                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::MESSAGE: {
            const CommandMessage& message = command.message();
            state->consumers[message.consumer_id()].messages[message.message_id()];
            RequestData& reqData = state->consumers[message.consumer_id()].messages[message.message_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            const ConsumerData& consumerData = state->consumers[message.consumer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
                            consumerData.consumerName.c_str(), message.message_id().ledgerid(),
                            message.message_id().entryid());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      message.consumer_id());

                dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                                                  consumerData.consumerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             consumerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                // Pair with frame information
                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::ACK: {
            const CommandAck& ack = command.ack();
            RequestData& reqData = state->consumers[ack.consumer_id()].messages[ack.message_id().Get(0)];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            const ConsumerData& consumerData = state->consumers[ack.consumer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
                            consumerData.consumerName.c_str(), ack.message_id().Get(0).ledgerid(),
                            ack.message_id().Get(0).entryid());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      ack.consumer_id());
                proto_tree_add_string(cmd_tree, hf_pulsar_ack_type, tvb, cmdOffset, cmdSize,
                                      to_str(ack.ack_type(), ack_type_vs));

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                                                  consumerData.consumerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             consumerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                // Pair with frame information
                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::FLOW: {
            const CommandFlow& flow = command.flow();
            const ConsumerData& consumerData = state->consumers[flow.consumer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %d", consumerData.consumerName.c_str(),
                            flow.messagepermits());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      flow.consumer_id());
                proto_tree_add_uint(cmd_tree, hf_pulsar_message_permits, tvb, cmdOffset, cmdSize,
                                    flow.messagepermits());

                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                                                  consumerData.consumerName.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             consumerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);
            }
            break;
        }
        case BaseCommand::UNSUBSCRIBE: {
            const CommandUnsubscribe& unsubscribe = command.unsubscribe();
            RequestData& reqData = state->requests[unsubscribe.request_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ConsumerData& consumerData = state->consumers[unsubscribe.consumer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
                            to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
                            consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      unsubscribe.consumer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      unsubscribe.request_id());
                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                                  consumerData.topic.c_str());
                PROTO_ITEM_IS_GENERATED(item);
                item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
                                             consumerData.subscriptionName.c_str());
                PROTO_ITEM_IS_GENERATED(item);
                proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
                                      to_str(consumerData.subType, sub_type_names_vs));
                PROTO_ITEM_IS_GENERATED(item);

                proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                                      consumerData.consumerName.c_str());
                PROTO_ITEM_IS_GENERATED(item);

                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }

        case BaseCommand::SUCCESS: {
            const CommandSuccess& success = command.success();
            RequestResponseData& reqData = state->requests[success.request_id()];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      success.request_id());

                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::ERROR: {
            const CommandError& error = command.error();
            RequestResponseData& reqData = state->requests[error.request_id()];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            if (tree) {
                proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
                                              "Request failed");
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      error.request_id());
                proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
                                      to_str(error.error(), server_errors_vs));
                proto_tree_add_string(cmd_tree, hf_pulsar_error_message, tvb, cmdOffset, cmdSize,
                                      error.message().c_str());

                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::CLOSE_PRODUCER: {
            const CommandCloseProducer& close_producer = command.close_producer();
            RequestData& reqData = state->requests[close_producer.request_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ProducerData& producerData = state->producers[close_producer.producer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producerData.topic.c_str());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                      close_producer.producer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      close_producer.request_id());
                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                                  producerData.topic.c_str());
                PROTO_ITEM_IS_GENERATED(item);

                proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                                      producerData.producerName.c_str());
                PROTO_ITEM_IS_GENERATED(item);

                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }

        case BaseCommand::CLOSE_CONSUMER: {
            const CommandCloseConsumer& close_consumer = command.close_consumer();
            RequestData& reqData = state->requests[close_consumer.request_id()];
            reqData.requestFrame = pinfo->fd->num;
            reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;

            ConsumerData& consumerData = state->consumers[close_consumer.consumer_id()];

            col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
                            to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
                            consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
                                      close_consumer.consumer_id());
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      close_consumer.request_id());
                auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                                  consumerData.topic.c_str());
                PROTO_ITEM_IS_GENERATED(item);
                item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
                                             consumerData.subscriptionName.c_str());
                PROTO_ITEM_IS_GENERATED(item);
                proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
                                      to_str(consumerData.subType, sub_type_names_vs));
                PROTO_ITEM_IS_GENERATED(item);

                proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
                                      consumerData.consumerName.c_str());
                PROTO_ITEM_IS_GENERATED(item);

                link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }

        case BaseCommand::PRODUCER_SUCCESS: {
            const CommandProducerSuccess& success = command.producer_success();
            RequestResponseData& reqData = state->requests[success.request_id()];
            reqData.ackFrame = pinfo->fd->num;
            reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
            reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
            uint64_t producerId = reqData.id;
            ProducerData& producerData = state->producers[producerId];
            producerData.producerName = success.producer_name();

            if (tree) {
                proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
                                      success.request_id());
                proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
                                      success.producer_name().c_str());

                auto item = proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
                                                  producerId);
                PROTO_ITEM_SET_GENERATED(item);

                item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
                                             producerData.topic.c_str());
                PROTO_ITEM_SET_GENERATED(item);

                link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
            }
            break;
        }
        case BaseCommand::PING:
            break;
        case BaseCommand::PONG:
            break;
        case BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES:
            break;
        case BaseCommand::PARTITIONED_METADATA:
            break;
        case BaseCommand::PARTITIONED_METADATA_RESPONSE:
            break;
        case BaseCommand::LOOKUP:
            break;
        case BaseCommand::LOOKUP_RESPONSE:
            break;
        case BaseCommand::CONSUMER_STATS:
            break;
        case BaseCommand::CONSUMER_STATS_RESPONSE:
            break;
        case BaseCommand::REACHED_END_OF_TOPIC:
            break;
        case BaseCommand::SEEK:
            break;
        case BaseCommand::GET_LAST_MESSAGE_ID:
            break;
        case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
            break;
        case BaseCommand::ACTIVE_CONSUMER_CHANGE:
            break;
        case BaseCommand::GET_TOPICS_OF_NAMESPACE:
            break;
        case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
            break;
        case BaseCommand::GET_SCHEMA:
            break;
        case BaseCommand::GET_SCHEMA_RESPONSE:
            break;
        case BaseCommand::AUTH_CHALLENGE:
            break;
        case BaseCommand::AUTH_RESPONSE:
            break;
        case BaseCommand::ACK_RESPONSE:
            break;
        case BaseCommand::GET_OR_CREATE_SCHEMA:
            break;
        case BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE:
            break;
        case BaseCommand::NEW_TXN:
            break;
        case BaseCommand::NEW_TXN_RESPONSE:
            break;
        case BaseCommand::ADD_PARTITION_TO_TXN:
            break;
        case BaseCommand::ADD_PARTITION_TO_TXN_RESPONSE:
            break;
        case BaseCommand::ADD_SUBSCRIPTION_TO_TXN:
            break;
        case BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
            break;
        case BaseCommand::END_TXN:
            break;
        case BaseCommand::END_TXN_RESPONSE:
            break;
        case BaseCommand::END_TXN_ON_PARTITION:
            break;
        case BaseCommand::END_TXN_ON_PARTITION_RESPONSE:
            break;
        case BaseCommand::END_TXN_ON_SUBSCRIPTION:
            break;
        case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
            break;
        case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
            break;
        case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
            break;
        case BaseCommand::WATCH_TOPIC_LIST:
            break;
        case BaseCommand::WATCH_TOPIC_LIST_SUCCESS:
            break;
        case BaseCommand::WATCH_TOPIC_UPDATE:
            break;
        case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
            break;
    }

    return maxOffset;
}