/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * https://llvm.org/LICENSE.txt * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace unifex { namespace _stop_immediately { template struct _stream { struct type; }; template using stream = typename _stream, Values...>::type; template struct _stream::type { private: using stream = type; enum class state { not_started, source_next_completed, source_next_active, source_next_active_stream_stopped, source_next_active_cleanup_requested, cleanup_completed }; struct cleanup_operation_base { virtual void start_cleanup() noexcept = 0; }; struct next_receiver_base { virtual void set_value(Values&&... values) && noexcept = 0; virtual void set_done() && noexcept = 0; virtual void set_error(std::exception_ptr ex) && noexcept = 0; }; struct cancel_next_callback { stream& stream_; void operator()() noexcept { auto oldState = stream_.state_.load(std::memory_order_acquire); if (oldState == state::source_next_active) { // We may be racing with the next() operation completing on another // thread so we need to use a compare_exchange here to decide the // race. // Note that the callback destructor is run when we receive // the next() operation completion signal before delivering the signal // to the true receiver. The destructor will will block waiting for // this method to return and so we are guaranteed that there will be // no further call to next() or to cleanup() before we return here. // The only concurrent state transition can be from // 'source_next_active' to 'idle' and there will be no further state // changes until we return. // Thus it should be safe to use 'relaxed' memory access for the // compare-exchange below since we have already synchronised with the // 'acquire' operation above. if (stream_.state_.compare_exchange_strong( oldState, state::source_next_active_stream_stopped, std::memory_order_relaxed)) { // Successfully acquired ownership over the receiver. // Send the 'done' signal immediately to signal the end of the // sequence and also send the stop signal to the still-running // next() operation. stream_.stopSource_.request_stop(); auto receiver = std::exchange(stream_.nextReceiver_, nullptr); UNIFEX_ASSERT(receiver != nullptr); std::move(*receiver).set_done(); } else { UNIFEX_ASSERT(oldState == state::source_next_completed); } } else { UNIFEX_ASSERT(oldState == state::source_next_completed); } } }; struct next_receiver { stream& stream_; inplace_stop_source& get_stop_source() const { return stream_.stopSource_; } // Note, parameters passed by value here just in case we are passed // references to values owned by the operation object that we will be // destroying before passing the values along to the next receiver. void set_value(Values... values) && noexcept { handle_signal([&](next_receiver_base* receiver) noexcept { UNIFEX_TRY { std::move(*receiver).set_value((Values&&)values...); } UNIFEX_CATCH (...) { std::move(*receiver).set_error(std::current_exception()); } }); } void set_done() && noexcept { handle_signal([](next_receiver_base* receiver) noexcept { std::move(*receiver).set_done(); }); } template void set_error(Error&& error) && noexcept { std::move(*this).set_error(make_exception_ptr((Error&&)error)); } void set_error(std::exception_ptr ex) && noexcept { auto& nextError = stream_.nextError_; nextError = std::move(ex); handle_signal([&](next_receiver_base* receiver) noexcept { std::move(*receiver).set_error(std::exchange(nextError, {})); }); } template void handle_signal(Func deliverSignalTo) noexcept { auto& strm = stream_; strm.nextOp_.destruct(); auto oldState = strm.state_.load(std::memory_order_acquire); if (oldState == state::source_next_active) { if (strm.state_.compare_exchange_strong( oldState, state::source_next_completed, std::memory_order_relaxed)) { // We acquired ownership of the receiver before it was cancelled. auto* receiver = std::exchange(strm.nextReceiver_, nullptr); UNIFEX_ASSERT(receiver != nullptr); deliverSignalTo(receiver); return; } } if (oldState == state::source_next_active_stream_stopped) { if (strm.state_.compare_exchange_strong( oldState, state::source_next_completed, std::memory_order_release, std::memory_order_acquire)) { // Successfully signalled that 'next' completed before 'cleanup' // operation started. Discard this signal without forwarding it on. return; } } // Otherwise, cleanup() was requested before this operation completed. // We are responsible for starting cleanup now that next() has finished. UNIFEX_ASSERT(oldState == state::source_next_active_cleanup_requested); UNIFEX_ASSERT(stream_.cleanupOp_ != nullptr); stream_.cleanupOp_->start_cleanup(); } friend inplace_stop_token tag_invoke( tag_t, const next_receiver& r) noexcept { return r.get_stop_source().get_token(); } template friend void tag_invoke( tag_t, const next_receiver& r, Func&& func) { std::invoke(func, r.op_->receiver_); } }; struct next_sender { stream& stream_; template