in lib/ConsumerImpl.cc [1563:1614]
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
bool compareMarkDeletePosition;
{
std::lock_guard<std::mutex> lock{mutexForMessageId_};
compareMarkDeletePosition =
(lastDequedMessageId_ == MessageId::earliest()) &&
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
}
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
callback(result, {});
return;
}
auto handleResponse = [self, response, callback] {
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
// We only care about comparing ledger ids and entry ids as mark delete position
// doesn't have other ids such as batch index
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId());
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
: compareResult < 0);
} else {
callback(ResultOk, false);
}
};
if (self->config_.isStartMessageIdInclusive() &&
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
return;
}
handleResponse();
});
} else {
handleResponse();
}
});
} else {
if (hasMoreMessages()) {
callback(ResultOk, true);
return;
}
auto self = get_shared_this_ptr();
getLastMessageIdAsync(
[this, self, callback](Result result, const GetLastMessageIdResponse& response) {
callback(result, (result == ResultOk) && hasMoreMessages());
});
}
}