in lib/ClientConnection.cc [1653:1708]
void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id());
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock();
if (!lookupTopicResponse.has_response() ||
(lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Failed)) {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
lookupDataPromise->setFailed(
getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " with empty response: ");
lookupDataPromise->setFailed(ResultConnectError);
}
} else {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id() //
<< " -- broker-url: " << lookupTopicResponse.brokerserviceurl()
<< " -- broker-tls-url: " //
<< lookupTopicResponse.brokerserviceurltls()
<< " authoritative: " << lookupTopicResponse.authoritative() //
<< " redirect: " << lookupTopicResponse.response());
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
if (tlsSocket_) {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
} else {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
}
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Redirect);
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
lookupDataPromise->setValue(lookupResultPtr);
}
} else {
LOG_WARN("Received unknown request id from server: " << lookupTopicResponse.request_id());
}
}