AimRT/install_x64/include/unifex/linux/io_epoll_context.hpp
2025-01-12 19:51:34 +08:00

985 lines
31 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/config.hpp>
#if !UNIFEX_NO_EPOLL
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/pipe_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/span.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/linux/monotonic_clock.hpp>
#include <unifex/linux/safe_file_descriptor.hpp>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <optional>
#include <system_error>
#include <utility>
#include <sys/uio.h>
#include <sys/epoll.h>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace linuxos {
class io_epoll_context {
public:
class schedule_sender;
class schedule_at_sender;
template <typename Duration>
class schedule_after_sender;
class scheduler;
class read_sender;
class write_sender;
class async_reader;
class async_writer;
io_epoll_context();
~io_epoll_context();
template <typename StopToken>
void run(StopToken stopToken);
scheduler get_scheduler() noexcept;
private:
struct operation_base {
~operation_base() {
UNIFEX_ASSERT(enqueued_.load() == 0);
}
operation_base() noexcept : enqueued_(0), next_(nullptr), execute_(nullptr) {}
std::atomic<int> enqueued_;
operation_base* next_;
void (*execute_)(operation_base*) noexcept;
};
struct completion_base : operation_base {
};
struct stop_operation : operation_base {
stop_operation() noexcept {
this->execute_ = [](operation_base * op) noexcept {
static_cast<stop_operation*>(op)->shouldStop_ = true;
};
}
bool shouldStop_ = false;
};
using time_point = linuxos::monotonic_clock::time_point;
struct schedule_at_operation : operation_base {
explicit schedule_at_operation(
io_epoll_context& context,
const time_point& dueTime,
bool canBeCancelled) noexcept
: context_(context),
dueTime_(dueTime),
canBeCancelled_(canBeCancelled) {}
schedule_at_operation* timerNext_;
schedule_at_operation* timerPrev_;
io_epoll_context& context_;
time_point dueTime_;
bool canBeCancelled_;
static constexpr std::uint32_t timer_elapsed_flag = 1;
static constexpr std::uint32_t cancel_pending_flag = 2;
std::atomic<std::uint32_t> state_ = 0;
};
using operation_queue =
intrusive_queue<operation_base, &operation_base::next_>;
using timer_heap = intrusive_heap<
schedule_at_operation,
&schedule_at_operation::timerNext_,
&schedule_at_operation::timerPrev_,
time_point,
&schedule_at_operation::dueTime_>;
bool is_running_on_io_thread() const noexcept;
void run_impl(const bool& shouldStop);
void schedule_impl(operation_base* op);
void schedule_local(operation_base* op) noexcept;
void schedule_local(operation_queue ops) noexcept;
void schedule_remote(operation_base* op) noexcept;
// Insert the timer operation into the queue of timers.
// Must be called from the I/O thread.
void schedule_at_impl(schedule_at_operation* op) noexcept;
// Execute all ready-to-run items on the local queue.
// Will not run other items that were enqueued during the execution of the
// items that were already enqueued.
// This bounds the amount of work to a finite amount.
void execute_pending_local() noexcept;
// Check if any completion queue items are available and if so add them
// to the local queue.
void acquire_completion_queue_items();
// collect the contents of the remote queue and pass them to schedule_local
//
// Returns true if successful.
//
// Returns false if some other thread concurrently enqueued work to the remote queue.
bool try_schedule_local_remote_queue_contents() noexcept;
// Signal the remote queue eventfd.
//
// This should only be called after trying to enqueue() work
// to the remoteQueue and being told that the I/O thread is
// inactive.
void signal_remote_queue();
void remove_timer(schedule_at_operation* op) noexcept;
void update_timers() noexcept;
bool try_submit_timer_io(const time_point& dueTime) noexcept;
void* timer_user_data() const {
return const_cast<void*>(static_cast<const void*>(&timers_));
}
////////
// Data that does not change once initialised.
// Resources
safe_file_descriptor epollFd_;
safe_file_descriptor timerFd_;
safe_file_descriptor remoteQueueEventFd_;
///////////////////
// Data that is modified by I/O thread
// Local queue for operations that are ready to execute.
operation_queue localQueue_;
// Set of operations waiting to be executed at a specific time.
timer_heap timers_;
// The time that the current timer operation submitted to the kernel
// is due to elapse.
std::optional<time_point> currentDueTime_;
bool remoteQueueReadSubmitted_ = false;
bool timersAreDirty_ = false;
//////////////////
// Data that is modified by remote threads
// Queue of operations enqueued by remote threads.
atomic_intrusive_queue<operation_base, &operation_base::next_> remoteQueue_;
};
template <typename StopToken>
void io_epoll_context::run(StopToken stopToken) {
stop_operation stopOp;
auto onStopRequested = [&] { this->schedule_impl(&stopOp); };
typename StopToken::template callback_type<decltype(onStopRequested)>
stopCallback{std::move(stopToken), std::move(onStopRequested)};
run_impl(stopOp.shouldStop_);
}
class io_epoll_context::schedule_sender {
template <typename Receiver>
class operation : private operation_base {
public:
void start() noexcept {
UNIFEX_TRY {
context_.schedule_impl(this);
} UNIFEX_CATCH (...) {
unifex::set_error(
static_cast<Receiver&&>(receiver_), std::current_exception());
}
}
private:
friend schedule_sender;
template <typename Receiver2>
explicit operation(io_epoll_context& context, Receiver2&& r)
: context_(context), receiver_((Receiver2 &&) r) {
this->execute_ = &execute_impl;
}
static void execute_impl(operation_base* p) noexcept {
operation& op = *static_cast<operation*>(p);
if constexpr (!is_stop_never_possible_v<stop_token_type_t<Receiver>>) {
if (get_stop_token(op.receiver_).stop_requested()) {
unifex::set_done(static_cast<Receiver&&>(op.receiver_));
return;
}
}
if constexpr (is_nothrow_receiver_of_v<Receiver>) {
unifex::set_value(static_cast<Receiver&&>(op.receiver_));
} else {
UNIFEX_TRY {
unifex::set_value(static_cast<Receiver&&>(op.receiver_));
} UNIFEX_CATCH (...) {
unifex::set_error(
static_cast<Receiver&&>(op.receiver_), std::current_exception());
}
}
}
io_epoll_context& context_;
Receiver receiver_;
};
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
template <typename Receiver>
operation<std::remove_reference_t<Receiver>> connect(Receiver&& r) && {
return operation<std::remove_reference_t<Receiver>>{context_,
(Receiver &&) r};
}
private:
friend io_epoll_context::scheduler;
explicit schedule_sender(io_epoll_context& context) noexcept
: context_(context) {}
io_epoll_context& context_;
};
class io_epoll_context::schedule_at_sender {
template <typename Receiver>
struct operation : schedule_at_operation {
static constexpr bool is_stop_ever_possible =
!is_stop_never_possible_v<stop_token_type_t<Receiver>>;
public:
explicit operation(
io_epoll_context& context,
const time_point& dueTime,
Receiver&& r)
: schedule_at_operation(
context,
dueTime,
get_stop_token(r).stop_possible()),
receiver_((Receiver &&) r) {}
void start() noexcept {
if (this->context_.is_running_on_io_thread()) {
start_local();
} else {
start_remote();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_local();
}
static void complete_with_done(operation_base* op) noexcept {
// Avoid instantiating set_done() if we're not going to call it.
if constexpr (is_stop_ever_possible) {
auto& timerOp = *static_cast<operation*>(op);
unifex::set_done(std::move(timerOp).receiver_);
} else {
// This should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
}
// Executed when the timer gets to the front of the ready-to-run queue.
static void maybe_complete_with_value(operation_base* op) noexcept {
auto& timerOp = *static_cast<operation*>(op);
if constexpr (is_stop_ever_possible) {
timerOp.stopCallback_.destruct();
if (get_stop_token(timerOp.receiver_).stop_requested()) {
complete_with_done(op);
return;
}
}
if constexpr (is_nothrow_receiver_of_v<Receiver>) {
unifex::set_value(std::move(timerOp).receiver_);
} else {
UNIFEX_TRY {
unifex::set_value(std::move(timerOp).receiver_);
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(timerOp).receiver_, std::current_exception());
}
}
}
static void remove_timer_from_queue_and_complete_with_done(
operation_base* op) noexcept {
// Avoid instantiating set_done() if we're never going to call it.
if constexpr (is_stop_ever_possible) {
auto& timerOp = *static_cast<operation*>(op);
UNIFEX_ASSERT(timerOp.context_.is_running_on_io_thread());
timerOp.stopCallback_.destruct();
auto state = timerOp.state_.load(std::memory_order_relaxed);
if ((state & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer not yet removed from the timers_ list. Do that now.
timerOp.context_.remove_timer(&timerOp);
}
unifex::set_done(std::move(timerOp).receiver_);
} else {
// Should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
}
void start_local() noexcept {
if constexpr (is_stop_ever_possible) {
if (get_stop_token(receiver_).stop_requested()) {
// Stop already requested. Don't bother adding the timer.
this->execute_ = &operation::complete_with_done;
this->context_.schedule_local(this);
return;
}
}
this->execute_ = &operation::maybe_complete_with_value;
this->context_.schedule_at_impl(this);
if constexpr (is_stop_ever_possible) {
stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
}
}
void start_remote() noexcept {
this->execute_ = &operation::on_schedule_complete;
this->context_.schedule_remote(this);
}
void request_stop() noexcept {
if (context_.is_running_on_io_thread()) {
request_stop_local();
} else {
request_stop_remote();
}
}
void request_stop_local() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
stopCallback_.destruct();
this->execute_ = &operation::complete_with_done;
auto state = this->state_.load(std::memory_order_relaxed);
if ((state & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer not yet elapsed.
// Remove timer from list of timers and enqueue cancellation.
context_.remove_timer(this);
context_.schedule_local(this);
} else {
// Timer already elapsed and added to ready-to-run queue.
}
}
void request_stop_remote() noexcept {
auto oldState = this->state_.fetch_add(
schedule_at_operation::cancel_pending_flag,
std::memory_order_acq_rel);
if ((oldState & schedule_at_operation::timer_elapsed_flag) == 0) {
// Timer had not yet elapsed.
// We are responsible for scheduling the completion of this timer
// operation.
this->execute_ =
&operation::remove_timer_from_queue_and_complete_with_done;
this->context_.schedule_remote(this);
}
}
struct cancel_callback {
operation& op_;
void operator()() noexcept {
op_.request_stop();
}
};
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
};
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true;
explicit schedule_at_sender(
io_epoll_context& context,
const time_point& dueTime) noexcept
: context_(context), dueTime_(dueTime) {}
template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) const & {
return operation<remove_cvref_t<Receiver>>{
context_, dueTime_, (Receiver &&) r};
}
private:
io_epoll_context& context_;
time_point dueTime_;
};
class io_epoll_context::scheduler {
public:
scheduler(const scheduler&) noexcept = default;
scheduler& operator=(const scheduler&) = default;
~scheduler() = default;
schedule_sender schedule() const noexcept {
return schedule_sender{*context_};
}
time_point now() const noexcept {
return monotonic_clock::now();
}
schedule_at_sender schedule_at(const time_point& dueTime) const noexcept {
return schedule_at_sender{*context_, dueTime};
}
private:
friend io_epoll_context;
friend std::pair<async_reader, async_writer> tag_invoke(
tag_t<open_pipe>,
scheduler s);
friend bool operator==(scheduler a, scheduler b) noexcept {
return a.context_ == b.context_;
}
friend bool operator!=(scheduler a, scheduler b) noexcept {
return a.context_ != b.context_;
}
explicit scheduler(io_epoll_context& context) noexcept
: context_(&context) {}
io_epoll_context* context_;
};
inline io_epoll_context::scheduler io_epoll_context::get_scheduler() noexcept {
return scheduler{*this};
}
class io_epoll_context::read_sender {
struct done_op : operation_base {
};
template <typename Receiver>
class operation : private completion_base, private done_op {
friend io_epoll_context;
static constexpr bool is_stop_ever_possible =
!is_stop_never_possible_v<stop_token_type_t<Receiver>>;
public:
template <typename Receiver2>
explicit operation(const read_sender& sender, Receiver2&& r)
: context_(sender.context_),
fd_(sender.fd_),
receiver_((Receiver2 &&) r) {
buffer_[0].iov_base = sender.buffer_.data();
buffer_[0].iov_len = sender.buffer_.size();
}
void start() noexcept {
if (!context_.is_running_on_io_thread()) {
static_cast<completion_base*>(this)->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(static_cast<completion_base*>(this));
} else {
start_io();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<completion_base*>(op));
self.start_io();
}
void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto result = readv(fd_, buffer_, 1);
if (result == -EAGAIN || result == -EWOULDBLOCK || result == -EPERM) {
if constexpr (is_stop_ever_possible) {
stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
}
UNIFEX_ASSERT(static_cast<completion_base*>(this)->enqueued_.load() == 0);
static_cast<completion_base*>(this)->execute_ = &operation::on_read_complete;
epoll_event event;
event.data.ptr = static_cast<completion_base*>(this);
event.events = EPOLLIN | EPOLLRDHUP | EPOLLHUP;
(void)epoll_ctl(context_.epollFd_.get(), EPOLL_CTL_ADD, fd_, &event);
return;
}
auto oldState = state_.fetch_add(
io_epoll_context::read_sender::operation<Receiver>::io_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::read_sender::operation<Receiver>::cancel_pending_mask) != 0) {
// io has been cancelled by a remote thread.
// The other thread is responsible for enqueueing the operation completion
return;
}
if (result == -ECANCELED) {
unifex::set_done(std::move(receiver_));
} else if (result >= 0) {
if constexpr (is_nothrow_receiver_of_v<Receiver, ssize_t>) {
unifex::set_value(std::move(receiver_), ssize_t(result));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(receiver_), ssize_t(result));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
} else {
unifex::set_error(
std::move(receiver_),
std::error_code{-int(result), std::system_category()});
}
}
static void on_read_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<completion_base*>(op));
UNIFEX_ASSERT(static_cast<completion_base&>(self).enqueued_.load() == 0);
self.stopCallback_.destruct();
auto oldState = self.state_.fetch_add(
io_epoll_context::read_sender::operation<Receiver>::io_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::read_sender::operation<Receiver>::cancel_pending_mask) != 0) {
// io has been cancelled by a remote thread.
// The other thread is responsible for enqueueing the operation completion
return;
}
epoll_event event = {};
(void)epoll_ctl(self.context_.epollFd_.get(), EPOLL_CTL_DEL, self.fd_, &event);
auto result = readv(self.fd_, self.buffer_, 1);
UNIFEX_ASSERT(result != -EAGAIN);
UNIFEX_ASSERT(result != -EWOULDBLOCK);
if (result == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else if (result >= 0) {
if constexpr (is_nothrow_receiver_of_v<Receiver, ssize_t>) {
unifex::set_value(std::move(self).receiver_, ssize_t(result));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self).receiver_, ssize_t(result));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(self).receiver_, std::current_exception());
}
}
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-int(result), std::system_category()});
}
}
static void complete_with_done(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<done_op*>(op));
UNIFEX_ASSERT(static_cast<done_op&>(self).enqueued_.load() == 0);
if (static_cast<completion_base&>(self).enqueued_.load() == 0) {
// Avoid instantiating set_done() if we're not going to call it.
if constexpr (is_stop_ever_possible) {
unifex::set_done(std::move(self.receiver_));
} else {
// This should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
} else {
// reschedule after queued io is cleared
static_cast<done_op&>(self).execute_ = &operation::complete_with_done;
self.context_.schedule_local(static_cast<done_op*>(&self));
}
}
void request_stop() noexcept {
auto oldState = this->state_.fetch_add(
io_epoll_context::read_sender::operation<Receiver>::cancel_pending_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::read_sender::operation<Receiver>::io_mask) == 0) {
// IO not yet completed.
epoll_event event = {};
(void)epoll_ctl(this->context_.epollFd_.get(), EPOLL_CTL_DEL, this->fd_, &event);
// We are responsible for scheduling the completion of this io
// operation.
static_cast<done_op&>(*this).execute_ = &operation::complete_with_done;
this->context_.schedule_remote(static_cast<done_op*>(this));
}
}
struct cancel_callback {
operation& op_;
void operator()() noexcept {
op_.request_stop();
}
};
io_epoll_context& context_;
int fd_;
iovec buffer_[1];
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
static constexpr std::uint32_t io_flag = 0x00010000;
static constexpr std::uint32_t io_mask = 0xFFFF0000;
static constexpr std::uint32_t cancel_pending_flag = 1;
static constexpr std::uint32_t cancel_pending_mask = 0xFFFF;
std::atomic<std::uint32_t> state_ = 0;
};
public:
// Produces number of bytes read.
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<ssize_t>>;
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;
static constexpr bool sends_done = true;
explicit read_sender(
io_epoll_context& context,
int fd,
span<std::byte> buffer) noexcept
: context_(context), fd_(fd), buffer_(buffer) {}
template <typename Receiver>
operation<std::decay_t<Receiver>> connect(Receiver&& r) && {
return operation<std::decay_t<Receiver>>{*this, (Receiver &&) r};
}
private:
io_epoll_context& context_;
int fd_;
span<std::byte> buffer_;
};
class io_epoll_context::write_sender {
struct done_op : operation_base {
};
template <typename Receiver>
class operation : private completion_base, private done_op {
friend io_epoll_context;
static constexpr bool is_stop_ever_possible =
!is_stop_never_possible_v<stop_token_type_t<Receiver>>;
public:
template <typename Receiver2>
explicit operation(const write_sender& sender, Receiver2&& r)
: context_(sender.context_),
fd_(sender.fd_),
receiver_((Receiver2 &&) r) {
buffer_[0].iov_base = (void*)sender.buffer_.data();
buffer_[0].iov_len = sender.buffer_.size();
}
void start() noexcept {
if (!context_.is_running_on_io_thread()) {
static_cast<completion_base*>(this)->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(static_cast<completion_base*>(this));
} else {
start_io();
}
}
private:
static void on_schedule_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<completion_base*>(op));
self.start_io();
}
void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());
auto result = writev(fd_, buffer_, 1);
if (result == -EAGAIN || result == -EWOULDBLOCK || result == -EPERM) {
if constexpr (is_stop_ever_possible) {
stopCallback_.construct(
get_stop_token(receiver_), cancel_callback{*this});
}
UNIFEX_ASSERT(static_cast<completion_base*>(this)->enqueued_.load() == 0);
static_cast<completion_base*>(this)->execute_ = &operation::on_write_complete;
epoll_event event;
event.data.ptr = static_cast<completion_base*>(this);
event.events = EPOLLOUT | EPOLLRDHUP | EPOLLHUP;
(void)epoll_ctl(context_.epollFd_.get(), EPOLL_CTL_ADD, fd_, &event);
return;
}
auto oldState = state_.fetch_add(
io_epoll_context::write_sender::operation<Receiver>::io_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::write_sender::operation<Receiver>::cancel_pending_mask) != 0) {
// io has been cancelled by a remote thread.
// The other thread is responsible for enqueueing the operation completion
return;
}
if (result == -ECANCELED) {
unifex::set_done(std::move(receiver_));
} else if (result >= 0) {
if constexpr (is_nothrow_receiver_of_v<Receiver, ssize_t>) {
unifex::set_value(std::move(receiver_), ssize_t(result));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(receiver_), ssize_t(result));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(receiver_), std::current_exception());
}
}
} else {
unifex::set_error(
std::move(receiver_),
std::error_code{-int(result), std::system_category()});
}
}
static void on_write_complete(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<completion_base*>(op));
UNIFEX_ASSERT(static_cast<completion_base&>(self).enqueued_.load() == 0);
self.stopCallback_.destruct();
epoll_event event = {};
(void)epoll_ctl(self.context_.epollFd_.get(), EPOLL_CTL_DEL, self.fd_, &event);
auto oldState = self.state_.fetch_add(
io_epoll_context::write_sender::operation<Receiver>::io_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::write_sender::operation<Receiver>::cancel_pending_mask) != 0) {
// io has been cancelled by a remote thread.
// The other thread is responsible for enqueueing the operation completion
return;
}
auto result = writev(self.fd_, self.buffer_, 1);
UNIFEX_ASSERT(result != -EAGAIN);
UNIFEX_ASSERT(result != -EWOULDBLOCK);
if (result == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else if (result >= 0) {
if constexpr (is_nothrow_receiver_of_v<Receiver, ssize_t>) {
unifex::set_value(std::move(self).receiver_, ssize_t(result));
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self).receiver_, ssize_t(result));
} UNIFEX_CATCH (...) {
unifex::set_error(std::move(self).receiver_, std::current_exception());
}
}
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-int(result), std::system_category()});
}
}
static void complete_with_done(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(static_cast<done_op*>(op));
UNIFEX_ASSERT(static_cast<done_op&>(self).enqueued_.load() == 0);
if (static_cast<completion_base&>(self).enqueued_.load() == 0) {
// Avoid instantiating set_done() if we're not going to call it.
if constexpr (is_stop_ever_possible) {
unifex::set_done(std::move(self.receiver_));
} else {
// This should never be called if stop is not possible.
UNIFEX_ASSERT(false);
}
} else {
// reschedule after queued io is cleared
static_cast<done_op&>(self).execute_ = &operation::complete_with_done;
self.context_.schedule_local(static_cast<done_op*>(&self));
}
}
void request_stop() noexcept {
auto oldState = this->state_.fetch_add(
io_epoll_context::write_sender::operation<Receiver>::cancel_pending_flag,
std::memory_order_acq_rel);
if ((oldState & io_epoll_context::write_sender::operation<Receiver>::io_mask) == 0) {
// IO not yet completed.
epoll_event event = {};
(void)epoll_ctl(this->context_.epollFd_.get(), EPOLL_CTL_DEL, this->fd_, &event);
// We are responsible for scheduling the completion of this io
// operation.
static_cast<done_op&>(*this).execute_ = &operation::complete_with_done;
this->context_.schedule_remote(static_cast<done_op*>(this));
}
}
struct cancel_callback {
operation& op_;
void operator()() noexcept {
op_.request_stop();
}
};
io_epoll_context& context_;
int fd_;
iovec buffer_[1];
Receiver receiver_;
manual_lifetime<typename stop_token_type_t<
Receiver>::template callback_type<cancel_callback>>
stopCallback_;
static constexpr std::uint32_t io_flag = 0x00010000;
static constexpr std::uint32_t io_mask = 0xFFFF0000;
static constexpr std::uint32_t cancel_pending_flag = 1;
static constexpr std::uint32_t cancel_pending_mask = 0xFFFF;
std::atomic<std::uint32_t> state_ = 0;
};
public:
// Produces number of bytes read.
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<ssize_t>>;
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;
static constexpr bool sends_done = true;
explicit write_sender(
io_epoll_context& context,
int fd,
span<const std::byte> buffer) noexcept
: context_(context), fd_(fd), buffer_(buffer) {}
template <typename Receiver>
operation<std::decay_t<Receiver>> connect(Receiver&& r) && {
return operation<std::decay_t<Receiver>>{*this, (Receiver &&) r};
}
private:
io_epoll_context& context_;
int fd_;
span<const std::byte> buffer_;
};
class io_epoll_context::async_reader {
public:
explicit async_reader(io_epoll_context& context, int fd) noexcept
: context_(context), fd_(fd) {}
private:
friend scheduler;
friend read_sender tag_invoke(
tag_t<async_read_some>,
async_reader& reader,
span<std::byte> buffer) noexcept {
return read_sender{reader.context_, reader.fd_.get(), buffer};
}
io_epoll_context& context_;
safe_file_descriptor fd_;
};
class io_epoll_context::async_writer {
public:
explicit async_writer(io_epoll_context& context, int fd) noexcept
: context_(context), fd_(fd) {}
private:
friend scheduler;
friend write_sender tag_invoke(
tag_t<async_write_some>,
async_writer& writer,
span<const std::byte> buffer) noexcept {
return write_sender{writer.context_, writer.fd_.get(), buffer};
}
io_epoll_context& context_;
safe_file_descriptor fd_;
};
} // namespace linuxos
} // namespace unifex
#include <unifex/detail/epilogue.hpp>
#endif // !UNIFEX_NO_EPOLL