in wireshark/pulsarDissector.cc [493:1078]
static int dissect_pulsar_message(tvbuff_t* tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
uint32_t offset = FRAME_SIZE_LEN;
int maxOffset = tvb_captured_length(tvb);
auto cmdSize = (uint32_t)tvb_get_ntohl(tvb, offset);
offset += 4;
if (offset + cmdSize > maxOffset) {
// Not enough data to dissect
#ifdef DEBUG
proto_tree_add_debug_text(tree, "[Debug] Not enough data to dissect command");
#endif
return maxOffset;
}
col_set_str(pinfo->cinfo, COL_PROTOCOL, "Pulsar");
conversation_t* conversation = find_or_create_conversation(pinfo);
auto state = (ConnectionState*)conversation_get_proto_data(conversation, proto_pulsar);
if (state == nullptr) {
state = new ConnectionState();
conversation_add_proto_data(conversation, proto_pulsar, state);
}
auto ptr = (uint8_t*)tvb_get_ptr(tvb, offset, cmdSize);
if (!command.ParseFromArray(ptr, cmdSize)) {
proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true,
"Error parsing protocol buffer command");
return maxOffset;
}
int cmdOffset = offset;
offset += cmdSize;
col_add_str(pinfo->cinfo, COL_INFO, to_str(command.type(), pulsar_cmd_names));
proto_item* frame_tree = nullptr;
proto_item* cmd_tree = nullptr;
if (tree) { /* we are being asked for details */
proto_item* ti = proto_tree_add_item(tree, proto_pulsar, tvb, 0, -1, ENC_NA);
frame_tree = proto_item_add_subtree(ti, ett_pulsar);
proto_tree_add_item(frame_tree, hf_pulsar_frame_size, tvb, 0, 4, ENC_BIG_ENDIAN);
proto_tree_add_item(frame_tree, hf_pulsar_cmd_size, tvb, 4, 4, ENC_BIG_ENDIAN);
cmd_tree = proto_tree_add_subtree_format(frame_tree, tvb, 8, cmdSize, ett_pulsar, nullptr,
"Command %s", to_str(command.type(), pulsar_cmd_names));
proto_tree_add_string(cmd_tree, hf_pulsar_cmd_type, tvb, 8, cmdSize,
to_str(command.type(), pulsar_cmd_names));
}
switch (command.type()) {
case BaseCommand::CONNECT: {
const CommandConnect& connect = command.connect();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_client_version, tvb, cmdOffset, cmdSize,
connect.client_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connect.protocol_version(), protocol_version_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_auth_method, tvb, cmdOffset, cmdSize,
to_str(connect.auth_method(), auth_methods_vs));
if (connect.has_auth_data()) {
proto_tree_add_string(cmd_tree, hf_pulsar_auth_data, tvb, cmdOffset, cmdSize,
connect.auth_data().c_str());
}
}
break;
}
case BaseCommand::CONNECTED: {
const CommandConnected& connected = command.connected();
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_server_version, tvb, cmdOffset, cmdSize,
connected.server_version().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_protocol_version, tvb, cmdOffset, cmdSize,
to_str(connected.protocol_version(), protocol_version_vs));
}
break;
}
case BaseCommand::SUBSCRIBE: {
const CommandSubscribe& subscribe = command.subscribe();
RequestData& reqData = state->requests[subscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[subscribe.consumer_id()];
consumerData.topic = subscribe.topic();
consumerData.subscriptionName = subscribe.subscription();
consumerData.consumerName = subscribe.consumer_name();
consumerData.subType = subscribe.subtype();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(subscribe.subtype(), sub_type_names_vs), subscribe.topic().c_str(),
subscribe.subscription().c_str(), subscribe.consumer_name().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
subscribe.topic().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
subscribe.subscription().c_str());
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(subscribe.subtype(), sub_type_names_vs));
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
subscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
subscribe.request_id());
proto_tree_add_string(
cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
subscribe.has_consumer_name() ? subscribe.consumer_name().c_str() : "<none>");
}
break;
}
case BaseCommand::PRODUCER: {
const CommandProducer& producer = command.producer();
RequestResponseData& reqData = state->requests[producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
reqData.id = producer.producer_id();
state->producers[producer.producer_id()].topic = producer.topic();
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producer.topic().c_str());
if (tree) {
proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producer.topic().c_str());
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
producer.request_id());
proto_tree_add_string(
cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producer.has_producer_name() ? producer.producer_name().c_str() : "<none>");
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND: {
const CommandSend& send = command.send();
RequestData& reqData = state->producers[send.producer_id()].messages[send.sequence_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send.sequence_id());
// Decode message metadata
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND_RECEIPT: {
const CommandSendReceipt& send_receipt = command.send_receipt();
RequestData& reqData =
state->producers[send_receipt.producer_id()].messages[send_receipt.sequence_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_receipt.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send_receipt.sequence_id());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_receipt.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_receipt.sequence_id());
if (send_receipt.has_message_id()) {
const MessageIdData& messageId = send_receipt.message_id();
proto_tree_add_string_format(cmd_tree, hf_pulsar_message_id, tvb, cmdOffset, cmdSize, "",
"Message Id: %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER
"u",
messageId.ledgerid(), messageId.entryid());
}
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SEND_ERROR: {
const CommandSendError& send_error = command.send_error();
RequestData& reqData =
state->producers[send_error.producer_id()].messages[send_error.sequence_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[send_error.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u",
producerData.producerName.c_str(), send_error.sequence_id());
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
"Error in sending operation");
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
send_error.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_sequence_id, tvb, cmdOffset, cmdSize,
send_error.sequence_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(send_error.error(), server_errors_vs));
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::MESSAGE: {
const CommandMessage& message = command.message();
state->consumers[message.consumer_id()].messages[message.message_id()];
RequestData& reqData = state->consumers[message.consumer_id()].messages[message.message_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[message.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
consumerData.consumerName.c_str(), message.message_id().ledgerid(),
message.message_id().entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
message.consumer_id());
dissect_message_metadata(cmd_tree, tvb, offset, maxOffset);
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::ACK: {
const CommandAck& ack = command.ack();
RequestData& reqData = state->consumers[ack.consumer_id()].messages[ack.message_id().Get(0)];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
const ConsumerData& consumerData = state->consumers[ack.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %" G_GINT64_MODIFIER "u:%" G_GINT64_MODIFIER "u",
consumerData.consumerName.c_str(), ack.message_id().Get(0).ledgerid(),
ack.message_id().Get(0).entryid());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
ack.consumer_id());
proto_tree_add_string(cmd_tree, hf_pulsar_ack_type, tvb, cmdOffset, cmdSize,
to_str(ack.ack_type(), ack_type_vs));
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
// Pair with frame information
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::FLOW: {
const CommandFlow& flow = command.flow();
const ConsumerData& consumerData = state->consumers[flow.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %d", consumerData.consumerName.c_str(),
flow.messagepermits());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
flow.consumer_id());
proto_tree_add_uint(cmd_tree, hf_pulsar_message_permits, tvb, cmdOffset, cmdSize,
flow.messagepermits());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
}
break;
}
case BaseCommand::UNSUBSCRIBE: {
const CommandUnsubscribe& unsubscribe = command.unsubscribe();
RequestData& reqData = state->requests[unsubscribe.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[unsubscribe.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
unsubscribe.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
unsubscribe.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::SUCCESS: {
const CommandSuccess& success = command.success();
RequestResponseData& reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::ERROR: {
const CommandError& error = command.error();
RequestResponseData& reqData = state->requests[error.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
if (tree) {
proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, cmdOffset, cmdSize, true,
"Request failed");
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
error.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_server_error, tvb, cmdOffset, cmdSize,
to_str(error.error(), server_errors_vs));
proto_tree_add_string(cmd_tree, hf_pulsar_error_message, tvb, cmdOffset, cmdSize,
error.message().c_str());
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_PRODUCER: {
const CommandCloseProducer& close_producer = command.close_producer();
RequestData& reqData = state->requests[close_producer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ProducerData& producerData = state->producers[close_producer.producer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s", producerData.topic.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
close_producer.producer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_producer.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
producerData.producerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::CLOSE_CONSUMER: {
const CommandCloseConsumer& close_consumer = command.close_consumer();
RequestData& reqData = state->requests[close_consumer.request_id()];
reqData.requestFrame = pinfo->fd->num;
reqData.requestTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.requestTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
ConsumerData& consumerData = state->consumers[close_consumer.consumer_id()];
col_append_fstr(pinfo->cinfo, COL_INFO, " / %s / %s / %s / %s",
to_str(consumerData.subType, sub_type_names_vs), consumerData.topic.c_str(),
consumerData.subscriptionName.c_str(), consumerData.consumerName.c_str());
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_consumer_id, tvb, cmdOffset, cmdSize,
close_consumer.consumer_id());
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
close_consumer.request_id());
auto item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
consumerData.topic.c_str());
PROTO_ITEM_IS_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_subscription, tvb, cmdOffset, cmdSize,
consumerData.subscriptionName.c_str());
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_subType, tvb, cmdOffset, cmdSize,
to_str(consumerData.subType, sub_type_names_vs));
PROTO_ITEM_IS_GENERATED(item);
proto_tree_add_string(cmd_tree, hf_pulsar_consumer_name, tvb, cmdOffset, cmdSize,
consumerData.consumerName.c_str());
PROTO_ITEM_IS_GENERATED(item);
link_to_response_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PRODUCER_SUCCESS: {
const CommandProducerSuccess& success = command.producer_success();
RequestResponseData& reqData = state->requests[success.request_id()];
reqData.ackFrame = pinfo->fd->num;
reqData.ackTimestamp.secs = pinfo->fd->abs_ts.secs;
reqData.ackTimestamp.nsecs = pinfo->fd->abs_ts.nsecs;
uint64_t producerId = reqData.id;
ProducerData& producerData = state->producers[producerId];
producerData.producerName = success.producer_name();
if (tree) {
proto_tree_add_uint64(cmd_tree, hf_pulsar_request_id, tvb, cmdOffset, cmdSize,
success.request_id());
proto_tree_add_string(cmd_tree, hf_pulsar_producer_name, tvb, cmdOffset, cmdSize,
success.producer_name().c_str());
auto item = proto_tree_add_uint64(cmd_tree, hf_pulsar_producer_id, tvb, cmdOffset, cmdSize,
producerId);
PROTO_ITEM_SET_GENERATED(item);
item = proto_tree_add_string(cmd_tree, hf_pulsar_topic, tvb, cmdOffset, cmdSize,
producerData.topic.c_str());
PROTO_ITEM_SET_GENERATED(item);
link_to_request_frame(cmd_tree, tvb, cmdOffset, cmdSize, reqData);
}
break;
}
case BaseCommand::PING:
break;
case BaseCommand::PONG:
break;
case BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES:
break;
case BaseCommand::PARTITIONED_METADATA:
break;
case BaseCommand::PARTITIONED_METADATA_RESPONSE:
break;
case BaseCommand::LOOKUP:
break;
case BaseCommand::LOOKUP_RESPONSE:
break;
case BaseCommand::CONSUMER_STATS:
break;
case BaseCommand::CONSUMER_STATS_RESPONSE:
break;
case BaseCommand::REACHED_END_OF_TOPIC:
break;
case BaseCommand::SEEK:
break;
case BaseCommand::GET_LAST_MESSAGE_ID:
break;
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
break;
case BaseCommand::ACTIVE_CONSUMER_CHANGE:
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE:
break;
case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
break;
case BaseCommand::GET_SCHEMA:
break;
case BaseCommand::GET_SCHEMA_RESPONSE:
break;
case BaseCommand::AUTH_CHALLENGE:
break;
case BaseCommand::AUTH_RESPONSE:
break;
case BaseCommand::ACK_RESPONSE:
break;
case BaseCommand::GET_OR_CREATE_SCHEMA:
break;
case BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE:
break;
case BaseCommand::NEW_TXN:
break;
case BaseCommand::NEW_TXN_RESPONSE:
break;
case BaseCommand::ADD_PARTITION_TO_TXN:
break;
case BaseCommand::ADD_PARTITION_TO_TXN_RESPONSE:
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN:
break;
case BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
break;
case BaseCommand::END_TXN:
break;
case BaseCommand::END_TXN_RESPONSE:
break;
case BaseCommand::END_TXN_ON_PARTITION:
break;
case BaseCommand::END_TXN_ON_PARTITION_RESPONSE:
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION:
break;
case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
break;
case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
break;
case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
break;
case BaseCommand::WATCH_TOPIC_LIST:
break;
case BaseCommand::WATCH_TOPIC_LIST_SUCCESS:
break;
case BaseCommand::WATCH_TOPIC_UPDATE:
break;
case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
break;
}
return maxOffset;
}