in lib/BinaryProtoLookupService.cc [36:97]
auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
const std::string& topic, size_t redirectCount)
-> LookupResultFuture {
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic
<< ", redirect count: " << redirectCount);
auto promise = std::make_shared<Promise<Result, LookupResult>>();
if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) {
LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is "
<< maxLookupRedirects_);
promise->setFailed(ResultTooManyLookupRequestException);
return promise->getFuture();
}
// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
redirectCount](Result result,
const ClientConnectionWeakPtr& weakCnx) {
if (result != ResultOk) {
promise->setFailed(result);
return;
}
auto cnx = weakCnx.lock();
if (!cnx) {
LOG_ERROR("Connection to " << address << " is expired before lookup");
promise->setFailed(ResultNotConnected);
return;
}
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
Result result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
return;
}
const auto responseBrokerAddress =
(serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1)
.addListener([promise](Result result, const LookupResult& value) {
if (result == ResultOk) {
promise->setValue(value);
} else {
promise->setFailed(result);
}
});
} else {
LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl());
if (data->shouldProxyThroughServiceUrl()) {
// logicalAddress is the proxy's address, we should still connect through proxy
promise->setValue({responseBrokerAddress, address});
} else {
promise->setValue({responseBrokerAddress, responseBrokerAddress});
}
}
});
});
return promise->getFuture();
}