auto BinaryProtoLookupService::findBroker()

in lib/BinaryProtoLookupService.cc [35:97]


auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
                                          const std::string& topic, size_t redirectCount)
    -> LookupResultFuture {
    LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic
                                  << ", redirect count: " << redirectCount);
    auto promise = std::make_shared<Promise<Result, LookupResult>>();
    if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) {
        LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is "
                                                                << maxLookupRedirects_);
        promise->setFailed(ResultTooManyLookupRequestException);
        return promise->getFuture();
    }

    // NOTE: we can use move capture for topic since C++14
    cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
                                                      redirectCount](Result result,
                                                                     const ClientConnectionWeakPtr& weakCnx) {
        if (result != ResultOk) {
            promise->setFailed(result);
            return;
        }
        auto cnx = weakCnx.lock();
        if (!cnx) {
            LOG_ERROR("Connection to " << address << " is expired before lookup");
            promise->setFailed(ResultNotConnected);
            return;
        }
        auto lookupPromise = std::make_shared<LookupDataResultPromise>();
        cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
        lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
                                                   Result result, const LookupDataResultPtr& data) {
            if (result != ResultOk || !data) {
                LOG_ERROR("Lookup failed for " << topic << ", result " << result);
                promise->setFailed(result);
                return;
            }

            const auto responseBrokerAddress =
                (serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
            if (data->isRedirect()) {
                LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
                findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1)
                    .addListener([promise](Result result, const LookupResult& value) {
                        if (result == ResultOk) {
                            promise->setValue(value);
                        } else {
                            promise->setFailed(result);
                        }
                    });
            } else {
                LOG_INFO("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl()
                                                << ", from " << cnx->cnxString());
                if (data->shouldProxyThroughServiceUrl()) {
                    // logicalAddress is the proxy's address, we should still connect through proxy
                    promise->setValue({responseBrokerAddress, address, true});
                } else {
                    promise->setValue({responseBrokerAddress, responseBrokerAddress, false});
                }
            }
        });
    });
    return promise->getFuture();
}