common/executor/executor.cpp (88 lines of code) (raw):

#include "executor.h" #include <photon/common/alog.h> #include <photon/common/event-loop.h> #include <photon/common/executor/executor.h> #include <photon/common/lockfree_queue.h> #include <photon/common/utility.h> #include <photon/io/fd-events.h> #include <photon/thread/thread-pool.h> #include <photon/thread/thread11.h> #include <atomic> #include <thread> namespace photon { class Executor::ExecutorImpl { public: using CBList = common::RingChannel<LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>>; std::unique_ptr<std::thread> th; photon::thread *pth = nullptr; CBList queue; photon::ThreadPoolBase *pool; ExecutorImpl(int init_ev, int init_io, const PhotonOptions& options, const ExecutorQueueOption& queue_options) : queue(queue_options.max_yield_turn, queue_options.max_yield_usec) { th.reset( new std::thread(&ExecutorImpl::launch, this, init_ev, init_io, options)); } ExecutorImpl() {} ~ExecutorImpl() { queue.send({}); if (th) th->join(); else while (pool) photon::thread_yield(); } struct CallArg { Delegate<void> task; photon::thread *backth; }; static void *do_event(void *arg) { auto a = (CallArg *)arg; auto task = a->task; photon::thread_yield_to(a->backth); task(); return nullptr; } void main_loop() { CallArg arg; arg.backth = photon::CURRENT; for (;;) { arg.task = queue.recv(); if (!arg.task) { return; } auto th = pool->thread_create(&ExecutorImpl::do_event, (void *)&arg); photon::thread_yield_to(th); } } void do_loop() { pth = photon::CURRENT; pool = photon::new_thread_pool(32); LOG_INFO("worker start"); main_loop(); LOG_INFO("worker finished"); photon::delete_thread_pool(pool); pool = nullptr; } void launch(int init_ev, int init_io, const PhotonOptions& options) { photon::init(init_ev, init_io, options); DEFER(photon::fini()); do_loop(); } }; Executor::Executor(int init_ev, int init_io, const PhotonOptions& options, const ExecutorQueueOption& queue_options) : e(new Executor::ExecutorImpl(init_ev, init_io, options, queue_options)) {} Executor::Executor(create_on_current_vcpu) : e(new Executor::ExecutorImpl()) {} Executor::~Executor() { delete e; } void Executor::_issue(ExecutorImpl *e, Delegate<void> act) { e->queue.send<ThreadPause>(act); } Executor *Executor::export_as_executor() { auto ret = new Executor(create_on_current_vcpu()); auto th = photon::thread_create11(&Executor::ExecutorImpl::do_loop, ret->e); photon::thread_yield_to(th); return ret; } } // namespace photon