include/unifex/win32/windows_thread_pool.hpp (577 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <unifex/receiver_concepts.hpp> #include <unifex/sender_concepts.hpp> #include <unifex/scheduler_concepts.hpp> #include <unifex/get_stop_token.hpp> #include <unifex/stop_token_concepts.hpp> #include <unifex/manual_lifetime.hpp> #include <unifex/exception.hpp> #include <unifex/win32/filetime_clock.hpp> #include <windows.h> #include <threadpoolapiset.h> #include <utility> #include <exception> #include <new> #include <system_error> #include <atomic> #include <cstdio> #include <unifex/detail/prologue.hpp> namespace unifex { namespace win32 { class windows_thread_pool { class scheduler; class schedule_sender; class schedule_op_base; template<typename Receiver> struct _schedule_op { class type; }; template<typename Receiver> using schedule_op = typename _schedule_op<Receiver>::type; template<typename StopToken> struct _cancellable_schedule_op_base { class type; }; template<typename StopToken> using cancellable_schedule_op_base = typename _cancellable_schedule_op_base<StopToken>::type; template<typename Receiver> struct _cancellable_schedule_op { class type; }; template <typename Receiver> using cancellable_schedule_op = typename _cancellable_schedule_op<Receiver>::type; template<typename StopToken> struct _time_schedule_op_base { class type; }; template<typename StopToken> using time_schedule_op_base = typename _time_schedule_op_base<StopToken>::type; template<typename Receiver> struct _time_schedule_op { class type; }; template<typename Receiver> using time_schedule_op = typename _time_schedule_op<Receiver>::type; template<typename Receiver> struct _schedule_at_op { class type; }; template<typename Receiver> using schedule_at_op = typename _schedule_at_op<Receiver>::type; template<typename Duration, typename Receiver> struct _schedule_after_op { class type; }; template<typename Duration, typename Receiver> using schedule_after_op = typename _schedule_after_op<Duration, Receiver>::type; class schedule_at_sender; template<typename Duration> struct _schedule_after_sender { class type; }; template<typename Duration> using schedule_after_sender = typename _schedule_after_sender<Duration>::type; using clock_type = filetime_clock; public: // Initialise to use the process' default thread-pool. windows_thread_pool() noexcept; // Construct to an independend thread-pool with a dynamic number of // threads that varies between a min and a max number of threads. explicit windows_thread_pool(std::uint32_t minThreadCount, std::uint32_t maxThreadCount); ~windows_thread_pool(); scheduler get_scheduler() noexcept; private: PTP_POOL threadPool_; }; ///////////////////////// // Non-cancellable schedule() operation class windows_thread_pool::schedule_op_base { public: schedule_op_base(schedule_op_base&&) = delete; schedule_op_base& operator=(schedule_op_base&&) = delete; ~schedule_op_base(); void start() & noexcept; protected: schedule_op_base(windows_thread_pool& pool, PTP_WORK_CALLBACK workCallback); private: TP_CALLBACK_ENVIRON environ_; PTP_WORK work_; }; template<typename Receiver> class windows_thread_pool::_schedule_op<Receiver>::type final : public windows_thread_pool::schedule_op_base { public: template<typename Receiver2> explicit type(windows_thread_pool& pool, Receiver2&& r) : schedule_op_base(pool, &work_callback) , receiver_((Receiver2&&)r) {} private: static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE, void* workContext, PTP_WORK) noexcept { auto& op = *static_cast<type*>(workContext); if constexpr (is_nothrow_callable_v<decltype(unifex::set_value), Receiver>) { unifex::set_value(std::move(op.receiver_)); } else { UNIFEX_TRY { unifex::set_value(std::move(op.receiver_)); } UNIFEX_CATCH (...) { unifex::set_error(std::move(op.receiver_), std::current_exception()); } } } Receiver receiver_; }; /////////////////////////// // Cancellable schedule() operation template<typename StopToken> class windows_thread_pool::_cancellable_schedule_op_base<StopToken>::type { public: type(type&&) = delete; type& operator=(type&&) = delete; ~type() { ::CloseThreadpoolWork(work_); ::DestroyThreadpoolEnvironment(&environ_); delete state_; } protected: explicit type(windows_thread_pool& pool, bool isStopPossible) { ::InitializeThreadpoolEnvironment(&environ_); ::SetThreadpoolCallbackPool(&environ_, pool.threadPool_); work_ = ::CreateThreadpoolWork( isStopPossible ? &stoppable_work_callback : &unstoppable_work_callback, static_cast<void*>(this), &environ_); if (work_ == nullptr) { DWORD errorCode = ::GetLastError(); ::DestroyThreadpoolEnvironment(&environ_); throw_( std::system_error{static_cast<int>(errorCode), std::system_category(), "CreateThreadpoolWork()"}); } if (isStopPossible) { state_ = new (std::nothrow) std::atomic<std::uint32_t>(not_started); if (state_ == nullptr) { ::CloseThreadpoolWork(work_); ::DestroyThreadpoolEnvironment(&environ_); throw_(std::bad_alloc{}); } } else { state_ = nullptr; } } void start_impl(const StopToken& stopToken) & noexcept { if (state_ != nullptr) { // Short-circuit all of this if stopToken.stop_requested() is already true. if (stopToken.stop_requested()) { set_done_impl(); return; } stopCallback_.construct(stopToken, stop_requested_callback{*this}); // Take a copy of the 'state' pointer prior to submitting the // work as the operation-state may have already been destroyed // on another thread by the time SubmitThreadpoolWork() returns. auto* state = state_; ::SubmitThreadpoolWork(work_); // Signal that SubmitThreadpoolWork() has returned and that it is // now safe for the stop-request to request cancellation of the // work items. const auto prevState = state->fetch_add(submit_complete_flag, std::memory_order_acq_rel); if ((prevState & stop_requested_flag) != 0) { // stop was requested before the call to SubmitThreadpoolWork() // returned and before the work started executing. It was not // safe for the request_stop() method to cancel the work before // it had finished being submitted so it has delegated responsibility // for cancelling the just-submitted work to us to do once we // finished submitting the work. complete_with_done(); } else if ((prevState & running_flag) != 0) { // Otherwise, it's possible that the work item may have started // running on another thread already, prior to us returning. // If this is the case then, to avoid leaving us with a // dangling reference to the 'state' when the operation-state // is destroyed, it will detach the 'state' from the operation-state // and delegate the delete of the 'state' to us. delete state; } } else { // A stop-request is not possible so skip the extra // synchronisation needed to support it. ::SubmitThreadpoolWork(work_); } } private: static void CALLBACK unstoppable_work_callback( PTP_CALLBACK_INSTANCE, void* workContext, PTP_WORK) noexcept { auto& op = *static_cast<type*>(workContext); op.set_value_impl(); } static void CALLBACK stoppable_work_callback( PTP_CALLBACK_INSTANCE, void* workContext, PTP_WORK) noexcept { auto& op = *static_cast<type*>(workContext); // Signal that the work callback has started executing. auto prevState = op.state_->fetch_add(starting_flag, std::memory_order_acq_rel); if ((prevState & stop_requested_flag) != 0) { // request_stop() is already running and is waiting for this callback // to finish executing. So we return immediately here without doing // anything further so that we don't introduce a deadlock. // In particular, we don't want to try to deregister the stop-callback // which will block waiting for the request_stop() method to return. return; } // Note that it's possible that stop might be requested after setting // the 'starting' flag but before we deregister the stop callback. // We're going to ignore these stop-requests as we already won the race // in the fetch_add() above and ignoring them simplifies some of the // cancellation logic. op.stopCallback_.destruct(); prevState = op.state_->fetch_add(running_flag, std::memory_order_acq_rel); if (prevState == starting_flag) { // start() method has not yet finished submitting the work // on another thread and so is still accessing the 'state'. // This means we don't want to let the operation-state destructor // free the state memory. Instead, we have just delegated // responsibility for freeing this memory to the start() method // and we clear the start_ member here to prevent the destructor // from freeing it. op.state_ = nullptr; } op.set_value_impl(); } void request_stop() noexcept { auto prevState = state_->load(std::memory_order_relaxed); do { UNIFEX_ASSERT((prevState & running_flag) == 0); if ((prevState & starting_flag) != 0) { // Work callback won the race and will be waiting for // us to return so it can deregister the stop-callback. // Return immediately so we don't deadlock. return; } } while (!state_->compare_exchange_weak( prevState, prevState | stop_requested_flag, std::memory_order_acq_rel, std::memory_order_relaxed)); UNIFEX_ASSERT((prevState & starting_flag) == 0); if ((prevState & submit_complete_flag) != 0) { // start() has finished calling SubmitThreadpoolWork() and the work has not // yet started executing the work so it's safe for this method to now try // and cancel the work. While it's possible that the work callback will start // executing concurrently on a thread-pool thread, we are guaranteed that // it will see our write of the stop_requested_flag and will promptly // return without blocking. complete_with_done(); } else { // Otherwise, as the start() method has not yet finished calling // SubmitThreadpoolWork() we can't safely call WaitForThreadpoolWorkCallbacks(). // In this case we are delegating responsibility for calling complete_with_done() // to start() method when it eventually returns from SubmitThreadpoolWork(). } } void complete_with_done() noexcept { const BOOL cancelPending = TRUE; ::WaitForThreadpoolWorkCallbacks(work_, cancelPending); // Destruct the stop-callback before calling set_done() as the call // to set_done() will invalidate the stop-token and we need to // make sure that stopCallback_.destruct(); // Now that the work has been successfully cancelled we can // call the receiver's set_done(). set_done_impl(); } virtual void set_done_impl() noexcept = 0; virtual void set_value_impl() noexcept = 0; struct stop_requested_callback { type& op_; void operator()() noexcept { op_.request_stop(); } }; ///////////////// // Flags to use for state_ member // Initial state. start() not yet called. static constexpr std::uint32_t not_started = 0; // Flag set once start() has finished calling ThreadpoolSubmitWork() static constexpr std::uint32_t submit_complete_flag = 1; // Flag set by request_stop() static constexpr std::uint32_t stop_requested_flag = 2; // Flag set by cancellable_work_callback() when it starts executing. // This is before deregistering the stop-callback. static constexpr std::uint32_t starting_flag = 4; // Flag set by cancellable_work_callback() after having deregistered // the stop-callback, just before it calls the receiver. static constexpr std::uint32_t running_flag = 8; PTP_WORK work_; TP_CALLBACK_ENVIRON environ_; std::atomic<std::uint32_t>* state_; manual_lifetime<typename StopToken::template callback_type<stop_requested_callback>> stopCallback_; }; template<typename Receiver> class windows_thread_pool::_cancellable_schedule_op<Receiver>::type final : public windows_thread_pool::cancellable_schedule_op_base<stop_token_type_t<Receiver>> { using base = windows_thread_pool::cancellable_schedule_op_base<stop_token_type_t<Receiver>>; public: template<typename Receiver2> explicit type(windows_thread_pool& pool, Receiver2&& r) : base(pool, unifex::get_stop_token(r).stop_possible()) , receiver_((Receiver2&&)r) {} void start() & noexcept { this->start_impl(get_stop_token(receiver_)); } private: void set_value_impl() noexcept override { if constexpr (is_nothrow_callable_v<decltype(unifex::set_value), Receiver>) { unifex::set_value(std::move(receiver_)); } else { UNIFEX_TRY { unifex::set_value(std::move(receiver_)); } UNIFEX_CATCH (...) { unifex::set_error(std::move(receiver_), std::current_exception()); } } } void set_done_impl() noexcept override { if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) { unifex::set_done(std::move(receiver_)); } else { UNIFEX_ASSERT(false); } } Receiver receiver_; }; //////////////////////////////////////////////////// // schedule() sender class windows_thread_pool::schedule_sender { public: template<template<typename...> class Variant, template<typename...> class Tuple> using value_types = Variant<Tuple<>>; template<template<typename...> class Variant> using error_types = Variant<std::exception_ptr>; static constexpr bool sends_done = true; template(typename Receiver) (requires receiver_of<Receiver> AND is_stop_never_possible_v<stop_token_type_t<Receiver>>) schedule_op<unifex::remove_cvref_t<Receiver>> connect(Receiver&& r) const { return schedule_op<unifex::remove_cvref_t<Receiver>>{ *pool_, (Receiver&&)r}; } template(typename Receiver) (requires receiver_of<Receiver> AND (!is_stop_never_possible_v<stop_token_type_t<Receiver>>)) cancellable_schedule_op<unifex::remove_cvref_t<Receiver>> connect(Receiver&& r) const { return cancellable_schedule_op<unifex::remove_cvref_t<Receiver>>{ *pool_, (Receiver&&)r}; } private: friend scheduler; explicit schedule_sender(windows_thread_pool& pool) noexcept : pool_(&pool) {} windows_thread_pool* pool_; }; ///////////////////////////////// // time_schedule_op template<typename StopToken> class windows_thread_pool::_time_schedule_op_base<StopToken>::type { protected: explicit type(windows_thread_pool& pool, bool isStopPossible) { ::InitializeThreadpoolEnvironment(&environ_); ::SetThreadpoolCallbackPool(&environ_, pool.threadPool_); // Give the optimiser a hand for cases where the parameter // can never be true. if constexpr (is_stop_never_possible_v<StopToken>) { isStopPossible = false; } timer_ = ::CreateThreadpoolTimer( isStopPossible ? &stoppable_timer_callback : &timer_callback, static_cast<void*>(this), &environ_); if (timer_ == nullptr) { DWORD errorCode = ::GetLastError(); ::DestroyThreadpoolEnvironment(&environ_); throw_( std::system_error{static_cast<int>(errorCode), std::system_category(), "CreateThreadpoolTimer()"}); } if (isStopPossible) { state_ = new (std::nothrow) std::atomic<std::uint32_t>{not_started}; if (state_ == nullptr) { ::CloseThreadpoolTimer(timer_); ::DestroyThreadpoolEnvironment(&environ_); throw_(std::bad_alloc{}); } } } public: ~type() { ::CloseThreadpoolTimer(timer_); ::DestroyThreadpoolEnvironment(&environ_); delete state_; } protected: void start_impl(const StopToken& stopToken, FILETIME dueTime) noexcept { auto startTimer = [&]() noexcept { const DWORD periodInMs = 0; // Single-shot const DWORD maxDelayInMs = 0; // Max delay to allow timer coalescing ::SetThreadpoolTimer(timer_, &dueTime, periodInMs, maxDelayInMs); }; if constexpr (!is_stop_never_possible_v<StopToken>) { auto* const state = state_; if (state != nullptr) { // Short-circuit extra work submitting the // timer if stop has already been requested. if (stopToken.stop_requested()) { set_done_impl(); return; } stopCallback_.construct(stopToken, stop_requested_callback{*this}); startTimer(); const auto prevState = state->fetch_add(submit_complete_flag, std::memory_order_acq_rel); if ((prevState & stop_requested_flag) != 0) { complete_with_done(); } else if ((prevState & running_flag) != 0) { delete state; } return; } } startTimer(); } private: virtual void set_value_impl() noexcept = 0; virtual void set_done_impl() noexcept = 0; static void CALLBACK timer_callback( [[maybe_unused]] PTP_CALLBACK_INSTANCE instance, void* timerContext, [[maybe_unused]] PTP_TIMER timer) noexcept { type& op = *static_cast<type*>(timerContext); op.set_value_impl(); } static void CALLBACK stoppable_timer_callback( [[maybe_unused]] PTP_CALLBACK_INSTANCE instance, void* timerContext, [[maybe_unused]] PTP_TIMER timer) noexcept { type& op = *static_cast<type*>(timerContext); auto prevState = op.state_->fetch_add(starting_flag, std::memory_order_acq_rel); if ((prevState & stop_requested_flag) != 0) { return; } op.stopCallback_.destruct(); prevState = op.state_->fetch_add(running_flag, std::memory_order_acq_rel); if (prevState == starting_flag) { op.state_ = nullptr; } op.set_value_impl(); } void request_stop() noexcept { auto prevState = state_->load(std::memory_order_relaxed); do { UNIFEX_ASSERT((prevState & running_flag) == 0); if ((prevState & starting_flag) != 0) { return; } } while (!state_->compare_exchange_weak( prevState, prevState | stop_requested_flag, std::memory_order_acq_rel, std::memory_order_relaxed)); UNIFEX_ASSERT((prevState & starting_flag) == 0); if ((prevState & submit_complete_flag) != 0) { complete_with_done(); } } void complete_with_done() noexcept { const BOOL cancelPending = TRUE; ::WaitForThreadpoolTimerCallbacks(timer_, cancelPending); stopCallback_.destruct(); set_done_impl(); } struct stop_requested_callback { type& op_; void operator()() noexcept { op_.request_stop(); } }; ///////////////// // Flags to use for state_ member // Initial state. start() not yet called. static constexpr std::uint32_t not_started = 0; // Flag set once start() has finished calling ThreadpoolSubmitWork() static constexpr std::uint32_t submit_complete_flag = 1; // Flag set by request_stop() static constexpr std::uint32_t stop_requested_flag = 2; // Flag set by cancellable_work_callback() when it starts executing. // This is before deregistering the stop-callback. static constexpr std::uint32_t starting_flag = 4; // Flag set by cancellable_work_callback() after having deregistered // the stop-callback, just before it calls the receiver. static constexpr std::uint32_t running_flag = 8; PTP_TIMER timer_; TP_CALLBACK_ENVIRON environ_; std::atomic<std::uint32_t>* state_{nullptr}; manual_lifetime<typename StopToken::template callback_type<stop_requested_callback>> stopCallback_; }; ///////////////////////////////// // schedule_at() operation template<typename Receiver> class windows_thread_pool::_schedule_at_op<Receiver>::type final : public windows_thread_pool::time_schedule_op_base<stop_token_type_t<Receiver>> { using base = windows_thread_pool::time_schedule_op_base<stop_token_type_t<Receiver>>; public: template<typename Receiver2> explicit type(windows_thread_pool& pool, windows_thread_pool::clock_type::time_point dueTime, Receiver2&& r) : base(pool, get_stop_token(r).stop_possible()) , dueTime_(dueTime) , receiver_((Receiver2&&)r) {} void start() & noexcept { ULARGE_INTEGER ticks; ticks.QuadPart = dueTime_.get_ticks(); FILETIME ft; ft.dwLowDateTime = ticks.LowPart; ft.dwHighDateTime = ticks.HighPart; this->start_impl(get_stop_token(receiver_), ft); } private: void set_value_impl() noexcept override { if constexpr (unifex::is_nothrow_callable_v<decltype(unifex::set_value), Receiver>) { unifex::set_value(std::move(receiver_)); } else { UNIFEX_TRY { unifex::set_value(std::move(receiver_)); } UNIFEX_CATCH (...) { unifex::set_error(std::move(receiver_), std::current_exception()); } } } void set_done_impl() noexcept override { unifex::set_done(std::move(receiver_)); } windows_thread_pool::clock_type::time_point dueTime_; Receiver receiver_; }; class windows_thread_pool::schedule_at_sender { public: template<template<typename...> class Variant, template<typename...> class Tuple> using value_types = Variant<Tuple<>>; template<template<typename...> class Variant> using error_types = Variant<std::exception_ptr>; static constexpr bool sends_done = true; explicit schedule_at_sender(windows_thread_pool& pool, filetime_clock::time_point dueTime) : pool_(&pool) , dueTime_(dueTime) {} template(typename Receiver) (requires unifex::receiver_of<Receiver>) schedule_at_op<unifex::remove_cvref_t<Receiver>> connect(Receiver&& r) const { return schedule_at_op<unifex::remove_cvref_t<Receiver>>{ *pool_, dueTime_, (Receiver&&)r }; } private: windows_thread_pool* pool_; filetime_clock::time_point dueTime_; }; ////////////////////////////////// // schedule_after() template<typename Duration, typename Receiver> class windows_thread_pool::_schedule_after_op<Duration, Receiver>::type final : public windows_thread_pool::time_schedule_op_base<stop_token_type_t<Receiver>> { using base = windows_thread_pool::time_schedule_op_base<stop_token_type_t<Receiver>>; public: template<typename Receiver2> explicit type(windows_thread_pool& pool, Duration duration, Receiver2&& r) : base(pool, get_stop_token(r).stop_possible()) , duration_(duration) , receiver_((Receiver2&&)r) {} void start() & noexcept { auto dueTime = filetime_clock::now() + duration_; ULARGE_INTEGER ticks; ticks.QuadPart = dueTime.get_ticks(); FILETIME ft; ft.dwLowDateTime = ticks.LowPart; ft.dwHighDateTime = ticks.HighPart; this->start_impl(get_stop_token(receiver_), ft); } private: void set_value_impl() noexcept override { if constexpr (unifex::is_nothrow_callable_v<decltype(unifex::set_value), Receiver>) { unifex::set_value(std::move(receiver_)); } else { UNIFEX_TRY { unifex::set_value(std::move(receiver_)); } UNIFEX_CATCH (...) { unifex::set_error(std::move(receiver_), std::current_exception()); } } } void set_done_impl() noexcept override { unifex::set_done(std::move(receiver_)); } Duration duration_; Receiver receiver_; }; template<typename Duration> class windows_thread_pool::_schedule_after_sender<Duration>::type { public: template<template<typename...> class Variant, template<typename...> class Tuple> using value_types = Variant<Tuple<>>; template<template<typename...> class Variant> using error_types = Variant<std::exception_ptr>; static constexpr bool sends_done = true; explicit type(windows_thread_pool& pool, Duration duration) : pool_(&pool) , duration_(duration) {} template(typename Receiver) (requires unifex::receiver_of<Receiver>) schedule_after_op<Duration, unifex::remove_cvref_t<Receiver>> connect(Receiver&& r) const { return schedule_after_op<Duration, unifex::remove_cvref_t<Receiver>>{ *pool_, duration_, (Receiver&&)r }; } private: windows_thread_pool* pool_; Duration duration_; }; ///////////////////////////////// // scheduler class windows_thread_pool::scheduler { public: using time_point = filetime_clock::time_point; schedule_sender schedule() const noexcept { return schedule_sender{*pool_}; } time_point now() const noexcept { return filetime_clock::now(); } schedule_at_sender schedule_at(time_point tp) const noexcept { return schedule_at_sender{*pool_, tp}; } template<typename Duration> schedule_after_sender<Duration> schedule_after(Duration d) const noexcept(std::is_nothrow_move_constructible_v<Duration>) { return schedule_after_sender<Duration>{*pool_, std::move(d)}; } friend bool operator==(scheduler a, scheduler b) noexcept { return a.pool_ == b.pool_; } friend bool operator!=(scheduler a, scheduler b) noexcept { return a.pool_ != b.pool_; } private: friend windows_thread_pool; explicit scheduler(windows_thread_pool& pool) noexcept : pool_(&pool) {} windows_thread_pool* pool_; }; ///////////////////////// // scheduler methods inline windows_thread_pool::scheduler windows_thread_pool::get_scheduler() noexcept { return scheduler{*this}; } } // namespace win32 } // namespace unifex #include <unifex/detail/epilogue.hpp>