in rocketmq-client-csharp/Client.cs [138:193]
private async Task UpdateTopicRoute()
{
HashSet<string> topics = new HashSet<string>();
foreach (var topic in topicsOfInterest_)
{
topics.Add(topic);
}
foreach (var item in _topicRouteTable)
{
topics.Add(item.Key);
}
Logger.Debug($"Fetch topic route for {topics.Count} topics");
// Wrap topics into list such that we can map async result to topic
List<string> topicList = new List<string>();
topicList.AddRange(topics);
List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
foreach (var item in topicList)
{
tasks.Add(GetRouteFor(item, true));
}
// Update topic route data
TopicRouteData[] result = await Task.WhenAll(tasks);
var i = 0;
foreach (var item in result)
{
if (null == item)
{
Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
++i;
continue;
}
if (0 == item.MessageQueues.Count)
{
Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
++i;
continue;
}
var topicName = item.MessageQueues[0].Topic.Name;
// Make assertion
Debug.Assert(topicName.Equals(topicList[i]));
var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
{
_topicRouteTable[topicName] = item;
}
++i;
}
}