2025-01-12 19:51:34 +08:00

269 lines
7.5 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/async_trace.hpp>
#include <unifex/await_transform.hpp>
#include <unifex/connect_awaitable.hpp>
#include <unifex/inplace_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/coroutine.hpp>
#include <unifex/coroutine_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/std_concepts.hpp>
#include <unifex/scope_guard.hpp>
#include <unifex/type_list.hpp>
#include <unifex/invoke.hpp>
#include <unifex/at_coroutine_exit.hpp>
#include <unifex/continuations.hpp>
#if UNIFEX_NO_COROUTINES
# error "Coroutine support is required to use this header"
#endif
#include <exception>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace _task {
using namespace _util;
template <typename T>
struct _task {
struct [[nodiscard]] type;
};
struct _promise_base {
struct _final_suspend_awaiter_base {
static bool await_ready() noexcept {
return false;
}
static void await_resume() noexcept {}
};
coro::suspend_always initial_suspend() noexcept {
return {};
}
coro::coroutine_handle<> unhandled_done() noexcept {
return continuation_.done();
}
template <typename Func>
friend void
tag_invoke(tag_t<visit_continuations>, const _promise_base& p, Func&& func) {
visit_continuations(p.continuation_, (Func &&) func);
}
friend inplace_stop_token tag_invoke(tag_t<get_stop_token>, const _promise_base& p) noexcept {
return p.stoken_;
}
friend continuation_handle<> tag_invoke(
tag_t<exchange_continuation>, _promise_base& p, continuation_handle<> action) noexcept {
return std::exchange(p.continuation_, (continuation_handle<>&&) action);
}
continuation_handle<> continuation_;
inplace_stop_token stoken_;
};
template <typename T>
struct _return_value_or_void {
struct type {
template(typename Value = T)
(requires convertible_to<Value, T> AND constructible_from<T, Value>)
void return_value(Value&& value) noexcept(
std::is_nothrow_constructible_v<T, Value>) {
expected_.reset_value();
unifex::activate_union_member(expected_.value_, (Value &&) value);
expected_.state_ = _state::value;
}
_expected<T> expected_;
};
};
template <>
struct _return_value_or_void<void> {
struct type {
void return_void() noexcept {
expected_.reset_value();
unifex::activate_union_member(expected_.value_);
expected_.state_ = _state::value;
}
_expected<void> expected_;
};
};
template <typename T>
struct _promise {
struct type : _promise_base, _return_value_or_void<T>::type {
using result_type = T;
typename _task<T>::type get_return_object() noexcept {
return typename _task<T>::type{
coro::coroutine_handle<type>::from_promise(*this)};
}
auto final_suspend() noexcept {
struct awaiter : _final_suspend_awaiter_base {
#if defined(_MSC_VER) && !defined(__clang__)
// MSVC doesn't seem to like symmetric transfer in this final awaiter
void await_suspend(coro::coroutine_handle<type> h) noexcept {
return h.promise().continuation_.handle().resume();
}
#else
auto await_suspend(coro::coroutine_handle<type> h) noexcept {
return h.promise().continuation_.handle();
}
#endif
};
return awaiter{};
}
void unhandled_exception() noexcept {
this->expected_.reset_value();
unifex::activate_union_member(this->expected_.exception_, std::current_exception());
this->expected_.state_ = _state::exception;
}
template(typename Value)
(requires callable<decltype(unifex::await_transform), type&, Value>)
auto await_transform(Value&& value)
noexcept(is_nothrow_callable_v<decltype(unifex::await_transform), type&, Value>)
-> callable_result_t<decltype(unifex::await_transform), type&, Value> {
return unifex::await_transform(*this, (Value&&)value);
}
decltype(auto) result() {
if (this->expected_.state_ == _state::exception) {
std::rethrow_exception(std::move(this->expected_.exception_).get());
}
return std::move(this->expected_.value_).get();
}
};
};
template<typename ThisPromise, typename OtherPromise>
struct _awaiter {
struct type {
using result_type = typename ThisPromise::result_type;
explicit type(coro::coroutine_handle<ThisPromise> coro) noexcept
: coro_(coro)
{}
type(type&& other) noexcept
: coro_(std::exchange(other.coro_, {}))
{}
~type() {
if (coro_) coro_.destroy();
}
bool await_ready() noexcept {
return false;
}
coro::coroutine_handle<ThisPromise> await_suspend(
coro::coroutine_handle<OtherPromise> h) noexcept {
UNIFEX_ASSERT(coro_);
auto& promise = coro_.promise();
promise.continuation_ = h;
promise.stoken_ = stopTokenAdapter_.subscribe(get_stop_token(h.promise()));
return coro_;
}
result_type await_resume() {
stopTokenAdapter_.unsubscribe();
scope_guard destroyOnExit{[this]() noexcept { std::exchange(coro_, {}).destroy(); }};
return coro_.promise().result();
}
private:
coro::coroutine_handle<ThisPromise> coro_;
UNIFEX_NO_UNIQUE_ADDRESS
detail::inplace_stop_token_adapter_subscription<stop_token_type_t<OtherPromise>> stopTokenAdapter_;
};
};
template <typename T>
struct _task<T>::type {
using promise_type = typename _promise<T>::type;
friend promise_type;
template<
template<typename...> class Variant,
template<typename...> class Tuple>
using value_types = Variant<
typename std::conditional_t<
std::is_void_v<T>, type_list<>, type_list<T>>
::template apply<Tuple>>;
template<
template<typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
~type() {
if (coro_)
coro_.destroy();
}
type(type&& t) noexcept : coro_(std::exchange(t.coro_, {})) {}
type& operator=(type t) noexcept {
std::swap(coro_, t.coro_);
return *this;
}
template <typename Fn, typename... Args>
friend type tag_invoke(
tag_t<co_invoke>, type_identity<type>, Fn fn, Args... args) {
co_return co_await std::invoke((Fn&&) fn, (Args&&) args...);
}
private:
template <typename OtherPromise>
using awaiter = typename _awaiter<promise_type, OtherPromise>::type;
explicit type(coro::coroutine_handle<promise_type> h) noexcept
: coro_(h) {}
template<typename Promise>
friend awaiter<Promise> tag_invoke(tag_t<unifex::await_transform>, Promise&, type&& t) noexcept {
return awaiter<Promise>{std::exchange(t.coro_, {})};
}
template<typename Receiver>
friend auto tag_invoke(tag_t<unifex::connect>, type&& t, Receiver&& r) {
return unifex::connect_awaitable((type&&)t, (Receiver&&)r);
}
coro::coroutine_handle<promise_type> coro_;
};
} // namespace _task
template <typename T>
using task = typename _task::_task<T>::type;
} // namespace unifex
#include <unifex/detail/epilogue.hpp>