/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace unifex { namespace _stop_when { template struct _op { class type; }; template using stop_when_operation = typename _op::type; template struct _srcvr { class type; }; template using stop_when_source_receiver = typename _srcvr::type; template class _srcvr::type { using operation_state = stop_when_operation; public: explicit type(operation_state* op) noexcept : op_(op) {} type(type&& other) noexcept : op_(std::exchange(other.op_, nullptr)) {} template void set_value(Values&&... values) && { op_->result_.template emplace< std::tuple, std::decay_t...>>( unifex::set_value, (Values &&) values...); op_->notify_source_complete(); } template void set_error(Error&& error) && noexcept { op_->result_.template emplace< std::tuple, std::decay_t>>( unifex::set_error, (Error &&) error); op_->notify_source_complete(); } void set_done() && noexcept { op_->result_.template emplace>>( unifex::set_done); op_->notify_source_complete(); } private: friend inplace_stop_token tag_invoke( tag_t, const type& r) noexcept { return r.get_stop_token(); } template(typename CPO) (requires is_receiver_query_cpo_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept(is_nothrow_callable_v) -> callable_result_t { return std::move(cpo)(r.get_receiver()); } inplace_stop_token get_stop_token() const noexcept { return op_->stopSource_.get_token(); } const Receiver& get_receiver() const noexcept { return op_->receiver_; } operation_state* op_; }; template struct _trcvr { class type; }; template using stop_when_trigger_receiver = typename _trcvr::type; template class _trcvr::type { using operation_state = stop_when_operation; public: explicit type(operation_state* op) noexcept : op_(op) {} type(type&& other) noexcept : op_(std::exchange(other.op_, nullptr)) {} void set_value() && noexcept { op_->notify_trigger_complete(); } template void set_error(Error&&) && noexcept { op_->notify_trigger_complete(); } void set_done() && noexcept { op_->notify_trigger_complete(); } private: friend inplace_stop_token tag_invoke( tag_t, const type& r) noexcept { return r.get_stop_token(); } template(typename CPO) (requires is_receiver_query_cpo_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept(is_nothrow_callable_v) -> callable_result_t { return std::move(cpo)(r.get_receiver()); } inplace_stop_token get_stop_token() const noexcept { return op_->stopSource_.get_token(); } const Receiver& get_receiver() const noexcept { return op_->receiver_; } operation_state* op_; }; template class _op::type { using source_receiver = stop_when_source_receiver; using trigger_receiver = stop_when_trigger_receiver; public: template explicit type( Source&& source, Trigger&& trigger, Receiver2&& receiver) noexcept(is_nothrow_connectable_v && is_nothrow_connectable_v && std::is_nothrow_constructible_v) : receiver_((Receiver2 &&) receiver) , sourceOp_(unifex::connect((Source &&) source, source_receiver{this})) , triggerOp_( unifex::connect((Trigger &&) trigger, trigger_receiver{this})) {} void start() & noexcept { stopCallback_.emplace(get_stop_token(receiver_), cancel_callback{this}); unifex::start(sourceOp_); unifex::start(triggerOp_); } private: friend class _srcvr::type; friend class _trcvr::type; class cancel_callback { public: explicit cancel_callback(type* op) noexcept : op_(op) {} void operator()() noexcept { op_->stopSource_.request_stop(); } private: type* op_; }; void notify_source_complete() noexcept { this->notify_trigger_complete(); } void notify_trigger_complete() noexcept { stopSource_.request_stop(); if (activeOpCount_.fetch_sub(1, std::memory_order_acq_rel) == 1) { stopCallback_.reset(); deliver_result(); } } void deliver_result() noexcept { UNIFEX_TRY { std::visit( [this](auto&& tuple) { if constexpr ( std::tuple_size_v< std::remove_reference_t> != 0) { std::apply( [&](auto set_xxx, auto&&... args) { set_xxx( std::move(receiver_), static_cast(args)...); }, static_cast(tuple)); } else { // Should be unreachable std::terminate(); } }, std::move(result_)); } UNIFEX_CATCH (...) { unifex::set_error(std::move(receiver_), std::current_exception()); } } template using value_decayed_tuple = std::tuple, std::decay_t...>; template using error_tuples = type_list< std::tuple, std::decay_t>...>; using result_variant = typename concat_type_lists_t< type_list, std::tuple>>, sender_value_types_t, type_list, value_decayed_tuple>, sender_error_types_t, error_tuples>>::template apply; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; std::atomic activeOpCount_ = 2; inplace_stop_source stopSource_; std::optional::template callback_type> stopCallback_; UNIFEX_NO_UNIQUE_ADDRESS result_variant result_; UNIFEX_NO_UNIQUE_ADDRESS connect_result_t sourceOp_; UNIFEX_NO_UNIQUE_ADDRESS connect_result_t triggerOp_; }; template struct _sndr { class type; }; template using stop_when_sender = typename _sndr::type; template class _sndr::type { using stop_when_sender = type; template using decayed_type_list = type_list...>>; template < template class Outer, template class Inner> struct compose_nested { template using apply = Outer...>; }; public: template < template class Variant, template class Tuple> using value_types = typename sender_traits:: template value_types:: template apply::template apply>; template