in lib/ReaderImpl.cc [48:117]
void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
consumerConf.setReadCompacted(readerConf_.isReadCompacted());
consumerConf.setSchema(readerConf_.getSchema());
consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
consumerConf.setProperties(readerConf_.getProperties());
consumerConf.setStartMessageIdInclusive(readerConf_.isStartMessageIdInclusive());
if (readerConf_.getReaderName().length() > 0) {
consumerConf.setConsumerName(readerConf_.getReaderName());
}
if (readerConf_.hasReaderListener()) {
// Adapt the message listener to be a reader-listener
readerListener_ = readerConf_.getReaderListener();
consumerConf.setMessageListener(std::bind(&ReaderImpl::messageListener, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
}
std::string subscription;
if (!readerConf_.getInternalSubscriptionName().empty()) {
subscription = readerConf_.getInternalSubscriptionName();
} else {
subscription = "reader-" + generateRandomName();
if (!readerConf_.getSubscriptionRolePrefix().empty()) {
subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
}
}
// get the consumer's configuration before created
if (test::readerConfigTestEnabled) {
test::consumerConfigOfReader = consumerConf.clone();
}
if (partitions_ > 0) {
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
client_.lock()->getLookup(),
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
Commands::SubscriptionModeNonDurable, startMessageId);
consumer_ = consumerImpl;
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
startMessageId);
consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_ = consumerImpl;
}
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
if (result == ResultOk) {
callback(weakConsumerPtr);
readerCreatedCallback_(result, Reader(self));
} else {
readerCreatedCallback_(result, {});
}
});
consumer_->start();
}