in lib/AckGroupingTracker.cc [77:113]
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : msgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
}