thread/st.cpp (341 lines of code) (raw):
#include "st.h"
#include <inttypes.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <sys/resource.h>
#include <photon/photon.h>
#include <photon/thread/thread.h>
#include <photon/thread/thread-key.h>
#include <photon/io/fd-events.h>
#include <photon/net/basic_socket.h>
#include <photon/common/iovector.h>
#include <photon/common/alog.h>
#include <photon/common/timeout.h>
static int _eventsys = 0;
int st_get_eventsys(void) {
return _eventsys;
}
int st_set_eventsys(int eventsys) {
auto es = (uint32_t)eventsys;
if (es > ST_EVENTSYS_IOURING)
LOG_ERROR_RETURN(EINVAL, -1, "unknown eventsys ", eventsys);
_eventsys = es;
return 0;
}
int st_init(void) {
auto engine = photon::INIT_EVENT_DEFAULT;
#if defined(__linux__)
if (_eventsys == ST_EVENTSYS_IOURING) {
engine ^= photon::INIT_EVENT_EPOLL;
}
#endif
return photon::init(engine, 0, {
.libaio_queue_depth = 0,
.use_pooled_stack_allocator = true,
.bypass_threadpool = true,
});
}
int st_getfdlimit(void) {
struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) < 0)
LOG_ERRNO_RETURN(0, -1, "failed to getrlimit()");
return rlim.rlim_max;
}
const char *st_get_eventsys_name(void) {
return "event_sys_name";
}
// st_switch_cb_t st_set_switch_in_cb(st_switch_cb_t cb);
// st_switch_cb_t st_set_switch_out_cb(st_switch_cb_t cb);
st_thread_t st_thread_create(void *(*start)(void *arg), void *arg,
int joinable, int stack_size) {
if (stack_size == 0)
stack_size = photon::DEFAULT_STACK_SIZE;
auto th = photon::thread_create(start, arg, stack_size);
if (joinable) photon::thread_enable_join(th);
return th;
}
void st_thread_exit(void *retval) {
photon::thread_exit(retval);
}
int st_thread_join(st_thread_t thread, void **retvalp) {
auto retval = photon::thread_join((photon::join_handle*)thread);
if (retvalp) *retvalp = retval;
return 0;
}
st_thread_t st_thread_self(void) {
return photon::CURRENT;
}
void st_thread_interrupt(st_thread_t th) {
thread_interrupt((photon::thread*)th);
}
int st_sleep(int secs) {
return photon::thread_sleep(secs);
}
int st_usleep(st_utime_t usecs) {
return photon::thread_usleep(usecs);
}
int st_randomize_stacks(int on) {
return 0;
}
int st_key_create(int *keyp, void (*destructor)(void *)) {
photon::thread_key_t key;
auto ret = photon::thread_key_create(&key, destructor);
if (ret < 0)
LOG_ERRNO_RETURN(0, -1, "failed to thread_key_create()");
if (key > INT_MAX) {
photon::thread_key_delete(key);
LOG_ERROR_RETURN(EOVERFLOW, -1, "thread key space overflow");
}
*keyp = key;
return 0;
}
int st_key_getlimit(void) {
return photon::THREAD_KEYS_MAX;
}
int st_thread_setspecific(int key, void *value) {
photon::thread_key_t k = key;
return photon::thread_setspecific(k, value);
}
void *st_thread_getspecific(int key) {
photon::thread_key_t k = key;
return photon::thread_getspecific(k);
}
// Synchronization
st_cond_t st_cond_new(void) {
return new photon::condition_variable;
}
int st_cond_destroy(st_cond_t cvar) {
delete (photon::condition_variable*)cvar;
return 0;
}
int st_cond_wait(st_cond_t cvar) {
return st_cond_timedwait(cvar, -1UL);
}
int st_cond_timedwait(st_cond_t cvar, st_utime_t timeout) {
auto cv = (photon::condition_variable*)cvar;
return cv->wait_no_lock(timeout);
}
int st_cond_signal(st_cond_t cvar) {
auto cv = (photon::condition_variable*)cvar;
return cv->signal(), 0;
}
int st_cond_broadcast(st_cond_t cvar) {
auto cv = (photon::condition_variable*)cvar;
return cv->broadcast(), 0;
}
st_mutex_t st_mutex_new(void) {
return new photon::mutex;
}
int st_mutex_destroy(st_mutex_t lock) {
delete (photon::mutex*) lock;
return 0;
}
int st_mutex_lock(st_mutex_t lock) {
auto m = (photon::mutex*) lock;
return m->lock();
}
int st_mutex_trylock(st_mutex_t lock) {
auto m = (photon::mutex*) lock;
return m->try_lock();
}
int st_mutex_unlock(st_mutex_t lock) {
auto m = (photon::mutex*) lock;
return m->unlock(), 0;
}
time_t st_time(void) {
return photon::now / 1000 /1000;
}
st_utime_t st_utime(void) {
return photon::now;
}
int st_set_utime_function(st_utime_t (*func)(void)) {
return 0;
}
int st_timecache_set(int on) {
return 0;
}
struct netfd {
int fd;
void* specific = nullptr;
void (*destructor)(void *);
netfd(int fd) : fd(fd) { }
netfd(int fd, bool) : fd(fd) {
photon::net::set_fd_nonblocking(fd);
}
~netfd() {
if (specific && destructor)
destructor(specific);
if (fd >= 0)
::close(fd);
}
};
inline int getfd(st_netfd_t fd) {
return static_cast<netfd*>(fd)->fd;
}
// I/O Functions
st_netfd_t st_netfd_open(int osfd) {
return new netfd(osfd);
}
st_netfd_t st_netfd_open_socket(int osfd) {
return new netfd(osfd, true);
}
void st_netfd_free(st_netfd_t fd) {
delete (netfd*)fd;
}
int st_netfd_close(st_netfd_t fd) {
return ::close(getfd(fd));
}
int st_netfd_fileno(st_netfd_t fd) {
return getfd(fd);
}
void st_netfd_setspecific(st_netfd_t fd, void *value, void (*destructor)(void *)) {
auto _fd = (netfd*)fd;
_fd->specific = value;
_fd->destructor = destructor;
}
void *st_netfd_getspecific(st_netfd_t fd) {
auto _fd = (netfd*)fd;
return _fd->specific;
}
// On some platforms (e.g., Solaris 2.5 and possibly other SVR4 implementations)
// accept(3) calls from different processes on the same listening socket (see
// bind(3), listen(3)) must be serialized. This function causes all subsequent
// accept(3) calls made by st_accept() on the specified file descriptor object
// to be serialized.
int st_netfd_serialize_accept(st_netfd_t fd) {
return 0; // we do not support thoses platforms
}
inline uint32_t to_photon_events(int poll_event) {
uint32_t events = 0;
if (poll_event & POLLIN) events |= photon::EVENT_READ;
if (poll_event & POLLOUT) events |= photon::EVENT_WRITE;
if (poll_event & POLLPRI) events |= photon::EVENT_ERROR;
return events;
}
int st_netfd_poll(st_netfd_t fd, int how, st_utime_t timeout) {
return photon::get_vcpu()->master_event_engine->
wait_for_fd(getfd(fd), to_photon_events(how), timeout);
}
st_netfd_t st_accept(st_netfd_t fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout) {
static_assert(sizeof(socklen_t) == sizeof(int), "...");
auto connection = photon::net::accept(
getfd(fd), addr, (socklen_t*)addrlen, timeout);
if (connection < 0)
LOG_ERRNO_RETURN(0, nullptr, "failed to accept new connection");
return st_netfd_open_socket(connection);
}
int st_connect(st_netfd_t fd, const struct sockaddr *addr, int addrlen, st_utime_t timeout) {
return photon::net::connect(getfd(fd), addr, addrlen, timeout);
}
ssize_t st_read(st_netfd_t fd, void *buf, size_t nbyte, st_utime_t timeout) {
return photon::net::read(getfd(fd), buf, nbyte, timeout);
}
ssize_t st_read_fully(st_netfd_t fd, void *buf, size_t nbyte, st_utime_t timeout) {
return photon::net::read_n(getfd(fd), buf, nbyte, timeout);
}
int st_read_resid(st_netfd_t fd, void *buf, size_t *resid, st_utime_t timeout) {
auto ret = photon::net::read_n(getfd(fd), buf, *resid, timeout);
if (ret > 0) *resid -= ret;
return ret;
}
ssize_t st_readv(st_netfd_t fd, const struct iovec *iov, int iov_size, st_utime_t timeout) {
return photon::net::readv(getfd(fd), iov, iov_size, timeout);
}
int st_readv_resid(st_netfd_t fd, struct iovec **iov, int *iov_size, st_utime_t timeout) {
if (unlikely(!iov || !*iov || !iov_size || *iov_size <= 0))
LOG_ERROR_RETURN(EINVAL, -1, "invalid arguments");
photon::Timeout tmo(timeout);
iovector_view v(*iov, *iov_size);
auto ret = DOIO_LOOP(photon::net::readv(getfd(fd), v.iov, v.iovcnt, tmo),
photon::net::BufStepV(v));
*iov = v.iov;
*iov_size = v.iovcnt;
return ret;
}
ssize_t st_write(st_netfd_t fd, const void *buf, size_t nbyte, st_utime_t timeout) {
return photon::net::write(getfd(fd), buf, nbyte, timeout);
}
int st_write_resid(st_netfd_t fd, const void *buf, size_t *resid, st_utime_t timeout) {
auto ret = photon::net::write_n(getfd(fd), buf, *resid, timeout);
if (ret > 0) *resid -= ret;
return ret;
}
ssize_t st_writev(st_netfd_t fd, const struct iovec *iov, int iov_size, st_utime_t timeout) {
return photon::net::writev(getfd(fd), iov, iov_size, timeout);
}
int st_writev_resid(st_netfd_t fd, struct iovec **iov, int *iov_size, st_utime_t timeout) {
if (unlikely(!iov || !*iov || !iov_size || *iov_size <= 0))
LOG_ERROR_RETURN(EINVAL, -1, "invalid arguments");
photon::Timeout tmo(timeout);
iovector_view v(*iov, *iov_size);
// TODO:: this implementation of DOIO_LOOP incurs an extra wait_for_fd()
// in every iteration, should fix it.
auto ret = DOIO_LOOP(photon::net::writev(getfd(fd), v.iov, v.iovcnt, tmo),
photon::net::BufStepV(v));
*iov = v.iov;
*iov_size = v.iovcnt;
return ret;
}
using photon::net::doio_once;
using photon::net::doio_loop;
int st_recvfrom(st_netfd_t fd, void *buf, int len, struct sockaddr *addr, int *addrlen, st_utime_t timeout) {
iovec iov{buf, (size_t)len};
struct msghdr hdr {
.msg_name = (void*)addr,
.msg_namelen = addrlen ? (socklen_t)*addrlen : 0,
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = nullptr,
.msg_controllen = 0,
.msg_flags = 0,
};
auto ret = st_recvmsg(fd, &hdr, 0, timeout);
if (addrlen) *addrlen = hdr.msg_namelen;
return ret;
}
int st_sendto(st_netfd_t fd, const void *buf, int len, struct sockaddr *addr, int addrlen, st_utime_t timeout) {
iovec iov{(void*)buf, (size_t)len};
struct msghdr hdr {
.msg_name = (void*)addr,
.msg_namelen = (socklen_t)addrlen,
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = nullptr,
.msg_controllen = 0,
.msg_flags = 0,
};
return st_sendmsg(fd, &hdr, 0, timeout);
}
int st_recvmsg(st_netfd_t fd, struct msghdr *msg, int flags, st_utime_t timeout) {
return DOIO_ONCE(::recvmsg(getfd(fd), msg, flags | MSG_DONTWAIT),
photon::wait_for_fd_readable(getfd(fd), timeout));
}
int st_sendmsg(st_netfd_t fd, const struct msghdr *msg, int flags, st_utime_t timeout) {
return DOIO_ONCE(::sendmsg(getfd(fd), msg, flags | MSG_DONTWAIT | MSG_NOSIGNAL),
photon::wait_for_fd_writable(getfd(fd), timeout));
}
st_netfd_t st_open(const char *path, int oflags, mode_t mode) {
int fd = ::open(path, oflags, mode);
if (fd < 0)
LOG_ERRNO_RETURN(0, nullptr, "failed to open(`, `, `) file", path, oflags, mode);
return new netfd(fd);
}
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout) {
if (!pds || !npds) return 0;
auto eng = photon::new_default_cascading_engine();
DEFER(delete eng);
for (int i = 0; i < npds; ++i) {
auto& p = pds[i];
if (p.fd < 0)
LOG_ERROR_RETURN(EINVAL, -1, "invalid fd ", p.fd);
auto events = to_photon_events(p.events);
eng->add_interest({p.fd, events, (void*)(int64_t)i});
}
constexpr int MAX = 32;
void* data[MAX];
int n = 0;
again:
auto ret = eng->wait_for_events(data, MAX, timeout);
if (ret < 0)
LOG_ERRNO_RETURN(0, -1, "failed to wait_for_events() via default cascading engine");
n += ret;
for (ssize_t i = 0; i < ret; ++i) {
auto j = (uint64_t)data[i];
if (j >= (uint64_t)npds)
LOG_ERROR_RETURN(EOVERFLOW, -1, "reap event data overflow");
pds[j].revents = pds[j].events;
}
if (ret == MAX)
goto again;
return n;
}