public async ValueTask FindConnectionForTopic()

in src/DotPulsar/Internal/ConnectionPool.cs [70:112]


    public async ValueTask<IConnection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
    {
        var lookup = new CommandLookupTopic
        {
            Topic = topic,
            Authoritative = false,
            AdvertisedListenerName = _listenerName
        };

        var physicalUrl = _serviceUrl;

        while (true)
        {
            var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
            var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);

            response.Expect(BaseCommand.Type.LookupResponse);

            if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
                response.LookupTopicResponse.Throw();

            lookup.Authoritative = response.LookupTopicResponse.Authoritative;

            var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));

            if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
            {
                physicalUrl = lookupResponseServiceUrl;
                continue;
            }

            if (response.LookupTopicResponse.ProxyThroughServiceUrl)
            {
                var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
                return await GetConnection(url, cancellationToken).ConfigureAwait(false);
            }

            // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
            return lookupResponseServiceUrl.IsLoopback
                ? connection
                : await GetConnection(lookupResponseServiceUrl, cancellationToken).ConfigureAwait(false);
        }
    }