include/unifex/take_until.hpp (374 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/config.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/stream_concepts.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/unstoppable_token.hpp>
#include <unifex/inplace_stop_token.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/bind_back.hpp>
#include <unifex/exception.hpp>
#include <exception>
#include <atomic>
#include <type_traits>
#include <utility>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace _take_until {
template <typename SourceStream, typename TriggerStream>
struct _stream {
struct type;
};
template <typename SourceStream, typename TriggerStream>
using stream =
typename _stream<
remove_cvref_t<SourceStream>,
remove_cvref_t<TriggerStream>>::type;
template <typename SourceStream, typename TriggerStream>
struct _stream<SourceStream, TriggerStream>::type {
private:
using take_until_stream = type;
struct trigger_next_receiver {
take_until_stream& stream_;
template <typename... Values>
void set_value(Values&&...) && noexcept {
std::move(*this).set_done();
}
template <typename Error>
void set_error(Error&&) && noexcept {
std::move(*this).set_done();
}
void set_done() && noexcept {
auto& stream = stream_;
stream.triggerNextOp_.destruct();
stream.trigger_next_done();
}
inplace_stop_source& get_stop_source() const {
return stream_.stopSource_;
}
friend inplace_stop_token tag_invoke(
tag_t<get_stop_token>, const trigger_next_receiver& r) noexcept {
return r.get_stop_source().get_token();
}
};
struct cleanup_operation_base {
virtual void start_trigger_cleanup() noexcept = 0;
};
struct cancel_callback {
inplace_stop_source& stopSource_;
void operator()() noexcept {
stopSource_.request_stop();
}
};
struct next_sender {
take_until_stream& stream_;
template <template <typename...> class Variant,
template <typename...> class Tuple>
using value_types =
typename sender_traits<next_sender_t<SourceStream>>::
template value_types<Variant, Tuple>;
template <template <typename...> class Variant>
using error_types =
sender_error_types_t<next_sender_t<SourceStream>, Variant>;
static constexpr bool sends_done = sender_traits<next_sender_t<SourceStream>>::sends_done;
template <typename Receiver>
struct _op {
struct type {
struct receiver_wrapper {
type& op_;
template <typename... Values>
void set_value(Values&&... values) && noexcept {
op_.stopCallback_.destruct();
unifex::set_value(std::move(op_.receiver_), (Values&&)values...);
}
void set_done() && noexcept {
op_.stopCallback_.destruct();
op_.stream_.stopSource_.request_stop();
unifex::set_done(std::move(op_.receiver_));
}
template <typename Error>
void set_error(Error&& error) && noexcept {
op_.stopCallback_.destruct();
op_.stream_.stopSource_.request_stop();
unifex::set_error(std::move(op_.receiver_), (Error&&)error);
}
inplace_stop_source& get_stop_source() const {
return op_.stream_.stopSource_;
}
friend inplace_stop_token tag_invoke(
tag_t<get_stop_token>, const receiver_wrapper& r) noexcept {
return r.get_stop_source().get_token();
}
template <typename Func>
friend void tag_invoke(
tag_t<visit_continuations>,
const receiver_wrapper& r,
Func&& func) {
std::invoke(func, r.op_.receiver_);
}
};
take_until_stream& stream_;
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<Receiver&>::
template callback_type<cancel_callback>>
stopCallback_;
next_operation_t<SourceStream, receiver_wrapper> innerOp_;
template <typename Receiver2>
explicit type(take_until_stream& stream, Receiver2&& receiver)
: stream_(stream)
, receiver_((Receiver2&&)receiver)
, innerOp_(unifex::connect(
next(stream.source_),
receiver_wrapper{*this}))
{}
void start() noexcept {
if (!stream_.triggerNextStarted_) {
stream_.triggerNextStarted_ = true;
UNIFEX_TRY {
stream_.triggerNextOp_.construct_with([&] {
return unifex::connect(
next(stream_.trigger_),
trigger_next_receiver{stream_});
});
unifex::start(stream_.triggerNextOp_.get());
} UNIFEX_CATCH (...) {
stream_.trigger_next_done();
}
}
stopCallback_.construct(
get_stop_token(receiver_),
cancel_callback{stream_.stopSource_});
unifex::start(innerOp_);
}
};
};
template <typename Receiver>
using operation = typename _op<remove_cvref_t<Receiver>>::type;
template <typename Receiver>
operation<Receiver> connect(Receiver&& receiver) && {
return operation<Receiver>{
stream_,
(Receiver&&)receiver};
}
};
struct cleanup_sender {
take_until_stream& stream_;
template <template <typename...> class Variant,
template <typename...> class Tuple>
using value_types =
typename sender_traits<cleanup_sender_t<SourceStream>>::
template value_types<Variant, Tuple>;
template <template <typename...> class Variant>
using error_types =
sender_error_types_t<cleanup_sender_t<SourceStream>, Variant>;
static constexpr bool sends_done = true;
template <typename Receiver>
struct _op {
struct type final : cleanup_operation_base {
struct source_receiver {
type& op_;
void set_done() && noexcept {
auto& op = op_;
op.sourceOp_.destruct();
op.source_cleanup_done();
}
template <typename Error>
void set_error(Error&& error) && noexcept {
std::move(*this).set_error(make_exception_ptr((Error&&)error));
}
void set_error(std::exception_ptr error) && noexcept {
auto& op = op_;
op.sourceOp_.destruct();
op.source_cleanup_error(std::move(error));
}
template <typename Func>
friend void tag_invoke(
tag_t<visit_continuations>,
const source_receiver& r,
Func&& func) {
std::invoke(func, r.op_.receiver_);
}
};
struct trigger_receiver {
type& op_;
void set_done() && noexcept {
auto& op = op_;
op.sourceOp_.destruct();
op.trigger_cleanup_done();
}
template <typename Error>
void set_error(Error&& error) && noexcept {
std::move(*this).set_error(make_exception_ptr((Error&&)error));
}
void set_error(std::exception_ptr error) && noexcept {
auto& op = op_;
op.triggerOp_.destruct();
op.trigger_cleanup_error(std::move(error));
}
template <typename Func>
friend void tag_invoke(
tag_t<visit_continuations>,
const trigger_receiver& r,
Func&& func) {
std::invoke(func, r.op_.receiver_);
}
};
take_until_stream& stream_;
std::atomic<bool> cleanupCompleted_ = false;
std::exception_ptr sourceError_;
std::exception_ptr triggerError_;
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
manual_lifetime<cleanup_operation_t<SourceStream, source_receiver>>
sourceOp_;
manual_lifetime<cleanup_operation_t<TriggerStream, trigger_receiver>>
triggerOp_;
template <typename Receiver2>
explicit type(take_until_stream& stream, Receiver2&& receiver)
: stream_(stream)
, receiver_((Receiver2&&)receiver)
{}
void start() noexcept {
UNIFEX_TRY {
sourceOp_.construct_with([&] {
return unifex::connect(
cleanup(stream_.source_),
source_receiver{*this});
});
unifex::start(sourceOp_.get());
} UNIFEX_CATCH (...) {
source_cleanup_error(std::current_exception());
}
if (!stream_.cleanupReady_.load(std::memory_order_acquire)) {
stream_.cleanupOperation_ = this;
stream_.stopSource_.request_stop();
if (!stream_.cleanupReady_.exchange(true, std::memory_order_acq_rel)) {
// The trigger cleanup is not yet ready to run.
// The trigger_next_receiver will start this when it completes.
return;
}
}
// Otherwise, the trigger cleanup is ready to start.
start_trigger_cleanup();
}
void start_trigger_cleanup() noexcept final {
UNIFEX_TRY {
triggerOp_.construct_with([&] {
return unifex::connect(
cleanup(stream_.trigger_),
trigger_receiver{*this});
});
unifex::start(triggerOp_.get());
} UNIFEX_CATCH (...) {
trigger_cleanup_error(std::current_exception());
return;
}
}
void source_cleanup_done() noexcept {
if (!cleanupCompleted_.load(std::memory_order_acquire)) {
if (!cleanupCompleted_.exchange(true, std::memory_order_acq_rel)) {
// We were first to register completion of the cleanup op.
// Let the other operation call the final receiver.
return;
}
}
// The other operation finished first.
if (triggerError_) {
unifex::set_error(std::move(receiver_), std::move(triggerError_));
} else {
unifex::set_done(std::move(receiver_));
}
}
void source_cleanup_error(std::exception_ptr ex) noexcept {
sourceError_ = std::move(ex);
if (!cleanupCompleted_.load(std::memory_order_acquire)) {
if (!cleanupCompleted_.exchange(true, std::memory_order_acq_rel)) {
// trigger cleanup not yet finished.
// let the trigger_receiver call the final receiver.
return;
}
}
// Trigger cleanup finished first
// Prefer to propagate the source cleanup error over the trigger
// cleanup error if there was one.
unifex::set_error(std::move(receiver_), std::move(sourceError_));
}
void trigger_cleanup_done() noexcept {
if (!cleanupCompleted_.load(std::memory_order_acquire)) {
if (!cleanupCompleted_.exchange(true, std::memory_order_acq_rel)) {
// We were first to register completion of the cleanup op.
// Let the other operation call the final receiver.
return;
}
}
// The other operation finished first.
if (sourceError_) {
unifex::set_error(std::move(receiver_), std::move(sourceError_));
} else {
unifex::set_done(std::move(receiver_));
}
}
void trigger_cleanup_error(std::exception_ptr ex) noexcept {
triggerError_ = std::move(ex);
if (!cleanupCompleted_.load(std::memory_order_acquire)) {
if (!cleanupCompleted_.exchange(true, std::memory_order_acq_rel)) {
// source cleanup not yet finished.
// let the source_receiver call the final receiver.
return;
}
}
// Source cleanup finished first
// Prefer to propagate the source cleanup error over the trigger
// cleanup error if there was one.
if (sourceError_) {
unifex::set_error(std::move(receiver_), std::move(sourceError_));
} else {
unifex::set_error(std::move(receiver_), std::move(triggerError_));
}
}
};
};
template <typename Receiver>
using operation = typename _op<remove_cvref_t<Receiver>>::type;
template <typename Receiver>
operation<Receiver> connect(Receiver&& receiver) {
return operation<Receiver>{stream_, (Receiver &&) receiver};
}
};
UNIFEX_NO_UNIQUE_ADDRESS SourceStream source_;
UNIFEX_NO_UNIQUE_ADDRESS TriggerStream trigger_;
inplace_stop_source stopSource_;
cleanup_operation_base* cleanupOperation_ = nullptr;
std::atomic<bool> cleanupReady_ = false;
bool triggerNextStarted_ = false;
manual_lifetime<next_operation_t<TriggerStream, trigger_next_receiver>>
triggerNextOp_;
void trigger_next_done() noexcept {
if (!cleanupReady_.load(std::memory_order_acquire)) {
stopSource_.request_stop();
if (!cleanupReady_.exchange(true, std::memory_order_acq_rel)) {
// Successfully registered completion of next(trigger)
// before someone called cleanup(stream). We have passed
// responsibility for calling cleanup(trigger_) to the
// call to start() on the cleanup(stream) sender.
return;
}
}
// Otherwise, the cleanup(stream) operation has already been started
// before the next(trigger) operation finished.
// We have the responsibility for launching cleanup(trigger).
UNIFEX_ASSERT(cleanupOperation_ != nullptr);
cleanupOperation_->start_trigger_cleanup();
}
public:
template <typename SourceStream2, typename TriggerStream2>
explicit type(SourceStream2&& source, TriggerStream2&& trigger)
: source_((SourceStream2&&)source)
, trigger_((TriggerStream2&&)trigger)
{}
type(type&& other)
: source_(std::move(other.source_))
, trigger_(std::move(other.trigger_))
{}
friend next_sender tag_invoke(tag_t<next>, take_until_stream& s) {
return {s};
}
friend cleanup_sender tag_invoke(tag_t<cleanup>, take_until_stream& s) {
return {s};
}
};
} // namespace _take_until
namespace _take_until_cpo {
inline const struct _fn {
template <typename SourceStream, typename TriggerStream>
auto operator()(SourceStream&& source, TriggerStream&& trigger) const {
return _take_until::stream<SourceStream, TriggerStream>{
(SourceStream&&)source,
(TriggerStream&&)trigger};
}
template <typename TriggerStream>
constexpr auto operator()(TriggerStream&& trigger) const
noexcept(is_nothrow_callable_v<
tag_t<bind_back>, _fn, TriggerStream>)
-> bind_back_result_t<_fn, TriggerStream> {
return bind_back(*this, (TriggerStream&&)trigger);
}
} take_until {};
} // namespace _take_until_cpo
using _take_until_cpo::take_until;
} // namespace unifex
#include <unifex/detail/epilogue.hpp>