func()

in pulsar/internal/lookup_service.go [142:205]


func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
	ls.metrics.LookupRequestsCount.Inc()
	id := ls.rpcClient.NewRequestID()

	res, err := ls.rpcClient.RequestToHost(&ls.serviceNameResolver, id, pb.BaseCommand_LOOKUP,
		&pb.CommandLookupTopic{
			RequestId:              &id,
			Topic:                  &topic,
			Authoritative:          proto.Bool(false),
			AdvertisedListenerName: proto.String(ls.listenerName),
			Properties:             ls.lookupProperties,
		})
	if err != nil {
		return nil, err
	}
	ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)

	for i := 0; i < lookupResultMaxRedirect; i++ {
		lr := res.Response.LookupTopicResponse
		switch *lr.Response {

		case pb.CommandLookupTopicResponse_Redirect:
			brokerServiceURL := selectServiceURL(ls.tlsEnabled, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls())
			lookupResult, err := ls.GetBrokerAddress(brokerServiceURL, lr.GetProxyThroughServiceUrl())
			if err != nil {
				return nil, err
			}

			ls.log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
				topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)

			id := ls.rpcClient.NewRequestID()
			res, err = ls.rpcClient.Request(lookupResult.LogicalAddr, lookupResult.PhysicalAddr, id, pb.BaseCommand_LOOKUP,
				&pb.CommandLookupTopic{
					RequestId:              &id,
					Topic:                  &topic,
					Authoritative:          lr.Authoritative,
					AdvertisedListenerName: proto.String(ls.listenerName),
				})
			if err != nil {
				return nil, err
			}

			// Process the response at the top of the loop
			continue

		case pb.CommandLookupTopicResponse_Connect:
			ls.log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
				topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())

			brokerServiceURL := selectServiceURL(ls.tlsEnabled, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls())
			return ls.GetBrokerAddress(brokerServiceURL, lr.GetProxyThroughServiceUrl())
		case pb.CommandLookupTopicResponse_Failed:
			ls.log.WithFields(log.Fields{
				"topic":   topic,
				"error":   lr.GetError(),
				"message": lr.GetMessage(),
			}).Warn("Failed to lookup topic")
			return nil, errors.New(lr.GetError().String())
		}
	}

	return nil, errors.New("exceeded max number of redirection during topic lookup")
}