void ConsumerImpl::hasMessageAvailableAsync()

in lib/ConsumerImpl.cc [1563:1614]


void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
    bool compareMarkDeletePosition;
    {
        std::lock_guard<std::mutex> lock{mutexForMessageId_};
        compareMarkDeletePosition =
            (lastDequedMessageId_ == MessageId::earliest()) &&
            (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
    }
    if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
        auto self = get_shared_this_ptr();
        getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
            if (result != ResultOk) {
                callback(result, {});
                return;
            }
            auto handleResponse = [self, response, callback] {
                if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
                    // We only care about comparing ledger ids and entry ids as mark delete position
                    // doesn't have other ids such as batch index
                    auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
                                                                 response.getLastMessageId());
                    callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
                                                                                 : compareResult < 0);
                } else {
                    callback(ResultOk, false);
                }
            };
            if (self->config_.isStartMessageIdInclusive() &&
                !self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
                self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
                    if (result != ResultOk) {
                        callback(result, {});
                        return;
                    }
                    handleResponse();
                });
            } else {
                handleResponse();
            }
        });
    } else {
        if (hasMoreMessages()) {
            callback(ResultOk, true);
            return;
        }
        auto self = get_shared_this_ptr();
        getLastMessageIdAsync(
            [this, self, callback](Result result, const GetLastMessageIdResponse& response) {
                callback(result, (result == ResultOk) && hasMoreMessages());
            });
    }
}