in lib/ClientImpl.cc [310:356]
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
return;
} else {
lock.unlock();
if (!topicNamePtr) {
LOG_ERROR("Topic pattern not valid: " << regexPattern);
callback(ResultInvalidTopicName, Consumer());
return;
}
}
if (TopicName::containsDomain(regexPattern)) {
LOG_WARN("Ignore invalid domain: "
<< topicNamePtr->getDomain()
<< ", use the RegexSubscriptionMode parameter to set the topic type");
}
CommandGetTopicsOfNamespace_Mode mode;
auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
switch (regexSubscriptionMode) {
case PersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
break;
case NonPersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
break;
case AllTopics:
mode = CommandGetTopicsOfNamespace_Mode_ALL;
break;
default:
LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode);
callback(ResultInvalidConfiguration, Consumer());
return;
}
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
subscriptionName, conf, callback));
}