From 076f76394e422a72a5ccdbaf27d96d1147c681cf Mon Sep 17 00:00:00 2001 From: ATT_POWER <34850640+yglsaltfish@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:18:00 +0800 Subject: [PATCH] refactor(echo_plugin): Remove executor configuration and simplify echo logic (#78) * refactor(echo_plugin): Remove executor configuration and simplify echo logic - Removed executor field from echo_plugin configuration - Simplified echo message handling process by processing directly in main thread - Updated example configurations and descriptions in documentation * refactor(echo_plugin): Refactor EchoPlugin class - Remove get_type_support_func_ member variable, replace with direct GetTypeSupport method call in RegisterEchoChannel - Merge EchoFunc creation and subscription callback logic to reduce intermediate variables * refactor(echo_plugin): Remove GetTypeSupport function, directly access type_support_map where needed * refactor(echo_plugin): Remove unused headers and member functions * refactor(echo_plugin): Remove unnecessary 'this' capture in lambda expression * refactor(echo_plugin): optimize buffer handling logic in EchoPlugin * format * fix(echo_plugin): add release_callback func before return in sub_wrapper.callback --------- Co-authored-by: yuguanlin --- .../tutorials/plugins/echo_plugin.md | 23 +--- src/examples/plugins/echo_plugin/README.md | 34 +----- ...s_plugins_echo_plugin_pb_executor_cfg.yaml | 54 --------- ...xamples_plugins_echo_plugin_pb_executor.sh | 3 - src/plugins/echo_plugin/echo_plugin.cc | 114 ++++-------------- src/plugins/echo_plugin/echo_plugin.h | 18 +-- 6 files changed, 34 insertions(+), 212 deletions(-) delete mode 100644 src/examples/plugins/echo_plugin/install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml delete mode 100755 src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_pb_executor.sh diff --git a/document/sphinx-cn/tutorials/plugins/echo_plugin.md b/document/sphinx-cn/tutorials/plugins/echo_plugin.md index 91a01ae86..407bd7574 100644 --- a/document/sphinx-cn/tutorials/plugins/echo_plugin.md +++ b/document/sphinx-cn/tutorials/plugins/echo_plugin.md @@ -16,7 +16,6 @@ | ---- | ---- | ---- | ---- | ---- | | type_support_pkgs | array | 必选 | [] | type support 包配置 | | type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 | -| executor | string | 可选 | "" | 回显使用的执行器,要求必须是线程安全 | | topic_meta_list | array | 必选 | [] | 要回显的 topic 和类型 | | topic_meta_list[j].topic_name | string | 必选 | "" | 要回显的 topic | | topic_meta_list[j].msg_type | string | 必选 | "" | 要回显的消息类型 | @@ -25,11 +24,10 @@ ### 回显消息的简单示例配置 -回显消息的存在两种配置,分别是 是否指定执行器 和 回显消息的格式: -- 是否指定执行器: 插件会使用指定的执行器来处理回显消息,如果未指定执行器,则使用默认的执行器; -- 回显消息的格式: ros2 消息类型 支持 "json", "yaml" , pb只支持 "json" -以下是一个带执行器的回显消息格式为 json 的简单示例配置: +对于回显消息的格式,ros2 消息类型 支持 "json", "yaml" , pb只支持 "json" + +以下是一个 pb 消息类型回显消息格式为 json 的简单示例配置: ```yaml aimrt: plugin: @@ -39,7 +37,6 @@ aimrt: options: type_support_pkgs: - path: ./libexample_event_ts_pkg.so - executor: echo_executor topic_meta_list: - topic_name: test_topic msg_type: pb:aimrt.protocols.example.ExampleEventMsg @@ -48,16 +45,12 @@ aimrt: core_lvl: Info # Trace/Debug/Info backends: - type: console - executor: - executors: - - name: echo_executor - type: simple_thread channel: # ... ``` -以下是一个不带执行器的回显消息格式为 json 的简单示例配置: +以下是一个 ros2 消息类型回显消息格式为 yaml 的简单示例配置: ```yaml aimrt: plugin: @@ -69,16 +62,12 @@ aimrt: - path: ./libexample_event_ts_pkg.so topic_meta_list: - topic_name: test_topic - msg_type: pb:aimrt.protocols.example.ExampleEventMsg - echo_type: json + msg_type: ros2:example_ros2/msg/RosTestMsg + echo_type: yaml log: core_lvl: Info # Trace/Debug/Info backends: - type: console - executor: - executors: - - name: echo_executor - type: simple_thread channel: # ... ``` diff --git a/src/examples/plugins/echo_plugin/README.md b/src/examples/plugins/echo_plugin/README.md index f6dc8ba2e..b1360eeb1 100644 --- a/src/examples/plugins/echo_plugin/README.md +++ b/src/examples/plugins/echo_plugin/README.md @@ -1,41 +1,11 @@ # echo plugin examples -## echo with executor - -一个基于 **echo_plugin** 的带执行器的回显消息示例,演示内容包括: -- 如何在启动时加载 **echo_plugin**; -- 如何回显指定 topic、msg 类型的数据; -- 如何为 **echo_plugin** 配置执行器; - - -核心代码: -- [event.proto](../../../protocols/example/event.proto) -- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc) -- [type_support_pkg_main.cc](./example_event_ts_pkg/type_support_pkg_main.cc) - - -配置文件: -- [examples_plugins_echo_plugin_pb_executor_cfg.yaml](./install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml) - - -运行方式(linux): -- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_WITH_PROTOBUF`、`AIMRT_BUILD_WITH_ROS2` 、`AIMRT_BUILD_WITH_ECHO_PLUGIN` 选项编译 AimRT; -- 直接运行 build 目录下`start_examples_plugins_echo_plugin_pb_executor.sh`脚本启动进程; -- 键入`ctrl-c`停止进程; - - -说明: -- 此示例创建了以下模块: - - `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息; -- 此示例加载了 `example_event_ts_pkg`,其中提供了 `ExampleEventMsg` 和 `RosTestMsg` 类型的 type support 工具,作为回显时的序列化工具; -- 请注意,echo 插件的原理是向 AimRT 订阅指定的 Topic,因此需要在 channel 配置中为该 topic 设置合适的后端,以保证插件能接收到数据; - - -## echo without executor +## echo with pb msg 一个基于 **echo_plugin** 的回显消息示例,演示内容包括: - 如何在启动时加载 **echo_plugin**; - 如何回显指定 topic、msg 类型的数据; +- 如何配置回显 pb 消息; 核心代码: diff --git a/src/examples/plugins/echo_plugin/install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml b/src/examples/plugins/echo_plugin/install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml deleted file mode 100644 index 44650c221..000000000 --- a/src/examples/plugins/echo_plugin/install/linux/bin/cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (c) 2023, AgiBot Inc. -# All rights reserved. - -aimrt: - plugin: - plugins: - - name: echo_plugin - path: ./libaimrt_echo_plugin.so - options: - type_support_pkgs: - - path: ./libexample_event_ts_pkg.so - executor: echo_executor - topic_meta_list: - - topic_name: test_topic - msg_type: pb:aimrt.protocols.example.ExampleEventMsg - echo_type: json - log: - core_lvl: Info # Trace/Debug/Info - backends: - - type: console - executor: - executors: - - name: echo_executor - type: simple_thread - - name: work_thread_pool - type: asio_thread - options: - thread_num: 4 - channel: - backends: - - type: local - options: - subscriber_use_inline_executor: false - subscriber_executor: work_thread_pool - pub_topics_options: - - topic_name: "(.*)" - enable_backends: [local] - sub_topics_options: - - topic_name: "(.*)" - enable_backends: [local] - module: - pkgs: - - path: ./libpb_chn_pub_pkg.so - enable_modules: [NormalPublisherModule] - modules: - - name: NormalPublisherModule - log_lvl: Info - -# Module custom configuration -NormalPublisherModule: - topic_name: test_topic - channel_frq: 0.5 - - diff --git a/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_pb_executor.sh b/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_pb_executor.sh deleted file mode 100755 index ddc26f3e4..000000000 --- a/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_pb_executor.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_pb_executor_cfg.yaml diff --git a/src/plugins/echo_plugin/echo_plugin.cc b/src/plugins/echo_plugin/echo_plugin.cc index cfb20c9dd..c5d4e0738 100644 --- a/src/plugins/echo_plugin/echo_plugin.cc +++ b/src/plugins/echo_plugin/echo_plugin.cc @@ -23,9 +23,8 @@ struct convert { node["type_support_pkgs"].push_back(type_support_pkg_node); } - node["executor"] = rhs.executor; - node["topic_meta_list"] = Node(NodeType::Sequence); + for (const auto& topic_meta : rhs.topic_meta_list) { Node topic_meta_node; topic_meta_node["topic_name"] = topic_meta.topic_name; @@ -47,10 +46,6 @@ struct convert { } } - if (node["executor"] && node["executor"].IsScalar()) { - rhs.executor = node["executor"].as(); - } - if (node["topic_meta_list"] && node["topic_meta_list"].IsSequence()) { for (const auto& topic_meta_node : node["topic_meta_list"]) { Options::TopicMeta topic_meta; @@ -101,26 +96,19 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept { AIMRT_TRACE("Load {} pkg and {} type.", type_support_pkg_loader_vec_.size(), type_support_map_.size()); - RegisterGetTypeSupportFunc( - [this](std::string_view msg_type) -> aimrt::util::TypeSupportRef { - auto finditr = type_support_map_.find(msg_type); - if (finditr != type_support_map_.end()) - return finditr->second.type_support_ref; - return {}; - }); - for (auto& topic_meta : options_.topic_meta_list) { // check msg type - auto type_support_ref = get_type_support_func_(topic_meta.msg_type); - AIMRT_CHECK_ERROR_THROW(type_support_ref, + auto finditr = type_support_map_.find(topic_meta.msg_type); + AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); + auto& type_support_ref = finditr->second.type_support_ref; // check serialization type if (!topic_meta.serialization_type.empty()) { bool check_ret = type_support_ref.CheckSerializationTypeSupported(topic_meta.serialization_type); AIMRT_CHECK_ERROR_THROW(check_ret, "Msg type '{}' does not support serialization type '{}'.", - topic_meta.msg_type, topic_meta.msg_type); + topic_meta.msg_type, topic_meta.serialization_type); } else { topic_meta.serialization_type = type_support_ref.DefaultSerializationType(); } @@ -145,18 +133,6 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept { topic_meta_map_.emplace(key, topic_meta); } - if (!options_.executor.empty()) { - core_ptr_->RegisterHookFunc( - runtime::core::AimRTCore::State::kPostStart, - [this] { - RegisterGetExecutorFunc( - [this](std::string_view executor_name) -> aimrt::executor::ExecutorRef { - return core_ptr_->GetExecutorManager().GetExecutor(executor_name); - }); - InitExecutor(); - }); - } - core_ptr_->RegisterHookFunc( runtime::core::AimRTCore::State::kPostInitLog, [this] { @@ -222,41 +198,42 @@ void EchoPlugin::RegisterEchoChannel() { for (const auto& topic_meta_itr : topic_meta_list) { const auto& topic_meta = topic_meta_itr.second; - EchoFunc echo_func; - if (options_.executor.empty()) { - echo_func = [this, echo_type{topic_meta.echo_type}]( - MsgWrapper& msg_wrapper) { - Echo(msg_wrapper, echo_type); - }; - } else { - echo_func = [this, echo_type{topic_meta.echo_type}]( - MsgWrapper& msg_wrapper) { - executor_.Execute([this, msg_wrapper{std::move(msg_wrapper)}, echo_type]() mutable { - Echo(msg_wrapper, echo_type); - }); - }; - } auto finditr = type_support_map_.find(topic_meta.msg_type); AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); - const auto& type_support_ref = finditr->second; + const auto& type_support_wrapper = finditr->second; SubscribeWrapper sub_wrapper; sub_wrapper.info = TopicInfo{ .msg_type = topic_meta.msg_type, .topic_name = topic_meta.topic_name, - .pkg_path = type_support_ref.options.path, + .pkg_path = type_support_wrapper.options.path, .module_name = "core", - .msg_type_support_ref = type_support_ref.type_support_ref}; + .msg_type_support_ref = type_support_wrapper.type_support_ref}; sub_wrapper.require_cache_serialization_types.emplace(topic_meta.serialization_type); - sub_wrapper.callback = [echo_func{std::move(echo_func)}]( + sub_wrapper.callback = [echo_type{topic_meta.echo_type}]( MsgWrapper& msg_wrapper, std::function&& release_callback) { - echo_func(msg_wrapper); + auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type); + if (!buffer_view_ptr) [[unlikely]] { + AIMRT_ERROR("Can not serialize msg type '{}' with echo type '{}'.", + msg_wrapper.info.msg_type, echo_type); + release_callback(); + return; + } + if (buffer_view_ptr->Size() == 1) { + auto data = buffer_view_ptr->Data()[0]; + AIMRT_INFO("\n{}\n---------------\n", std::string_view(static_cast(data.data), data.len)); + } else if (buffer_view_ptr->Size() > 1) { + AIMRT_INFO("\n{}\n---------------\n", buffer_view_ptr->JoinToString()); + } else { + AIMRT_ERROR("Invalid buffer, topic_name: {}, msg_type: {}", msg_wrapper.info.topic_name, msg_wrapper.info.msg_type); + } release_callback(); }; + bool ret = core_ptr_->GetChannelManager().Subscribe(std::move(sub_wrapper)); AIMRT_CHECK_ERROR_THROW(ret, "Subscribe failed!"); } @@ -270,45 +247,4 @@ void EchoPlugin::Shutdown() noexcept { } } -void EchoPlugin::Echo(runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view echo_type) { - auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type); - if (!buffer_view_ptr) [[unlikely]] { - AIMRT_ERROR("Can not serialize msg type '{}' with echo type '{}'.", - msg_wrapper.info.msg_type, echo_type); - return; - } - if (buffer_view_ptr->Size() == 1) { - const char* data = static_cast(buffer_view_ptr->Data()[0].data); - AIMRT_INFO("\n{}\n---------------\n", std::string_view(data, buffer_view_ptr->Data()[0].len)); - } else if (buffer_view_ptr->Size() > 1) { - AIMRT_INFO("\n{}\n---------------\n", buffer_view_ptr->JoinToString()); - } else { - AIMRT_ERROR("Invalid buffer, topic_name: {}, msg_type: {}", msg_wrapper.info.topic_name, msg_wrapper.info.msg_type); - } -} - -void EchoPlugin::RegisterGetTypeSupportFunc( - const std::function& get_type_support_func) { - get_type_support_func_ = get_type_support_func; -} - -void EchoPlugin::InitExecutor() { - AIMRT_CHECK_ERROR_THROW( - get_executor_func_, - "Get executor function is not set before initialize."); - - executor_ = get_executor_func_(options_.executor); - - AIMRT_CHECK_ERROR_THROW( - executor_, "Can not get executor {}.", options_.executor); - - AIMRT_CHECK_ERROR_THROW( - executor_.ThreadSafe(), "Echo executor {} is not thread safe!", options_.executor); -} - -void EchoPlugin::RegisterGetExecutorFunc( - const std::function& get_executor_func) { - get_executor_func_ = get_executor_func; -} - } // namespace aimrt::plugins::echo_plugin diff --git a/src/plugins/echo_plugin/echo_plugin.h b/src/plugins/echo_plugin/echo_plugin.h index d3ec30df0..53df31813 100644 --- a/src/plugins/echo_plugin/echo_plugin.h +++ b/src/plugins/echo_plugin/echo_plugin.h @@ -18,8 +18,6 @@ namespace aimrt::plugins::echo_plugin { class EchoPlugin : public AimRTCorePluginBase { public: struct Options { - std::string executor; - struct TopicMeta { std::string topic_name; std::string msg_type; @@ -42,21 +40,12 @@ class EchoPlugin : public AimRTCorePluginBase { bool Initialize(runtime::core::AimRTCore* core_ptr) noexcept override; void Shutdown() noexcept override; - const auto& GetTypeSupportMap() const { return type_support_map_; } - private: void InitTypeSupport(Options::TypeSupportPkg& options); - void InitExecutor(); void RegisterEchoChannel(); - void RegisterGetTypeSupportFunc( - const std::function& get_type_support_func); - - void RegisterGetExecutorFunc( - const std::function& get_executor_func); - - aimrt::executor::ExecutorRef executor_; + private: runtime::core::AimRTCore* core_ptr_ = nullptr; Options options_; @@ -75,11 +64,6 @@ class EchoPlugin : public AimRTCorePluginBase { std::unordered_map type_support_map_; std::unordered_map topic_meta_map_; - - std::function get_type_support_func_; - std::function get_executor_func_; - - void Echo(runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type); }; } // namespace aimrt::plugins::echo_plugin