From 0c7686b699e929e5fa2d6ba553d7006421580efe Mon Sep 17 00:00:00 2001 From: wtudio <95963900+wtudio@users.noreply.github.com> Date: Sat, 14 Dec 2024 20:58:17 +0800 Subject: [PATCH] fix: asio thread executor (#129) --- document/sphinx-cn/tutorials/cfg/executor.md | 3 -- .../bin/cfg/examples_cpp_executor_cfg.yaml | 4 +- .../core/executor/asio_strand_executor.cc | 18 +++++++- .../core/executor/asio_thread_executor.cc | 42 +------------------ .../core/executor/asio_thread_executor.h | 7 ---- src/runtime/main/CMakeLists.txt | 4 +- 6 files changed, 22 insertions(+), 56 deletions(-) diff --git a/document/sphinx-cn/tutorials/cfg/executor.md b/document/sphinx-cn/tutorials/cfg/executor.md index eb5b5ccb0..f121d1ef9 100644 --- a/document/sphinx-cn/tutorials/cfg/executor.md +++ b/document/sphinx-cn/tutorials/cfg/executor.md @@ -74,7 +74,6 @@ aimrt: | thread_sched_policy | string | 可选 | "" | 线程调度策略 | | thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 | | timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 | -| queue_threshold | unsigned int | 可选 | 10000 | 队列任务上限 | | use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock | @@ -82,7 +81,6 @@ aimrt: - `thread_num`配置了线程数,默认为 1。当线程数配置为 1 时为线程安全执行器,否则是线程不安全的。 - `thread_sched_policy`和`thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。 - `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。 -- `queue_threshold`配置了队列任务上限,当已经有超过此阈值的任务在队列中时,新任务将投递失败。 - `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。 以下是一个简单的示例: @@ -97,7 +95,6 @@ aimrt: thread_sched_policy: SCHED_FIFO:80 thread_bind_cpu: [0, 1] timeout_alarm_threshold_us: 1000 - queue_threshold: 10000 use_system_clock: false ``` diff --git a/src/examples/cpp/executor/install/linux/bin/cfg/examples_cpp_executor_cfg.yaml b/src/examples/cpp/executor/install/linux/bin/cfg/examples_cpp_executor_cfg.yaml index fe686e501..d08e21383 100644 --- a/src/examples/cpp/executor/install/linux/bin/cfg/examples_cpp_executor_cfg.yaml +++ b/src/examples/cpp/executor/install/linux/bin/cfg/examples_cpp_executor_cfg.yaml @@ -13,9 +13,9 @@ aimrt: options: thread_num: 2 - name: thread_safe_executor - type: asio_thread + type: asio_strand options: - thread_num: 1 + bind_asio_thread_executor_name: work_executor - name: time_schedule_executor type: asio_thread options: diff --git a/src/runtime/core/executor/asio_strand_executor.cc b/src/runtime/core/executor/asio_strand_executor.cc index aeb26bd5f..4d9a54c00 100644 --- a/src/runtime/core/executor/asio_strand_executor.cc +++ b/src/runtime/core/executor/asio_strand_executor.cc @@ -84,10 +84,17 @@ void AsioStrandExecutor::Shutdown() { } void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept { + if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] { + fprintf(stderr, + "Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n", + name_.c_str()); + return; + } + try { asio::post(*strand_ptr_, std::move(task)); } catch (const std::exception& e) { - AIMRT_ERROR("{}", e.what()); + fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what()); } } @@ -103,6 +110,13 @@ std::chrono::system_clock::time_point AsioStrandExecutor::Now() const noexcept { void AsioStrandExecutor::ExecuteAt( std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept { + if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] { + fprintf(stderr, + "Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n", + name_.c_str()); + return; + } + try { if (!options_.use_system_clock) { auto timer_ptr = std::make_shared(*strand_ptr_); @@ -150,7 +164,7 @@ void AsioStrandExecutor::ExecuteAt( }); } } catch (const std::exception& e) { - AIMRT_ERROR("{}", e.what()); + fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what()); } } diff --git a/src/runtime/core/executor/asio_thread_executor.cc b/src/runtime/core/executor/asio_thread_executor.cc index 4c9e27b98..c21cb9d17 100644 --- a/src/runtime/core/executor/asio_thread_executor.cc +++ b/src/runtime/core/executor/asio_thread_executor.cc @@ -19,7 +19,6 @@ struct convert { std::chrono::duration_cast( rhs.timeout_alarm_threshold_us) .count()); - node["queue_threshold"] = rhs.queue_threshold; node["use_system_clock"] = rhs.use_system_clock; return node; @@ -36,8 +35,6 @@ struct convert { if (node["timeout_alarm_threshold_us"]) rhs.timeout_alarm_threshold_us = std::chrono::microseconds( node["timeout_alarm_threshold_us"].as()); - if (node["queue_threshold"]) - rhs.queue_threshold = node["queue_threshold"].as(); if (node["use_system_clock"]) rhs.use_system_clock = node["use_system_clock"].as(); @@ -62,9 +59,6 @@ void AsioThreadExecutor::Initialize(std::string_view name, start_sys_tp_ = std::chrono::system_clock::now(); start_std_tp_ = std::chrono::steady_clock::now(); - queue_threshold_ = options_.queue_threshold; - queue_warn_threshold_ = queue_threshold_ * 0.95; - AIMRT_CHECK_ERROR_THROW( options_.thread_num > 0, "Invalide asio thread executor options, thread num is zero."); @@ -94,9 +88,7 @@ void AsioThreadExecutor::Initialize(std::string_view name, } try { - while (io_ptr_->run_one()) { - --queue_task_num_; - } + io_ptr_->run(); } catch (const std::exception& e) { AIMRT_FATAL("Asio thread executor '{}' run loop get exception, {}", Name(), e.what()); @@ -145,22 +137,6 @@ void AsioThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept { return; } - uint32_t cur_queue_task_num = ++queue_task_num_; - - if (cur_queue_task_num > queue_threshold_) [[unlikely]] { - fprintf(stderr, - "The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n", - name_.c_str(), queue_threshold_); - --queue_task_num_; - return; - } - - if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] { - fprintf(stderr, - "The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n", - name_.c_str(), cur_queue_task_num, queue_threshold_); - } - try { asio::post(*io_ptr_, std::move(task)); } catch (const std::exception& e) { @@ -187,22 +163,6 @@ void AsioThreadExecutor::ExecuteAt( return; } - uint32_t cur_queue_task_num = ++queue_task_num_; - - if (cur_queue_task_num > queue_threshold_) [[unlikely]] { - fprintf(stderr, - "The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n", - name_.c_str(), queue_threshold_); - --queue_task_num_; - return; - } - - if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] { - fprintf(stderr, - "The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n", - name_.c_str(), cur_queue_task_num, queue_threshold_); - } - try { if (!options_.use_system_clock) { auto timer_ptr = std::make_shared(*io_ptr_); diff --git a/src/runtime/core/executor/asio_thread_executor.h b/src/runtime/core/executor/asio_thread_executor.h index 04d9e27b8..cb32117e4 100644 --- a/src/runtime/core/executor/asio_thread_executor.h +++ b/src/runtime/core/executor/asio_thread_executor.h @@ -25,7 +25,6 @@ class AsioThreadExecutor : public ExecutorBase { std::string thread_sched_policy; std::vector thread_bind_cpu; std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1); - uint32_t queue_threshold = 10000; bool use_system_clock = false; }; @@ -57,8 +56,6 @@ class AsioThreadExecutor : public ExecutorBase { std::chrono::system_clock::time_point Now() const noexcept override; void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override; - size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); } - State GetState() const { return state_.load(); } void SetLogger(const std::shared_ptr& logger_ptr) { logger_ptr_ = logger_ptr; } @@ -75,10 +72,6 @@ class AsioThreadExecutor : public ExecutorBase { std::chrono::system_clock::time_point start_sys_tp_; std::chrono::steady_clock::time_point start_std_tp_; - uint32_t queue_threshold_; - uint32_t queue_warn_threshold_; - std::atomic_uint32_t queue_task_num_ = 0; - std::unique_ptr io_ptr_; std::unique_ptr< asio::executor_work_guard> diff --git a/src/runtime/main/CMakeLists.txt b/src/runtime/main/CMakeLists.txt index c67618756..d5e098306 100644 --- a/src/runtime/main/CMakeLists.txt +++ b/src/runtime/main/CMakeLists.txt @@ -34,7 +34,9 @@ target_link_libraries( aimrt::runtime::core) if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") - set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s") + if(CMAKE_BUILD_TYPE STREQUAL "Release") + set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s") + endif() endif() # Add -Werror option