AimRT/install_x64/include/unifex/thread_unsafe_event_loop.hpp
2025-01-12 19:51:34 +08:00

410 lines
11 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/config.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <chrono>
#include <exception>
#include <optional>
#include <type_traits>
#include <utility>
#include <unifex/detail/prologue.hpp>
namespace unifex {
class thread_unsafe_event_loop;
namespace _thread_unsafe_event_loop {
using clock_t = std::chrono::steady_clock;
using time_point_t = clock_t::time_point;
class cancel_callback;
class operation_base {
friend cancel_callback;
protected:
using execute_fn = void(operation_base*) noexcept;
operation_base(thread_unsafe_event_loop& loop, execute_fn* execute) noexcept
: loop_(loop), execute_(execute) {}
operation_base(const operation_base&) = delete;
operation_base(operation_base&&) = delete;
public:
void start() noexcept;
private:
friend thread_unsafe_event_loop;
void execute() noexcept {
this->execute_(this);
}
thread_unsafe_event_loop& loop_;
operation_base* next_;
operation_base** prevPtr_;
execute_fn* execute_;
protected:
time_point_t dueTime_;
};
class cancel_callback {
public:
explicit cancel_callback(operation_base& op) noexcept
: op_(&op) {}
void operator()() noexcept;
private:
operation_base* const op_;
};
class scheduler;
template <typename Duration>
struct _schedule_after_sender {
class type;
};
template <typename Duration>
using schedule_after_sender = typename _schedule_after_sender<Duration>::type;
template <typename Duration, typename Receiver>
struct _after_op {
class type;
};
template <typename Duration, typename Receiver>
using after_operation = typename _after_op<Duration, remove_cvref_t<Receiver>>::type;
template <typename Duration, typename Receiver>
class _after_op<Duration, Receiver>::type final : public operation_base {
friend schedule_after_sender<Duration>;
public:
void start() noexcept {
this->dueTime_ = clock_t::now() + duration_;
callback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
operation_base::start();
}
private:
template <typename Receiver2>
explicit type(
Receiver2&& r,
Duration d,
thread_unsafe_event_loop& loop)
: operation_base(loop, &type::execute_impl)
, receiver_((Receiver2 &&) r)
, duration_(d) {}
static void execute_impl(operation_base* p) noexcept {
auto& self = *static_cast<type*>(p);
self.callback_.destruct();
if constexpr (is_stop_never_possible_v<
stop_token_type_t<Receiver&>>) {
unifex::set_value(std::move(self.receiver_));
} else {
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_value(std::move(self.receiver_));
}
}
}
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
UNIFEX_NO_UNIQUE_ADDRESS Duration duration_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t<
Receiver&>::template callback_type<cancel_callback>>
callback_;
};
template <typename Duration>
class _schedule_after_sender<Duration>::type {
using schedule_after_sender = type;
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<>;
static constexpr bool sends_done = true;
template <typename Receiver>
after_operation<Duration, remove_cvref_t<Receiver>> connect(Receiver&& r) const& {
return after_operation<Duration, remove_cvref_t<Receiver>>{
(Receiver &&) r, duration_, loop_};
}
private:
friend scheduler;
explicit type(
thread_unsafe_event_loop& loop,
Duration duration) noexcept
: loop_(loop), duration_(duration) {}
thread_unsafe_event_loop& loop_;
Duration duration_;
};
struct schedule_at_sender;
template <typename Receiver>
struct _at_op {
class type;
};
template <typename Receiver>
using at_operation = typename _at_op<remove_cvref_t<Receiver>>::type;
template <typename Receiver>
class _at_op<Receiver>::type final : public operation_base {
public:
void start() noexcept {
callback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
operation_base::start();
}
private:
friend schedule_at_sender;
template <typename Receiver2>
explicit type(
Receiver2&& r,
time_point_t tp,
thread_unsafe_event_loop& loop)
: operation_base(loop, &type::execute_impl), receiver_((Receiver2 &&) r) {
this->dueTime_ = tp;
}
static void execute_impl(operation_base* p) noexcept {
auto& self = *static_cast<type*>(p);
self.callback_.destruct();
if constexpr (is_stop_never_possible_v<
stop_token_type_t<Receiver&>>) {
unifex::set_value(std::move(self.receiver_));
} else {
if (get_stop_token(self.receiver_).stop_requested()) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_value(std::move(self.receiver_));
}
}
}
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
UNIFEX_NO_UNIQUE_ADDRESS manual_lifetime<typename stop_token_type_t<
Receiver&>::template callback_type<cancel_callback>>
callback_;
};
struct schedule_at_sender {
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<>;
static constexpr bool sends_done = true;
template <typename Receiver>
at_operation<remove_cvref_t<Receiver>> connect(Receiver&& r) const& {
return at_operation<remove_cvref_t<Receiver>>{
(Receiver &&) r, dueTime_, loop_};
}
private:
friend scheduler;
explicit schedule_at_sender(
thread_unsafe_event_loop& loop,
time_point_t dueTime)
: loop_(loop), dueTime_(dueTime) {}
thread_unsafe_event_loop& loop_;
time_point_t dueTime_;
};
class scheduler {
public:
auto schedule_at(time_point_t dueTime) const noexcept {
return schedule_at_sender{*loop_, dueTime};
}
template <typename Rep, typename Ratio>
auto schedule_after(std::chrono::duration<Rep, Ratio> d) const noexcept {
return schedule_after_sender<std::chrono::duration<Rep, Ratio>>{*loop_, d};
}
auto schedule() const noexcept {
return schedule_after(std::chrono::milliseconds(0));
}
friend bool operator==(scheduler a, scheduler b) noexcept {
return a.loop_ == b.loop_;
}
friend bool operator!=(scheduler a, scheduler b) noexcept {
return a.loop_ != b.loop_;
}
private:
friend thread_unsafe_event_loop;
explicit scheduler(thread_unsafe_event_loop& loop) noexcept
: loop_(&loop) {}
thread_unsafe_event_loop* loop_;
};
template <typename T>
struct _sync_wait_promise {
class type;
};
template <typename T>
using sync_wait_promise = typename _sync_wait_promise<T>::type;
template <typename T>
class _sync_wait_promise<T>::type {
using sync_wait_promise = type;
enum class state { incomplete, done, value, error };
class receiver {
public:
template <typename... Values>
void set_value(Values&&... values) && noexcept {
UNIFEX_TRY {
unifex::activate_union_member(promise_.value_, (Values &&) values...);
promise_.state_ = state::value;
} UNIFEX_CATCH (...) {
unifex::activate_union_member(promise_.exception_, std::current_exception());
promise_.state_ = state::error;
}
}
void set_error(std::exception_ptr ex) && noexcept {
unifex::activate_union_member(promise_.exception_, std::move(ex));
promise_.state_ = state::error;
}
void set_done() && noexcept {
promise_.state_ = state::done;
}
private:
friend sync_wait_promise;
explicit receiver(sync_wait_promise& promise) noexcept
: promise_(promise) {}
sync_wait_promise& promise_;
};
public:
type() noexcept {}
~type() {
if (state_ == state::value) {
unifex::deactivate_union_member(value_);
} else if (state_ == state::error) {
unifex::deactivate_union_member(exception_);
}
}
receiver get_receiver() noexcept {
return receiver{*this};
}
std::optional<T> get() && {
switch (state_) {
case state::done:
return std::nullopt;
case state::value:
return std::move(value_).get();
case state::error:
std::rethrow_exception(exception_.get());
default:
UNIFEX_ASSERT(false);
std::terminate();
}
}
private:
union {
manual_lifetime<T> value_;
manual_lifetime<std::exception_ptr> exception_;
};
state state_ = state::incomplete;
};
} // namespace _thread_unsafe_event_loop
class thread_unsafe_event_loop {
using operation_base = _thread_unsafe_event_loop::operation_base;
using scheduler = _thread_unsafe_event_loop::scheduler;
using cancel_callback = _thread_unsafe_event_loop::cancel_callback;
friend operation_base;
friend cancel_callback;
void enqueue(operation_base* op) noexcept;
void run_until_empty() noexcept;
operation_base* head_ = nullptr;
public:
using clock_t = _thread_unsafe_event_loop::clock_t;
using time_point_t = _thread_unsafe_event_loop::time_point_t;
scheduler get_scheduler() noexcept {
return scheduler{*this};
}
template <
typename Sender,
typename Result = sender_single_value_result_t<remove_cvref_t<Sender>>>
std::optional<Result> sync_wait(Sender&& sender) {
using promise_t = _thread_unsafe_event_loop::sync_wait_promise<Result>;
promise_t promise;
auto op = connect((Sender &&) sender, promise.get_receiver());
start(op);
run_until_empty();
return std::move(promise).get();
}
};
namespace _thread_unsafe_event_loop {
inline void operation_base::start() noexcept {
loop_.enqueue(this);
}
}
} // namespace unifex
#include <unifex/detail/epilogue.hpp>