in rocketmq-client-csharp/PushConsumer.cs [37:67]
public override async Task Start()
{
if (null == _messageListener)
{
throw new System.Exception("Bad configuration: message listener is required");
}
await base.Start();
// Step-1: Resolve topic routes
List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
foreach (var item in _topicFilterExpressionMap)
{
queryRouteTasks.Add(GetRouteFor(item.Key, true));
}
Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
// Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
await Heartbeat();
// Step-3: Scan load assignments that are assigned to current client
schedule(async () =>
{
await scanLoadAssignments();
}, 10, _scanAssignmentCTS.Token);
schedule(() =>
{
ScanExpiredProcessQueue();
}, 10, _scanExpiredProcessQueueCTS.Token);
}