in lib/ConsumerImpl.cc [1522:1578]
void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v12) {
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
<< ", requestId - " << requestId);
auto self = get_shared_this_ptr();
cnx->newGetLastMessageId(consumerId_, requestId)
.addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "getLastMessageId: " << response);
Lock lock(mutexForMessageId_);
lastMessageIdInBroker_ = response.getLastMessageId();
lock.unlock();
} else {
LOG_ERROR(getName() << "Failed to getLastMessageId: " << result);
}
callback(result, response);
});
} else {
LOG_ERROR(getName() << " Operation not supported since server protobuf version "
<< cnx->getServerProtocolVersion() << " is older than proto::v12");
callback(ResultUnsupportedVersionError, MessageId());
}
} else {
TimeDuration next = std::min(remainTime, backoff->next());
if (next.total_milliseconds() <= 0) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected, MessageId());
return;
}
remainTime -= next;
timer->expires_from_now(next);
auto self = shared_from_this();
timer->async_wait([this, backoff, remainTime, timer, next, callback,
self](const boost::system::error_code& ec) -> void {
if (ec == boost::asio::error::operation_aborted) {
LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
return;
}
if (ec) {
LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "].");
return;
}
LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in "
<< next.total_milliseconds() << " ms")
this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback);
});
}
}