void ConsumerImpl::internalGetLastMessageIdAsync()

in lib/ConsumerImpl.cc [1522:1578]


void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
                                                 const DeadlineTimerPtr& timer,
                                                 BrokerGetLastMessageIdCallback callback) {
    ClientConnectionPtr cnx = getCnx().lock();
    if (cnx) {
        if (cnx->getServerProtocolVersion() >= proto::v12) {
            ClientImplPtr client = client_.lock();
            uint64_t requestId = client->newRequestId();
            LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
                                << ", requestId - " << requestId);

            auto self = get_shared_this_ptr();
            cnx->newGetLastMessageId(consumerId_, requestId)
                .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
                    if (result == ResultOk) {
                        LOG_DEBUG(getName() << "getLastMessageId: " << response);
                        Lock lock(mutexForMessageId_);
                        lastMessageIdInBroker_ = response.getLastMessageId();
                        lock.unlock();
                    } else {
                        LOG_ERROR(getName() << "Failed to getLastMessageId: " << result);
                    }
                    callback(result, response);
                });
        } else {
            LOG_ERROR(getName() << " Operation not supported since server protobuf version "
                                << cnx->getServerProtocolVersion() << " is older than proto::v12");
            callback(ResultUnsupportedVersionError, MessageId());
        }
    } else {
        TimeDuration next = std::min(remainTime, backoff->next());
        if (next.total_milliseconds() <= 0) {
            LOG_ERROR(getName() << " Client Connection not ready for Consumer");
            callback(ResultNotConnected, MessageId());
            return;
        }
        remainTime -= next;

        timer->expires_from_now(next);

        auto self = shared_from_this();
        timer->async_wait([this, backoff, remainTime, timer, next, callback,
                           self](const boost::system::error_code& ec) -> void {
            if (ec == boost::asio::error::operation_aborted) {
                LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
                return;
            }
            if (ec) {
                LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "].");
                return;
            }
            LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in "
                               << next.total_milliseconds() << " ms")
            this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback);
        });
    }
}