diff --git a/CMakeLists.txt b/CMakeLists.txt index b76046376..3ce2692dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,7 @@ cmake_dependent_option(AIMRT_BUILD_TIME_MANIPULATOR_PLUGIN "AimRT build time man cmake_dependent_option(AIMRT_BUILD_PARAMETER_PLUGIN "AimRT build parameter plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF) cmake_dependent_option(AIMRT_BUILD_LOG_CONTROL_PLUGIN "AimRT build log control plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF) cmake_dependent_option(AIMRT_BUILD_GRPC_PLUGIN "AimRT build grpc plugin." OFF "AIMRT_BUILD_RUNTIME" OFF) +cmake_dependent_option(AIMRT_BUILD_PROXY_PLUGIN "AimRT build proxy plugin." OFF "AIMRT_BUILD_RUNTIME" OFF) option(AIMRT_INSTALL "Enable installation of AimRT." ON) diff --git a/build.bat b/build.bat index 498a9b301..11a552c6f 100644 --- a/build.bat +++ b/build.bat @@ -32,6 +32,7 @@ cmake -B build ^ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^ -DAIMRT_BUILD_GRPC_PLUGIN=OFF ^ -DAIMRT_BUILD_ECHO_PLUGIN=OFF ^ + -DAIMRT_BUILD_PROXY_PLUGIN=ON ^ -DAIMRT_BUILD_PYTHON_PACKAGE=ON ^ %* diff --git a/build.sh b/build.sh index 024d97047..aabe4d99f 100755 --- a/build.sh +++ b/build.sh @@ -34,6 +34,7 @@ cmake -B build \ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \ -DAIMRT_BUILD_GRPC_PLUGIN=ON \ -DAIMRT_BUILD_ECHO_PLUGIN=ON \ + -DAIMRT_BUILD_PROXY_PLUGIN=ON \ -DAIMRT_BUILD_PYTHON_PACKAGE=ON \ $@ diff --git a/document/sphinx-cn/release_notes/v0_9_0.md b/document/sphinx-cn/release_notes/v0_9_0.md index 936c34518..c874ea616 100644 --- a/document/sphinx-cn/release_notes/v0_9_0.md +++ b/document/sphinx-cn/release_notes/v0_9_0.md @@ -11,6 +11,7 @@ - 新增 Echo 插件,用于回显消息; - 新增了基于执行器的定时器,方便执行定时任务; - aimrt_py channel 和 rpc 支持 ros2 消息类型; +- 新增了 Proxy 插件,用于转发消息; **次要修改**: - 缩短了一些 examples 的文件路径长度; diff --git a/document/sphinx-cn/tutorials/examples/examples_plugins.md b/document/sphinx-cn/tutorials/examples/examples_plugins.md index 1dcbf3512..007bf192c 100644 --- a/document/sphinx-cn/tutorials/examples/examples_plugins.md +++ b/document/sphinx-cn/tutorials/examples/examples_plugins.md @@ -15,6 +15,7 @@ AimRT 提供了以下插件使用示例: - {{ '[time_manipulator_plugin]({}/src/examples/plugins/time_manipulator_plugin)'.format(code_site_root_path_url) }} - {{ '[zenoh_plugin]({}/src/examples/plugins/zenoh_plugin)'.format(code_site_root_path_url) }} - {{ '[echo_plugin]({}/src/examples/plugins/echo_plugin)'.format(code_site_root_path_url) }} + - {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }} 关于这些示例的说明: - 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看; diff --git a/document/sphinx-cn/tutorials/index.md b/document/sphinx-cn/tutorials/index.md index 723cfbd58..a8dddb275 100644 --- a/document/sphinx-cn/tutorials/index.md +++ b/document/sphinx-cn/tutorials/index.md @@ -103,6 +103,7 @@ plugins/zenoh_plugin.md plugins/iceoryx_plugin.md plugins/grpc_plugin.md plugins/echo_plugin.md +plugins/proxy_plugin.md ``` 如果开发者想定制开发自己的插件,可以参考以下文档。 diff --git a/document/sphinx-cn/tutorials/plugins/proxy_plugin.md b/document/sphinx-cn/tutorials/plugins/proxy_plugin.md new file mode 100644 index 000000000..2f4718f7d --- /dev/null +++ b/document/sphinx-cn/tutorials/plugins/proxy_plugin.md @@ -0,0 +1,85 @@ + +# echo插件 + +## 相关链接 + +参考示例: +- {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }} + +## 插件概述 + +**proxy_plugin**用于对 Channel 中的消息进行代理转发,插件支持独立的 type_support_pkg,并支持指定执行器, 其中执行器需要线程安全,在使用时,插件会根据配置注册一个或多个 Channel Subscriber 或 Publisher。 + +插件的配置项如下: + +| 节点 | 类型 | 是否可选| 默认值 | 作用 | +| ---- | ---- | ---- | ---- | ---- | +| type_support_pkgs | array | 必选 | [] | type support 包配置 | +| type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 | +| proxy_actions | array | 必选 | [] | 代理转发配置 | +| proxy_actions[i].name | string | 必选 | "" | 代理转发名称 | +| proxy_actions[i].options | object | 必选 | {} | 代理转发配置 | +| proxy_actions[i].options.executor| string | 必选 | "" | 代理转发执行器 | +| proxy_actions[i].options.topic_meta_list | array | 必选 | [] | 要代理转发的 topic 和类型 | +| proxy_actions[i].options.topic_meta_list[j].topic_name | string | 必选 | "" | 要代理转发的 topic | +| proxy_actions[i].options.topic_meta_list[j].msg_type | string | 必选 | "" | 要代理转发的消息类型 | +| proxy_actions[i].options.topic_meta_list[j].pub_topic_name | array | 必选 | [] | 代理转发后的 topic | + + +请注意,**proxy_plugin**中是以`action`为单元管理代理转发动作的,每个代理转发`action`可以有自己的执行器、topic 等参数,使用时可以根据数据实际大小和频率,为每个 action 分配合理的资源。 + + +### 代理转发的简单示例配置 + +以下是将一个以 http 为后端的 topic 消息代理转发到两个以 zenoh 和 ros2 为后端的 topic 的简单示例配置,对于 proxy_plugin 需要为每个 action 指定执行器,并且在 channel 处需要为每个订阅的 topic 和转发的 topic 指定后端,其他相关插件的配置请参考[net_plugin](./net_plugin.md), [zenoh_plugin](./zenoh_plugin.md) 和 [ros2_plugin](./ros2_plugin.md); + +```yaml +aimrt: + plugin: + plugins: + - name: proxy_plugin + path: ./libaimrt_proxy_plugin.so + options: + type_support_pkgs: + - path: ./libexample_event_ts_pkg.so + proxy_actions: + - name: my_proxy + options: + executor: proxy_plugin_executor + topic_meta_list: + - sub_topic_name: test_topic_http + pub_topic_name: [test_topic_zenoh, test_topic_ros2] + msg_type: pb:aimrt.protocols.example.ExampleEventMsg + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + - name: ros2_plugin + path: ./libaimrt_ros2_plugin.so + options: + node_name: example_ros2_pb_chn_publisher_node + executor_type: MultiThreaded # SingleThreaded/StaticSingleThreaded/MultiThreaded + executor_thread_num: 2 + - name: net_plugin + path: ./libaimrt_net_plugin.so + options: + thread_num: 4 + http_options: + listen_ip: 127.0.0.1 + listen_port: 50081 + channel: + backends: + - type: http + - type: zenoh + - type: ros2 + sub_topics_options: + - topic_name: test_topic_http + enable_backends: [http] + pub_topics_options: + - topic_name: test_topic_zenoh + enable_backends: [zenoh] + - topic_name: test_topic_ros2 + enable_backends: [ros2] + # ... +``` + + + diff --git a/src/examples/plugins/CMakeLists.txt b/src/examples/plugins/CMakeLists.txt index 50280bf7d..ab5c053c2 100644 --- a/src/examples/plugins/CMakeLists.txt +++ b/src/examples/plugins/CMakeLists.txt @@ -61,3 +61,10 @@ endif() if(AIMRT_BUILD_WITH_PROTOBUF AND AIMRT_BUILD_GRPC_PLUGIN) add_subdirectory(grpc_plugin) endif() + +if(AIMRT_BUILD_PROXY_PLUGIN + AND AIMRT_BUILD_WITH_PROTOBUF + AND AIMRT_BUILD_NET_PLUGIN + AND AIMRT_BUILD_ZENOH_PLUGIN) + add_subdirectory(proxy_plugin) +endif() diff --git a/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_ros2.sh b/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_ros2.sh index f88a5d7e7..8533e563d 100755 --- a/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_ros2.sh +++ b/src/examples/plugins/echo_plugin/install/linux/bin/start_examples_plugins_echo_plugin_ros2.sh @@ -1,3 +1,5 @@ #!/bin/bash +source install/share/example_ros2/local_setup.bash + ./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_ros2_cfg.yaml diff --git a/src/examples/plugins/proxy_plugin/CMakeLists.txt b/src/examples/plugins/proxy_plugin/CMakeLists.txt new file mode 100644 index 000000000..1b89df0f6 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +# Get the current folder name +string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + +# Set namespace +set_namespace() + +# type_support_pkg +add_subdirectory(example_event_ts_pkg) + +# install +if(CMAKE_SYSTEM_NAME MATCHES "Linux") + set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/linux) +elseif(CMAKE_SYSTEM_NAME MATCHES "Windows") + set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/win) +else() + message(FATAL_ERROR "Unsupport os") +endif() + +# build all +get_namespace(CUR_SUPERIOR_NAMESPACE) +string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE}) +add_custom_target( + ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}_build_all ALL + COMMAND ${CMAKE_COMMAND} -E copy_directory ${CUR_INSTALL_SOURCE_DIR}/bin ${CMAKE_BINARY_DIR} + DEPENDS aimrt::runtime::main # + aimrt::examples::cpp::pb_chn::pb_chn_pub_pkg) diff --git a/src/examples/plugins/proxy_plugin/README.md b/src/examples/plugins/proxy_plugin/README.md new file mode 100644 index 000000000..848eee417 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/README.md @@ -0,0 +1,30 @@ +# proxy plugin examples + +一个基于 **proxy_plugin** 的代理转发示例,演示内容包括: +- 如何在启动时加载 **proxy_plugin**; +- 如何配置消息转发的 topic; + +核心代码: +- [event.proto](../../../protocols/example/event.proto) +- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc) +- [type_support_pkg_main.cc](./example_event_ts_pkg/type_support_pkg_main.cc) + + +配置文件: +- [examples_plugins_proxy_plugin_http_pub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml) +- [examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml) +- [examples_plugins_proxy_plugin_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml) + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_WITH_PROTOBUF`、`AIMRT_BUILD_WITH_PROXY_PLUGIN`、`AIMRT_BUILD_WITH_ZENOH_PLUGIN`、`AIMRT_BUILD_WITH_NET_PLUGIN` 选项编译 AimRT; +- 分别运行 build 目录下 `start_examples_plugins_proxy_plugin_zenoh_sub.sh` , `start_examples_plugins_proxy_plugin.sh` 和 `start_examples_plugins_proxy_plugin_http_pub.sh` 脚本启动进程; +- 键入`ctrl-c`停止进程; + + +说明: +- 此示例创建了三个进程,分别为 http 发布消息进程、zenoh 订阅消息进程和 proxy 转发进程: + - `http_pub`,会以配置的频率,发布 `ExampleEventMsg` 消息到 `test_topic_http`; + - `proxy`,会使用 `proxy_plugin_executor` 执行器,执行代理消息转发操作; + - `zenoh_sub`,会订阅 `test_topic_zenoh`; +- 请注意,proxy 插件的原理是向 AimRT 订阅指定的 Topic,因此需要在 channel 配置中为该 topic 设置合适的后端,以保证插件能接收到数据,在此示例中,`test_topic_http` 配置了 `http` 后端,`test_topic_zenoh` 配置了 `zenoh` 后端; \ No newline at end of file diff --git a/src/examples/plugins/proxy_plugin/example_event_ts_pkg/CMakeLists.txt b/src/examples/plugins/proxy_plugin/example_event_ts_pkg/CMakeLists.txt new file mode 100644 index 000000000..928bb3094 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/example_event_ts_pkg/CMakeLists.txt @@ -0,0 +1,35 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +# Get the current folder name +string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + +# Get namespace +get_namespace(CUR_SUPERIOR_NAMESPACE) +string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE}) + +# Set target name +set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}) +set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR}) + +# Set file collection +file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc) + +# Add target +add_library(${CUR_TARGET_NAME} SHARED) +add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME}) + +# Set source file of target +target_sources(${CUR_TARGET_NAME} PRIVATE ${src}) + +# Set link libraries of target +target_link_libraries( + ${CUR_TARGET_NAME} + PRIVATE aimrt::interface::aimrt_type_support_pkg_c_interface + aimrt::interface::aimrt_module_protobuf_interface + aimrt::protocols::example_pb_gencode + aimrt::interface::aimrt_module_ros2_interface + example_ros2::example_ros2__rosidl_generator_cpp + example_ros2::example_ros2__rosidl_typesupport_cpp) +# Set misc of target +set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME ${CUR_DIR}) diff --git a/src/examples/plugins/proxy_plugin/example_event_ts_pkg/type_support_pkg_main.cc b/src/examples/plugins/proxy_plugin/example_event_ts_pkg/type_support_pkg_main.cc new file mode 100644 index 000000000..1eca903ec --- /dev/null +++ b/src/examples/plugins/proxy_plugin/example_event_ts_pkg/type_support_pkg_main.cc @@ -0,0 +1,27 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "aimrt_type_support_pkg_c_interface/type_support_pkg_main.h" + +#include "aimrt_module_protobuf_interface/util/protobuf_type_support.h" + +#include "aimrt_module_ros2_interface/util/ros2_type_support.h" + +#include "example_ros2/msg/ros_test_msg.hpp" + +#include "event.pb.h" + +static const aimrt_type_support_base_t* type_support_array[]{ + aimrt::GetProtobufMessageTypeSupport(), + aimrt::GetRos2MessageTypeSupport()}; + +extern "C" { + +size_t AimRTDynlibGetTypeSupportArrayLength() { + return sizeof(type_support_array) / sizeof(type_support_array[0]); +} + +const aimrt_type_support_base_t** AimRTDynlibGetTypeSupportArray() { + return type_support_array; +} +} \ No newline at end of file diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml new file mode 100644 index 000000000..d427d78b1 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml @@ -0,0 +1,52 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: proxy_plugin + path: ./libaimrt_proxy_plugin.so + options: + type_support_pkgs: + - path: ./libexample_event_ts_pkg.so + proxy_actions: + - name: my_proxy + options: + executor: proxy_plugin_executor + topic_meta_list: + - sub_topic_name: test_topic_http + pub_topic_name: [test_topic_zenoh] + msg_type: pb:aimrt.protocols.example.ExampleEventMsg + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + - name: net_plugin + path: ./libaimrt_net_plugin.so + options: + thread_num: 4 + http_options: + listen_ip: 127.0.0.1 + listen_port: 50080 + log: + core_lvl: Info # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 4 + - name: proxy_plugin_executor + type: simple_thread + + channel: + backends: + - type: zenoh + - type: http + sub_topics_options: + - topic_name: test_topic_http + enable_backends: [http] + pub_topics_options: + - topic_name: test_topic_zenoh + enable_backends: [zenoh] diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml new file mode 100644 index 000000000..f1335db71 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml @@ -0,0 +1,45 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: net_plugin + path: ./libaimrt_net_plugin.so + options: + thread_num: 4 + http_options: + listen_ip: 127.0.0.1 + listen_port: 50081 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 2 + channel: + backends: + - type: http + options: + pub_topics_options: + - topic_name: "(.*)" + server_url_list: ["127.0.0.1:50080"] + pub_topics_options: + - topic_name: "(.*)" + enable_backends: [http] + module: + pkgs: + - path: ./libpb_chn_pub_pkg.so + enable_modules: [NormalPublisherModule] + modules: + - name: NormalPublisherModule + log_lvl: INFO + +# Module custom configuration +NormalPublisherModule: + topic_name: test_topic_http + channel_frq: 0.5 diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml new file mode 100644 index 000000000..ff70dc476 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml @@ -0,0 +1,32 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + channel: + backends: + - type: zenoh + sub_topics_options: + - topic_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_chn_sub_pkg.so + enable_modules: [NormalSubscriberModule] + modules: + - name: NormalSubscriberModule + log_lvl: INFO + +# Module custom configuration +NormalSubscriberModule: + topic_name: test_topic_zenoh diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin.sh b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin.sh new file mode 100755 index 000000000..15d5455e1 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_cfg.yaml diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_http_pub.sh b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_http_pub.sh new file mode 100755 index 000000000..6e5d89e19 --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_http_pub.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml diff --git a/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_zenoh_sub.sh b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_zenoh_sub.sh new file mode 100755 index 000000000..42000719f --- /dev/null +++ b/src/examples/plugins/proxy_plugin/install/linux/bin/start_examples_plugins_proxy_plugin_zenoh_sub.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt index 11503f829..0a4f45eb6 100644 --- a/src/plugins/CMakeLists.txt +++ b/src/plugins/CMakeLists.txt @@ -50,3 +50,7 @@ endif() if(AIMRT_BUILD_ECHO_PLUGIN) add_subdirectory(echo_plugin) endif() + +if(AIMRT_BUILD_PROXY_PLUGIN) + add_subdirectory(proxy_plugin) +endif() diff --git a/src/plugins/echo_plugin/echo_plugin.cc b/src/plugins/echo_plugin/echo_plugin.cc index 2d8357035..017f0982c 100644 --- a/src/plugins/echo_plugin/echo_plugin.cc +++ b/src/plugins/echo_plugin/echo_plugin.cc @@ -101,22 +101,11 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept { auto finditr = type_support_map_.find(topic_meta.msg_type); AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); - auto& type_support_ref = finditr->second.type_support_ref; - - // check serialization type - if (!topic_meta.serialization_type.empty()) { - bool check_ret = type_support_ref.CheckSerializationTypeSupported(topic_meta.serialization_type); - AIMRT_CHECK_ERROR_THROW(check_ret, - "Msg type '{}' does not support serialization type '{}'.", - topic_meta.msg_type, topic_meta.serialization_type); - } else { - topic_meta.serialization_type = type_support_ref.DefaultSerializationType(); - } } // check duplicate topic for (auto& topic_meta_option : options_.topic_meta_list) { - TopicMetaKey key{ + runtime::core::util::TopicMetaKey key{ .topic_name = topic_meta_option.topic_name, .msg_type = topic_meta_option.msg_type}; @@ -129,7 +118,7 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept { .topic_name = topic_meta_option.topic_name, .msg_type = topic_meta_option.msg_type, .echo_type = topic_meta_option.echo_type, - .serialization_type = topic_meta_option.serialization_type}; + }; topic_meta_map_.emplace(key, topic_meta); } @@ -193,7 +182,6 @@ void EchoPlugin::InitTypeSupport(Options::TypeSupportPkg& options) { void EchoPlugin::RegisterEchoChannel() { using namespace aimrt::runtime::core::channel; - using EchoFunc = std::function; const auto& topic_meta_list = topic_meta_map_; AIMRT_TRACE("Echo plugin has {} topics.", topic_meta_list.size()); @@ -215,7 +203,7 @@ void EchoPlugin::RegisterEchoChannel() { .module_name = "core", .msg_type_support_ref = type_support_wrapper.type_support_ref}; - sub_wrapper.require_cache_serialization_types.emplace(topic_meta.serialization_type); + sub_wrapper.require_cache_serialization_types.emplace(topic_meta.echo_type); sub_wrapper.callback = [echo_type{topic_meta.echo_type}]( MsgWrapper& msg_wrapper, std::function&& release_callback) { auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type); diff --git a/src/plugins/echo_plugin/echo_plugin.h b/src/plugins/echo_plugin/echo_plugin.h index 53df31813..ce06a4020 100644 --- a/src/plugins/echo_plugin/echo_plugin.h +++ b/src/plugins/echo_plugin/echo_plugin.h @@ -9,8 +9,9 @@ #include "core/channel/channel_backend_tools.h" #include "core/util/type_support_pkg_loader.h" +#include "core/util/topic_meta_key.h" #include "echo_plugin/global.h" -#include "echo_plugin/topic_meta_key.h" +#include "topic_meta.h" #include "yaml-cpp/yaml.h" namespace aimrt::plugins::echo_plugin { @@ -21,7 +22,6 @@ class EchoPlugin : public AimRTCorePluginBase { struct TopicMeta { std::string topic_name; std::string msg_type; - std::string serialization_type; std::string echo_type; }; std::vector topic_meta_list; @@ -63,7 +63,9 @@ class EchoPlugin : public AimRTCorePluginBase { std::unordered_map type_support_map_; - std::unordered_map topic_meta_map_; + std::unordered_map + topic_meta_map_; }; } // namespace aimrt::plugins::echo_plugin diff --git a/src/plugins/echo_plugin/topic_meta.h b/src/plugins/echo_plugin/topic_meta.h new file mode 100644 index 000000000..7282e20b2 --- /dev/null +++ b/src/plugins/echo_plugin/topic_meta.h @@ -0,0 +1,16 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include + +namespace aimrt::plugins::echo_plugin { + +struct TopicMeta { + std::string topic_name; + std::string msg_type; + std::string echo_type; +}; + +} // namespace aimrt::plugins::echo_plugin diff --git a/src/plugins/echo_plugin/topic_meta_key.h b/src/plugins/echo_plugin/topic_meta_key.h deleted file mode 100644 index 0c748b4e6..000000000 --- a/src/plugins/echo_plugin/topic_meta_key.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) 2023, AgiBot Inc. -// All rights reserved. - -#pragma once - -#include -#include - -namespace aimrt::plugins::echo_plugin { - -struct TopicMetaKey { - std::string topic_name; - std::string msg_type; - - bool operator==(const TopicMetaKey& rhs) const { - return topic_name == rhs.topic_name && msg_type == rhs.msg_type; - } - - struct Hash { - std::size_t operator()(const TopicMetaKey& k) const { - return (std::hash()(k.topic_name)) ^ - (std::hash()(k.msg_type)); - } - }; -}; - -struct TopicMeta { - std::string topic_name; - std::string msg_type; - std::string echo_type; - std::string serialization_type; -}; - -} // namespace aimrt::plugins::echo_plugin diff --git a/src/plugins/proxy_plugin/CMakeLists.txt b/src/plugins/proxy_plugin/CMakeLists.txt new file mode 100644 index 000000000..0fa5a932c --- /dev/null +++ b/src/plugins/proxy_plugin/CMakeLists.txt @@ -0,0 +1,53 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +# Get the current folder name +string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + +# Get namespace +get_namespace(CUR_SUPERIOR_NAMESPACE) +string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE}) + +# Set target name +set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}) +set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR}) + +# Set file collection +file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc) +file(GLOB_RECURSE test_files ${CMAKE_CURRENT_SOURCE_DIR}/*_test.cc) +list(REMOVE_ITEM src ${test_files}) + +# Add target +add_library(${CUR_TARGET_NAME} SHARED) +add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME}) + +# Set source file of target +target_sources(${CUR_TARGET_NAME} PRIVATE ${src}) + +# Set include path of target +target_include_directories( + ${CUR_TARGET_NAME} + PRIVATE $) + +# Set link libraries of target +target_link_libraries( + ${CUR_TARGET_NAME} + PRIVATE aimrt::interface::aimrt_core_plugin_interface + aimrt::runtime::core) + +# Add -Werror option +include(AddWerror) +add_werror(${CUR_TARGET_NAME}) + +# Set installation of target +if(AIMRT_INSTALL) + install(TARGETS ${CUR_TARGET_NAME} LIBRARY DESTINATION bin) +endif() + +# Set test of target +if(AIMRT_BUILD_TESTS AND test_files) + add_gtest_target(TEST_TARGET ${CUR_TARGET_NAME} TEST_SRC ${test_files}) +endif() + +# Set misc of target +set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME "aimrt_${CUR_DIR}") diff --git a/src/plugins/proxy_plugin/global.cc b/src/plugins/proxy_plugin/global.cc new file mode 100644 index 000000000..a14b9716a --- /dev/null +++ b/src/plugins/proxy_plugin/global.cc @@ -0,0 +1,15 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_plugin/global.h" + +namespace aimrt::plugins::proxy_plugin { + +aimrt::logger::LoggerRef global_logger; + +void SetLogger(aimrt::logger::LoggerRef logger) { global_logger = logger; } +aimrt::logger::LoggerRef GetLogger() { + return global_logger ? global_logger : aimrt::logger::GetSimpleLoggerRef(); +} + +} // namespace aimrt::plugins::proxy_plugin diff --git a/src/plugins/proxy_plugin/global.h b/src/plugins/proxy_plugin/global.h new file mode 100644 index 000000000..ed7cc9a5c --- /dev/null +++ b/src/plugins/proxy_plugin/global.h @@ -0,0 +1,13 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "aimrt_module_cpp_interface/logger/logger.h" + +namespace aimrt::plugins::proxy_plugin { + +void SetLogger(aimrt::logger::LoggerRef); +aimrt::logger::LoggerRef GetLogger(); + +} // namespace aimrt::plugins::proxy_plugin diff --git a/src/plugins/proxy_plugin/proxy_action.cc b/src/plugins/proxy_plugin/proxy_action.cc new file mode 100644 index 000000000..ea723b4b1 --- /dev/null +++ b/src/plugins/proxy_plugin/proxy_action.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_action.h" +#include "log_util.h" +#include "proxy_plugin/global.h" + +namespace YAML { +template <> +struct convert { + using Options = aimrt::plugins::proxy_plugin::ProxyAction::Options; + + static Node encode(const Options& rhs) { + Node node; + + node["executor"] = rhs.executor; + + node["topic_meta_list"] = YAML::Node(); + for (const auto& topic_meta : rhs.topic_meta_list) { + Node topic_meta_node; + topic_meta_node["sub_topic_name"] = topic_meta.sub_topic_name; + topic_meta_node["pub_topic_name"] = topic_meta.pub_topic_name; + topic_meta_node["msg_type"] = topic_meta.msg_type; + node["topic_meta_list"].push_back(topic_meta_node); + } + + return node; + } + static bool decode(const Node& node, Options& rhs) { + if (!node.IsMap()) return false; + + rhs.executor = node["executor"].as(); + + if (node["topic_meta_list"]) { + const auto& topic_meta_list = node["topic_meta_list"]; + if (topic_meta_list.IsSequence()) { + for (const auto& topic_meta_node : topic_meta_list) { + Options::TopicMeta topic_meta; + topic_meta.sub_topic_name = topic_meta_node["sub_topic_name"].as(); + topic_meta.pub_topic_name = topic_meta_node["pub_topic_name"].as>(); + topic_meta.msg_type = topic_meta_node["msg_type"].as(); + rhs.topic_meta_list.push_back(topic_meta); + } + } + } + return true; + } +}; + +} // namespace YAML + +namespace aimrt::plugins::proxy_plugin { +void ProxyAction::Initialize(YAML::Node options) { + if (options && !options.IsNull()) + options_ = options.as(); + + for (auto& topic_meta : options_.topic_meta_list) { + // check msg type + auto type_support_ref = get_type_support_func_(topic_meta.msg_type); + AIMRT_CHECK_ERROR_THROW(type_support_ref, "Can not get type support for msg type '{}'.", topic_meta.msg_type); + + // check duplicate topic meta + runtime::core::util::TopicMetaKey key{ + .topic_name = topic_meta.sub_topic_name, + .msg_type = topic_meta.msg_type}; + + AIMRT_CHECK_ERROR_THROW( + topic_meta_map_.find(key) == topic_meta_map_.end(), + "Duplicate topic meta, topic name: {}, msg type: {}.", + topic_meta.sub_topic_name, topic_meta.msg_type); + + TopicMeta topic_meta_info{ + .topic_name = topic_meta.sub_topic_name, + .msg_type = topic_meta.msg_type, + .pub_topic_name = topic_meta.pub_topic_name}; + topic_meta_map_.emplace(key, topic_meta_info); + + // check duplicate pub topic name + for (const auto& pub_topic_name : topic_meta.pub_topic_name) { + runtime::core::util::TopicMetaKey pub_key{ + .topic_name = pub_topic_name, + .msg_type = topic_meta.msg_type}; + AIMRT_CHECK_ERROR_THROW( + pub_topic_name_set_.find(pub_key) == pub_topic_name_set_.end(), + "Duplicate pub topic name: {}, msg type: {}.", + pub_topic_name, topic_meta.msg_type); + pub_topic_name_set_.insert(pub_key); + } + } +} + +void ProxyAction::InitExecutor() { + executor_ = get_executor_func_(options_.executor); + + AIMRT_CHECK_ERROR_THROW(executor_, "Can not get executor {}.", options_.executor); + + AIMRT_CHECK_ERROR_THROW( + executor_.ThreadSafe(), + "Proxy executor {} is not thread safe!", options_.executor); +} + +void ProxyAction::RegisterGetExecutorFunc( + const std::function& get_executor_func) { + get_executor_func_ = get_executor_func; +} + +void ProxyAction::RegisterGetTypeSupportFunc( + const std::function& get_type_support_func) { + get_type_support_func_ = get_type_support_func; +} + +void ProxyAction::Shutdown() { + AIMRT_INFO("Proxy action shutdown."); +} + +} // namespace aimrt::plugins::proxy_plugin \ No newline at end of file diff --git a/src/plugins/proxy_plugin/proxy_action.h b/src/plugins/proxy_plugin/proxy_action.h new file mode 100644 index 000000000..d421d1f85 --- /dev/null +++ b/src/plugins/proxy_plugin/proxy_action.h @@ -0,0 +1,73 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include +#include +#include +#include +#include + +#include "aimrt_module_cpp_interface/executor/executor.h" +#include "aimrt_module_cpp_interface/util/type_support.h" +#include "core/util/topic_meta_key.h" + +#include "topic_meta.h" +#include "yaml-cpp/yaml.h" + +namespace aimrt::plugins::proxy_plugin { + +class ProxyAction { + public: + struct Options { + struct TopicMeta { + std::string sub_topic_name; + std::vector pub_topic_name; + std::string msg_type; + }; + std::vector topic_meta_list; + struct TypeSupportPkg { + std::string path; + }; + std::vector type_support_pkgs; + std::string executor; + }; + + public: + ProxyAction() = default; + ~ProxyAction() = default; + + void Initialize(YAML::Node options); + void InitExecutor(); + void Shutdown(); + + const auto& GetTopicMetaMap() const { + return topic_meta_map_; + } + + auto& GetExecutor() { return executor_; } + + void RegisterGetExecutorFunc( + const std::function& get_executor_func); + + void RegisterGetTypeSupportFunc( + const std::function& get_type_support_func); + + private: + Options options_; + + aimrt::executor::ExecutorRef executor_; + + std::function get_executor_func_; + std::function get_type_support_func_; + + std::unordered_map + topic_meta_map_; + std::unordered_set + pub_topic_name_set_; +}; + +} // namespace aimrt::plugins::proxy_plugin diff --git a/src/plugins/proxy_plugin/proxy_plugin.cc b/src/plugins/proxy_plugin/proxy_plugin.cc new file mode 100644 index 000000000..132ac51cb --- /dev/null +++ b/src/plugins/proxy_plugin/proxy_plugin.cc @@ -0,0 +1,322 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_plugin.h" +#include +#include "aimrt_core.h" +#include "aimrt_core_plugin_base.h" +#include "channel/channel_backend_tools.h" +#include "channel/channel_msg_wrapper.h" +#include "channel/channel_registry.h" +#include "global.h" +#include "log_util.h" +#include "proxy_action.h" +#include "proxy_plugin.h" +#include "proxy_plugin/topic_meta.h" + +namespace YAML { + +template <> +struct convert { + using Options = aimrt::plugins::proxy_plugin::ProxyPlugin::Options; + + static Node encode(const Options& rhs) { + Node node; + + node["type_support_pkgs"] = Node(NodeType::Sequence); + for (const auto& type_support_pkg : rhs.type_support_pkgs) { + Node type_support_pkg_node; + type_support_pkg_node["path"] = type_support_pkg.path; + node["type_support_pkgs"].push_back(type_support_pkg_node); + } + + node["executor"] = rhs.executor; + node["proxy_actions"] = Node(NodeType::Sequence); + + for (const auto& proxy_action : rhs.proxy_actions) { + Node proxy_action_node; + proxy_action_node["name"] = proxy_action.name; + proxy_action_node["options"] = proxy_action.options; + node["proxy_actions"].push_back(proxy_action_node); + } + return node; + } + + static bool decode(const Node& node, Options& rhs) { + if (!node.IsMap()) return false; + + if (node["type_support_pkgs"] && node["type_support_pkgs"].IsSequence()) { + for (const auto& type_support_pkg_node : node["type_support_pkgs"]) { + Options::TypeSupportPkg type_support_pkg; + type_support_pkg.path = type_support_pkg_node["path"].as(); + rhs.type_support_pkgs.push_back(std::move(type_support_pkg)); + } + } + + if (node["executor"] && node["executor"].IsScalar()) { + rhs.executor = node["executor"].as(); + } + + if (node["proxy_actions"] && node["proxy_actions"].IsSequence()) { + for (const auto& proxy_action_node : node["proxy_actions"]) { + Options::ProxyAction proxy_action; + proxy_action.name = proxy_action_node["name"].as(); + proxy_action.options = proxy_action_node["options"]; + rhs.proxy_actions.push_back(std::move(proxy_action)); + } + } + return true; + } +}; + +} // namespace YAML + +namespace aimrt::plugins::proxy_plugin { + +bool ProxyPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept { + try { + core_ptr_ = core_ptr; + + YAML::Node plugin_options_node = core_ptr_->GetPluginManager().GetPluginOptionsNode(Name()); + + if (plugin_options_node && !plugin_options_node.IsNull()) { + options_ = plugin_options_node.as(); + AIMRT_TRACE("Load plugin options."); + } + + init_flag_ = true; + // type support + for (auto& type_support_pkg : options_.type_support_pkgs) { + // check duplicate pkg + auto finditr = std::find_if( + options_.type_support_pkgs.begin(), options_.type_support_pkgs.end(), + [&type_support_pkg](const auto& op) { + if (&type_support_pkg == &op) return false; + return op.path == type_support_pkg.path; + }); + AIMRT_CHECK_ERROR_THROW(finditr == options_.type_support_pkgs.end(), + "Duplicate pkg path {}", type_support_pkg.path); + InitTypeSupport(type_support_pkg); + } + AIMRT_TRACE("Load {} pkg and {} type.", + type_support_pkg_loader_vec_.size(), type_support_map_.size()); + + // proxy action + for (auto& proxy_action : options_.proxy_actions) { + // check duplicate proxy action name + auto finditr = std::find_if( + options_.proxy_actions.begin(), options_.proxy_actions.end(), + [&proxy_action](const auto& op) { + if (&proxy_action == &op) return false; + return op.name == proxy_action.name; + }); + AIMRT_CHECK_ERROR_THROW(finditr == options_.proxy_actions.end(), + "Duplicate proxy action name {}", proxy_action.name); + + auto action_ptr = std::make_unique(); + + action_ptr->RegisterGetExecutorFunc([this](std::string_view name) -> aimrt::executor::ExecutorRef { + return core_ptr_->GetExecutorManager().GetExecutor(name); + }); + action_ptr->RegisterGetTypeSupportFunc([this](std::string_view name) -> aimrt::util::TypeSupportRef { + auto finditr = type_support_map_.find(name); + AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), + "Can not find type support for msg type '{}'.", name); + return finditr->second.type_support_ref; + }); + action_ptr->Initialize(proxy_action.options); + + proxy_action_map_.emplace(proxy_action.name, std::move(action_ptr)); + } + + core_ptr_->RegisterHookFunc( + runtime::core::AimRTCore::State::kPostInitLog, + [this] { + SetLogger(aimrt::logger::LoggerRef( + core_ptr_->GetLoggerManager().GetLoggerProxy().NativeHandle())); + }); + core_ptr_->RegisterHookFunc( + runtime::core::AimRTCore::State::kPreInitModules, + [this] { + for (auto& proxy_action_itr : proxy_action_map_) { + proxy_action_itr.second->InitExecutor(); + } + RegisterSubChannel(); + RegisterPubChannel(); + }); + core_ptr_->RegisterHookFunc( + runtime::core::AimRTCore::State::kPreShutdown, + [this] { + SetLogger(aimrt::logger::GetSimpleLoggerRef()); + }); + + plugin_options_node = options_; + core_ptr_->GetPluginManager().UpdatePluginOptionsNode(Name(), plugin_options_node); + return true; + } catch (const std::exception& e) { + AIMRT_ERROR("Initialize failed, {}", e.what()); + } + + return false; +} + +void ProxyPlugin::InitTypeSupport(Options::TypeSupportPkg& options) { + auto loader_ptr = std::make_unique(); + loader_ptr->LoadTypeSupportPkg(options.path); + + options.path = loader_ptr->GetDynamicLib().GetLibFullPath(); + + auto type_support_array = loader_ptr->GetTypeSupportArray(); + for (const auto* item : type_support_array) { + aimrt::util::TypeSupportRef type_support_ref(item); + auto type_name = type_support_ref.TypeName(); + + // check duplicate type + auto finditr = type_support_map_.find(type_name); + if (finditr != type_support_map_.end()) { + AIMRT_WARN("Duplicate msg type '{}' in {} and {}.", + type_name, options.path, finditr->second.options.path); + continue; + } + + type_support_map_.emplace( + type_name, + TypeSupportWrapper{ + .options = options, + .type_support_ref = type_support_ref, + .loader_ptr = loader_ptr.get()}); + } + type_support_pkg_loader_vec_.emplace_back(std::move(loader_ptr)); + AIMRT_TRACE("Load {} type support pkgs.", type_support_pkg_loader_vec_.size()); +} + +void ProxyPlugin::RegisterSubChannel() { + using namespace aimrt::runtime::core::channel; + + for (const auto& [_, proxy_action] : proxy_action_map_) { + const auto& action_topic_meta_map = proxy_action->GetTopicMetaMap(); + for (const auto& [_, topic_meta] : action_topic_meta_map) { + auto finditr = type_support_map_.find(topic_meta.msg_type); + AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), + "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); + + const auto& type_support_wrapper = finditr->second; + + SubscribeWrapper subscribe_wrapper{ + .info = { + .msg_type = topic_meta.msg_type, + .topic_name = topic_meta.topic_name, + .pkg_path = type_support_wrapper.options.path, + .module_name = "core", + .msg_type_support_ref = type_support_wrapper.type_support_ref}}; + subscribe_wrapper.callback = [this, action_raw_ptr = proxy_action.get()]( + MsgWrapper& msg_wrapper, std::function&& release_callback) { + if (msg_wrapper.msg_ptr == nullptr && msg_wrapper.serialization_cache.size() == 0) [[unlikely]] { + AIMRT_WARN("Receive empty msg, ignore it."); + release_callback(); + return; + } + action_raw_ptr->GetExecutor().Execute([this, msg_wrapper, topic_meta_map = action_raw_ptr->GetTopicMetaMap()]() { + + runtime::core::util::TopicMetaKey key{ + .topic_name = msg_wrapper.info.topic_name, + .msg_type = msg_wrapper.info.msg_type, + }; + + auto finditr = topic_meta_map.find(key); + AIMRT_CHECK_ERROR_THROW(finditr != topic_meta_map.end(), + "Can not find topic meta, topic name: {}, msg type: {}.", + key.topic_name, key.msg_type); + + for (auto &pub_topic_name : finditr->second.pub_topic_name) { + runtime::core::util::TopicMetaKey pub_key{ + .topic_name = pub_topic_name, + .msg_type = key.msg_type, + }; + const auto& topic_pub_wrapper = topic_pub_wrapper_map_.find(pub_key)->second; + AIMRT_CHECK_ERROR_THROW(topic_pub_wrapper.pub_type_wrapper_ptr, "Get publish type wrapper failed!"); + + aimrt::channel::Context ctx; + MsgWrapper pub_msg_wrapper{ + .info = topic_pub_wrapper.pub_type_wrapper_ptr->info, + .msg_ptr = msg_wrapper.msg_ptr, + .ctx_ref = ctx, + }; + pub_msg_wrapper.serialization_cache = msg_wrapper.serialization_cache; + + core_ptr_->GetChannelManager().Publish(std::move(pub_msg_wrapper)); + + } }); + release_callback(); + }; + bool ret = core_ptr_->GetChannelManager().Subscribe(std::move(subscribe_wrapper)); + AIMRT_CHECK_ERROR_THROW(ret, "Register subscribe channel failed!"); + } + } +} + +void ProxyPlugin::RegisterPubChannel() { + using namespace aimrt::runtime::core::channel; + + for (auto& proxy_action_itr : proxy_action_map_) { + // register publish type + auto& proxy_action = *(proxy_action_itr.second); + + const auto& topic_meta_map = proxy_action.GetTopicMetaMap(); + + for (const auto& [_, topic_meta] : topic_meta_map) { + auto finditr = type_support_map_.find(topic_meta.msg_type); + AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), + "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); + + const auto& type_support_wrapper = finditr->second; + + // register publish type + for (auto& pub_topic_name : topic_meta.pub_topic_name) { + PublishTypeWrapper pub_type_wrapper; + pub_type_wrapper.info = TopicInfo{ + .msg_type = topic_meta.msg_type, + .topic_name = pub_topic_name, + .pkg_path = type_support_wrapper.options.path, + .module_name = "core", + .msg_type_support_ref = type_support_wrapper.type_support_ref}; + + bool ret = core_ptr_->GetChannelManager().RegisterPublishType(std::move(pub_type_wrapper)); + AIMRT_CHECK_ERROR_THROW(ret, "Register publish type failed!"); + } + } + // map pub_type_wrapper_ptr + for (const auto& [_, topic_meta] : topic_meta_map) { + for (auto& pub_topic_name : topic_meta.pub_topic_name) { + runtime::core::util::TopicMetaKey key{ + .topic_name = pub_topic_name, + .msg_type = topic_meta.msg_type}; + auto finditr = type_support_map_.find(topic_meta.msg_type); + AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), + "Can not find type '{}' in any type support pkg!", topic_meta.msg_type); + + const auto& type_support_wrapper = finditr->second; + + const auto* pub_type_wrapper_ptr = core_ptr_->GetChannelManager().GetChannelRegistry()->GetPublishTypeWrapperPtr( + topic_meta.msg_type, pub_topic_name, type_support_wrapper.options.path, "core"); + + AIMRT_CHECK_ERROR_THROW(pub_type_wrapper_ptr, "Get publish type wrapper failed!"); + + topic_pub_wrapper_map_.emplace(key, TopicPubWrapper{.pub_type_wrapper_ptr = pub_type_wrapper_ptr}); + } + } + } +} + +void ProxyPlugin::Shutdown() noexcept { + try { + if (!init_flag_) return; + for (auto& proxy_action_itr : proxy_action_map_) { + proxy_action_itr.second->Shutdown(); + } + } catch (const std::exception& e) { + AIMRT_ERROR("Shutdown failed, {}", e.what()); + } +} + +} // namespace aimrt::plugins::proxy_plugin \ No newline at end of file diff --git a/src/plugins/proxy_plugin/proxy_plugin.h b/src/plugins/proxy_plugin/proxy_plugin.h new file mode 100644 index 000000000..dedeffa99 --- /dev/null +++ b/src/plugins/proxy_plugin/proxy_plugin.h @@ -0,0 +1,82 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include +#include +#include +#include "aimrt_core_plugin_interface/aimrt_core_plugin_base.h" +#include "aimrt_module_cpp_interface/executor/executor.h" +#include "aimrt_module_cpp_interface/util/type_support.h" +#include "core/aimrt_core.h" +#include "core/util/topic_meta_key.h" +#include "core/util/type_support_pkg_loader.h" + +#include "proxy_action.h" +#include "proxy_plugin/topic_meta.h" + +namespace aimrt::plugins::proxy_plugin { + +class ProxyPlugin : public AimRTCorePluginBase { + public: + struct Options { + struct ProxyAction { + std::string name; + YAML::Node options; + }; + std::vector proxy_actions; + struct TypeSupportPkg { + std::string path; + }; + std::vector type_support_pkgs; + std::string executor; + }; + + public: + ProxyPlugin() = default; + ~ProxyPlugin() override = default; + + std::string_view Name() const noexcept override { return "proxy_plugin"; } + + bool Initialize(runtime::core::AimRTCore* core_ptr) noexcept override; + void Shutdown() noexcept override; + + private: + void InitTypeSupport(Options::TypeSupportPkg& options); + + void RegisterSubChannel(); + void RegisterPubChannel(); + + private: + runtime::core::AimRTCore* core_ptr_ = nullptr; + + aimrt::executor::ExecutorRef executor_; + + Options options_; + + bool init_flag_ = false; + + struct TypeSupportWrapper { + const Options::TypeSupportPkg& options; + aimrt::util::TypeSupportRef type_support_ref; + runtime::core::util::TypeSupportPkgLoader* loader_ptr; + }; + + struct TopicPubWrapper { + const aimrt::runtime::core::channel::PublishTypeWrapper* pub_type_wrapper_ptr; + }; + + std::unordered_map type_support_map_; + + std::unordered_map + topic_pub_wrapper_map_; + + std::vector> + type_support_pkg_loader_vec_; + + std::unordered_map> proxy_action_map_; +}; + +} // namespace aimrt::plugins::proxy_plugin diff --git a/src/plugins/proxy_plugin/proxy_plugin_main.cc b/src/plugins/proxy_plugin/proxy_plugin_main.cc new file mode 100644 index 000000000..020fde0f0 --- /dev/null +++ b/src/plugins/proxy_plugin/proxy_plugin_main.cc @@ -0,0 +1,16 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "aimrt_core_plugin_interface/aimrt_core_plugin_main.h" +#include "proxy_plugin/proxy_plugin.h" + +extern "C" { + +aimrt::AimRTCorePluginBase* AimRTDynlibCreateCorePluginHandle() { + return new aimrt::plugins::proxy_plugin::ProxyPlugin(); +} + +void AimRTDynlibDestroyCorePluginHandle(const aimrt::AimRTCorePluginBase* plugin) { + delete plugin; +} +} diff --git a/src/plugins/proxy_plugin/topic_meta.h b/src/plugins/proxy_plugin/topic_meta.h new file mode 100644 index 000000000..a3d075a90 --- /dev/null +++ b/src/plugins/proxy_plugin/topic_meta.h @@ -0,0 +1,17 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include +#include + +namespace aimrt::plugins::proxy_plugin { + +struct TopicMeta { + std::string topic_name; + std::string msg_type; + std::vector pub_topic_name; +}; + +} // namespace aimrt::plugins::proxy_plugin diff --git a/src/plugins/record_playback_plugin/metadata_yaml.h b/src/plugins/record_playback_plugin/metadata_yaml.h index fa3e5ef28..cdf01e3da 100644 --- a/src/plugins/record_playback_plugin/metadata_yaml.h +++ b/src/plugins/record_playback_plugin/metadata_yaml.h @@ -5,7 +5,7 @@ #include -#include "record_playback_plugin/topic_meta_key.h" +#include "record_playback_plugin/topic_meta.h" #include "yaml-cpp/yaml.h" diff --git a/src/plugins/record_playback_plugin/playback_action.h b/src/plugins/record_playback_plugin/playback_action.h index 39fad5881..a544feb03 100644 --- a/src/plugins/record_playback_plugin/playback_action.h +++ b/src/plugins/record_playback_plugin/playback_action.h @@ -10,8 +10,9 @@ #include "aimrt_module_cpp_interface/executor/executor.h" #include "aimrt_module_cpp_interface/util/buffer.h" #include "aimrt_module_cpp_interface/util/type_support.h" +#include "core/util/topic_meta_key.h" #include "record_playback_plugin/metadata_yaml.h" -#include "record_playback_plugin/topic_meta_key.h" +#include "record_playback_plugin/topic_meta.h" #include "sqlite3.h" #include "yaml-cpp/yaml.h" diff --git a/src/plugins/record_playback_plugin/record_action.cc b/src/plugins/record_playback_plugin/record_action.cc index 19508345e..34eb53918 100644 --- a/src/plugins/record_playback_plugin/record_action.cc +++ b/src/plugins/record_playback_plugin/record_action.cc @@ -144,7 +144,7 @@ void RecordAction::Initialize(YAML::Node options) { metadata_.version = kVersion; for (auto& topic_meta_option : options_.topic_meta_list) { - TopicMetaKey key{ + runtime::core::util::TopicMetaKey key{ .topic_name = topic_meta_option.topic_name, .msg_type = topic_meta_option.msg_type}; diff --git a/src/plugins/record_playback_plugin/record_action.h b/src/plugins/record_playback_plugin/record_action.h index 2e8811253..16eb18527 100644 --- a/src/plugins/record_playback_plugin/record_action.h +++ b/src/plugins/record_playback_plugin/record_action.h @@ -11,8 +11,9 @@ #include "aimrt_module_cpp_interface/executor/executor.h" #include "aimrt_module_cpp_interface/util/buffer.h" #include "aimrt_module_cpp_interface/util/type_support.h" +#include "core/util/topic_meta_key.h" #include "record_playback_plugin/metadata_yaml.h" -#include "record_playback_plugin/topic_meta_key.h" +#include "record_playback_plugin/topic_meta.h" #include "sqlite3.h" #include "yaml-cpp/yaml.h" @@ -98,7 +99,9 @@ class RecordAction { aimrt::executor::ExecutorRef executor_; std::function get_type_support_func_; - std::unordered_map topic_meta_map_; + std::unordered_map + topic_meta_map_; size_t max_bag_size_ = 0; size_t cur_data_size_ = 0; diff --git a/src/plugins/record_playback_plugin/record_playback_plugin.cc b/src/plugins/record_playback_plugin/record_playback_plugin.cc index 8f2e76aaf..09c8d9c1e 100644 --- a/src/plugins/record_playback_plugin/record_playback_plugin.cc +++ b/src/plugins/record_playback_plugin/record_playback_plugin.cc @@ -8,6 +8,7 @@ #include "core/channel/channel_backend_tools.h" #include "record_playback_plugin/global.h" #include "util/time_util.h" +#include "util/topic_meta_key.h" namespace YAML { @@ -294,7 +295,6 @@ void RecordPlaybackPlugin::RegisterRpcService() { void RecordPlaybackPlugin::RegisterRecordChannel() { using namespace aimrt::runtime::core::channel; - using RecordFunc = std::function; struct Wrapper { @@ -302,7 +302,9 @@ void RecordPlaybackPlugin::RegisterRecordChannel() { std::vector record_func_vec; }; - std::unordered_map recore_func_map; + std::unordered_map + recore_func_map; for (auto& record_action_itr : record_action_map_) { auto& record_action = *(record_action_itr.second); @@ -389,7 +391,9 @@ void RecordPlaybackPlugin::RegisterRecordChannel() { void RecordPlaybackPlugin::RegisterPlaybackChannel() { using namespace aimrt::runtime::core::channel; - std::unordered_set playback_topic_meta_set; + std::unordered_set + playback_topic_meta_set; // 处理 playback action for (auto& playback_action_itr : playback_action_map_) { @@ -398,7 +402,7 @@ void RecordPlaybackPlugin::RegisterPlaybackChannel() { const auto& topic_meta_map = playback_action.GetTopicMetaMap(); for (const auto& topic_meta_itr : topic_meta_map) { - playback_topic_meta_set.emplace(TopicMetaKey{ + playback_topic_meta_set.emplace(aimrt::runtime::core::util::TopicMetaKey{ .topic_name = topic_meta_itr.second.topic_name, .msg_type = topic_meta_itr.second.msg_type}); } diff --git a/src/plugins/record_playback_plugin/topic_meta.h b/src/plugins/record_playback_plugin/topic_meta.h new file mode 100644 index 000000000..9ddb53b6e --- /dev/null +++ b/src/plugins/record_playback_plugin/topic_meta.h @@ -0,0 +1,17 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include + +namespace aimrt::plugins::record_playback_plugin { + +struct TopicMeta { + uint64_t id; + std::string topic_name; + std::string msg_type; + std::string serialization_type; +}; + +} // namespace aimrt::plugins::record_playback_plugin \ No newline at end of file diff --git a/src/plugins/record_playback_plugin/topic_meta_key.h b/src/runtime/core/util/topic_meta_key.h similarity index 68% rename from src/plugins/record_playback_plugin/topic_meta_key.h rename to src/runtime/core/util/topic_meta_key.h index b240bab18..9c85bfe78 100644 --- a/src/plugins/record_playback_plugin/topic_meta_key.h +++ b/src/runtime/core/util/topic_meta_key.h @@ -2,10 +2,9 @@ // All rights reserved. #pragma once - #include -namespace aimrt::plugins::record_playback_plugin { +namespace aimrt::runtime::core::util { struct TopicMetaKey { std::string topic_name; @@ -23,11 +22,4 @@ struct TopicMetaKey { }; }; -struct TopicMeta { - uint64_t id; - std::string topic_name; - std::string msg_type; - std::string serialization_type; -}; - -} // namespace aimrt::plugins::record_playback_plugin \ No newline at end of file +} // namespace aimrt::runtime::core::util diff --git a/test.bat b/test.bat index 8beb25198..f2c353331 100644 --- a/test.bat +++ b/test.bat @@ -28,6 +28,7 @@ cmake -B build ^ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^ -DAIMRT_BUILD_GRPC_PLUGIN=OFF ^ -DAIMRT_BUILD_ECHO_PLUGIN=OFF ^ + -DAIMRT_BUILD_PROXY_PLUGIN=ON ^ -DAIMRT_BUILD_PYTHON_PACKAGE=ON ^ %* diff --git a/test.sh b/test.sh index 3cbe550a2..c330eadd9 100755 --- a/test.sh +++ b/test.sh @@ -30,6 +30,7 @@ cmake -B build \ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \ -DAIMRT_BUILD_GRPC_PLUGIN=ON \ -DAIMRT_BUILD_ECHO_PLUGIN=ON \ + -DAIMRT_BUILD_PROXY_PLUGIN=ON \ -DAIMRT_BUILD_PYTHON_PACKAGE=ON \ $@