void ConsumerImpl::redeliverUnacknowledgedMessages()

in lib/ConsumerImpl.cc [1421:1452]


void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
    if (messageIds.empty()) {
        return;
    }
    if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
        redeliverUnacknowledgedMessages();
        return;
    }

    ClientConnectionPtr cnx = getCnx().lock();
    if (cnx) {
        if (cnx->getServerProtocolVersion() >= proto::v2) {
            auto needRedeliverMsgs = std::make_shared<std::set<MessageId>>();
            auto needCallBack = std::make_shared<std::atomic<int>>(messageIds.size());
            auto self = get_shared_this_ptr();
            // TODO Support MAX_REDELIVER_UNACKNOWLEDGED Avoid redelivering too many messages
            for (const auto& msgId : messageIds) {
                processPossibleToDLQ(msgId,
                                     [self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) {
                                         if (!processSuccess) {
                                             needRedeliverMsgs->emplace(msgId);
                                         }
                                         if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) {
                                             self->redeliverMessages(*needRedeliverMsgs);
                                         }
                                     });
            }
        }
    } else {
        LOG_WARN("Connection not ready for Consumer - " << getConsumerId());
    }
}