fix: asio thread executor (#129)
This commit is contained in:
parent
16c922fbde
commit
0c7686b699
@ -74,7 +74,6 @@ aimrt:
|
|||||||
| thread_sched_policy | string | 可选 | "" | 线程调度策略 |
|
| thread_sched_policy | string | 可选 | "" | 线程调度策略 |
|
||||||
| thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 |
|
| thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 |
|
||||||
| timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 |
|
| timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 |
|
||||||
| queue_threshold | unsigned int | 可选 | 10000 | 队列任务上限 |
|
|
||||||
| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock |
|
| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock |
|
||||||
|
|
||||||
|
|
||||||
@ -82,7 +81,6 @@ aimrt:
|
|||||||
- `thread_num`配置了线程数,默认为 1。当线程数配置为 1 时为线程安全执行器,否则是线程不安全的。
|
- `thread_num`配置了线程数,默认为 1。当线程数配置为 1 时为线程安全执行器,否则是线程不安全的。
|
||||||
- `thread_sched_policy`和`thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。
|
- `thread_sched_policy`和`thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。
|
||||||
- `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。
|
- `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。
|
||||||
- `queue_threshold`配置了队列任务上限,当已经有超过此阈值的任务在队列中时,新任务将投递失败。
|
|
||||||
- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。
|
- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。
|
||||||
|
|
||||||
以下是一个简单的示例:
|
以下是一个简单的示例:
|
||||||
@ -97,7 +95,6 @@ aimrt:
|
|||||||
thread_sched_policy: SCHED_FIFO:80
|
thread_sched_policy: SCHED_FIFO:80
|
||||||
thread_bind_cpu: [0, 1]
|
thread_bind_cpu: [0, 1]
|
||||||
timeout_alarm_threshold_us: 1000
|
timeout_alarm_threshold_us: 1000
|
||||||
queue_threshold: 10000
|
|
||||||
use_system_clock: false
|
use_system_clock: false
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@ aimrt:
|
|||||||
options:
|
options:
|
||||||
thread_num: 2
|
thread_num: 2
|
||||||
- name: thread_safe_executor
|
- name: thread_safe_executor
|
||||||
type: asio_thread
|
type: asio_strand
|
||||||
options:
|
options:
|
||||||
thread_num: 1
|
bind_asio_thread_executor_name: work_executor
|
||||||
- name: time_schedule_executor
|
- name: time_schedule_executor
|
||||||
type: asio_thread
|
type: asio_thread
|
||||||
options:
|
options:
|
||||||
|
@ -84,10 +84,17 @@ void AsioStrandExecutor::Shutdown() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept {
|
void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept {
|
||||||
|
if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] {
|
||||||
|
fprintf(stderr,
|
||||||
|
"Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n",
|
||||||
|
name_.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
asio::post(*strand_ptr_, std::move(task));
|
asio::post(*strand_ptr_, std::move(task));
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
AIMRT_ERROR("{}", e.what());
|
fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,6 +110,13 @@ std::chrono::system_clock::time_point AsioStrandExecutor::Now() const noexcept {
|
|||||||
|
|
||||||
void AsioStrandExecutor::ExecuteAt(
|
void AsioStrandExecutor::ExecuteAt(
|
||||||
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
|
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
|
||||||
|
if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] {
|
||||||
|
fprintf(stderr,
|
||||||
|
"Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n",
|
||||||
|
name_.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!options_.use_system_clock) {
|
if (!options_.use_system_clock) {
|
||||||
auto timer_ptr = std::make_shared<asio::steady_timer>(*strand_ptr_);
|
auto timer_ptr = std::make_shared<asio::steady_timer>(*strand_ptr_);
|
||||||
@ -150,7 +164,7 @@ void AsioStrandExecutor::ExecuteAt(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
AIMRT_ERROR("{}", e.what());
|
fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,6 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
|
|||||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
rhs.timeout_alarm_threshold_us)
|
rhs.timeout_alarm_threshold_us)
|
||||||
.count());
|
.count());
|
||||||
node["queue_threshold"] = rhs.queue_threshold;
|
|
||||||
node["use_system_clock"] = rhs.use_system_clock;
|
node["use_system_clock"] = rhs.use_system_clock;
|
||||||
|
|
||||||
return node;
|
return node;
|
||||||
@ -36,8 +35,6 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
|
|||||||
if (node["timeout_alarm_threshold_us"])
|
if (node["timeout_alarm_threshold_us"])
|
||||||
rhs.timeout_alarm_threshold_us = std::chrono::microseconds(
|
rhs.timeout_alarm_threshold_us = std::chrono::microseconds(
|
||||||
node["timeout_alarm_threshold_us"].as<uint64_t>());
|
node["timeout_alarm_threshold_us"].as<uint64_t>());
|
||||||
if (node["queue_threshold"])
|
|
||||||
rhs.queue_threshold = node["queue_threshold"].as<uint32_t>();
|
|
||||||
if (node["use_system_clock"])
|
if (node["use_system_clock"])
|
||||||
rhs.use_system_clock = node["use_system_clock"].as<bool>();
|
rhs.use_system_clock = node["use_system_clock"].as<bool>();
|
||||||
|
|
||||||
@ -62,9 +59,6 @@ void AsioThreadExecutor::Initialize(std::string_view name,
|
|||||||
start_sys_tp_ = std::chrono::system_clock::now();
|
start_sys_tp_ = std::chrono::system_clock::now();
|
||||||
start_std_tp_ = std::chrono::steady_clock::now();
|
start_std_tp_ = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
queue_threshold_ = options_.queue_threshold;
|
|
||||||
queue_warn_threshold_ = queue_threshold_ * 0.95;
|
|
||||||
|
|
||||||
AIMRT_CHECK_ERROR_THROW(
|
AIMRT_CHECK_ERROR_THROW(
|
||||||
options_.thread_num > 0,
|
options_.thread_num > 0,
|
||||||
"Invalide asio thread executor options, thread num is zero.");
|
"Invalide asio thread executor options, thread num is zero.");
|
||||||
@ -94,9 +88,7 @@ void AsioThreadExecutor::Initialize(std::string_view name,
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (io_ptr_->run_one()) {
|
io_ptr_->run();
|
||||||
--queue_task_num_;
|
|
||||||
}
|
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
AIMRT_FATAL("Asio thread executor '{}' run loop get exception, {}",
|
AIMRT_FATAL("Asio thread executor '{}' run loop get exception, {}",
|
||||||
Name(), e.what());
|
Name(), e.what());
|
||||||
@ -145,22 +137,6 @@ void AsioThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t cur_queue_task_num = ++queue_task_num_;
|
|
||||||
|
|
||||||
if (cur_queue_task_num > queue_threshold_) [[unlikely]] {
|
|
||||||
fprintf(stderr,
|
|
||||||
"The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n",
|
|
||||||
name_.c_str(), queue_threshold_);
|
|
||||||
--queue_task_num_;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] {
|
|
||||||
fprintf(stderr,
|
|
||||||
"The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n",
|
|
||||||
name_.c_str(), cur_queue_task_num, queue_threshold_);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
asio::post(*io_ptr_, std::move(task));
|
asio::post(*io_ptr_, std::move(task));
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
@ -187,22 +163,6 @@ void AsioThreadExecutor::ExecuteAt(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t cur_queue_task_num = ++queue_task_num_;
|
|
||||||
|
|
||||||
if (cur_queue_task_num > queue_threshold_) [[unlikely]] {
|
|
||||||
fprintf(stderr,
|
|
||||||
"The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n",
|
|
||||||
name_.c_str(), queue_threshold_);
|
|
||||||
--queue_task_num_;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] {
|
|
||||||
fprintf(stderr,
|
|
||||||
"The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n",
|
|
||||||
name_.c_str(), cur_queue_task_num, queue_threshold_);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!options_.use_system_clock) {
|
if (!options_.use_system_clock) {
|
||||||
auto timer_ptr = std::make_shared<asio::steady_timer>(*io_ptr_);
|
auto timer_ptr = std::make_shared<asio::steady_timer>(*io_ptr_);
|
||||||
|
@ -25,7 +25,6 @@ class AsioThreadExecutor : public ExecutorBase {
|
|||||||
std::string thread_sched_policy;
|
std::string thread_sched_policy;
|
||||||
std::vector<uint32_t> thread_bind_cpu;
|
std::vector<uint32_t> thread_bind_cpu;
|
||||||
std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1);
|
std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1);
|
||||||
uint32_t queue_threshold = 10000;
|
|
||||||
bool use_system_clock = false;
|
bool use_system_clock = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -57,8 +56,6 @@ class AsioThreadExecutor : public ExecutorBase {
|
|||||||
std::chrono::system_clock::time_point Now() const noexcept override;
|
std::chrono::system_clock::time_point Now() const noexcept override;
|
||||||
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
|
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;
|
||||||
|
|
||||||
size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }
|
|
||||||
|
|
||||||
State GetState() const { return state_.load(); }
|
State GetState() const { return state_.load(); }
|
||||||
|
|
||||||
void SetLogger(const std::shared_ptr<aimrt::common::util::LoggerWrapper>& logger_ptr) { logger_ptr_ = logger_ptr; }
|
void SetLogger(const std::shared_ptr<aimrt::common::util::LoggerWrapper>& logger_ptr) { logger_ptr_ = logger_ptr; }
|
||||||
@ -75,10 +72,6 @@ class AsioThreadExecutor : public ExecutorBase {
|
|||||||
std::chrono::system_clock::time_point start_sys_tp_;
|
std::chrono::system_clock::time_point start_sys_tp_;
|
||||||
std::chrono::steady_clock::time_point start_std_tp_;
|
std::chrono::steady_clock::time_point start_std_tp_;
|
||||||
|
|
||||||
uint32_t queue_threshold_;
|
|
||||||
uint32_t queue_warn_threshold_;
|
|
||||||
std::atomic_uint32_t queue_task_num_ = 0;
|
|
||||||
|
|
||||||
std::unique_ptr<asio::io_context> io_ptr_;
|
std::unique_ptr<asio::io_context> io_ptr_;
|
||||||
std::unique_ptr<
|
std::unique_ptr<
|
||||||
asio::executor_work_guard<asio::io_context::executor_type>>
|
asio::executor_work_guard<asio::io_context::executor_type>>
|
||||||
|
@ -34,7 +34,9 @@ target_link_libraries(
|
|||||||
aimrt::runtime::core)
|
aimrt::runtime::core)
|
||||||
|
|
||||||
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
|
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
|
||||||
set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s")
|
if(CMAKE_BUILD_TYPE STREQUAL "Release")
|
||||||
|
set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s")
|
||||||
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
# Add -Werror option
|
# Add -Werror option
|
||||||
|
Loading…
x
Reference in New Issue
Block a user