void ConsumerImpl::seekAsyncInternal()

in lib/ConsumerImpl.cc [1707:1767]


void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
                                     ResultCallback callback) {
    ClientConnectionPtr cnx = getCnx().lock();
    if (!cnx) {
        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
        callback(ResultNotConnected);
        return;
    }

    auto expected = SeekStatus::NOT_STARTED;
    if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
        LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
                            << static_cast<int>(expected));
        callback(ResultNotAllowedError);
        return;
    }

    const auto originalSeekMessageId = seekMessageId_.get();
    if (boost::get<uint64_t>(&seekArg)) {
        hasSoughtByTimestamp_.store(true, std::memory_order_release);
    } else {
        seekMessageId_ = *boost::get<MessageId>(&seekArg);
    }
    seekStatus_ = SeekStatus::IN_PROGRESS;
    seekCallback_ = std::move(callback);
    LOG_INFO(getName() << " Seeking subscription to " << seekArg);

    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};

    cnx->sendRequestWithId(seek, requestId)
        .addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
                                                                       const ResponseData& responseData) {
            auto self = weakSelf.lock();
            if (!self) {
                callback(result);
                return;
            }
            if (result == ResultOk) {
                LOG_INFO(getName() << "Seek successfully");
                ackGroupingTrackerPtr_->flushAndClean();
                incomingMessages_.clear();
                Lock lock(mutexForMessageId_);
                lastDequedMessageId_ = MessageId::earliest();
                lock.unlock();
                if (getCnx().expired()) {
                    // It's during reconnection, complete the seek future after connection is established
                    seekStatus_ = SeekStatus::COMPLETED;
                } else {
                    if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
                        startMessageId_ = seekMessageId_.get();
                    }
                    seekCallback_.release()(result);
                }
            } else {
                LOG_ERROR(getName() << "Failed to seek: " << result);
                seekMessageId_ = originalSeekMessageId;
                seekStatus_ = SeekStatus::NOT_STARTED;
                seekCallback_.release()(result);
            }
        });
}