agent/native/libcommon/code/PeriodicTaskExecutor.h (98 lines of code) (raw):

#pragma once #include "ForkableInterface.h" #include <atomic> #include <condition_variable> #include <functional> #include <stop_token> #include <thread> #include <vector> namespace elasticapm::php { class PeriodicTaskExecutor : public ForkableInterface { private: auto getThreadWorkerFunction() { return [this]() { work(); }; } public: using clock_t = std::chrono::steady_clock; using time_point_t = std::chrono::time_point<clock_t, std::chrono::milliseconds>; using task_t = std::function<void(time_point_t)>; using worker_init_t = std::function<void()>; PeriodicTaskExecutor(std::vector<task_t> periodicTasks, worker_init_t workerInit = {}) : periodicTasks_(periodicTasks), workerInit_(std::move(workerInit)), thread_(getThreadWorkerFunction()) { } ~PeriodicTaskExecutor() { shutdown(); if (thread_.joinable()) { thread_.join(); } } void work() { if (workerInit_) { workerInit_(); } std::unique_lock<std::mutex> lock(mutex_); while(working_) { pauseCondition_.wait(lock, [this]() -> bool { return resumed_ || !working_; }); if (!working_) { break; } lock.unlock(); std::this_thread::sleep_for(sleepInterval_); for (auto const &task : periodicTasks_) { task(std::chrono::time_point_cast<std::chrono::milliseconds>(clock_t::now())); } lock.lock(); } } void prefork() final { shutdown(); if (thread_.joinable()) { thread_.join(); } } void postfork([[maybe_unused]] bool child) final { working_ = true; mutex_.lock(); thread_ = std::thread(getThreadWorkerFunction()); mutex_.unlock(); pauseCondition_.notify_all(); } void resumePeriodicTasks() { { std::lock_guard<std::mutex> lock(mutex_); resumed_ = true; } pauseCondition_.notify_all(); } void suspendPeriodicTasks() { { std::lock_guard<std::mutex> lock(mutex_); resumed_ = false; } pauseCondition_.notify_all(); } void setInterval(std::chrono::milliseconds interval) { sleepInterval_ = interval; } private: PeriodicTaskExecutor(const PeriodicTaskExecutor&) = delete; PeriodicTaskExecutor& operator=(const PeriodicTaskExecutor&) = delete; void shutdown() { { std::lock_guard<std::mutex> lock(mutex_); working_ = false; } pauseCondition_.notify_all(); } private: std::chrono::milliseconds sleepInterval_ = std::chrono::milliseconds(20); std::vector<task_t> periodicTasks_; worker_init_t workerInit_; std::mutex mutex_; std::thread thread_; std::condition_variable pauseCondition_; bool working_ = true; bool resumed_ = false; }; }