in lib/RetryableOperation.h [82:129]
Future<Result, T> runImpl(TimeDuration remainingTime) {
std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()};
func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (result == ResultOk) {
promise_.setValue(value);
return;
}
if (result != ResultRetryable) {
promise_.setFailed(result);
return;
}
if (remainingTime.total_milliseconds() <= 0) {
promise_.setFailed(ResultTimeout);
return;
}
auto delay = std::min(backoff_.next(), remainingTime);
timer_->expires_from_now(delay);
auto nextRemainingTime = remainingTime - delay;
LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
<< " ms");
timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
LOG_DEBUG("Timer for " << name_ << " is cancelled");
promise_.setFailed(ResultTimeout);
} else {
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
}
} else {
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
<< nextRemainingTime.total_milliseconds() << " ms");
runImpl(nextRemainingTime);
}
});
});
return promise_.getFuture();
}