in include/unifex/stop_immediately.hpp [151:186]
void handle_signal(Func deliverSignalTo) noexcept {
auto& strm = stream_;
strm.nextOp_.destruct();
auto oldState = strm.state_.load(std::memory_order_acquire);
if (oldState == state::source_next_active) {
if (strm.state_.compare_exchange_strong(
oldState, state::source_next_completed,
std::memory_order_relaxed)) {
// We acquired ownership of the receiver before it was cancelled.
auto* receiver = std::exchange(strm.nextReceiver_, nullptr);
UNIFEX_ASSERT(receiver != nullptr);
deliverSignalTo(receiver);
return;
}
}
if (oldState == state::source_next_active_stream_stopped) {
if (strm.state_.compare_exchange_strong(
oldState, state::source_next_completed,
std::memory_order_release,
std::memory_order_acquire)) {
// Successfully signalled that 'next' completed before 'cleanup'
// operation started. Discard this signal without forwarding it on.
return;
}
}
// Otherwise, cleanup() was requested before this operation completed.
// We are responsible for starting cleanup now that next() has finished.
UNIFEX_ASSERT(oldState == state::source_next_active_cleanup_requested);
UNIFEX_ASSERT(stream_.cleanupOp_ != nullptr);
stream_.cleanupOp_->start_cleanup();
}