feat: executor sys clock (#101)

* feat: executor sys clock
This commit is contained in:
wtudio 2024-11-14 15:32:50 +08:00 committed by GitHub
parent d4eefad587
commit eaa7c97557
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 149 additions and 56 deletions

View File

@ -28,3 +28,4 @@
- grpc 插件支持 ros2 消息以及 json 序列化格式;
- mqtt 新增配置项以支持 ssl/tls 加密传输;
- mqtt 插件在broker未启动时会自动重试异步连接
- asio thread/strand 执行器现在支持是否使用 system clock

View File

@ -75,6 +75,7 @@ aimrt:
| thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 |
| timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 |
| queue_threshold | unsigned int | 可选 | 10000 | 队列任务上限 |
| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock默认使用 std::steady_clock |
使用注意点如下:
@ -82,7 +83,7 @@ aimrt:
- `thread_sched_policy``thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。
- `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。
- `queue_threshold`配置了队列任务上限,当已经有超过此阈值的任务在队列中时,新任务将投递失败。
- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。
以下是一个简单的示例:
```yaml
@ -97,6 +98,7 @@ aimrt:
thread_bind_cpu: [0, 1]
timeout_alarm_threshold_us: 1000
queue_threshold: 10000
use_system_clock: false
```
## asio_strand 执行器
@ -107,12 +109,12 @@ aimrt:
| ---- | ---- | ---- | ---- | ---- |
| bind_asio_thread_executor_name | string | 必选 | "" | 绑定的asio_thread执行器名称 |
| timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 |
| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock默认使用 std::steady_clock |
使用注意点如下:
- 通过`bind_asio_thread_executor_name`配置项来绑定`asio_thread`类型的执行器。如果指定名称的执行器不存在、或不是`asio_thread`类型,则会在初始化时抛出异常。
- `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。
- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。
以下是一个简单的示例:
@ -129,6 +131,7 @@ aimrt:
options:
bind_asio_thread_executor_name: test_asio_thread_executor
timeout_alarm_threshold_us: 1000
use_system_clock: false
```

View File

@ -185,7 +185,7 @@ void BenchmarkPublisherModule::StartSinglePlan(uint32_t plan_id, BenchPlan plan)
uint32_t send_count = 0;
uint32_t sleep_ns = static_cast<uint32_t>(1000000000 / plan.channel_frq);
auto cur_tp = std::chrono::system_clock::now();
auto cur_tp = std::chrono::steady_clock::now();
for (; send_count < plan.msg_count; ++send_count) {
if (!run_flag_.load()) [[unlikely]]
@ -196,7 +196,7 @@ void BenchmarkPublisherModule::StartSinglePlan(uint32_t plan_id, BenchPlan plan)
aimrt::channel::Publish(publisher, msg);
cur_tp += std::chrono::duration_cast<std::chrono::system_clock::duration>(std::chrono::nanoseconds(sleep_ns));
cur_tp += std::chrono::nanoseconds(sleep_ns);
std::this_thread::sleep_until(cur_tp);
}

View File

@ -16,6 +16,7 @@ struct convert<aimrt::runtime::core::executor::AsioStrandExecutor::Options> {
std::chrono::duration_cast<std::chrono::microseconds>(
rhs.timeout_alarm_threshold_us)
.count());
node["use_system_clock"] = rhs.use_system_clock;
return node;
}
@ -29,6 +30,8 @@ struct convert<aimrt::runtime::core::executor::AsioStrandExecutor::Options> {
if (node["timeout_alarm_threshold_us"])
rhs.timeout_alarm_threshold_us = std::chrono::microseconds(
node["timeout_alarm_threshold_us"].as<uint64_t>());
if (node["use_system_clock"])
rhs.use_system_clock = node["use_system_clock"].as<bool>();
return true;
}
@ -51,6 +54,9 @@ void AsioStrandExecutor::Initialize(std::string_view name,
if (options_node && !options_node.IsNull())
options_ = options_node.as<Options>();
start_sys_tp_ = std::chrono::system_clock::now();
start_std_tp_ = std::chrono::steady_clock::now();
AIMRT_CHECK_ERROR_THROW(
!options_.bind_asio_thread_executor_name.empty(),
"Invalide bind asio thread executor name, name is empty.");
@ -85,30 +91,64 @@ void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept {
}
}
std::chrono::system_clock::time_point AsioStrandExecutor::Now() const noexcept {
if (!options_.use_system_clock) {
return start_sys_tp_ +
std::chrono::duration_cast<std::chrono::system_clock::time_point::duration>(
std::chrono::steady_clock::now() - start_std_tp_);
}
return std::chrono::system_clock::now();
}
void AsioStrandExecutor::ExecuteAt(
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
try {
auto timer_ptr = std::make_shared<asio::system_timer>(*strand_ptr_);
timer_ptr->expires_at(tp);
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
if (!options_.use_system_clock) {
auto timer_ptr = std::make_shared<asio::steady_timer>(*strand_ptr_);
timer_ptr->expires_after(tp - Now());
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
auto dif_time = std::chrono::system_clock::now() - timer_ptr->expiry();
auto diff_time = std::chrono::steady_clock::now() - timer_ptr->expiry();
task();
task();
AIMRT_CHECK_WARN(
dif_time <= options_.timeout_alarm_threshold_us,
"Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(dif_time),
options_.timeout_alarm_threshold_us);
});
AIMRT_CHECK_WARN(
diff_time <= options_.timeout_alarm_threshold_us,
"Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(diff_time),
options_.timeout_alarm_threshold_us);
});
} else {
auto timer_ptr = std::make_shared<asio::system_timer>(*strand_ptr_);
timer_ptr->expires_at(tp);
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio strand executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
auto diff_time = std::chrono::system_clock::now() - timer_ptr->expiry();
task();
AIMRT_CHECK_WARN(
diff_time <= options_.timeout_alarm_threshold_us,
"Asio strand executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(diff_time),
options_.timeout_alarm_threshold_us);
});
}
} catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what());
}

View File

@ -17,6 +17,7 @@ class AsioStrandExecutor : public ExecutorBase {
struct Options {
std::string bind_asio_thread_executor_name;
std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1);
bool use_system_clock = false;
};
enum class State : uint32_t {
@ -46,9 +47,7 @@ class AsioStrandExecutor : public ExecutorBase {
void Execute(aimrt::executor::Task&& task) noexcept override;
std::chrono::system_clock::time_point Now() const noexcept override {
return std::chrono::system_clock::now();
}
std::chrono::system_clock::time_point Now() const noexcept override;
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
void RegisterGetAsioHandle(GetAsioHandle&& handle);
@ -64,6 +63,9 @@ class AsioStrandExecutor : public ExecutorBase {
std::atomic<State> state_ = State::kPreInit;
std::shared_ptr<aimrt::common::util::LoggerWrapper> logger_ptr_;
std::chrono::system_clock::time_point start_sys_tp_;
std::chrono::steady_clock::time_point start_std_tp_;
GetAsioHandle get_asio_handle_;
using Strand = asio::strand<asio::io_context::executor_type>;

View File

@ -20,6 +20,7 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
rhs.timeout_alarm_threshold_us)
.count());
node["queue_threshold"] = rhs.queue_threshold;
node["use_system_clock"] = rhs.use_system_clock;
return node;
}
@ -37,6 +38,8 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
node["timeout_alarm_threshold_us"].as<uint64_t>());
if (node["queue_threshold"])
rhs.queue_threshold = node["queue_threshold"].as<uint32_t>();
if (node["use_system_clock"])
rhs.use_system_clock = node["use_system_clock"].as<bool>();
return true;
}
@ -52,9 +55,13 @@ void AsioThreadExecutor::Initialize(std::string_view name,
"AsioThreadExecutor can only be initialized once.");
name_ = std::string(name);
if (options_node && !options_node.IsNull())
options_ = options_node.as<Options>();
start_sys_tp_ = std::chrono::system_clock::now();
start_std_tp_ = std::chrono::steady_clock::now();
queue_threshold_ = options_.queue_threshold;
queue_warn_threshold_ = queue_threshold_ * 0.95;
@ -161,6 +168,16 @@ void AsioThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept {
}
}
std::chrono::system_clock::time_point AsioThreadExecutor::Now() const noexcept {
if (!options_.use_system_clock) {
return start_sys_tp_ +
std::chrono::duration_cast<std::chrono::system_clock::time_point::duration>(
std::chrono::steady_clock::now() - start_std_tp_);
}
return std::chrono::system_clock::now();
}
void AsioThreadExecutor::ExecuteAt(
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] {
@ -187,27 +204,51 @@ void AsioThreadExecutor::ExecuteAt(
}
try {
auto timer_ptr = std::make_shared<asio::system_timer>(*io_ptr_);
timer_ptr->expires_at(tp);
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
if (!options_.use_system_clock) {
auto timer_ptr = std::make_shared<asio::steady_timer>(*io_ptr_);
timer_ptr->expires_after(tp - Now());
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
auto dif_time = std::chrono::system_clock::now() - timer_ptr->expiry();
auto diff_time = std::chrono::steady_clock::now() - timer_ptr->expiry();
task();
task();
AIMRT_CHECK_WARN(
dif_time <= options_.timeout_alarm_threshold_us,
"Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(dif_time),
options_.timeout_alarm_threshold_us);
});
AIMRT_CHECK_WARN(
diff_time <= options_.timeout_alarm_threshold_us,
"Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(diff_time),
options_.timeout_alarm_threshold_us);
});
} else {
auto timer_ptr = std::make_shared<asio::system_timer>(*io_ptr_);
timer_ptr->expires_at(tp);
timer_ptr->async_wait([this, timer_ptr,
task{std::move(task)}](asio::error_code ec) {
if (ec) [[unlikely]] {
AIMRT_ERROR("Asio thread executor '{}' timer get err, code '{}', msg: {}",
Name(), ec.value(), ec.message());
return;
}
auto diff_time = std::chrono::system_clock::now() - timer_ptr->expiry();
task();
AIMRT_CHECK_WARN(
diff_time <= options_.timeout_alarm_threshold_us,
"Asio thread executor '{}' timer delay too much, error time value '{}', require '{}'. "
"Perhaps the CPU load is too high",
Name(), std::chrono::duration_cast<std::chrono::microseconds>(diff_time),
options_.timeout_alarm_threshold_us);
});
}
} catch (const std::exception& e) {
fprintf(stderr, "Asio thread executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what());
}

View File

@ -6,7 +6,6 @@
#include <atomic>
#include <list>
#include <memory>
#include <set>
#include <string>
#include <thread>
@ -27,6 +26,7 @@ class AsioThreadExecutor : public ExecutorBase {
std::vector<uint32_t> thread_bind_cpu;
std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1);
uint32_t queue_threshold = 10000;
bool use_system_clock = false;
};
enum class State : uint32_t {
@ -54,9 +54,7 @@ class AsioThreadExecutor : public ExecutorBase {
void Execute(aimrt::executor::Task&& task) noexcept override;
std::chrono::system_clock::time_point Now() const noexcept override {
return std::chrono::system_clock::now();
}
std::chrono::system_clock::time_point Now() const noexcept override;
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }
@ -74,6 +72,9 @@ class AsioThreadExecutor : public ExecutorBase {
std::atomic<State> state_ = State::kPreInit;
std::shared_ptr<aimrt::common::util::LoggerWrapper> logger_ptr_;
std::chrono::system_clock::time_point start_sys_tp_;
std::chrono::steady_clock::time_point start_std_tp_;
uint32_t queue_threshold_;
uint32_t queue_warn_threshold_;
std::atomic_uint32_t queue_task_num_ = 0;

View File

@ -68,7 +68,7 @@ TEST(ASIO_THREAD_EXECUTOR_TEST, execute) {
// ExecuteAt
ret = false;
executor.ExecuteAt(
std::chrono::system_clock::now() + std::chrono::milliseconds(5),
executor.Now() + std::chrono::milliseconds(5),
[&]() { ret = true; });
EXPECT_FALSE(ret);

View File

@ -156,6 +156,11 @@ void SimpleThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept {
cond_.notify_one();
}
std::chrono::system_clock::time_point SimpleThreadExecutor::Now() const noexcept {
AIMRT_ERROR("Simple thread executor '{}' does not support timer schedule.", Name());
return std::chrono::system_clock::time_point();
}
void SimpleThreadExecutor::ExecuteAt(
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
AIMRT_ERROR("Simple thread executor '{}' does not support timer schedule.", Name());

View File

@ -47,9 +47,7 @@ class SimpleThreadExecutor : public ExecutorBase {
void Execute(aimrt::executor::Task&& task) noexcept override;
std::chrono::system_clock::time_point Now() const noexcept override {
return std::chrono::system_clock::now();
}
std::chrono::system_clock::time_point Now() const noexcept override;
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }

View File

@ -18,7 +18,6 @@ thread_bind_cpu: [0]
EXPECT_EQ(simple_thread_executor.Type(), "simple_thread");
simple_thread_executor.Start();
std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
bool is_executed = false;
aimrt::executor::Task &&task = aimrt::executor::Task(
[&is_executed]() mutable { is_executed = true; });

View File

@ -172,6 +172,11 @@ void TBBThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept {
}
}
std::chrono::system_clock::time_point TBBThreadExecutor::Now() const noexcept {
AIMRT_ERROR("Tbb thread executor '{}' does not support timer schedule.", Name());
return std::chrono::system_clock::time_point();
}
void TBBThreadExecutor::ExecuteAt(
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
AIMRT_ERROR("Tbb thread executor '{}' does not support timer schedule.", Name());

View File

@ -45,9 +45,7 @@ class TBBThreadExecutor : public ExecutorBase {
void Execute(aimrt::executor::Task&& task) noexcept override;
std::chrono::system_clock::time_point Now() const noexcept override {
return std::chrono::system_clock::now();
}
std::chrono::system_clock::time_point Now() const noexcept override;
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }