in rpc/rpc.cpp [328:381]
virtual int serve(IStream* stream) override
{
if (unlikely(!m_running))
return -1;
#pragma GCC diagnostic push
#if defined(__clang__)
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#endif
#if __GNUC__ >= 12
#pragma GCC diagnostic ignored "-Wdangling-pointer"
#endif
ThreadLink node;
m_list.push_back(&node);
#pragma GCC diagnostic pop
DEFER(m_list.erase(&node));
// stream serve refcount
int stream_serv_count = 0;
photon::mutex w_lock;
photon::condition_variable stream_cv;
// once serve exit, stream will destruct
// make sure all requests relies on this stream are finished
DEFER({
while (stream_serv_count > 0) stream_cv.wait_no_lock();
});
if (stream_accept_notify) stream_accept_notify(stream);
DEFER(if (stream_close_notify) stream_close_notify(stream));
while(likely(m_running)) {
Context context(this, stream);
context.stream_serv_count = &stream_serv_count;
context.stream_cv = &stream_cv;
context.w_lock = &w_lock;
int ret = context.read_request();
if (ret < 0) {
// should only shutdown read, for other threads
// might still writing
ERRNO e;
stream->shutdown(ShutdownHow::ReadWrite);
if (e.no == ECANCELED || e.no == EAGAIN || e.no == EINTR || e.no == ENXIO) {
return -1;
} else {
LOG_ERROR_RETURN(0, -1, "Read request failed `, `", VALUE(ret), e);
}
}
context.got_it = false;
m_thread_pool->thread_create(&async_serve, &context);
stream_serv_count ++;
while(!context.got_it)
thread_yield();
}
return 0;
}