From eaa7c97557a4233f2c131304c707750d25d25267 Mon Sep 17 00:00:00 2001 From: wtudio <95963900+wtudio@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:32:50 +0800 Subject: [PATCH] feat: executor sys clock (#101) * feat: executor sys clock --- document/sphinx-cn/release_notes/v0_9_0.md | 1 + document/sphinx-cn/tutorials/cfg/executor.md | 9 ++- .../benchmark_publisher_module.cc | 4 +- .../core/executor/asio_strand_executor.cc | 76 +++++++++++++----- .../core/executor/asio_strand_executor.h | 8 +- .../core/executor/asio_thread_executor.cc | 77 ++++++++++++++----- .../core/executor/asio_thread_executor.h | 9 ++- .../executor/asio_thread_executor_test.cc | 2 +- .../core/executor/simple_thread_executor.cc | 5 ++ .../core/executor/simple_thread_executor.h | 4 +- .../executor/simple_thread_executor_test.cc | 1 - .../core/executor/tbb_thread_executor.cc | 5 ++ .../core/executor/tbb_thread_executor.h | 4 +- 13 files changed, 149 insertions(+), 56 deletions(-) diff --git a/document/sphinx-cn/release_notes/v0_9_0.md b/document/sphinx-cn/release_notes/v0_9_0.md index 5a347b2dd..d27e062e1 100644 --- a/document/sphinx-cn/release_notes/v0_9_0.md +++ b/document/sphinx-cn/release_notes/v0_9_0.md @@ -28,3 +28,4 @@ - grpc 插件支持 ros2 消息以及 json 序列化格式; - mqtt 新增配置项以支持 ssl/tls 加密传输; - mqtt 插件在broker未启动时,会自动重试异步连接; +- asio thread/strand 执行器现在支持是否使用 system clock; diff --git a/document/sphinx-cn/tutorials/cfg/executor.md b/document/sphinx-cn/tutorials/cfg/executor.md index ea16428d5..eb5b5ccb0 100644 --- a/document/sphinx-cn/tutorials/cfg/executor.md +++ b/document/sphinx-cn/tutorials/cfg/executor.md @@ -75,6 +75,7 @@ aimrt: | thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 | | timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 | | queue_threshold | unsigned int | 可选 | 10000 | 队列任务上限 | +| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock | 使用注意点如下: @@ -82,7 +83,7 @@ aimrt: - `thread_sched_policy`和`thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。 - `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。 - `queue_threshold`配置了队列任务上限,当已经有超过此阈值的任务在队列中时,新任务将投递失败。 - +- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。 以下是一个简单的示例: ```yaml @@ -97,6 +98,7 @@ aimrt: thread_bind_cpu: [0, 1] timeout_alarm_threshold_us: 1000 queue_threshold: 10000 + use_system_clock: false ``` ## asio_strand 执行器 @@ -107,12 +109,12 @@ aimrt: | ---- | ---- | ---- | ---- | ---- | | bind_asio_thread_executor_name | string | 必选 | "" | 绑定的asio_thread执行器名称 | | timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 | - +| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock | 使用注意点如下: - 通过`bind_asio_thread_executor_name`配置项来绑定`asio_thread`类型的执行器。如果指定名称的执行器不存在、或不是`asio_thread`类型,则会在初始化时抛出异常。 - `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。 - +- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。 以下是一个简单的示例: @@ -129,6 +131,7 @@ aimrt: options: bind_asio_thread_executor_name: test_asio_thread_executor timeout_alarm_threshold_us: 1000 + use_system_clock: false ``` diff --git a/src/examples/cpp/pb_chn/module/benchmark_publisher_module/benchmark_publisher_module.cc b/src/examples/cpp/pb_chn/module/benchmark_publisher_module/benchmark_publisher_module.cc index b055a0811..ccdab850a 100644 --- a/src/examples/cpp/pb_chn/module/benchmark_publisher_module/benchmark_publisher_module.cc +++ b/src/examples/cpp/pb_chn/module/benchmark_publisher_module/benchmark_publisher_module.cc @@ -185,7 +185,7 @@ void BenchmarkPublisherModule::StartSinglePlan(uint32_t plan_id, BenchPlan plan) uint32_t send_count = 0; uint32_t sleep_ns = static_cast(1000000000 / plan.channel_frq); - auto cur_tp = std::chrono::system_clock::now(); + auto cur_tp = std::chrono::steady_clock::now(); for (; send_count < plan.msg_count; ++send_count) { if (!run_flag_.load()) [[unlikely]] @@ -196,7 +196,7 @@ void BenchmarkPublisherModule::StartSinglePlan(uint32_t plan_id, BenchPlan plan) aimrt::channel::Publish(publisher, msg); - cur_tp += std::chrono::duration_cast(std::chrono::nanoseconds(sleep_ns)); + cur_tp += std::chrono::nanoseconds(sleep_ns); std::this_thread::sleep_until(cur_tp); } diff --git a/src/runtime/core/executor/asio_strand_executor.cc b/src/runtime/core/executor/asio_strand_executor.cc index 7b5fff1ec..aeb26bd5f 100644 --- a/src/runtime/core/executor/asio_strand_executor.cc +++ b/src/runtime/core/executor/asio_strand_executor.cc @@ -16,6 +16,7 @@ struct convert { std::chrono::duration_cast( rhs.timeout_alarm_threshold_us) .count()); + node["use_system_clock"] = rhs.use_system_clock; return node; } @@ -29,6 +30,8 @@ struct convert { if (node["timeout_alarm_threshold_us"]) rhs.timeout_alarm_threshold_us = std::chrono::microseconds( node["timeout_alarm_threshold_us"].as()); + if (node["use_system_clock"]) + rhs.use_system_clock = node["use_system_clock"].as(); return true; } @@ -51,6 +54,9 @@ void AsioStrandExecutor::Initialize(std::string_view name, if (options_node && !options_node.IsNull()) options_ = options_node.as(); + start_sys_tp_ = std::chrono::system_clock::now(); + start_std_tp_ = std::chrono::steady_clock::now(); + AIMRT_CHECK_ERROR_THROW( !options_.bind_asio_thread_executor_name.empty(), "Invalide bind asio thread executor name, name is empty."); @@ -85,30 +91,64 @@ void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept { } } +std::chrono::system_clock::time_point AsioStrandExecutor::Now() const noexcept { + if (!options_.use_system_clock) { + return start_sys_tp_ + + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_std_tp_); + } + + return std::chrono::system_clock::now(); +} + void AsioStrandExecutor::ExecuteAt( std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept { try { - auto timer_ptr = std::make_shared(*strand_ptr_); - timer_ptr->expires_at(tp); - timer_ptr->async_wait([this, timer_ptr, - task{std::move(task)}](asio::error_code ec) { - if (ec) [[unlikely]] { - AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}", - Name(), ec.value(), ec.message()); - return; - } + if (!options_.use_system_clock) { + auto timer_ptr = std::make_shared(*strand_ptr_); + timer_ptr->expires_after(tp - Now()); + timer_ptr->async_wait([this, timer_ptr, + task{std::move(task)}](asio::error_code ec) { + if (ec) [[unlikely]] { + AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}", + Name(), ec.value(), ec.message()); + return; + } - auto dif_time = std::chrono::system_clock::now() - timer_ptr->expiry(); + auto diff_time = std::chrono::steady_clock::now() - timer_ptr->expiry(); - task(); + task(); - AIMRT_CHECK_WARN( - dif_time <= options_.timeout_alarm_threshold_us, - "Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. " - "Perhaps the CPU load is too high", - Name(), std::chrono::duration_cast(dif_time), - options_.timeout_alarm_threshold_us); - }); + AIMRT_CHECK_WARN( + diff_time <= options_.timeout_alarm_threshold_us, + "Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. " + "Perhaps the CPU load is too high", + Name(), std::chrono::duration_cast(diff_time), + options_.timeout_alarm_threshold_us); + }); + } else { + auto timer_ptr = std::make_shared(*strand_ptr_); + timer_ptr->expires_at(tp); + timer_ptr->async_wait([this, timer_ptr, + task{std::move(task)}](asio::error_code ec) { + if (ec) [[unlikely]] { + AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}", + Name(), ec.value(), ec.message()); + return; + } + + auto diff_time = std::chrono::system_clock::now() - timer_ptr->expiry(); + + task(); + + AIMRT_CHECK_WARN( + diff_time <= options_.timeout_alarm_threshold_us, + "Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. " + "Perhaps the CPU load is too high", + Name(), std::chrono::duration_cast(diff_time), + options_.timeout_alarm_threshold_us); + }); + } } catch (const std::exception& e) { AIMRT_ERROR("{}", e.what()); } diff --git a/src/runtime/core/executor/asio_strand_executor.h b/src/runtime/core/executor/asio_strand_executor.h index 20937307a..185ee32ad 100644 --- a/src/runtime/core/executor/asio_strand_executor.h +++ b/src/runtime/core/executor/asio_strand_executor.h @@ -17,6 +17,7 @@ class AsioStrandExecutor : public ExecutorBase { struct Options { std::string bind_asio_thread_executor_name; std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1); + bool use_system_clock = false; }; enum class State : uint32_t { @@ -46,9 +47,7 @@ class AsioStrandExecutor : public ExecutorBase { void Execute(aimrt::executor::Task&& task) noexcept override; - std::chrono::system_clock::time_point Now() const noexcept override { - return std::chrono::system_clock::now(); - } + std::chrono::system_clock::time_point Now() const noexcept override; void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override; void RegisterGetAsioHandle(GetAsioHandle&& handle); @@ -64,6 +63,9 @@ class AsioStrandExecutor : public ExecutorBase { std::atomic state_ = State::kPreInit; std::shared_ptr logger_ptr_; + std::chrono::system_clock::time_point start_sys_tp_; + std::chrono::steady_clock::time_point start_std_tp_; + GetAsioHandle get_asio_handle_; using Strand = asio::strand; diff --git a/src/runtime/core/executor/asio_thread_executor.cc b/src/runtime/core/executor/asio_thread_executor.cc index 6ffb34177..4c9e27b98 100644 --- a/src/runtime/core/executor/asio_thread_executor.cc +++ b/src/runtime/core/executor/asio_thread_executor.cc @@ -20,6 +20,7 @@ struct convert { rhs.timeout_alarm_threshold_us) .count()); node["queue_threshold"] = rhs.queue_threshold; + node["use_system_clock"] = rhs.use_system_clock; return node; } @@ -37,6 +38,8 @@ struct convert { node["timeout_alarm_threshold_us"].as()); if (node["queue_threshold"]) rhs.queue_threshold = node["queue_threshold"].as(); + if (node["use_system_clock"]) + rhs.use_system_clock = node["use_system_clock"].as(); return true; } @@ -52,9 +55,13 @@ void AsioThreadExecutor::Initialize(std::string_view name, "AsioThreadExecutor can only be initialized once."); name_ = std::string(name); + if (options_node && !options_node.IsNull()) options_ = options_node.as(); + start_sys_tp_ = std::chrono::system_clock::now(); + start_std_tp_ = std::chrono::steady_clock::now(); + queue_threshold_ = options_.queue_threshold; queue_warn_threshold_ = queue_threshold_ * 0.95; @@ -161,6 +168,16 @@ void AsioThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept { } } +std::chrono::system_clock::time_point AsioThreadExecutor::Now() const noexcept { + if (!options_.use_system_clock) { + return start_sys_tp_ + + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_std_tp_); + } + + return std::chrono::system_clock::now(); +} + void AsioThreadExecutor::ExecuteAt( std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept { if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] { @@ -187,27 +204,51 @@ void AsioThreadExecutor::ExecuteAt( } try { - auto timer_ptr = std::make_shared(*io_ptr_); - timer_ptr->expires_at(tp); - timer_ptr->async_wait([this, timer_ptr, - task{std::move(task)}](asio::error_code ec) { - if (ec) [[unlikely]] { - AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}", - Name(), ec.value(), ec.message()); - return; - } + if (!options_.use_system_clock) { + auto timer_ptr = std::make_shared(*io_ptr_); + timer_ptr->expires_after(tp - Now()); + timer_ptr->async_wait([this, timer_ptr, + task{std::move(task)}](asio::error_code ec) { + if (ec) [[unlikely]] { + AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}", + Name(), ec.value(), ec.message()); + return; + } - auto dif_time = std::chrono::system_clock::now() - timer_ptr->expiry(); + auto diff_time = std::chrono::steady_clock::now() - timer_ptr->expiry(); - task(); + task(); - AIMRT_CHECK_WARN( - dif_time <= options_.timeout_alarm_threshold_us, - "Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. " - "Perhaps the CPU load is too high", - Name(), std::chrono::duration_cast(dif_time), - options_.timeout_alarm_threshold_us); - }); + AIMRT_CHECK_WARN( + diff_time <= options_.timeout_alarm_threshold_us, + "Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. " + "Perhaps the CPU load is too high", + Name(), std::chrono::duration_cast(diff_time), + options_.timeout_alarm_threshold_us); + }); + } else { + auto timer_ptr = std::make_shared(*io_ptr_); + timer_ptr->expires_at(tp); + timer_ptr->async_wait([this, timer_ptr, + task{std::move(task)}](asio::error_code ec) { + if (ec) [[unlikely]] { + AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}", + Name(), ec.value(), ec.message()); + return; + } + + auto diff_time = std::chrono::system_clock::now() - timer_ptr->expiry(); + + task(); + + AIMRT_CHECK_WARN( + diff_time <= options_.timeout_alarm_threshold_us, + "Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. " + "Perhaps the CPU load is too high", + Name(), std::chrono::duration_cast(diff_time), + options_.timeout_alarm_threshold_us); + }); + } } catch (const std::exception& e) { fprintf(stderr, "Asio thread executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what()); } diff --git a/src/runtime/core/executor/asio_thread_executor.h b/src/runtime/core/executor/asio_thread_executor.h index a3466311b..04d9e27b8 100644 --- a/src/runtime/core/executor/asio_thread_executor.h +++ b/src/runtime/core/executor/asio_thread_executor.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -27,6 +26,7 @@ class AsioThreadExecutor : public ExecutorBase { std::vector thread_bind_cpu; std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1); uint32_t queue_threshold = 10000; + bool use_system_clock = false; }; enum class State : uint32_t { @@ -54,9 +54,7 @@ class AsioThreadExecutor : public ExecutorBase { void Execute(aimrt::executor::Task&& task) noexcept override; - std::chrono::system_clock::time_point Now() const noexcept override { - return std::chrono::system_clock::now(); - } + std::chrono::system_clock::time_point Now() const noexcept override; void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override; size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); } @@ -74,6 +72,9 @@ class AsioThreadExecutor : public ExecutorBase { std::atomic state_ = State::kPreInit; std::shared_ptr logger_ptr_; + std::chrono::system_clock::time_point start_sys_tp_; + std::chrono::steady_clock::time_point start_std_tp_; + uint32_t queue_threshold_; uint32_t queue_warn_threshold_; std::atomic_uint32_t queue_task_num_ = 0; diff --git a/src/runtime/core/executor/asio_thread_executor_test.cc b/src/runtime/core/executor/asio_thread_executor_test.cc index 3e02359dd..8b7b1bc06 100644 --- a/src/runtime/core/executor/asio_thread_executor_test.cc +++ b/src/runtime/core/executor/asio_thread_executor_test.cc @@ -68,7 +68,7 @@ TEST(ASIO_THREAD_EXECUTOR_TEST, execute) { // ExecuteAt ret = false; executor.ExecuteAt( - std::chrono::system_clock::now() + std::chrono::milliseconds(5), + executor.Now() + std::chrono::milliseconds(5), [&]() { ret = true; }); EXPECT_FALSE(ret); diff --git a/src/runtime/core/executor/simple_thread_executor.cc b/src/runtime/core/executor/simple_thread_executor.cc index 4bf919362..51f3fb47f 100644 --- a/src/runtime/core/executor/simple_thread_executor.cc +++ b/src/runtime/core/executor/simple_thread_executor.cc @@ -156,6 +156,11 @@ void SimpleThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept { cond_.notify_one(); } +std::chrono::system_clock::time_point SimpleThreadExecutor::Now() const noexcept { + AIMRT_ERROR("Simple thread executor '{}' does not support timer schedule.", Name()); + return std::chrono::system_clock::time_point(); +} + void SimpleThreadExecutor::ExecuteAt( std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept { AIMRT_ERROR("Simple thread executor '{}' does not support timer schedule.", Name()); diff --git a/src/runtime/core/executor/simple_thread_executor.h b/src/runtime/core/executor/simple_thread_executor.h index e8f77b31c..01b8a7dad 100644 --- a/src/runtime/core/executor/simple_thread_executor.h +++ b/src/runtime/core/executor/simple_thread_executor.h @@ -47,9 +47,7 @@ class SimpleThreadExecutor : public ExecutorBase { void Execute(aimrt::executor::Task&& task) noexcept override; - std::chrono::system_clock::time_point Now() const noexcept override { - return std::chrono::system_clock::now(); - } + std::chrono::system_clock::time_point Now() const noexcept override; void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override; size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); } diff --git a/src/runtime/core/executor/simple_thread_executor_test.cc b/src/runtime/core/executor/simple_thread_executor_test.cc index f8bdad015..efcecbbac 100644 --- a/src/runtime/core/executor/simple_thread_executor_test.cc +++ b/src/runtime/core/executor/simple_thread_executor_test.cc @@ -18,7 +18,6 @@ thread_bind_cpu: [0] EXPECT_EQ(simple_thread_executor.Type(), "simple_thread"); simple_thread_executor.Start(); - std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); bool is_executed = false; aimrt::executor::Task &&task = aimrt::executor::Task( [&is_executed]() mutable { is_executed = true; }); diff --git a/src/runtime/core/executor/tbb_thread_executor.cc b/src/runtime/core/executor/tbb_thread_executor.cc index 552202e23..665d80238 100644 --- a/src/runtime/core/executor/tbb_thread_executor.cc +++ b/src/runtime/core/executor/tbb_thread_executor.cc @@ -172,6 +172,11 @@ void TBBThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept { } } +std::chrono::system_clock::time_point TBBThreadExecutor::Now() const noexcept { + AIMRT_ERROR("Tbb thread executor '{}' does not support timer schedule.", Name()); + return std::chrono::system_clock::time_point(); +} + void TBBThreadExecutor::ExecuteAt( std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept { AIMRT_ERROR("Tbb thread executor '{}' does not support timer schedule.", Name()); diff --git a/src/runtime/core/executor/tbb_thread_executor.h b/src/runtime/core/executor/tbb_thread_executor.h index 541ec1b78..40569dc3b 100644 --- a/src/runtime/core/executor/tbb_thread_executor.h +++ b/src/runtime/core/executor/tbb_thread_executor.h @@ -45,9 +45,7 @@ class TBBThreadExecutor : public ExecutorBase { void Execute(aimrt::executor::Task&& task) noexcept override; - std::chrono::system_clock::time_point Now() const noexcept override { - return std::chrono::system_clock::now(); - } + std::chrono::system_clock::time_point Now() const noexcept override; void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override; size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }