gloo/transport/uv/libuv.h (527 lines of code) (raw):

/** * Copyright (c) 2019-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once // This file contains a libuv wrapper for C++. // // The code in this file is in part included verbatim from and // otherwise inspired by `uvw` (https://github.com/skypjack/uvw). // See the `LICENSE.uvw` file for a copy of the `uvw` license. // // Reasons for not using `uvw` directly: // * Upstream requires C++17. Gloo requires C++11. // * No way to pass externally managed memory to the read functions. // #include <algorithm> #include <cassert> #include <chrono> #include <cstdio> #include <cstring> #include <deque> #include <functional> #include <list> #include <memory> #include <tuple> #include <utility> #include <vector> #include <stdio.h> #include <uv.h> #define UV_ASSERT(rv, prefix) \ { \ if ((rv) != 0) { \ fprintf( \ stderr, \ "[%s:%d] %s: %s\n", \ __FILE__, \ __LINE__, \ prefix, \ uv_strerror(rv)); \ abort(); \ } \ } \ while (0) \ ; namespace gloo { namespace transport { namespace uv { namespace libuv { // Base class for handles. // Used to differentiate between handles and requests. struct BaseHandle {}; // Base class for requests. // Used to differentiate between handles and requests. struct BaseRequest {}; // Event type for errors. struct ErrorEvent { public: explicit ErrorEvent(int error) : error_(error) {} operator bool() const { return error_ != 0; } int code() const { return error_; } const char* what() const { return uv_strerror(error_); } private: const int error_; }; // Event emitter. // // Both handles and requests emit events. // template <typename T> class Emitter { struct BaseHandler { virtual ~BaseHandler() noexcept = default; }; template <typename E> struct Handler final : BaseHandler { using Listener = std::function<void(E&, T&)>; using Element = std::pair<bool, Listener>; using ListenerList = std::list<Element>; using Connection = typename ListenerList::iterator; Connection once(Listener f) { return onceL.emplace(onceL.cend(), false, std::move(f)); } Connection on(Listener f) { return onL.emplace(onL.cend(), false, std::move(f)); } void erase(Connection conn) noexcept { conn->first = true; if (!publishing) { auto pred = [](Element& element) { return element.first; }; onceL.remove_if(pred); onL.remove_if(pred); } } void publish(E event, T& ref) { ListenerList currentL; onceL.swap(currentL); auto func = [&event, &ref](Element& element) { return element.first ? void() : element.second(event, ref); }; publishing = true; std::for_each(onL.rbegin(), onL.rend(), func); std::for_each(currentL.rbegin(), currentL.rend(), func); publishing = false; onL.remove_if([](Element& element) { return element.first; }); } private: bool publishing{false}; ListenerList onceL{}; ListenerList onL{}; }; static std::size_t next_type() noexcept { static std::size_t counter = 0; return counter++; } template <typename> static std::size_t event_type() noexcept { static std::size_t value = next_type(); return value; } template <typename E> Handler<E>& handler() noexcept { std::size_t type = event_type<E>(); if (!(type < handlers_.size())) { handlers_.resize(type + 1); } if (!handlers_[type]) { handlers_[type] = std::unique_ptr<Handler<E>>(new Handler<E>); } return static_cast<Handler<E>&>(*handlers_[type]); } protected: template <typename E> void publish(E event) { handler<E>().publish(std::move(event), *static_cast<T*>(this)); } public: template <typename E> using Listener = typename Handler<E>::Listener; template <typename E> struct Connection : private Handler<E>::Connection { template <typename> friend class Emitter; Connection() = default; Connection(const Connection&) = default; Connection(Connection&&) = default; Connection(typename Handler<E>::Connection conn) : Handler<E>::Connection{std::move(conn)} {} Connection& operator=(const Connection&) = default; Connection& operator=(Connection&&) = default; }; template <typename E> Connection<E> once(Listener<E> f) { return handler<E>().once(std::move(f)); } template <typename E> Connection<E> on(Listener<E> f) { return handler<E>().on(std::move(f)); } template <typename E> void erase(Connection<E> conn) noexcept { handler<E>().erase(std::move(conn)); } private: std::vector<std::unique_ptr<BaseHandler>> handlers_; }; class Loop : public std::enable_shared_from_this<Loop> { public: explicit Loop(std::unique_ptr<uv_loop_t> ptr) : loop_(std::move(ptr)) {} static std::shared_ptr<Loop> create() { auto ptr = std::unique_ptr<uv_loop_t>(new uv_loop_t); auto loop = std::make_shared<Loop>(std::move(ptr)); auto rv = uv_loop_init(loop->loop_.get()); UV_ASSERT(rv, "uv_loop_init"); return loop; } template <typename T, typename... Args> typename std:: enable_if<std::is_base_of<BaseHandle, T>::value, std::shared_ptr<T>>::type resource(Args&&... args) { auto handle = T::create(shared_from_this(), std::forward<Args>(args)...); handle->init(); return handle; } template <typename T, typename... Args> typename std::enable_if< std::is_base_of<BaseRequest, T>::value, std::shared_ptr<T>>::type resource(Args&&... args) { return T::create(shared_from_this(), std::forward<Args>(args)...); } void run() { uv_run(loop_.get(), UV_RUN_DEFAULT); } uv_loop_t* raw() { return loop_.get(); } private: std::unique_ptr<uv_loop_t> loop_; }; // Typename T is the wrapped type name. // Typename U is the underlying libuv type name. // Base class to both handles and requests. // Must only be instantiated from an uv_loop. template <typename T, typename U> class Resource : public Emitter<T>, public std::enable_shared_from_this<T> { protected: void leak() { leak_ = this->shared_from_this(); } void unleak() { leak_.reset(); } const U* get() const noexcept { return &resource_; } U* get() noexcept { return &resource_; } template <typename R> const R* get() const noexcept { static_assert(!std::is_same<R, U>::value, "!"); return reinterpret_cast<const R*>(&resource_); } template <typename R> R* get() noexcept { static_assert(!std::is_same<R, U>::value, "!"); return reinterpret_cast<R*>(&resource_); } template <typename R, typename... P> const R* get(const Resource<P...>& other) const noexcept { return reinterpret_cast<const R*>(&other.resource_); } template <typename R, typename... P> R* get(Resource<P...>& other) noexcept { return reinterpret_cast<R*>(&other.resource_); } Loop& loop() const noexcept { return *loop_; } public: explicit Resource(std::shared_ptr<Loop> loop) : loop_(std::move(loop)) { get()->data = static_cast<T*>(this); } template <typename... Args> static std::shared_ptr<T> create(Args&&... args) { return std::make_shared<T>(std::forward<Args>(args)...); } const U* raw() const noexcept { return &resource_; } protected: U resource_; std::shared_ptr<Loop> loop_; std::shared_ptr<T> leak_; }; struct CloseEvent {}; template <typename T, typename U> class Handle : public Resource<T, U>, public BaseHandle { static void uv__close_cb(uv_handle_t* handle) { T& ref = *static_cast<T*>(handle->data); ref.publish(CloseEvent{}); ref.unleak(); }; protected: template <typename F, typename... Args> void init(F&& f, Args&&... args) { auto rv = std::forward<F>(f)( this->loop_->raw(), this->get(), std::forward<Args>(args)...); UV_ASSERT(rv, typeid(T).name()); this->leak(); } template <typename F, typename... Args> typename std::result_of<F(Args...)>::type invoke(F&& f, Args&&... args) { return std::forward<F>(f)(std::forward<Args>(args)...); } public: using Resource<T, U>::Resource; bool closing() const noexcept { return !(uv_is_closing(this->template get<uv_handle_t>()) == 0); } void close() { if (!closing()) { uv_close(this->template get<uv_handle_t>(), &uv__close_cb); } } }; template <typename T, typename U> class Request : public Resource<T, U>, public BaseRequest { protected: template <typename E> static void defaultCallback(U* req, int status) { auto& ref = *static_cast<T*>(req->data); if (status) { ref.publish(ErrorEvent{status}); } else { ref.publish(E{}); } ref.unleak(); } // Call non-void libuv function for this uv_request_t. // The request is leaked if the call is successful, under the // assumption that it is unleaked when the callback gets called. template <typename F, typename... Args> typename std::enable_if< !std::is_void<typename std::result_of<F(Args...)>::type>::value, typename std::result_of<F(Args...)>::type>::type invoke(F&& f, Args&&... args) { auto err = std::forward<F>(f)(std::forward<Args>(args)...); if (err) { Emitter<T>::publish(ErrorEvent{err}); } else { this->leak(); } return err; } public: using Resource<T, U>::Resource; }; struct AsyncEvent {}; class Async : public Handle<Async, uv_async_t> { static void uv__async_cb(uv_async_t* handle) { Async& async = *static_cast<Async*>(handle->data); async.publish(AsyncEvent{}); }; public: using Handle::Handle; void init() { Handle<Async, uv_async_t>::init(&uv_async_init, &uv__async_cb); } void send() { invoke(&uv_async_send, get()); } }; struct TimerEvent {}; class Timer : public Handle<Timer, uv_timer_t> { static void uv__timer_cb(uv_timer_t* handle) { Timer& timer = *static_cast<Timer*>(handle->data); timer.publish(TimerEvent{}); }; public: using Handle::Handle; void init() { Handle::init(&uv_timer_init); } void start(std::chrono::milliseconds timeout) { auto rv = uv_timer_start(get(), &uv__timer_cb, timeout.count(), 0); UV_ASSERT(rv, "uv_timer_start"); } }; struct EndEvent {}; struct ListenEvent {}; struct ConnectEvent {}; class ReadEvent { public: using Deleter = void (*)(char*); ReadEvent(std::unique_ptr<char[], Deleter> data, size_t length) : data(std::move(data)), length(length) {} std::unique_ptr<char[], Deleter> data; size_t length; template <typename T> typename std::enable_if<std::is_trivially_copyable<T>::value, T>::type as() const { if (length != sizeof(T)) { abort(); } return *reinterpret_cast<T* const>(data.get()); } }; struct WriteEvent {}; namespace detail { class ReadSegment { public: using Deleter = void (*)(char*); ReadSegment(char* ptr, size_t length) : data_(ptr, [](char*) {}), length_(length) {} ReadSegment(std::unique_ptr<char[]> data, size_t length) : data_(data.release(), [](char* ptr) { delete[] ptr; }), length_(length) {} ReadSegment(std::unique_ptr<char[], Deleter> data, size_t length) : data_(std::move(data)), length_(length) {} void alloc(uv_buf_t* buf) { buf->base = data_.get() + offset_; buf->len = length_ - offset_; } void read(ssize_t nread) { assert(nread > 0); assert(nread <= length_ - offset_); offset_ += nread; } bool done() const noexcept { return offset_ == length_; } ReadEvent event() { return ReadEvent(std::move(data_), length_); } private: std::unique_ptr<char[], Deleter> data_; size_t length_; // Offset is updated after reads/writes to reflect how many bytes // of this segment have already been read/written. size_t offset_ = 0; }; class WriteRequest final : public Request<WriteRequest, uv_write_t> { public: using Deleter = void (*)(char*); WriteRequest( std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> data, unsigned int length) : Request<WriteRequest, uv_write_t>(std::move(loop)), data_(std::move(data)), buf_(uv_buf_init(data_.get(), length)) {} void write(uv_stream_t* handle) { invoke(&uv_write, get(), handle, &buf_, 1, &defaultCallback<WriteEvent>); } private: std::unique_ptr<char[], Deleter> data_; uv_buf_t buf_; }; class ConnectRequest final : public Request<ConnectRequest, uv_connect_t> { public: ConnectRequest(std::shared_ptr<Loop> loop, const struct sockaddr* addr) : Request<ConnectRequest, uv_connect_t>(std::move(loop)), addr_{addr} {} void connect(uv_tcp_t* handle) { invoke( &uv_tcp_connect, get(), handle, addr_, &defaultCallback<ConnectEvent>); } private: const struct sockaddr* addr_; }; }; // namespace detail class TCP final : public Handle<TCP, uv_tcp_t> { static constexpr unsigned int kDefaultListenBacklog = 128; static void uv__connection_cb(uv_stream_t* server, int status); static void uv__alloc_cb( uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uv__read_cb( uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); static void uv__write_cb(uv_write_t* req, int status) {} public: using Handle::Handle; void init() { Handle::init(uv_tcp_init); } bool noDelay(bool value = false) { return (0 == uv_tcp_nodelay(get(), value)); } void bind(const struct sockaddr* addr) { auto rv = uv_tcp_bind(get(), addr, 0); UV_ASSERT(rv, "uv_bind"); } void listen(int backlog = kDefaultListenBacklog) { auto rv = uv_listen( this->template get<uv_stream_t>(), backlog, &uv__connection_cb); UV_ASSERT(rv, "uv_listen"); } template <typename V> void accept(V& stream) { auto rv = uv_accept( this->template get<uv_stream_t>(), this->template get<uv_stream_t>(stream)); UV_ASSERT(rv, "uv_accept"); } void read(char* ptr, size_t length) { reads_.emplace_back(ptr, length); if (reads_.size() == 1) { auto rv = uv_read_start( this->template get<uv_stream_t>(), &uv__alloc_cb, &uv__read_cb); UV_ASSERT(rv, "uv_read_start"); } } void read(std::unique_ptr<char[]> buf, size_t length) { reads_.emplace_back(std::move(buf), length); if (reads_.size() == 1) { auto rv = uv_read_start(this->get<uv_stream_t>(), &uv__alloc_cb, &uv__read_cb); UV_ASSERT(rv, "uv_read_start"); } } void write(char* ptr, size_t length) { write(this->loop().resource<detail::WriteRequest>( std::unique_ptr<char[], detail::WriteRequest::Deleter>( ptr, [](char*) {}), length)); } void write(std::unique_ptr<char[]> data, size_t length) { write(this->loop().resource<detail::WriteRequest>( std::unique_ptr<char[], detail::WriteRequest::Deleter>( data.release(), [](char* ptr) { delete[] ptr; }), length)); } template <typename T> void write(T t) { static_assert( std::is_trivially_copyable<T>::value, "Only trivially copyable types can be written directly."); auto data = std::unique_ptr<char[], detail::WriteRequest::Deleter>( new char[sizeof(T)], [](char* ptr) { delete[] ptr; }); std::memcpy(data.get(), &t, sizeof(T)); write(this->loop().resource<detail::WriteRequest>( std::move(data), sizeof(T))); } void connect(const struct sockaddr& addr) { auto req = this->loop().resource<detail::ConnectRequest>(&addr); auto handle = shared_from_this(); req->once<ErrorEvent>( [handle](const ErrorEvent& event, const detail::ConnectRequest&) { handle->publish(event); }); req->once<ConnectEvent>( [handle](const ConnectEvent& event, const detail::ConnectRequest&) { handle->publish(event); }); req->connect(get()); } struct sockaddr_storage sockname() const { struct sockaddr_storage addr; int len = sizeof(addr); auto rv = uv_tcp_getsockname(get(), (struct sockaddr*)&addr, &len); UV_ASSERT(rv, "uv_tcp_getsockname"); return addr; } struct sockaddr_storage peername() const { struct sockaddr_storage addr; int len = sizeof(addr); auto rv = uv_tcp_getpeername(get(), (struct sockaddr*)&addr, &len); UV_ASSERT(rv, "uv_tcp_getpeername"); return addr; } protected: std::deque<detail::ReadSegment> reads_; protected: void write(std::shared_ptr<detail::WriteRequest> req) { auto handle = shared_from_this(); req->once<ErrorEvent>( [handle](const ErrorEvent& event, const detail::WriteRequest&) { handle->publish(event); }); req->once<WriteEvent>( [handle](const WriteEvent& event, const detail::WriteRequest&) { handle->publish(event); }); req->write(get<uv_stream_t>()); } }; } // namespace libuv } // namespace uv } // namespace transport } // namespace gloo