void RPIServiceImpl::executeOnMainThread()

in src/RPIServiceImpl.cpp [711:761]


void RPIServiceImpl::executeOnMainThread(std::function<void()> const& f, ServerContext* context, bool immediate) {
  static const int STATE_PENDING = 0;
  static const int STATE_RUNNING = 1;
  static const int STATE_INTERRUPTING = 2;
  static const int STATE_INTERRUPTED = 3;
  static const int STATE_DONE = 4;
  std::atomic_int state(STATE_PENDING);
  std::mutex mutex;
  std::unique_lock<std::mutex> lock(mutex);
  std::condition_variable condVar;

  eventLoopExecute([&] {
    R_interrupts_pending = 0;
    int expected = STATE_PENDING;
    if (!state.compare_exchange_strong(expected, STATE_RUNNING)) return;
    auto finally = Finally{[&] {
      std::unique_lock<std::mutex> lock1(mutex);
      int value = STATE_RUNNING;
      if (!state.compare_exchange_strong(value, STATE_DONE) && value == STATE_INTERRUPTING) {
        condVar.wait(lock1, [&] { return state.load() == STATE_INTERRUPTED; });
      }
      state.store(STATE_DONE);
      condVar.notify_one();
      R_interrupts_pending = 0;
    }};
    try {
      f();
    } catch (RUnwindException const&) {
      throw;
    } catch (std::exception const& e) {
      std::cerr << "Exception: " << e.what() << "\n";
    } catch (...) {
      std::cerr << "Exception: unknown\n";
    }
  }, immediate);
  bool cancelled = false;
  while (!terminateProceed) {
    int currentState = state.load();
    if (currentState == STATE_DONE) break;
    if (currentState == STATE_RUNNING && !cancelled && context != nullptr && context->IsCancelled()) {
      int expected = STATE_RUNNING;
      if (state.compare_exchange_strong(expected, STATE_INTERRUPTING)) {
        cancelled = true;
        asyncInterrupt();
        state.store(STATE_INTERRUPTED);
        condVar.notify_one();
      }
    }
    condVar.wait_for(lock, std::chrono::milliseconds(25));
  }
}