lib/RetryableOperation.h (100 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#pragma once
#include <pulsar/Result.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <memory>
#include "Backoff.h"
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"
namespace pulsar {
template <typename T>
class RetryableOperation : public std::enable_shared_from_this<RetryableOperation<T>> {
    struct PassKey {
        explicit PassKey() {}
    };
    RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, int timeoutSeconds,
                       DeadlineTimerPtr timer)
        : name_(name),
          func_(std::move(func)),
          timeout_(boost::posix_time::seconds(timeoutSeconds)),
          backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
                   boost::posix_time::milliseconds(0)),
          timer_(timer) {}
   public:
    template <typename... Args>
    explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward<Args>(args)...) {}
    template <typename... Args>
    static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) {
        return std::make_shared<RetryableOperation<T>>(PassKey{}, std::forward<Args>(args)...);
    }
    Future<Result, T> run() {
        bool expected = false;
        if (!started_.compare_exchange_strong(expected, true)) {
            return promise_.getFuture();
        }
        return runImpl(timeout_);
    }
    void cancel() {
        promise_.setFailed(ResultDisconnected);
        boost::system::error_code ec;
        timer_->cancel(ec);
    }
   private:
    const std::string name_;
    std::function<Future<Result, T>()> func_;
    const TimeDuration timeout_;
    Backoff backoff_;
    Promise<Result, T> promise_;
    std::atomic_bool started_{false};
    DeadlineTimerPtr timer_;
    Future<Result, T> runImpl(TimeDuration remainingTime) {
        std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()};
        func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) {
            auto self = weakSelf.lock();
            if (!self) {
                return;
            }
            if (result == ResultOk) {
                promise_.setValue(value);
                return;
            }
            if (result != ResultRetryable) {
                promise_.setFailed(result);
                return;
            }
            if (remainingTime.total_milliseconds() <= 0) {
                promise_.setFailed(ResultTimeout);
                return;
            }
            auto delay = std::min(backoff_.next(), remainingTime);
            timer_->expires_from_now(delay);
            auto nextRemainingTime = remainingTime - delay;
            LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
                                   << " ms, remaining time: " << nextRemainingTime.total_milliseconds()
                                   << " ms");
            timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
                auto self = weakSelf.lock();
                if (!self) {
                    return;
                }
                if (ec) {
                    if (ec == boost::asio::error::operation_aborted) {
                        LOG_DEBUG("Timer for " << name_ << " is cancelled");
                        promise_.setFailed(ResultTimeout);
                    } else {
                        LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
                    }
                } else {
                    LOG_DEBUG("Run operation " << name_ << ", remaining time: "
                                               << nextRemainingTime.total_milliseconds() << " ms");
                    runImpl(nextRemainingTime);
                }
            });
        });
        return promise_.getFuture();
    }
    DECLARE_LOG_OBJECT()
};
}  // namespace pulsar