SharedBuffer Commands::newSubscribe()

in lib/Commands.cc [329:402]


SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
                                    uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                    const std::string& consumerName, SubscriptionMode subscriptionMode,
                                    boost::optional<MessageId> startMessageId, bool readCompacted,
                                    const std::map<std::string, std::string>& metadata,
                                    const std::map<std::string, std::string>& subscriptionProperties,
                                    const SchemaInfo& schemaInfo,
                                    CommandSubscribe_InitialPosition subscriptionInitialPosition,
                                    bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
                                    int priorityLevel) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::SUBSCRIBE);
    CommandSubscribe* subscribe = cmd.mutable_subscribe();
    subscribe->set_topic(topic);
    subscribe->set_subscription(subscription);
    subscribe->set_subtype(static_cast<proto::CommandSubscribe_SubType>(subType));
    subscribe->set_consumer_id(consumerId);
    subscribe->set_request_id(requestId);
    subscribe->set_consumer_name(consumerName);
    subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
    subscribe->set_read_compacted(readCompacted);
    subscribe->set_initialposition(
        static_cast<proto::CommandSubscribe_InitialPosition>(subscriptionInitialPosition));
    subscribe->set_replicate_subscription_state(replicateSubscriptionState);
    subscribe->set_priority_level(priorityLevel);

    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
        subscribe->set_allocated_schema(getSchema(schemaInfo));
    }

    if (startMessageId) {
        MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
        messageIdData.set_ledgerid(startMessageId.value().ledgerId());
        messageIdData.set_entryid(startMessageId.value().entryId());

        if (startMessageId.value().batchIndex() != -1) {
            messageIdData.set_batch_index(startMessageId.value().batchIndex());
        }
    }
    for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
         it++) {
        proto::KeyValue* keyValue = proto::KeyValue().New();
        keyValue->set_key(it->first);
        keyValue->set_value(it->second);
        subscribe->mutable_metadata()->AddAllocated(keyValue);
    }

    for (const auto& subscriptionProperty : subscriptionProperties) {
        proto::KeyValue* keyValue = proto::KeyValue().New();
        keyValue->set_key(subscriptionProperty.first);
        keyValue->set_value(subscriptionProperty.second);
        subscribe->mutable_subscription_properties()->AddAllocated(keyValue);
    }

    if (subType == CommandSubscribe_SubType_Key_Shared) {
        KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
        switch (keySharedPolicy.getKeySharedMode()) {
            case pulsar::AUTO_SPLIT:
                ksm.set_keysharedmode(proto::KeySharedMode::AUTO_SPLIT);
                break;
            case pulsar::STICKY:
                ksm.set_keysharedmode(proto::KeySharedMode::STICKY);
                for (StickyRange range : keySharedPolicy.getStickyRanges()) {
                    IntRange* intRange = IntRange().New();
                    intRange->set_start(range.first);
                    intRange->set_end(range.second);
                    ksm.mutable_hashranges()->AddAllocated(intRange);
                }
        }
        ksm.set_allowoutoforderdelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
    }

    return writeMessageWithSize(cmd);
}