private async Task ScanLoadAssignments()

in rocketmq-client-csharp/SimpleConsumer.cs [84:128]


        private async Task ScanLoadAssignments()
        {

            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
            List<string> topics = new List<string>();
            foreach (var sub in _subscriptions)
            {
                var request = new rmq::QueryAssignmentRequest();
                request.Topic = new rmq::Resource();
                request.Topic.ResourceNamespace = ResourceNamespace;
                request.Topic.Name = sub.Key;
                topics.Add(sub.Key);
                request.Group = new rmq::Resource();
                request.Group.Name = _group;
                request.Group.ResourceNamespace = ResourceNamespace;

                request.Endpoints = new rmq::Endpoints();
                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
                var address = new rmq::Address();
                address.Host = _accessPoint.Host;
                address.Port = _accessPoint.Port;
                request.Endpoints.Addresses.Add(address);

                var metadata = new Metadata();
                Signature.sign(this, metadata);
                tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
            }

            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);

            var i = 0;
            foreach (var assignments in list)
            {
                string topic = topics[i];
                if (null == assignments || 0 == assignments.Count)
                {
                    Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
                    ++i;
                    continue;
                }
                Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
                _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
                ++i;
            }
        }