func()

in pulsar/internal/lookup_service.go [138:204]


func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
	ls.metrics.LookupRequestsCount.Inc()
	id := ls.rpcClient.NewRequestID()
	res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
		RequestId:              &id,
		Topic:                  &topic,
		Authoritative:          proto.Bool(false),
		AdvertisedListenerName: proto.String(ls.listenerName),
	})
	if err != nil {
		return nil, err
	}
	ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)

	for i := 0; i < lookupResultMaxRedirect; i++ {
		lr := res.Response.LookupTopicResponse
		switch *lr.Response {

		case pb.CommandLookupTopicResponse_Redirect:
			logicalAddress, physicalAddr, err := ls.getBrokerAddress(lr)
			if err != nil {
				return nil, err
			}

			ls.log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
				topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)

			id := ls.rpcClient.NewRequestID()
			res, err = ls.rpcClient.Request(logicalAddress, physicalAddr, id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
				RequestId:              &id,
				Topic:                  &topic,
				Authoritative:          lr.Authoritative,
				AdvertisedListenerName: proto.String(ls.listenerName),
			})
			if err != nil {
				return nil, err
			}

			// Process the response at the top of the loop
			continue

		case pb.CommandLookupTopicResponse_Connect:
			ls.log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
				topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())

			logicalAddress, physicalAddress, err := ls.getBrokerAddress(lr)
			if err != nil {
				return nil, err
			}

			return &LookupResult{
				LogicalAddr:  logicalAddress,
				PhysicalAddr: physicalAddress,
			}, nil

		case pb.CommandLookupTopicResponse_Failed:
			ls.log.WithFields(log.Fields{
				"topic":   topic,
				"error":   lr.GetError(),
				"message": lr.GetMessage(),
			}).Warn("Failed to lookup topic")
			return nil, errors.New(lr.GetError().String())
		}
	}

	return nil, errors.New("exceeded max number of redirection during topic lookup")
}