void AckGroupingTracker::doImmediateAck()

in lib/AckGroupingTracker.cc [90:138]


void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
    const auto cnx = connectionSupplier_();
    if (!cnx) {
        LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
        if (callback) {
            callback(ResultAlreadyClosed);
        }
        return;
    }

    std::set<MessageId> ackMsgIds;

    for (const auto& msgId : msgIds) {
        if (auto chunkMessageId =
                std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
            auto msgIdList = chunkMessageId->getChunkedMessageIds();
            ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
        } else {
            ackMsgIds.insert(msgId);
        }
    }

    if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
        if (waitResponse_) {
            const auto requestId = requestIdSupplier_();
            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
                .addListener([callback](Result result, const ResponseData&) {
                    if (callback) {
                        callback(result);
                    }
                });
        } else {
            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
            if (callback) {
                callback(ResultOk);
            }
        }
    } else {
        auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
        auto wrappedCallback = [callback, count](Result result) {
            if (--*count == 0 && callback) {
                callback(result);
            }
        };
        for (auto&& msgId : ackMsgIds) {
            doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
        }
    }
}