in src/DotPulsar/Internal/ConnectionPool.cs [70:112]
public async ValueTask<IConnection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
{
var lookup = new CommandLookupTopic
{
Topic = topic,
Authoritative = false,
AdvertisedListenerName = _listenerName
};
var physicalUrl = _serviceUrl;
while (true)
{
var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.LookupResponse);
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
response.LookupTopicResponse.Throw();
lookup.Authoritative = response.LookupTopicResponse.Authoritative;
var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
{
physicalUrl = lookupResponseServiceUrl;
continue;
}
if (response.LookupTopicResponse.ProxyThroughServiceUrl)
{
var url = new PulsarUrl(physicalUrl, lookupResponseServiceUrl);
return await GetConnection(url, cancellationToken).ConfigureAwait(false);
}
// LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
return lookupResponseServiceUrl.IsLoopback
? connection
: await GetConnection(lookupResponseServiceUrl, cancellationToken).ConfigureAwait(false);
}
}