in lib/AckGroupingTracker.cc [90:138]
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
std::set<MessageId> ackMsgIds;
for (const auto& msgId : msgIds) {
if (auto chunkMessageId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
auto msgIdList = chunkMessageId->getChunkedMessageIds();
ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
} else {
ackMsgIds.insert(msgId);
}
}
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : ackMsgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
}