2025-01-12 19:51:34 +08:00

181 lines
4.8 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/manual_event_loop.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/blocking.hpp>
#include <unifex/with_query_value.hpp>
#include <unifex/bind_back.hpp>
#include <unifex/exception.hpp>
#include <condition_variable>
#include <exception>
#include <mutex>
#include <type_traits>
#include <utility>
#include <optional>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace _sync_wait {
template <typename T>
struct promise {
promise() {}
~promise() {
if (state_ == state::value) {
unifex::deactivate_union_member(value_);
} else if (state_ == state::error) {
unifex::deactivate_union_member(exception_);
}
}
union {
manual_lifetime<T> value_;
manual_lifetime<std::exception_ptr> exception_;
};
enum class state { incomplete, done, value, error };
state state_ = state::incomplete;
};
template <typename T>
struct _receiver {
struct type {
promise<T>& promise_;
manual_event_loop& ctx_;
template <typename... Values>
void set_value(Values&&... values) && noexcept {
UNIFEX_TRY {
unifex::activate_union_member(promise_.value_, (Values&&)values...);
promise_.state_ = promise<T>::state::value;
}
UNIFEX_CATCH (...) {
unifex::activate_union_member(promise_.exception_, std::current_exception());
promise_.state_ = promise<T>::state::error;
}
signal_complete();
}
void set_error(std::exception_ptr err) && noexcept {
unifex::activate_union_member(promise_.exception_, std::move(err));
promise_.state_ = promise<T>::state::error;
signal_complete();
}
void set_error(std::error_code ec) && noexcept {
std::move(*this).set_error(make_exception_ptr(std::system_error{ec, "sync_wait"}));
}
template <typename Error>
void set_error(Error&& e) && noexcept {
std::move(*this).set_error(make_exception_ptr((Error&&)e));
}
void set_done() && noexcept {
promise_.state_ = promise<T>::state::done;
signal_complete();
}
friend auto
tag_invoke(tag_t<get_scheduler>, const type& r) noexcept {
return r.ctx_.get_scheduler();
}
private:
void signal_complete() noexcept {
ctx_.stop();
}
};
};
template <typename T>
using receiver_t = typename _receiver<T>::type;
template <typename Result, typename Sender>
UNIFEX_CLANG_DISABLE_OPTIMIZATION
std::optional<Result> _impl(Sender&& sender) {
using promise_t = _sync_wait::promise<Result>;
promise_t promise;
manual_event_loop ctx;
// Store state for the operation on the stack.
auto operation = connect(
(Sender&&)sender,
_sync_wait::receiver_t<Result>{promise, ctx});
start(operation);
ctx.run();
switch (promise.state_) {
case promise_t::state::done:
return std::nullopt;
case promise_t::state::value:
return std::move(promise.value_).get();
case promise_t::state::error:
std::rethrow_exception(promise.exception_.get());
default:
std::terminate();
}
}
} // namespace _sync_wait
namespace _sync_wait_cpo {
struct _fn {
template(typename Sender)
(requires typed_sender<Sender>)
auto operator()(Sender&& sender) const
-> std::optional<sender_single_value_result_t<remove_cvref_t<Sender>>> {
using Result = sender_single_value_result_t<remove_cvref_t<Sender>>;
return _sync_wait::_impl<Result>((Sender&&) sender);
}
constexpr auto operator()() const
noexcept(is_nothrow_callable_v<
tag_t<bind_back>, _fn>)
-> bind_back_result_t<_fn> {
return bind_back(*this);
}
};
} // namespace _sync_wait_cpo
inline constexpr _sync_wait_cpo::_fn sync_wait {};
namespace _sync_wait_r_cpo {
template <typename Result>
struct _fn {
template(typename Sender)
(requires sender<Sender>)
decltype(auto) operator()(Sender&& sender) const {
using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;
return _sync_wait::_impl<Result2>((Sender&&) sender);
}
};
} // namespace _sync_wait_r_cpo
template <typename Result>
inline constexpr _sync_wait_r_cpo::_fn<Result> sync_wait_r {};
} // namespace unifex
#include <unifex/detail/epilogue.hpp>