/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace unifex { namespace _reduce { template struct _op { struct type; }; template using operation = typename _op< remove_cvref_t, remove_cvref_t, remove_cvref_t, remove_cvref_t>::type; template struct _error_cleanup_receiver { struct type; }; template using error_cleanup_receiver = typename _error_cleanup_receiver< remove_cvref_t, remove_cvref_t, remove_cvref_t, remove_cvref_t>::type; template struct _error_cleanup_receiver::type { operation& op_; std::exception_ptr ex_; // No value() in cleanup receiver template void set_error(Error error) noexcept { auto& op = op_; unifex::deactivate_union_member(op.errorCleanup_); unifex::set_error(static_cast(op.receiver_), (Error &&) error); } void set_done() noexcept { auto& op = op_; auto ex = std::move(ex_); unifex::deactivate_union_member(op.errorCleanup_); unifex::set_error(static_cast(op.receiver_), std::move(ex)); } template(typename CPO) (requires is_receiver_query_cpo_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept( is_nothrow_callable_v) -> callable_result_t { return std::move(cpo)(std::as_const(r.op_.receiver_)); } friend unstoppable_token tag_invoke(tag_t, const type&) noexcept { return {}; } template friend void tag_invoke(tag_t, const type& r, Func&& func) { std::invoke(func, r.op_.receiver_); } }; template struct _done_cleanup_receiver { struct type; }; template using done_cleanup_receiver = typename _done_cleanup_receiver< remove_cvref_t, remove_cvref_t, remove_cvref_t, remove_cvref_t>::type; template struct _done_cleanup_receiver::type { operation& op_; template void set_error(Error error) && noexcept { auto& op = op_; unifex::deactivate_union_member(op.doneCleanup_); unifex::set_error(static_cast(op.receiver_), (Error &&) error); } void set_done() && noexcept { auto& op = op_; unifex::deactivate_union_member(op.doneCleanup_); unifex::set_value( static_cast(op.receiver_), std::forward(op.state_)); } template(typename CPO) (requires is_receiver_query_cpo_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept(is_nothrow_callable_v) -> callable_result_t { return std::move(cpo)(std::as_const(r.op_.receiver_)); } friend unstoppable_token tag_invoke(tag_t, const type&) noexcept { return {}; } template friend void tag_invoke(tag_t, const type& r, Func&& func) { std::invoke(func, r.op_.receiver_); } }; template struct _next_receiver { struct type; }; template using next_receiver = typename _next_receiver< remove_cvref_t, remove_cvref_t, remove_cvref_t, remove_cvref_t>::type; template struct _next_receiver::type { using error_cleanup_receiver_t = error_cleanup_receiver; using done_cleanup_receiver_t = done_cleanup_receiver; using next_receiver_t = next_receiver; operation& op_; template(typename CPO) (requires is_receiver_query_cpo_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept(is_nothrow_callable_v) -> callable_result_t { return std::move(cpo)(std::as_const(r.op_.receiver_)); } template friend void tag_invoke( tag_t, const type& r, Func&& func) { std::invoke(func, r.op_.receiver_); } template void set_value(Values... values) && noexcept { auto& op = op_; unifex::deactivate_union_member(op.next_); UNIFEX_TRY { op.state_ = std::invoke( op.reducer_, std::forward(op.state_), (Values &&) values...); unifex::activate_union_member_with(op.next_, [&] { return unifex::connect(next(op.stream_), next_receiver_t{op}); }); unifex::start(op.next_.get()); } UNIFEX_CATCH (...) { unifex::activate_union_member_with(op.errorCleanup_, [&] { return unifex::connect( cleanup(op.stream_), error_cleanup_receiver_t{op, std::current_exception()}); }); unifex::start(op.errorCleanup_.get()); } } void set_done() && noexcept { auto& op = op_; unifex::deactivate_union_member(op.next_); unifex::activate_union_member_with(op.doneCleanup_, [&] { return unifex::connect( cleanup(op.stream_), done_cleanup_receiver_t{op}); }); unifex::start(op.doneCleanup_.get()); } void set_error(std::exception_ptr ex) && noexcept { auto& op = op_; unifex::deactivate_union_member(op.next_); unifex::activate_union_member_with(op.errorCleanup_, [&] { return unifex::connect( cleanup(op.stream_), error_cleanup_receiver_t{op, std::move(ex)}); }); unifex::start(op.errorCleanup_.get()); } template void set_error(Error&& e) && noexcept { std::move(*this).set_error(make_exception_ptr((Error &&) e)); } }; template struct _op::type { UNIFEX_NO_UNIQUE_ADDRESS StreamSender stream_; UNIFEX_NO_UNIQUE_ADDRESS State state_; UNIFEX_NO_UNIQUE_ADDRESS ReducerFunc reducer_; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; using next_receiver_t = next_receiver; using error_cleanup_receiver_t = error_cleanup_receiver; using done_cleanup_receiver_t = done_cleanup_receiver; using next_op = manual_lifetime>; using error_op = manual_lifetime>; using done_op = manual_lifetime>; union { next_op next_; error_op errorCleanup_; done_op doneCleanup_; }; template explicit type( StreamSender2&& stream, State2&& state, ReducerFunc2&& reducer, Receiver2&& receiver) : stream_(std::forward(stream)), state_(std::forward(state)), reducer_(std::forward(reducer)), receiver_(std::forward(receiver)) {} ~type() {} // Due to the union member, this is load-bearing. DO NOT DELETE. void start() noexcept { UNIFEX_TRY { unifex::activate_union_member_with(next_, [&] { return unifex::connect(next(stream_), next_receiver_t{*this}); }); unifex::start(next_.get()); } UNIFEX_CATCH (...) { unifex::set_error( static_cast(receiver_), std::current_exception()); } } }; template struct _sender { struct type; }; template using sender = typename _sender< remove_cvref_t, remove_cvref_t, remove_cvref_t>::type; template struct _sender::type { using sender = type; StreamSender stream_; State initialState_; ReducerFunc reducer_; template < template class Variant, template class Tuple> using value_types = Variant>; template