in rocketmq-client-csharp/SimpleConsumer.cs [84:128]
private async Task ScanLoadAssignments()
{
List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
List<string> topics = new List<string>();
foreach (var sub in _subscriptions)
{
var request = new rmq::QueryAssignmentRequest();
request.Topic = new rmq::Resource();
request.Topic.ResourceNamespace = ResourceNamespace;
request.Topic.Name = sub.Key;
topics.Add(sub.Key);
request.Group = new rmq::Resource();
request.Group.Name = _group;
request.Group.ResourceNamespace = ResourceNamespace;
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
var address = new rmq::Address();
address.Host = _accessPoint.Host;
address.Port = _accessPoint.Port;
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
Signature.sign(this, metadata);
tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
}
List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
var i = 0;
foreach (var assignments in list)
{
string topic = topics[i];
if (null == assignments || 0 == assignments.Count)
{
Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
++i;
continue;
}
Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
_topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
++i;
}
}