in src/RPIServiceImpl.cpp [711:761]
void RPIServiceImpl::executeOnMainThread(std::function<void()> const& f, ServerContext* context, bool immediate) {
static const int STATE_PENDING = 0;
static const int STATE_RUNNING = 1;
static const int STATE_INTERRUPTING = 2;
static const int STATE_INTERRUPTED = 3;
static const int STATE_DONE = 4;
std::atomic_int state(STATE_PENDING);
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
std::condition_variable condVar;
eventLoopExecute([&] {
R_interrupts_pending = 0;
int expected = STATE_PENDING;
if (!state.compare_exchange_strong(expected, STATE_RUNNING)) return;
auto finally = Finally{[&] {
std::unique_lock<std::mutex> lock1(mutex);
int value = STATE_RUNNING;
if (!state.compare_exchange_strong(value, STATE_DONE) && value == STATE_INTERRUPTING) {
condVar.wait(lock1, [&] { return state.load() == STATE_INTERRUPTED; });
}
state.store(STATE_DONE);
condVar.notify_one();
R_interrupts_pending = 0;
}};
try {
f();
} catch (RUnwindException const&) {
throw;
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "Exception: unknown\n";
}
}, immediate);
bool cancelled = false;
while (!terminateProceed) {
int currentState = state.load();
if (currentState == STATE_DONE) break;
if (currentState == STATE_RUNNING && !cancelled && context != nullptr && context->IsCancelled()) {
int expected = STATE_RUNNING;
if (state.compare_exchange_strong(expected, STATE_INTERRUPTING)) {
cancelled = true;
asyncInterrupt();
state.store(STATE_INTERRUPTED);
condVar.notify_one();
}
}
condVar.wait_for(lock, std::chrono::milliseconds(25));
}
}