refactor(echo_plugin): Remove executor configuration and simplify echo logic (#78)

* refactor(echo_plugin): Remove executor configuration and simplify echo logic

- Removed executor field from echo_plugin configuration
- Simplified echo message handling process by processing directly in main thread
- Updated example configurations and descriptions in documentation

* refactor(echo_plugin): Refactor EchoPlugin class

- Remove get_type_support_func_ member variable, replace with direct GetTypeSupport method call in RegisterEchoChannel
- Merge EchoFunc creation and subscription callback logic to reduce intermediate variables

* refactor(echo_plugin): Remove GetTypeSupport function, directly access type_support_map where needed

* refactor(echo_plugin): Remove unused headers and member functions

* refactor(echo_plugin): Remove unnecessary 'this' capture in lambda expression

* refactor(echo_plugin): optimize buffer handling logic in EchoPlugin

* format

* fix(echo_plugin):  add release_callback func before return in sub_wrapper.callback

---------

Co-authored-by: yuguanlin <yuguanlin@agibot.com>
This commit is contained in:
ATT_POWER 2024-11-04 17:18:00 +08:00 committed by GitHub
parent fe409b822b
commit 076f76394e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 34 additions and 212 deletions

View File

@ -16,7 +16,6 @@
| ---- | ---- | ---- | ---- | ---- |
| type_support_pkgs | array | 必选 | [] | type support 包配置 |
| type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 |
| executor | string | 可选 | "" | 回显使用的执行器,要求必须是线程安全 |
| topic_meta_list | array | 必选 | [] | 要回显的 topic 和类型 |
| topic_meta_list[j].topic_name | string | 必选 | "" | 要回显的 topic |
| topic_meta_list[j].msg_type | string | 必选 | "" | 要回显的消息类型 |
@ -25,11 +24,10 @@
### 回显消息的简单示例配置
回显消息的存在两种配置,分别是 是否指定执行器 和 回显消息的格式:
- 是否指定执行器: 插件会使用指定的执行器来处理回显消息,如果未指定执行器,则使用默认的执行器;
- 回显消息的格式: ros2 消息类型 支持 "json", "yaml" pb只支持 "json"
以下是一个带执行器的回显消息格式为 json 的简单示例配置:
对于回显消息的格式ros2 消息类型 支持 "json", "yaml" pb只支持 "json"
以下是一个 pb 消息类型回显消息格式为 json 的简单示例配置:
```yaml
aimrt:
plugin:
@ -39,7 +37,6 @@ aimrt:
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
executor: echo_executor
topic_meta_list:
- topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
@ -48,16 +45,12 @@ aimrt:
core_lvl: Info # Trace/Debug/Info
backends:
- type: console
executor:
executors:
- name: echo_executor
type: simple_thread
channel:
# ...
```
以下是一个不带执行器的回显消息格式为 json 的简单示例配置:
以下是一个 ros2 消息类型回显消息格式为 yaml 的简单示例配置:
```yaml
aimrt:
plugin:
@ -69,16 +62,12 @@ aimrt:
- path: ./libexample_event_ts_pkg.so
topic_meta_list:
- topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
echo_type: json
msg_type: ros2:example_ros2/msg/RosTestMsg
echo_type: yaml
log:
core_lvl: Info # Trace/Debug/Info
backends:
- type: console
executor:
executors:
- name: echo_executor
type: simple_thread
channel:
# ...
```

View File

@ -1,41 +1,11 @@
# echo plugin examples
## echo with executor
一个基于 **echo_plugin** 的带执行器的回显消息示例,演示内容包括:
- 如何在启动时加载 **echo_plugin**
- 如何回显指定 topic、msg 类型的数据;
- 如何为 **echo_plugin** 配置执行器;
核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [type_support_pkg_main.cc](./example_event_ts_pkg/type_support_pkg_main.cc)
配置文件:
- [examples_plugins_echo_plugin_pb_executor_cfg.yaml](./install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_PROTOBUF``AIMRT_BUILD_WITH_ROS2``AIMRT_BUILD_WITH_ECHO_PLUGIN` 选项编译 AimRT
- 直接运行 build 目录下`start_examples_plugins_echo_plugin_pb_executor.sh`脚本启动进程;
- 键入`ctrl-c`停止进程;
说明:
- 此示例创建了以下模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- 此示例加载了 `example_event_ts_pkg`,其中提供了 `ExampleEventMsg``RosTestMsg` 类型的 type support 工具,作为回显时的序列化工具;
- 请注意echo 插件的原理是向 AimRT 订阅指定的 Topic因此需要在 channel 配置中为该 topic 设置合适的后端,以保证插件能接收到数据;
## echo without executor
## echo with pb msg
一个基于 **echo_plugin** 的回显消息示例,演示内容包括:
- 如何在启动时加载 **echo_plugin**
- 如何回显指定 topic、msg 类型的数据;
- 如何配置回显 pb 消息;
核心代码:

View File

@ -1,54 +0,0 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: echo_plugin
path: ./libaimrt_echo_plugin.so
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
executor: echo_executor
topic_meta_list:
- topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
echo_type: json
log:
core_lvl: Info # Trace/Debug/Info
backends:
- type: console
executor:
executors:
- name: echo_executor
type: simple_thread
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
channel:
backends:
- type: local
options:
subscriber_use_inline_executor: false
subscriber_executor: work_thread_pool
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [local]
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [local]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: Info
# Module custom configuration
NormalPublisherModule:
topic_name: test_topic
channel_frq: 0.5

View File

@ -1,3 +0,0 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml

View File

@ -23,9 +23,8 @@ struct convert<aimrt::plugins::echo_plugin::EchoPlugin::Options> {
node["type_support_pkgs"].push_back(type_support_pkg_node);
}
node["executor"] = rhs.executor;
node["topic_meta_list"] = Node(NodeType::Sequence);
for (const auto& topic_meta : rhs.topic_meta_list) {
Node topic_meta_node;
topic_meta_node["topic_name"] = topic_meta.topic_name;
@ -47,10 +46,6 @@ struct convert<aimrt::plugins::echo_plugin::EchoPlugin::Options> {
}
}
if (node["executor"] && node["executor"].IsScalar()) {
rhs.executor = node["executor"].as<std::string>();
}
if (node["topic_meta_list"] && node["topic_meta_list"].IsSequence()) {
for (const auto& topic_meta_node : node["topic_meta_list"]) {
Options::TopicMeta topic_meta;
@ -101,26 +96,19 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept {
AIMRT_TRACE("Load {} pkg and {} type.",
type_support_pkg_loader_vec_.size(), type_support_map_.size());
RegisterGetTypeSupportFunc(
[this](std::string_view msg_type) -> aimrt::util::TypeSupportRef {
auto finditr = type_support_map_.find(msg_type);
if (finditr != type_support_map_.end())
return finditr->second.type_support_ref;
return {};
});
for (auto& topic_meta : options_.topic_meta_list) {
// check msg type
auto type_support_ref = get_type_support_func_(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(type_support_ref,
auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
auto& type_support_ref = finditr->second.type_support_ref;
// check serialization type
if (!topic_meta.serialization_type.empty()) {
bool check_ret = type_support_ref.CheckSerializationTypeSupported(topic_meta.serialization_type);
AIMRT_CHECK_ERROR_THROW(check_ret,
"Msg type '{}' does not support serialization type '{}'.",
topic_meta.msg_type, topic_meta.msg_type);
topic_meta.msg_type, topic_meta.serialization_type);
} else {
topic_meta.serialization_type = type_support_ref.DefaultSerializationType();
}
@ -145,18 +133,6 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept {
topic_meta_map_.emplace(key, topic_meta);
}
if (!options_.executor.empty()) {
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPostStart,
[this] {
RegisterGetExecutorFunc(
[this](std::string_view executor_name) -> aimrt::executor::ExecutorRef {
return core_ptr_->GetExecutorManager().GetExecutor(executor_name);
});
InitExecutor();
});
}
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPostInitLog,
[this] {
@ -222,41 +198,42 @@ void EchoPlugin::RegisterEchoChannel() {
for (const auto& topic_meta_itr : topic_meta_list) {
const auto& topic_meta = topic_meta_itr.second;
EchoFunc echo_func;
if (options_.executor.empty()) {
echo_func = [this, echo_type{topic_meta.echo_type}](
MsgWrapper& msg_wrapper) {
Echo(msg_wrapper, echo_type);
};
} else {
echo_func = [this, echo_type{topic_meta.echo_type}](
MsgWrapper& msg_wrapper) {
executor_.Execute([this, msg_wrapper{std::move(msg_wrapper)}, echo_type]() mutable {
Echo(msg_wrapper, echo_type);
});
};
}
auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
const auto& type_support_ref = finditr->second;
const auto& type_support_wrapper = finditr->second;
SubscribeWrapper sub_wrapper;
sub_wrapper.info = TopicInfo{
.msg_type = topic_meta.msg_type,
.topic_name = topic_meta.topic_name,
.pkg_path = type_support_ref.options.path,
.pkg_path = type_support_wrapper.options.path,
.module_name = "core",
.msg_type_support_ref = type_support_ref.type_support_ref};
.msg_type_support_ref = type_support_wrapper.type_support_ref};
sub_wrapper.require_cache_serialization_types.emplace(topic_meta.serialization_type);
sub_wrapper.callback = [echo_func{std::move(echo_func)}](
sub_wrapper.callback = [echo_type{topic_meta.echo_type}](
MsgWrapper& msg_wrapper, std::function<void()>&& release_callback) {
echo_func(msg_wrapper);
auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type);
if (!buffer_view_ptr) [[unlikely]] {
AIMRT_ERROR("Can not serialize msg type '{}' with echo type '{}'.",
msg_wrapper.info.msg_type, echo_type);
release_callback();
return;
}
if (buffer_view_ptr->Size() == 1) {
auto data = buffer_view_ptr->Data()[0];
AIMRT_INFO("\n{}\n---------------\n", std::string_view(static_cast<const char*>(data.data), data.len));
} else if (buffer_view_ptr->Size() > 1) {
AIMRT_INFO("\n{}\n---------------\n", buffer_view_ptr->JoinToString());
} else {
AIMRT_ERROR("Invalid buffer, topic_name: {}, msg_type: {}", msg_wrapper.info.topic_name, msg_wrapper.info.msg_type);
}
release_callback();
};
bool ret = core_ptr_->GetChannelManager().Subscribe(std::move(sub_wrapper));
AIMRT_CHECK_ERROR_THROW(ret, "Subscribe failed!");
}
@ -270,45 +247,4 @@ void EchoPlugin::Shutdown() noexcept {
}
}
void EchoPlugin::Echo(runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view echo_type) {
auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type);
if (!buffer_view_ptr) [[unlikely]] {
AIMRT_ERROR("Can not serialize msg type '{}' with echo type '{}'.",
msg_wrapper.info.msg_type, echo_type);
return;
}
if (buffer_view_ptr->Size() == 1) {
const char* data = static_cast<const char*>(buffer_view_ptr->Data()[0].data);
AIMRT_INFO("\n{}\n---------------\n", std::string_view(data, buffer_view_ptr->Data()[0].len));
} else if (buffer_view_ptr->Size() > 1) {
AIMRT_INFO("\n{}\n---------------\n", buffer_view_ptr->JoinToString());
} else {
AIMRT_ERROR("Invalid buffer, topic_name: {}, msg_type: {}", msg_wrapper.info.topic_name, msg_wrapper.info.msg_type);
}
}
void EchoPlugin::RegisterGetTypeSupportFunc(
const std::function<aimrt::util::TypeSupportRef(std::string_view)>& get_type_support_func) {
get_type_support_func_ = get_type_support_func;
}
void EchoPlugin::InitExecutor() {
AIMRT_CHECK_ERROR_THROW(
get_executor_func_,
"Get executor function is not set before initialize.");
executor_ = get_executor_func_(options_.executor);
AIMRT_CHECK_ERROR_THROW(
executor_, "Can not get executor {}.", options_.executor);
AIMRT_CHECK_ERROR_THROW(
executor_.ThreadSafe(), "Echo executor {} is not thread safe!", options_.executor);
}
void EchoPlugin::RegisterGetExecutorFunc(
const std::function<executor::ExecutorRef(std::string_view)>& get_executor_func) {
get_executor_func_ = get_executor_func;
}
} // namespace aimrt::plugins::echo_plugin

View File

@ -18,8 +18,6 @@ namespace aimrt::plugins::echo_plugin {
class EchoPlugin : public AimRTCorePluginBase {
public:
struct Options {
std::string executor;
struct TopicMeta {
std::string topic_name;
std::string msg_type;
@ -42,21 +40,12 @@ class EchoPlugin : public AimRTCorePluginBase {
bool Initialize(runtime::core::AimRTCore* core_ptr) noexcept override;
void Shutdown() noexcept override;
const auto& GetTypeSupportMap() const { return type_support_map_; }
private:
void InitTypeSupport(Options::TypeSupportPkg& options);
void InitExecutor();
void RegisterEchoChannel();
void RegisterGetTypeSupportFunc(
const std::function<aimrt::util::TypeSupportRef(std::string_view)>& get_type_support_func);
void RegisterGetExecutorFunc(
const std::function<executor::ExecutorRef(std::string_view)>& get_executor_func);
aimrt::executor::ExecutorRef executor_;
private:
runtime::core::AimRTCore* core_ptr_ = nullptr;
Options options_;
@ -75,11 +64,6 @@ class EchoPlugin : public AimRTCorePluginBase {
std::unordered_map<std::string_view, TypeSupportWrapper> type_support_map_;
std::unordered_map<TopicMetaKey, TopicMeta, TopicMetaKey::Hash> topic_meta_map_;
std::function<aimrt::util::TypeSupportRef(std::string_view)> get_type_support_func_;
std::function<executor::ExecutorRef(std::string_view)> get_executor_func_;
void Echo(runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type);
};
} // namespace aimrt::plugins::echo_plugin