/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace unifex { namespace _repeat_effect_until { template struct _op { class type; }; template using operation_type = typename _op::type; template struct _rcvr { class type; }; template using receiver_t = typename _rcvr::type; template struct _sndr { class type; }; template class _rcvr::type { using operation = operation_type; public: explicit type(operation* op) noexcept : op_(op) {} type(type&& other) noexcept : op_(std::exchange(other.op_, {})) {} void set_value() noexcept { UNIFEX_ASSERT(op_ != nullptr); // This signals to repeat_effect_until the operation. auto* op = op_; UNIFEX_ASSERT(op->isSourceOpConstructed_); op->isSourceOpConstructed_ = false; op->sourceOp_.destruct(); if constexpr (std::is_nothrow_invocable_v && is_nothrow_connectable_v && is_nothrow_tag_invocable_v, Receiver>) { // call predicate and complete with void if it returns true if(op->predicate_()) { unifex::set_value(std::move(op->receiver_)); return; } auto& sourceOp = op->sourceOp_.construct_with([&]() noexcept { return unifex::connect(op->source_, type{op}); }); op->isSourceOpConstructed_ = true; unifex::start(sourceOp); } else { UNIFEX_TRY { // call predicate and complete with void if it returns true if(op->predicate_()) { unifex::set_value(std::move(op->receiver_)); return; } auto& sourceOp = op->sourceOp_.construct_with([&] { return unifex::connect(op->source_, type{op}); }); op->isSourceOpConstructed_ = true; unifex::start(sourceOp); } UNIFEX_CATCH (...) { unifex::set_error(std::move(op->receiver_), std::current_exception()); } } } void set_done() noexcept { UNIFEX_ASSERT(op_ != nullptr); unifex::set_done(std::move(op_->receiver_)); } template(typename Error) (requires receiver) void set_error(Error&& error) noexcept { UNIFEX_ASSERT(op_ != nullptr); unifex::set_error(std::move(op_->receiver_), (Error&&)error); } private: template(typename CPO) (requires is_receiver_query_cpo_v AND is_callable_v) friend auto tag_invoke(CPO cpo, const type& r) noexcept(std::is_nothrow_invocable_v) -> callable_result_t { return std::move(cpo)(r.get_rcvr()); } template friend void tag_invoke( tag_t, const type& r, VisitFunc&& func) noexcept(std::is_nothrow_invocable_v< VisitFunc&, const Receiver&>) { std::invoke(func, r.get_rcvr()); } const Receiver& get_rcvr() const noexcept { UNIFEX_ASSERT(op_ != nullptr); return op_->receiver_; } operation* op_; }; template class _op::type { using _receiver_t = receiver_t; public: template explicit type(Source2&& source, Predicate2&& predicate, Receiver2&& dest) noexcept(std::is_nothrow_constructible_v && std::is_nothrow_constructible_v && std::is_nothrow_constructible_v && is_nothrow_connectable_v) : source_((Source2&&)source) , predicate_((Predicate2&&)predicate) , receiver_((Receiver2&&)dest) { sourceOp_.construct_with([&] { return unifex::connect(source_, _receiver_t{this}); }); } ~type() { if (isSourceOpConstructed_) { sourceOp_.destruct(); isSourceOpConstructed_ = false; } } void start() & noexcept { unifex::start(sourceOp_.get()); } private: friend _receiver_t; using source_op_t = connect_result_t; UNIFEX_NO_UNIQUE_ADDRESS Source source_; UNIFEX_NO_UNIQUE_ADDRESS Predicate predicate_; UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_; bool isSourceOpConstructed_ = true; manual_lifetime sourceOp_; }; template class _sndr::type { public: template