void ReaderImpl::start()

in lib/ReaderImpl.cc [48:117]


void ReaderImpl::start(const MessageId& startMessageId,
                       std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
    ConsumerConfiguration consumerConf;
    consumerConf.setConsumerType(ConsumerExclusive);
    consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
    consumerConf.setReadCompacted(readerConf_.isReadCompacted());
    consumerConf.setSchema(readerConf_.getSchema());
    consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
    consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
    consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
    consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
    consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
    consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
    consumerConf.setProperties(readerConf_.getProperties());
    consumerConf.setStartMessageIdInclusive(readerConf_.isStartMessageIdInclusive());

    if (readerConf_.getReaderName().length() > 0) {
        consumerConf.setConsumerName(readerConf_.getReaderName());
    }

    if (readerConf_.hasReaderListener()) {
        // Adapt the message listener to be a reader-listener
        readerListener_ = readerConf_.getReaderListener();
        consumerConf.setMessageListener(std::bind(&ReaderImpl::messageListener, shared_from_this(),
                                                  std::placeholders::_1, std::placeholders::_2));
    }

    std::string subscription;
    if (!readerConf_.getInternalSubscriptionName().empty()) {
        subscription = readerConf_.getInternalSubscriptionName();
    } else {
        subscription = "reader-" + generateRandomName();
        if (!readerConf_.getSubscriptionRolePrefix().empty()) {
            subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
        }
    }

    // get the consumer's configuration before created
    if (test::readerConfigTestEnabled) {
        test::consumerConfigOfReader = consumerConf.clone();
    }

    if (partitions_ > 0) {
        auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
            client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
            client_.lock()->getLookup(),
            std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
            Commands::SubscriptionModeNonDurable, startMessageId);
        consumer_ = consumerImpl;
    } else {
        auto consumerImpl = std::make_shared<ConsumerImpl>(
            client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
            std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
            ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
            startMessageId);
        consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
        consumer_ = consumerImpl;
    }
    auto self = shared_from_this();
    consumer_->getConsumerCreatedFuture().addListener(
        [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
            if (result == ResultOk) {
                callback(weakConsumerPtr);
                readerCreatedCallback_(result, Reader(self));
            } else {
                readerCreatedCallback_(result, {});
            }
        });
    consumer_->start();
}