in rocketmq-client-csharp/PushConsumer.cs [114:147]
private void checkAndUpdateAssignments(List<rmq::Assignment> assignments)
{
if (assignments.Count == 0)
{
return;
}
string topic = assignments[0].MessageQueue.Topic.Name;
// Compare to generate or cancel pop-cycles
List<rmq::Assignment> existing;
_topicAssignmentsMap.TryGetValue(topic, out existing);
foreach (var assignment in assignments)
{
if (null == existing || !existing.Contains(assignment))
{
ExecutePop(assignment);
}
}
if (null != existing)
{
foreach (var assignment in existing)
{
if (!assignments.Contains(assignment))
{
Logger.Info($"Stop receiving messages from {assignment.MessageQueue.ToString()}");
CancelPop(assignment);
}
}
}
}