in lib/ConsumerImpl.cc [1707:1767]
void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
ResultCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected);
return;
}
auto expected = SeekStatus::NOT_STARTED;
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
<< static_cast<int>(expected));
callback(ResultNotAllowedError);
return;
}
const auto originalSeekMessageId = seekMessageId_.get();
if (boost::get<uint64_t>(&seekArg)) {
hasSoughtByTimestamp_.store(true, std::memory_order_release);
} else {
seekMessageId_ = *boost::get<MessageId>(&seekArg);
}
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = std::move(callback);
LOG_INFO(getName() << " Seeking subscription to " << seekArg);
std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
cnx->sendRequestWithId(seek, requestId)
.addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
const ResponseData& responseData) {
auto self = weakSelf.lock();
if (!self) {
callback(result);
return;
}
if (result == ResultOk) {
LOG_INFO(getName() << "Seek successfully");
ackGroupingTrackerPtr_->flushAndClean();
incomingMessages_.clear();
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = MessageId::earliest();
lock.unlock();
if (getCnx().expired()) {
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
startMessageId_ = seekMessageId_.get();
}
seekCallback_.release()(result);
}
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
seekMessageId_ = originalSeekMessageId;
seekStatus_ = SeekStatus::NOT_STARTED;
seekCallback_.release()(result);
}
});
}