in lib/Commands.cc [329:402]
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
int priorityLevel) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
subscribe->set_topic(topic);
subscribe->set_subscription(subscription);
subscribe->set_subtype(static_cast<proto::CommandSubscribe_SubType>(subType));
subscribe->set_consumer_id(consumerId);
subscribe->set_request_id(requestId);
subscribe->set_consumer_name(consumerName);
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
subscribe->set_read_compacted(readCompacted);
subscribe->set_initialposition(
static_cast<proto::CommandSubscribe_InitialPosition>(subscriptionInitialPosition));
subscribe->set_replicate_subscription_state(replicateSubscriptionState);
subscribe->set_priority_level(priorityLevel);
if (isBuiltInSchema(schemaInfo.getSchemaType())) {
subscribe->set_allocated_schema(getSchema(schemaInfo));
}
if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
if (startMessageId.value().batchIndex() != -1) {
messageIdData.set_batch_index(startMessageId.value().batchIndex());
}
}
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(it->first);
keyValue->set_value(it->second);
subscribe->mutable_metadata()->AddAllocated(keyValue);
}
for (const auto& subscriptionProperty : subscriptionProperties) {
proto::KeyValue* keyValue = proto::KeyValue().New();
keyValue->set_key(subscriptionProperty.first);
keyValue->set_value(subscriptionProperty.second);
subscribe->mutable_subscription_properties()->AddAllocated(keyValue);
}
if (subType == CommandSubscribe_SubType_Key_Shared) {
KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
switch (keySharedPolicy.getKeySharedMode()) {
case pulsar::AUTO_SPLIT:
ksm.set_keysharedmode(proto::KeySharedMode::AUTO_SPLIT);
break;
case pulsar::STICKY:
ksm.set_keysharedmode(proto::KeySharedMode::STICKY);
for (StickyRange range : keySharedPolicy.getStickyRanges()) {
IntRange* intRange = IntRange().New();
intRange->set_start(range.first);
intRange->set_end(range.second);
ksm.mutable_hashranges()->AddAllocated(intRange);
}
}
ksm.set_allowoutoforderdelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
}
return writeMessageWithSize(cmd);
}