2025-01-12 19:51:34 +08:00

215 lines
7.3 KiB
C++

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <unifex/blocking.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/async_trace.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/config.hpp>
#include <unifex/get_allocator.hpp>
#include <unifex/scope_guard.hpp>
#include <unifex/std_concepts.hpp>
#include <unifex/bind_back.hpp>
#include <memory>
#include <utility>
#include <unifex/detail/prologue.hpp>
namespace unifex {
namespace _submit {
template <typename Allocator, typename T>
using rebind_alloc_t =
typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
template <typename Allocator, typename T>
using rebind_traits_t =
std::allocator_traits<rebind_alloc_t<Allocator, T>>;
template <typename Sender, typename Receiver>
struct _op {
class type;
};
template <typename Sender, typename Receiver>
using operation = typename _op<Sender, remove_cvref_t<Receiver>>::type;
template <typename Sender, typename Receiver>
class _op<Sender, Receiver>::type {
struct wrapped_receiver {
type* op_;
template(typename... Values)
(requires receiver_of<Receiver, Values...>)
void set_value(Values&&... values) && noexcept {
auto allocator = get_allocator(get_receiver());
unifex::set_value(std::move(get_receiver()), (Values &&) values...);
destroy(std::move(allocator));
}
template(typename Error)
(requires receiver<Receiver, Error>)
void set_error(Error&& error) && noexcept {
auto allocator = get_allocator(get_receiver());
unifex::set_error(std::move(get_receiver()), (Error &&) error);
destroy(std::move(allocator));
}
void set_done() && noexcept {
auto allocator = get_allocator(get_receiver());
unifex::set_done(std::move(get_receiver()));
destroy(std::move(allocator));
}
template <typename Allocator>
void destroy(Allocator allocator) noexcept {
type* op = op_;
rebind_alloc_t<Allocator, type> typedAllocator{std::move(allocator)};
rebind_traits_t<Allocator, type>::destroy(typedAllocator, op);
rebind_traits_t<Allocator, type>::deallocate(typedAllocator, op, 1);
}
Receiver& get_receiver() const noexcept {
return op_->receiver_;
}
template(typename CPO)
(requires is_receiver_query_cpo_v<CPO>)
friend auto tag_invoke(CPO cpo, const wrapped_receiver& r) noexcept(
is_nothrow_callable_v<CPO, const Receiver&>)
-> callable_result_t<CPO, const Receiver&> {
return std::move(cpo)(std::as_const(r.get_receiver()));
}
template <typename Func>
friend void tag_invoke(
tag_t<visit_continuations>,
const wrapped_receiver& r,
Func&& func) {
std::invoke(func, std::as_const(r.get_receiver()));
}
};
public:
template <typename Receiver2>
explicit type(Sender&& sender, Receiver2&& receiver)
: receiver_((Receiver2 &&) receiver)
, inner_(unifex::connect((Sender &&) sender, wrapped_receiver{this}))
{}
void start() & noexcept {
unifex::start(inner_);
}
private:
UNIFEX_NO_UNIQUE_ADDRESS Receiver receiver_;
/*UNIFEX_NO_UNIQUE_ADDRESS*/ connect_result_t<Sender, wrapped_receiver> inner_;
};
} // namespace _submit
namespace _submit_cpo {
using _submit::rebind_alloc_t;
using _submit::rebind_traits_t;
template <typename Sender, typename Receiver>
using _member_submit_result_t =
decltype((UNIFEX_DECLVAL(Sender&&)).submit(UNIFEX_DECLVAL(Receiver&&)));
template <typename Sender, typename Receiver>
UNIFEX_CONCEPT_FRAGMENT( //
_has_member_submit_, //
requires() ( //
typename(_member_submit_result_t<Sender, Receiver>)
));
template <typename Sender, typename Receiver>
UNIFEX_CONCEPT //
_has_member_submit = //
UNIFEX_FRAGMENT(_submit_cpo::_has_member_submit_, Sender, Receiver);
inline const struct _fn {
template(typename Sender, typename Receiver)
(requires sender<Sender> AND receiver<Receiver> AND
tag_invocable<_fn, Sender, Receiver>)
void operator()(Sender&& sender, Receiver&& receiver) const {
static_assert(
std::is_void_v<tag_invoke_result_t<_fn, Sender, Receiver>>,
"Customisations of submit() must have a void return value");
unifex::tag_invoke(*this, (Sender&&) sender, (Receiver&&) receiver);
}
template(typename Sender, typename Receiver)
(requires sender<Sender> AND receiver<Receiver> AND
(!tag_invocable<_fn, Sender, Receiver>) AND
_has_member_submit<Sender, Receiver>)
void operator()(Sender&& sender, Receiver&& receiver) const {
((Sender&&) sender).submit((Receiver&&) receiver);
}
template(typename Sender, typename Receiver)
(requires sender<Sender> AND receiver<Receiver> AND
(!tag_invocable<_fn, Sender, Receiver>) AND
(!_has_member_submit<Sender, Receiver>) AND
sender_to<Sender, Receiver>)
void operator()(Sender&& sender, Receiver&& receiver) const {
// Default implementation in terms of connect/start
switch (blocking(sender)) {
case blocking_kind::always:
case blocking_kind::always_inline:
{
// The sender will complete synchronously so we can avoid allocating the
// state on the heap.
auto op = unifex::connect((Sender &&) sender, (Receiver &&) receiver);
unifex::start(op);
break;
}
default:
{
// Otherwise need to heap-allocate the operation-state
using op_t = _submit::operation<Sender, Receiver>;
op_t* op = nullptr;
{
// Use the receiver's associated allocator to allocate this state.
auto allocator = get_allocator(receiver);
using allocator_t = decltype(allocator);
rebind_alloc_t<allocator_t, op_t> typedAllocator{allocator};
op = rebind_traits_t<allocator_t, op_t>::allocate(typedAllocator, 1);
scope_guard freeOnError = [&]() noexcept {
rebind_traits_t<allocator_t, op_t>::deallocate(typedAllocator, op, 1);
};
rebind_traits_t<allocator_t, op_t>::construct(
typedAllocator, op, (Sender&&)sender, (Receiver&&)receiver);
freeOnError.release();
}
op->start();
}
}
}
template(typename Receiver)
(requires receiver<Receiver>)
constexpr auto operator()(Receiver&& receiver) const
noexcept(is_nothrow_callable_v<
tag_t<bind_back>, _fn, Receiver>)
-> bind_back_result_t<_fn, Receiver> {
return bind_back(*this, (Receiver&&)receiver);
}
} submit{};
} // namespace _submit_cpo
using _submit_cpo::submit;
} // namespace unifex
#include <unifex/detail/epilogue.hpp>