in rocketmq-client-csharp/PushConsumer.cs [161:211]
private async Task ExecutePop0(rmq::Assignment assignment)
{
Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
while (true)
{
try
{
ProcessQueue processQueue;
if (!_processQueueMap.TryGetValue(assignment, out processQueue))
{
break;
}
if (processQueue.Dropped)
{
break;
}
List<Message> messages = await base.ReceiveMessage(assignment, _group);
processQueue.LastReceiveTime = System.DateTime.UtcNow;
// TODO: cache message and dispatch them
List<Message> failed = new List<Message>();
await _messageListener.Consume(messages, failed);
foreach (var message in failed)
{
await base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
}
foreach (var message in messages)
{
if (!failed.Contains(message))
{
bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
if (!success)
{
//TODO: log error.
}
}
}
}
catch (System.Exception)
{
// TODO: log exception raised.
}
}
}