in pulsar/internal/lookup_service.go [142:205]
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
ls.metrics.LookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToHost(&ls.serviceNameResolver, id, pb.BaseCommand_LOOKUP,
&pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
Properties: ls.lookupProperties,
})
if err != nil {
return nil, err
}
ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
for i := 0; i < lookupResultMaxRedirect; i++ {
lr := res.Response.LookupTopicResponse
switch *lr.Response {
case pb.CommandLookupTopicResponse_Redirect:
brokerServiceURL := selectServiceURL(ls.tlsEnabled, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls())
lookupResult, err := ls.GetBrokerAddress(brokerServiceURL, lr.GetProxyThroughServiceUrl())
if err != nil {
return nil, err
}
ls.log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
id := ls.rpcClient.NewRequestID()
res, err = ls.rpcClient.Request(lookupResult.LogicalAddr, lookupResult.PhysicalAddr, id, pb.BaseCommand_LOOKUP,
&pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
}
// Process the response at the top of the loop
continue
case pb.CommandLookupTopicResponse_Connect:
ls.log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
brokerServiceURL := selectServiceURL(ls.tlsEnabled, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls())
return ls.GetBrokerAddress(brokerServiceURL, lr.GetProxyThroughServiceUrl())
case pb.CommandLookupTopicResponse_Failed:
ls.log.WithFields(log.Fields{
"topic": topic,
"error": lr.GetError(),
"message": lr.GetMessage(),
}).Warn("Failed to lookup topic")
return nil, errors.New(lr.GetError().String())
}
}
return nil, errors.New("exceeded max number of redirection during topic lookup")
}