void AckGroupingTracker::doImmediateAck()

in lib/AckGroupingTracker.cc [77:113]


void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
    const auto cnx = connectionSupplier_();
    if (!cnx) {
        LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
        if (callback) {
            callback(ResultAlreadyClosed);
        }
        return;
    }

    if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
        if (waitResponse_) {
            const auto requestId = requestIdSupplier_();
            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
                .addListener([callback](Result result, const ResponseData&) {
                    if (callback) {
                        callback(result);
                    }
                });
        } else {
            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
            if (callback) {
                callback(ResultOk);
            }
        }
    } else {
        auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
        auto wrappedCallback = [callback, count](Result result) {
            if (--*count == 0 && callback) {
                callback(result);
            }
        };
        for (auto&& msgId : msgIds) {
            doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
        }
    }
}