in bistro/utils/PeriodicPoller.h [80:226]
std::shared_ptr<const ProcessedData> processRawData(
// nullptr if the previous refresh had an exception, or in the 1st refresh
std::shared_ptr<const ProcessedData> previous,
const RawData&
)
>
class PeriodicPoller final : boost::noncopyable {
/**
* Fetch the data being polled. If it has changed, assign it to
* out_raw_data, and return true. Otherwise, return false, and out_raw_data
* will not be used.
*
* Use the state to track if the raw data had changed -- it might store a
* 'last modified time' or a version. If the previous refresh succeeded,
* you will get the state from the previous run. Otherwise, you'll get a
* default-initialized PollerState, which should force you to treat any
* data you fetched as new (any other behavior is most likely an error).
*
* Not a template argument to allow the use of bound methods & lambdas.
*
* DANGER: the calling class should not bind 'this' to any function it
* might want to pass in here, since the containing instance is only
* partially initialized. If you must, capture specific member
* variables that had already been properly initialized.
*/
typedef std::function<bool (
RawData* out_raw_data,
// default-initialized unless the previous refresh succeeded
PollerState* state,
// nullptr if the previous refresh had an exception, or in the 1st refresh
std::shared_ptr<const ProcessedData> previous
)> FetchRawDataFn;
public:
PeriodicPoller(
std::string thread_name, // up to 15 chars
FetchRawDataFn fetch_raw_data, // see the typedef
std::chrono::milliseconds period,
std::chrono::milliseconds retry_period
) : fetchRawData_(std::move(fetch_raw_data)),
period_(period),
retryPeriod_(retry_period),
// Call the default constructor explicitly, so that we value initialize
// any basic types (like int). Otherwise they will have some garbage
// values.
loaderState_() {
// Make the first call to getDataOrThrow return real data.
auto initial_period = refresh();
// Though the ThreadedRepeatingFunctionRunner docs urge
// double-initialization, it is safe to make this thread in the
// constructor, since:
// - this is the last action in the constructor
// - this class is guaranteed not to have derived classes
// - the threads_ object is declared last, and hence is destroyed first
threads_.add(
std::move(thread_name),
std::bind(&PeriodicPoller::refresh, this),
initial_period
);
}
~PeriodicPoller() {
threads_.stop();
}
/**
* Returns the latest Data, or throws if there was an error fetching it.
* Only copies the shared_ptr, since the Data object is immutable.
* Thread-safe.
*
* The initial state of the internal ProcessedData pointer is nullptr.
* However, you will never get nullptr from getDataOrThrow, unless your
* code does one of these:
* - Your processRawData() returns nullptr.
* - Your fetchRawData() returns false on the initial refresh(), when the
* PollerState was default-initialized. That's not the right behavior
* for most uses.
*/
std::shared_ptr<const ProcessedData> getDataOrThrow() const {
auto r = result_.copy();
if (r.ex_.hasValue()) {
throw std::runtime_error(r.ex_.value());
}
return r.data_;
}
private:
// Similarly to std::future, holds either a data pointer, or an exception.
struct Result {
// This is null initially, and after exceptions, until the first
// successful refresh()
std::shared_ptr<const ProcessedData> data_;
// The callers of getDataOrThrow don't care to distinguish exception
// types, but if it did, folly::exception_wrapper would help.
folly::Optional<std::string> ex_;
void setData(std::shared_ptr<const ProcessedData>&& data) noexcept {
data_ = std::move(data);
ex_ = folly::none;
}
void setException(const std::exception& e) noexcept {
data_.reset();
ex_ = e.what();
}
};
// Invokes fetchRawData, processRawData and stores the result. Returns the
// appropriate delay depending on whether an exception was thrown.
std::chrono::milliseconds refresh() noexcept {
auto previous = result_->data_;
RawData raw_data;
try {
if (!fetchRawData_(&raw_data, &loaderState_, previous)) {
return result_->ex_ ? retryPeriod_ : period_;
}
} catch (const std::exception& e) {
// On exception, reset the fetcher state, see docblock for the reasons.
loaderState_ = PollerState();
result_->setException(e);
return retryPeriod_;
}
try {
// Don't hold the result_ lock while processing
auto new_data = processRawData(previous, raw_data);
result_->setData(std::move(new_data));
return period_;
} catch (const std::exception& e) {
result_->setException(e);
return retryPeriod_;
}
}
const FetchRawDataFn fetchRawData_;
const std::chrono::milliseconds period_;
const std::chrono::milliseconds retryPeriod_;
// Only accessed from refresh(), so no synchronization
PollerState loaderState_;
// Uninitialized until the first call to getDataOrThrow
folly::Synchronized<Result> result_;
// Declared last since the thread's callback may access other members.
folly::ThreadedRepeatingFunctionRunner threads_;
};