std::shared_ptr processRawData()

in bistro/utils/PeriodicPoller.h [80:226]


  std::shared_ptr<const ProcessedData> processRawData(
    // nullptr if the previous refresh had an exception, or in the 1st refresh
    std::shared_ptr<const ProcessedData> previous,
    const RawData&
  )
>
class PeriodicPoller final : boost::noncopyable {

/**
 * Fetch the data being polled. If it has changed, assign it to
 * out_raw_data, and return true.  Otherwise, return false, and out_raw_data
 * will not be used.
 *
 * Use the state to track if the raw data had changed -- it might store a
 * 'last modified time' or a version.  If the previous refresh succeeded,
 * you will get the state from the previous run.  Otherwise, you'll get a
 * default-initialized PollerState, which should force you to treat any
 * data you fetched as new (any other behavior is most likely an error).
 *
 * Not a template argument to allow the use of bound methods & lambdas.
 *
 * DANGER: the calling class should not bind 'this' to any function it
 * might want to pass in here, since the containing instance is only
 * partially initialized.  If you must, capture specific member
 * variables that had already been properly initialized.
 */
typedef std::function<bool (
  RawData* out_raw_data,
  // default-initialized unless the previous refresh succeeded
  PollerState* state,
  // nullptr if the previous refresh had an exception, or in the 1st refresh
  std::shared_ptr<const ProcessedData> previous
)> FetchRawDataFn;

public:
  PeriodicPoller(
    std::string thread_name,  // up to 15 chars
    FetchRawDataFn fetch_raw_data,  // see the typedef
    std::chrono::milliseconds period,
    std::chrono::milliseconds retry_period
  ) : fetchRawData_(std::move(fetch_raw_data)),
      period_(period),
      retryPeriod_(retry_period),
      // Call the default constructor explicitly, so that we value initialize
      // any basic types (like int). Otherwise they will have some garbage
      // values.
      loaderState_() {

    // Make the first call to getDataOrThrow return real data.
    auto initial_period = refresh();
    // Though the ThreadedRepeatingFunctionRunner docs urge
    // double-initialization, it is safe to make this thread in the
    // constructor, since:
    //  - this is the last action in the constructor
    //  - this class is guaranteed not to have derived classes
    //  - the threads_ object is declared last, and hence is destroyed first
    threads_.add(
      std::move(thread_name),
      std::bind(&PeriodicPoller::refresh, this),
      initial_period
    );
  }

  ~PeriodicPoller() {
    threads_.stop();
  }

  /**
   * Returns the latest Data, or throws if there was an error fetching it.
   * Only copies the shared_ptr, since the Data object is immutable.
   * Thread-safe.
   *
   * The initial state of the internal ProcessedData pointer is nullptr.
   * However, you will never get nullptr from getDataOrThrow, unless your
   * code does one of these:
   *  - Your processRawData() returns nullptr.
   *  - Your fetchRawData() returns false on the initial refresh(), when the
   *    PollerState was default-initialized.  That's not the right behavior
   *    for most uses.
   */
  std::shared_ptr<const ProcessedData> getDataOrThrow() const {
    auto r = result_.copy();
    if (r.ex_.hasValue()) {
      throw std::runtime_error(r.ex_.value());
    }
    return r.data_;
  }

private:
  // Similarly to std::future, holds either a data pointer, or an exception.
  struct Result {
    // This is null initially, and after exceptions, until the first
    // successful refresh()
    std::shared_ptr<const ProcessedData> data_;
    // The callers of getDataOrThrow don't care to distinguish exception
    // types, but if it did, folly::exception_wrapper would help.
    folly::Optional<std::string> ex_;

    void setData(std::shared_ptr<const ProcessedData>&& data) noexcept {
      data_ = std::move(data);
      ex_ = folly::none;
    }

    void setException(const std::exception& e) noexcept {
      data_.reset();
      ex_ = e.what();
    }
  };

  // Invokes fetchRawData, processRawData and stores the result. Returns the
  // appropriate delay depending on whether an exception was thrown.
  std::chrono::milliseconds refresh() noexcept {
    auto previous = result_->data_;
    RawData raw_data;
    try {
      if (!fetchRawData_(&raw_data, &loaderState_, previous)) {
        return result_->ex_ ? retryPeriod_ : period_;
      }
    } catch (const std::exception& e) {
      // On exception, reset the fetcher state, see docblock for the reasons.
      loaderState_ = PollerState();
      result_->setException(e);
      return retryPeriod_;
    }
    try {
      // Don't hold the result_ lock while processing
      auto new_data = processRawData(previous, raw_data);
      result_->setData(std::move(new_data));
      return period_;
    } catch (const std::exception& e) {
      result_->setException(e);
      return retryPeriod_;
    }
  }

  const FetchRawDataFn fetchRawData_;
  const std::chrono::milliseconds period_;
  const std::chrono::milliseconds retryPeriod_;

  // Only accessed from refresh(), so no synchronization
  PollerState loaderState_;
  // Uninitialized until the first call to getDataOrThrow
  folly::Synchronized<Result> result_;

  // Declared last since the thread's callback may access other members.
  folly::ThreadedRepeatingFunctionRunner threads_;
};