in lib/ConsumerImpl.cc [1421:1452]
void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v2) {
auto needRedeliverMsgs = std::make_shared<std::set<MessageId>>();
auto needCallBack = std::make_shared<std::atomic<int>>(messageIds.size());
auto self = get_shared_this_ptr();
// TODO Support MAX_REDELIVER_UNACKNOWLEDGED Avoid redelivering too many messages
for (const auto& msgId : messageIds) {
processPossibleToDLQ(msgId,
[self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) {
if (!processSuccess) {
needRedeliverMsgs->emplace(msgId);
}
if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) {
self->redeliverMessages(*needRedeliverMsgs);
}
});
}
}
} else {
LOG_WARN("Connection not ready for Consumer - " << getConsumerId());
}
}