feat: add proxy plugin to transfer messages from one backend to multiple backends (#108)

* feat(plugins): add proxy plugin

* feat: add proxy action

* feat: del the state in proxy action

* feat: format

* fix : add check not to pub same topic and msg_type

* fix: remove TimerSchedule  executor check

* docs: add docs

* doc: change docs

* fix: remove some unnessary code

* refactor(proxy_plugin): migrate TopicMetaKey to core/util directory

* fix: simplify the code

* format

* fix: struct bind

* perf(proxy): use action_raw_ptr capture rather than reference capture local variable

* docs: update proxy_plugin documentation and add example configuration

* fix: remove necessary check in echo plugin

* doc: add proxy plugin example docs

* fix: migrate recordplayback plugin and echo plugin's topic_meta_key into util

* fix: format

* fix: remove necessary code

* perf: remove unnecessary code
This commit is contained in:
ATT_POWER 2024-11-23 10:25:48 +08:00 committed by GitHub
parent df5d21e49c
commit 18d45dbb0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1132 additions and 71 deletions

View File

@ -44,6 +44,7 @@ cmake_dependent_option(AIMRT_BUILD_TIME_MANIPULATOR_PLUGIN "AimRT build time man
cmake_dependent_option(AIMRT_BUILD_PARAMETER_PLUGIN "AimRT build parameter plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF) cmake_dependent_option(AIMRT_BUILD_PARAMETER_PLUGIN "AimRT build parameter plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF)
cmake_dependent_option(AIMRT_BUILD_LOG_CONTROL_PLUGIN "AimRT build log control plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF) cmake_dependent_option(AIMRT_BUILD_LOG_CONTROL_PLUGIN "AimRT build log control plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF)
cmake_dependent_option(AIMRT_BUILD_GRPC_PLUGIN "AimRT build grpc plugin." OFF "AIMRT_BUILD_RUNTIME" OFF) cmake_dependent_option(AIMRT_BUILD_GRPC_PLUGIN "AimRT build grpc plugin." OFF "AIMRT_BUILD_RUNTIME" OFF)
cmake_dependent_option(AIMRT_BUILD_PROXY_PLUGIN "AimRT build proxy plugin." OFF "AIMRT_BUILD_RUNTIME" OFF)
option(AIMRT_INSTALL "Enable installation of AimRT." ON) option(AIMRT_INSTALL "Enable installation of AimRT." ON)

View File

@ -32,6 +32,7 @@ cmake -B build ^
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^
-DAIMRT_BUILD_GRPC_PLUGIN=OFF ^ -DAIMRT_BUILD_GRPC_PLUGIN=OFF ^
-DAIMRT_BUILD_ECHO_PLUGIN=OFF ^ -DAIMRT_BUILD_ECHO_PLUGIN=OFF ^
-DAIMRT_BUILD_PROXY_PLUGIN=ON ^
-DAIMRT_BUILD_PYTHON_PACKAGE=ON ^ -DAIMRT_BUILD_PYTHON_PACKAGE=ON ^
%* %*

View File

@ -34,6 +34,7 @@ cmake -B build \
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \
-DAIMRT_BUILD_GRPC_PLUGIN=ON \ -DAIMRT_BUILD_GRPC_PLUGIN=ON \
-DAIMRT_BUILD_ECHO_PLUGIN=ON \ -DAIMRT_BUILD_ECHO_PLUGIN=ON \
-DAIMRT_BUILD_PROXY_PLUGIN=ON \
-DAIMRT_BUILD_PYTHON_PACKAGE=ON \ -DAIMRT_BUILD_PYTHON_PACKAGE=ON \
$@ $@

View File

@ -11,6 +11,7 @@
- 新增 Echo 插件,用于回显消息; - 新增 Echo 插件,用于回显消息;
- 新增了基于执行器的定时器,方便执行定时任务; - 新增了基于执行器的定时器,方便执行定时任务;
- aimrt_py channel 和 rpc 支持 ros2 消息类型; - aimrt_py channel 和 rpc 支持 ros2 消息类型;
- 新增了 Proxy 插件,用于转发消息;
**次要修改** **次要修改**
- 缩短了一些 examples 的文件路径长度; - 缩短了一些 examples 的文件路径长度;

View File

@ -15,6 +15,7 @@ AimRT 提供了以下插件使用示例:
- {{ '[time_manipulator_plugin]({}/src/examples/plugins/time_manipulator_plugin)'.format(code_site_root_path_url) }} - {{ '[time_manipulator_plugin]({}/src/examples/plugins/time_manipulator_plugin)'.format(code_site_root_path_url) }}
- {{ '[zenoh_plugin]({}/src/examples/plugins/zenoh_plugin)'.format(code_site_root_path_url) }} - {{ '[zenoh_plugin]({}/src/examples/plugins/zenoh_plugin)'.format(code_site_root_path_url) }}
- {{ '[echo_plugin]({}/src/examples/plugins/echo_plugin)'.format(code_site_root_path_url) }} - {{ '[echo_plugin]({}/src/examples/plugins/echo_plugin)'.format(code_site_root_path_url) }}
- {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }}
关于这些示例的说明: 关于这些示例的说明:
- 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看; - 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看;

View File

@ -103,6 +103,7 @@ plugins/zenoh_plugin.md
plugins/iceoryx_plugin.md plugins/iceoryx_plugin.md
plugins/grpc_plugin.md plugins/grpc_plugin.md
plugins/echo_plugin.md plugins/echo_plugin.md
plugins/proxy_plugin.md
``` ```
如果开发者想定制开发自己的插件,可以参考以下文档。 如果开发者想定制开发自己的插件,可以参考以下文档。

View File

@ -0,0 +1,85 @@
# echo插件
## 相关链接
参考示例:
- {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }}
## 插件概述
**proxy_plugin**用于对 Channel 中的消息进行代理转发,插件支持独立的 type_support_pkg并支持指定执行器, 其中执行器需要线程安全,在使用时,插件会根据配置注册一个或多个 Channel Subscriber 或 Publisher。
插件的配置项如下:
| 节点 | 类型 | 是否可选| 默认值 | 作用 |
| ---- | ---- | ---- | ---- | ---- |
| type_support_pkgs | array | 必选 | [] | type support 包配置 |
| type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 |
| proxy_actions | array | 必选 | [] | 代理转发配置 |
| proxy_actions[i].name | string | 必选 | "" | 代理转发名称 |
| proxy_actions[i].options | object | 必选 | {} | 代理转发配置 |
| proxy_actions[i].options.executor| string | 必选 | "" | 代理转发执行器 |
| proxy_actions[i].options.topic_meta_list | array | 必选 | [] | 要代理转发的 topic 和类型 |
| proxy_actions[i].options.topic_meta_list[j].topic_name | string | 必选 | "" | 要代理转发的 topic |
| proxy_actions[i].options.topic_meta_list[j].msg_type | string | 必选 | "" | 要代理转发的消息类型 |
| proxy_actions[i].options.topic_meta_list[j].pub_topic_name | array | 必选 | [] | 代理转发后的 topic |
请注意,**proxy_plugin**中是以`action`为单元管理代理转发动作的,每个代理转发`action`可以有自己的执行器、topic 等参数,使用时可以根据数据实际大小和频率,为每个 action 分配合理的资源。
### 代理转发的简单示例配置
以下是将一个以 http 为后端的 topic 消息代理转发到两个以 zenoh 和 ros2 为后端的 topic 的简单示例配置,对于 proxy_plugin 需要为每个 action 指定执行器,并且在 channel 处需要为每个订阅的 topic 和转发的 topic 指定后端,其他相关插件的配置请参考[net_plugin](./net_plugin.md), [zenoh_plugin](./zenoh_plugin.md) 和 [ros2_plugin](./ros2_plugin.md);
```yaml
aimrt:
plugin:
plugins:
- name: proxy_plugin
path: ./libaimrt_proxy_plugin.so
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
proxy_actions:
- name: my_proxy
options:
executor: proxy_plugin_executor
topic_meta_list:
- sub_topic_name: test_topic_http
pub_topic_name: [test_topic_zenoh, test_topic_ros2]
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
- name: ros2_plugin
path: ./libaimrt_ros2_plugin.so
options:
node_name: example_ros2_pb_chn_publisher_node
executor_type: MultiThreaded # SingleThreaded/StaticSingleThreaded/MultiThreaded
executor_thread_num: 2
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50081
channel:
backends:
- type: http
- type: zenoh
- type: ros2
sub_topics_options:
- topic_name: test_topic_http
enable_backends: [http]
pub_topics_options:
- topic_name: test_topic_zenoh
enable_backends: [zenoh]
- topic_name: test_topic_ros2
enable_backends: [ros2]
# ...
```

View File

@ -61,3 +61,10 @@ endif()
if(AIMRT_BUILD_WITH_PROTOBUF AND AIMRT_BUILD_GRPC_PLUGIN) if(AIMRT_BUILD_WITH_PROTOBUF AND AIMRT_BUILD_GRPC_PLUGIN)
add_subdirectory(grpc_plugin) add_subdirectory(grpc_plugin)
endif() endif()
if(AIMRT_BUILD_PROXY_PLUGIN
AND AIMRT_BUILD_WITH_PROTOBUF
AND AIMRT_BUILD_NET_PLUGIN
AND AIMRT_BUILD_ZENOH_PLUGIN)
add_subdirectory(proxy_plugin)
endif()

View File

@ -1,3 +1,5 @@
#!/bin/bash #!/bin/bash
source install/share/example_ros2/local_setup.bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_ros2_cfg.yaml ./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_ros2_cfg.yaml

View File

@ -0,0 +1,29 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# Set namespace
set_namespace()
# type_support_pkg
add_subdirectory(example_event_ts_pkg)
# install
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/linux)
elseif(CMAKE_SYSTEM_NAME MATCHES "Windows")
set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/win)
else()
message(FATAL_ERROR "Unsupport os")
endif()
# build all
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})
add_custom_target(
${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}_build_all ALL
COMMAND ${CMAKE_COMMAND} -E copy_directory ${CUR_INSTALL_SOURCE_DIR}/bin ${CMAKE_BINARY_DIR}
DEPENDS aimrt::runtime::main #
aimrt::examples::cpp::pb_chn::pb_chn_pub_pkg)

View File

@ -0,0 +1,30 @@
# proxy plugin examples
一个基于 **proxy_plugin** 的代理转发示例,演示内容包括:
- 如何在启动时加载 **proxy_plugin**
- 如何配置消息转发的 topic
核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [type_support_pkg_main.cc](./example_event_ts_pkg/type_support_pkg_main.cc)
配置文件:
- [examples_plugins_proxy_plugin_http_pub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml)
- [examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml)
- [examples_plugins_proxy_plugin_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_PROTOBUF``AIMRT_BUILD_WITH_PROXY_PLUGIN``AIMRT_BUILD_WITH_ZENOH_PLUGIN``AIMRT_BUILD_WITH_NET_PLUGIN` 选项编译 AimRT
- 分别运行 build 目录下 `start_examples_plugins_proxy_plugin_zenoh_sub.sh` `start_examples_plugins_proxy_plugin.sh``start_examples_plugins_proxy_plugin_http_pub.sh` 脚本启动进程;
- 键入`ctrl-c`停止进程;
说明:
- 此示例创建了三个进程,分别为 http 发布消息进程、zenoh 订阅消息进程和 proxy 转发进程:
- `http_pub`,会以配置的频率,发布 `ExampleEventMsg` 消息到 `test_topic_http`
- `proxy`,会使用 `proxy_plugin_executor` 执行器,执行代理消息转发操作;
- `zenoh_sub`,会订阅 `test_topic_zenoh`
- 请注意proxy 插件的原理是向 AimRT 订阅指定的 Topic因此需要在 channel 配置中为该 topic 设置合适的后端,以保证插件能接收到数据,在此示例中,`test_topic_http` 配置了 `http` 后端,`test_topic_zenoh` 配置了 `zenoh` 后端;

View File

@ -0,0 +1,35 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# Get namespace
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})
# Set target name
set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR})
set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR})
# Set file collection
file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)
# Add target
add_library(${CUR_TARGET_NAME} SHARED)
add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME})
# Set source file of target
target_sources(${CUR_TARGET_NAME} PRIVATE ${src})
# Set link libraries of target
target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE aimrt::interface::aimrt_type_support_pkg_c_interface
aimrt::interface::aimrt_module_protobuf_interface
aimrt::protocols::example_pb_gencode
aimrt::interface::aimrt_module_ros2_interface
example_ros2::example_ros2__rosidl_generator_cpp
example_ros2::example_ros2__rosidl_typesupport_cpp)
# Set misc of target
set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME ${CUR_DIR})

View File

@ -0,0 +1,27 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "aimrt_type_support_pkg_c_interface/type_support_pkg_main.h"
#include "aimrt_module_protobuf_interface/util/protobuf_type_support.h"
#include "aimrt_module_ros2_interface/util/ros2_type_support.h"
#include "example_ros2/msg/ros_test_msg.hpp"
#include "event.pb.h"
static const aimrt_type_support_base_t* type_support_array[]{
aimrt::GetProtobufMessageTypeSupport<aimrt::protocols::example::ExampleEventMsg>(),
aimrt::GetRos2MessageTypeSupport<example_ros2::msg::RosTestMsg>()};
extern "C" {
size_t AimRTDynlibGetTypeSupportArrayLength() {
return sizeof(type_support_array) / sizeof(type_support_array[0]);
}
const aimrt_type_support_base_t** AimRTDynlibGetTypeSupportArray() {
return type_support_array;
}
}

View File

@ -0,0 +1,52 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: proxy_plugin
path: ./libaimrt_proxy_plugin.so
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
proxy_actions:
- name: my_proxy
options:
executor: proxy_plugin_executor
topic_meta_list:
- sub_topic_name: test_topic_http
pub_topic_name: [test_topic_zenoh]
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50080
log:
core_lvl: Info # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: proxy_plugin_executor
type: simple_thread
channel:
backends:
- type: zenoh
- type: http
sub_topics_options:
- topic_name: test_topic_http
enable_backends: [http]
pub_topics_options:
- topic_name: test_topic_zenoh
enable_backends: [zenoh]

View File

@ -0,0 +1,45 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50081
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 2
channel:
backends:
- type: http
options:
pub_topics_options:
- topic_name: "(.*)"
server_url_list: ["127.0.0.1:50080"]
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [http]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: INFO
# Module custom configuration
NormalPublisherModule:
topic_name: test_topic_http
channel_frq: 0.5

View File

@ -0,0 +1,32 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: zenoh
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [NormalSubscriberModule]
modules:
- name: NormalSubscriberModule
log_lvl: INFO
# Module custom configuration
NormalSubscriberModule:
topic_name: test_topic_zenoh

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml

View File

@ -50,3 +50,7 @@ endif()
if(AIMRT_BUILD_ECHO_PLUGIN) if(AIMRT_BUILD_ECHO_PLUGIN)
add_subdirectory(echo_plugin) add_subdirectory(echo_plugin)
endif() endif()
if(AIMRT_BUILD_PROXY_PLUGIN)
add_subdirectory(proxy_plugin)
endif()

View File

@ -101,22 +101,11 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept {
auto finditr = type_support_map_.find(topic_meta.msg_type); auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(), AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type); "Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
auto& type_support_ref = finditr->second.type_support_ref;
// check serialization type
if (!topic_meta.serialization_type.empty()) {
bool check_ret = type_support_ref.CheckSerializationTypeSupported(topic_meta.serialization_type);
AIMRT_CHECK_ERROR_THROW(check_ret,
"Msg type '{}' does not support serialization type '{}'.",
topic_meta.msg_type, topic_meta.serialization_type);
} else {
topic_meta.serialization_type = type_support_ref.DefaultSerializationType();
}
} }
// check duplicate topic // check duplicate topic
for (auto& topic_meta_option : options_.topic_meta_list) { for (auto& topic_meta_option : options_.topic_meta_list) {
TopicMetaKey key{ runtime::core::util::TopicMetaKey key{
.topic_name = topic_meta_option.topic_name, .topic_name = topic_meta_option.topic_name,
.msg_type = topic_meta_option.msg_type}; .msg_type = topic_meta_option.msg_type};
@ -129,7 +118,7 @@ bool EchoPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept {
.topic_name = topic_meta_option.topic_name, .topic_name = topic_meta_option.topic_name,
.msg_type = topic_meta_option.msg_type, .msg_type = topic_meta_option.msg_type,
.echo_type = topic_meta_option.echo_type, .echo_type = topic_meta_option.echo_type,
.serialization_type = topic_meta_option.serialization_type}; };
topic_meta_map_.emplace(key, topic_meta); topic_meta_map_.emplace(key, topic_meta);
} }
@ -193,7 +182,6 @@ void EchoPlugin::InitTypeSupport(Options::TypeSupportPkg& options) {
void EchoPlugin::RegisterEchoChannel() { void EchoPlugin::RegisterEchoChannel() {
using namespace aimrt::runtime::core::channel; using namespace aimrt::runtime::core::channel;
using EchoFunc = std::function<void(MsgWrapper&)>;
const auto& topic_meta_list = topic_meta_map_; const auto& topic_meta_list = topic_meta_map_;
AIMRT_TRACE("Echo plugin has {} topics.", topic_meta_list.size()); AIMRT_TRACE("Echo plugin has {} topics.", topic_meta_list.size());
@ -215,7 +203,7 @@ void EchoPlugin::RegisterEchoChannel() {
.module_name = "core", .module_name = "core",
.msg_type_support_ref = type_support_wrapper.type_support_ref}; .msg_type_support_ref = type_support_wrapper.type_support_ref};
sub_wrapper.require_cache_serialization_types.emplace(topic_meta.serialization_type); sub_wrapper.require_cache_serialization_types.emplace(topic_meta.echo_type);
sub_wrapper.callback = [echo_type{topic_meta.echo_type}]( sub_wrapper.callback = [echo_type{topic_meta.echo_type}](
MsgWrapper& msg_wrapper, std::function<void()>&& release_callback) { MsgWrapper& msg_wrapper, std::function<void()>&& release_callback) {
auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type); auto buffer_view_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, echo_type);

View File

@ -9,8 +9,9 @@
#include "core/channel/channel_backend_tools.h" #include "core/channel/channel_backend_tools.h"
#include "core/util/type_support_pkg_loader.h" #include "core/util/type_support_pkg_loader.h"
#include "core/util/topic_meta_key.h"
#include "echo_plugin/global.h" #include "echo_plugin/global.h"
#include "echo_plugin/topic_meta_key.h" #include "topic_meta.h"
#include "yaml-cpp/yaml.h" #include "yaml-cpp/yaml.h"
namespace aimrt::plugins::echo_plugin { namespace aimrt::plugins::echo_plugin {
@ -21,7 +22,6 @@ class EchoPlugin : public AimRTCorePluginBase {
struct TopicMeta { struct TopicMeta {
std::string topic_name; std::string topic_name;
std::string msg_type; std::string msg_type;
std::string serialization_type;
std::string echo_type; std::string echo_type;
}; };
std::vector<TopicMeta> topic_meta_list; std::vector<TopicMeta> topic_meta_list;
@ -63,7 +63,9 @@ class EchoPlugin : public AimRTCorePluginBase {
std::unordered_map<std::string_view, TypeSupportWrapper> type_support_map_; std::unordered_map<std::string_view, TypeSupportWrapper> type_support_map_;
std::unordered_map<TopicMetaKey, TopicMeta, TopicMetaKey::Hash> topic_meta_map_; std::unordered_map<runtime::core::util::TopicMetaKey, TopicMeta,
runtime::core::util::TopicMetaKey::Hash>
topic_meta_map_;
}; };
} // namespace aimrt::plugins::echo_plugin } // namespace aimrt::plugins::echo_plugin

View File

@ -0,0 +1,16 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <string>
namespace aimrt::plugins::echo_plugin {
struct TopicMeta {
std::string topic_name;
std::string msg_type;
std::string echo_type;
};
} // namespace aimrt::plugins::echo_plugin

View File

@ -1,34 +0,0 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <cstdint>
#include <string>
namespace aimrt::plugins::echo_plugin {
struct TopicMetaKey {
std::string topic_name;
std::string msg_type;
bool operator==(const TopicMetaKey& rhs) const {
return topic_name == rhs.topic_name && msg_type == rhs.msg_type;
}
struct Hash {
std::size_t operator()(const TopicMetaKey& k) const {
return (std::hash<std::string>()(k.topic_name)) ^
(std::hash<std::string>()(k.msg_type));
}
};
};
struct TopicMeta {
std::string topic_name;
std::string msg_type;
std::string echo_type;
std::string serialization_type;
};
} // namespace aimrt::plugins::echo_plugin

View File

@ -0,0 +1,53 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# Get namespace
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})
# Set target name
set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR})
set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR})
# Set file collection
file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)
file(GLOB_RECURSE test_files ${CMAKE_CURRENT_SOURCE_DIR}/*_test.cc)
list(REMOVE_ITEM src ${test_files})
# Add target
add_library(${CUR_TARGET_NAME} SHARED)
add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME})
# Set source file of target
target_sources(${CUR_TARGET_NAME} PRIVATE ${src})
# Set include path of target
target_include_directories(
${CUR_TARGET_NAME}
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>)
# Set link libraries of target
target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE aimrt::interface::aimrt_core_plugin_interface
aimrt::runtime::core)
# Add -Werror option
include(AddWerror)
add_werror(${CUR_TARGET_NAME})
# Set installation of target
if(AIMRT_INSTALL)
install(TARGETS ${CUR_TARGET_NAME} LIBRARY DESTINATION bin)
endif()
# Set test of target
if(AIMRT_BUILD_TESTS AND test_files)
add_gtest_target(TEST_TARGET ${CUR_TARGET_NAME} TEST_SRC ${test_files})
endif()
# Set misc of target
set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME "aimrt_${CUR_DIR}")

View File

@ -0,0 +1,15 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_plugin/global.h"
namespace aimrt::plugins::proxy_plugin {
aimrt::logger::LoggerRef global_logger;
void SetLogger(aimrt::logger::LoggerRef logger) { global_logger = logger; }
aimrt::logger::LoggerRef GetLogger() {
return global_logger ? global_logger : aimrt::logger::GetSimpleLoggerRef();
}
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,13 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "aimrt_module_cpp_interface/logger/logger.h"
namespace aimrt::plugins::proxy_plugin {
void SetLogger(aimrt::logger::LoggerRef);
aimrt::logger::LoggerRef GetLogger();
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,116 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_action.h"
#include "log_util.h"
#include "proxy_plugin/global.h"
namespace YAML {
template <>
struct convert<aimrt::plugins::proxy_plugin::ProxyAction::Options> {
using Options = aimrt::plugins::proxy_plugin::ProxyAction::Options;
static Node encode(const Options& rhs) {
Node node;
node["executor"] = rhs.executor;
node["topic_meta_list"] = YAML::Node();
for (const auto& topic_meta : rhs.topic_meta_list) {
Node topic_meta_node;
topic_meta_node["sub_topic_name"] = topic_meta.sub_topic_name;
topic_meta_node["pub_topic_name"] = topic_meta.pub_topic_name;
topic_meta_node["msg_type"] = topic_meta.msg_type;
node["topic_meta_list"].push_back(topic_meta_node);
}
return node;
}
static bool decode(const Node& node, Options& rhs) {
if (!node.IsMap()) return false;
rhs.executor = node["executor"].as<std::string>();
if (node["topic_meta_list"]) {
const auto& topic_meta_list = node["topic_meta_list"];
if (topic_meta_list.IsSequence()) {
for (const auto& topic_meta_node : topic_meta_list) {
Options::TopicMeta topic_meta;
topic_meta.sub_topic_name = topic_meta_node["sub_topic_name"].as<std::string>();
topic_meta.pub_topic_name = topic_meta_node["pub_topic_name"].as<std::vector<std::string>>();
topic_meta.msg_type = topic_meta_node["msg_type"].as<std::string>();
rhs.topic_meta_list.push_back(topic_meta);
}
}
}
return true;
}
};
} // namespace YAML
namespace aimrt::plugins::proxy_plugin {
void ProxyAction::Initialize(YAML::Node options) {
if (options && !options.IsNull())
options_ = options.as<Options>();
for (auto& topic_meta : options_.topic_meta_list) {
// check msg type
auto type_support_ref = get_type_support_func_(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(type_support_ref, "Can not get type support for msg type '{}'.", topic_meta.msg_type);
// check duplicate topic meta
runtime::core::util::TopicMetaKey key{
.topic_name = topic_meta.sub_topic_name,
.msg_type = topic_meta.msg_type};
AIMRT_CHECK_ERROR_THROW(
topic_meta_map_.find(key) == topic_meta_map_.end(),
"Duplicate topic meta, topic name: {}, msg type: {}.",
topic_meta.sub_topic_name, topic_meta.msg_type);
TopicMeta topic_meta_info{
.topic_name = topic_meta.sub_topic_name,
.msg_type = topic_meta.msg_type,
.pub_topic_name = topic_meta.pub_topic_name};
topic_meta_map_.emplace(key, topic_meta_info);
// check duplicate pub topic name
for (const auto& pub_topic_name : topic_meta.pub_topic_name) {
runtime::core::util::TopicMetaKey pub_key{
.topic_name = pub_topic_name,
.msg_type = topic_meta.msg_type};
AIMRT_CHECK_ERROR_THROW(
pub_topic_name_set_.find(pub_key) == pub_topic_name_set_.end(),
"Duplicate pub topic name: {}, msg type: {}.",
pub_topic_name, topic_meta.msg_type);
pub_topic_name_set_.insert(pub_key);
}
}
}
void ProxyAction::InitExecutor() {
executor_ = get_executor_func_(options_.executor);
AIMRT_CHECK_ERROR_THROW(executor_, "Can not get executor {}.", options_.executor);
AIMRT_CHECK_ERROR_THROW(
executor_.ThreadSafe(),
"Proxy executor {} is not thread safe!", options_.executor);
}
void ProxyAction::RegisterGetExecutorFunc(
const std::function<executor::ExecutorRef(std::string_view)>& get_executor_func) {
get_executor_func_ = get_executor_func;
}
void ProxyAction::RegisterGetTypeSupportFunc(
const std::function<aimrt::util::TypeSupportRef(std::string_view)>& get_type_support_func) {
get_type_support_func_ = get_type_support_func;
}
void ProxyAction::Shutdown() {
AIMRT_INFO("Proxy action shutdown.");
}
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,73 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/util/topic_meta_key.h"
#include "topic_meta.h"
#include "yaml-cpp/yaml.h"
namespace aimrt::plugins::proxy_plugin {
class ProxyAction {
public:
struct Options {
struct TopicMeta {
std::string sub_topic_name;
std::vector<std::string> pub_topic_name;
std::string msg_type;
};
std::vector<TopicMeta> topic_meta_list;
struct TypeSupportPkg {
std::string path;
};
std::vector<TypeSupportPkg> type_support_pkgs;
std::string executor;
};
public:
ProxyAction() = default;
~ProxyAction() = default;
void Initialize(YAML::Node options);
void InitExecutor();
void Shutdown();
const auto& GetTopicMetaMap() const {
return topic_meta_map_;
}
auto& GetExecutor() { return executor_; }
void RegisterGetExecutorFunc(
const std::function<executor::ExecutorRef(std::string_view)>& get_executor_func);
void RegisterGetTypeSupportFunc(
const std::function<aimrt::util::TypeSupportRef(std::string_view)>& get_type_support_func);
private:
Options options_;
aimrt::executor::ExecutorRef executor_;
std::function<executor::ExecutorRef(std::string_view)> get_executor_func_;
std::function<aimrt::util::TypeSupportRef(std::string_view)> get_type_support_func_;
std::unordered_map<runtime::core::util::TopicMetaKey, TopicMeta,
runtime::core::util::TopicMetaKey::Hash>
topic_meta_map_;
std::unordered_set<runtime::core::util::TopicMetaKey,
runtime::core::util::TopicMetaKey::Hash>
pub_topic_name_set_;
};
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,322 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_plugin.h"
#include <string>
#include "aimrt_core.h"
#include "aimrt_core_plugin_base.h"
#include "channel/channel_backend_tools.h"
#include "channel/channel_msg_wrapper.h"
#include "channel/channel_registry.h"
#include "global.h"
#include "log_util.h"
#include "proxy_action.h"
#include "proxy_plugin.h"
#include "proxy_plugin/topic_meta.h"
namespace YAML {
template <>
struct convert<aimrt::plugins::proxy_plugin::ProxyPlugin::Options> {
using Options = aimrt::plugins::proxy_plugin::ProxyPlugin::Options;
static Node encode(const Options& rhs) {
Node node;
node["type_support_pkgs"] = Node(NodeType::Sequence);
for (const auto& type_support_pkg : rhs.type_support_pkgs) {
Node type_support_pkg_node;
type_support_pkg_node["path"] = type_support_pkg.path;
node["type_support_pkgs"].push_back(type_support_pkg_node);
}
node["executor"] = rhs.executor;
node["proxy_actions"] = Node(NodeType::Sequence);
for (const auto& proxy_action : rhs.proxy_actions) {
Node proxy_action_node;
proxy_action_node["name"] = proxy_action.name;
proxy_action_node["options"] = proxy_action.options;
node["proxy_actions"].push_back(proxy_action_node);
}
return node;
}
static bool decode(const Node& node, Options& rhs) {
if (!node.IsMap()) return false;
if (node["type_support_pkgs"] && node["type_support_pkgs"].IsSequence()) {
for (const auto& type_support_pkg_node : node["type_support_pkgs"]) {
Options::TypeSupportPkg type_support_pkg;
type_support_pkg.path = type_support_pkg_node["path"].as<std::string>();
rhs.type_support_pkgs.push_back(std::move(type_support_pkg));
}
}
if (node["executor"] && node["executor"].IsScalar()) {
rhs.executor = node["executor"].as<std::string>();
}
if (node["proxy_actions"] && node["proxy_actions"].IsSequence()) {
for (const auto& proxy_action_node : node["proxy_actions"]) {
Options::ProxyAction proxy_action;
proxy_action.name = proxy_action_node["name"].as<std::string>();
proxy_action.options = proxy_action_node["options"];
rhs.proxy_actions.push_back(std::move(proxy_action));
}
}
return true;
}
};
} // namespace YAML
namespace aimrt::plugins::proxy_plugin {
bool ProxyPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcept {
try {
core_ptr_ = core_ptr;
YAML::Node plugin_options_node = core_ptr_->GetPluginManager().GetPluginOptionsNode(Name());
if (plugin_options_node && !plugin_options_node.IsNull()) {
options_ = plugin_options_node.as<Options>();
AIMRT_TRACE("Load plugin options.");
}
init_flag_ = true;
// type support
for (auto& type_support_pkg : options_.type_support_pkgs) {
// check duplicate pkg
auto finditr = std::find_if(
options_.type_support_pkgs.begin(), options_.type_support_pkgs.end(),
[&type_support_pkg](const auto& op) {
if (&type_support_pkg == &op) return false;
return op.path == type_support_pkg.path;
});
AIMRT_CHECK_ERROR_THROW(finditr == options_.type_support_pkgs.end(),
"Duplicate pkg path {}", type_support_pkg.path);
InitTypeSupport(type_support_pkg);
}
AIMRT_TRACE("Load {} pkg and {} type.",
type_support_pkg_loader_vec_.size(), type_support_map_.size());
// proxy action
for (auto& proxy_action : options_.proxy_actions) {
// check duplicate proxy action name
auto finditr = std::find_if(
options_.proxy_actions.begin(), options_.proxy_actions.end(),
[&proxy_action](const auto& op) {
if (&proxy_action == &op) return false;
return op.name == proxy_action.name;
});
AIMRT_CHECK_ERROR_THROW(finditr == options_.proxy_actions.end(),
"Duplicate proxy action name {}", proxy_action.name);
auto action_ptr = std::make_unique<ProxyAction>();
action_ptr->RegisterGetExecutorFunc([this](std::string_view name) -> aimrt::executor::ExecutorRef {
return core_ptr_->GetExecutorManager().GetExecutor(name);
});
action_ptr->RegisterGetTypeSupportFunc([this](std::string_view name) -> aimrt::util::TypeSupportRef {
auto finditr = type_support_map_.find(name);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type support for msg type '{}'.", name);
return finditr->second.type_support_ref;
});
action_ptr->Initialize(proxy_action.options);
proxy_action_map_.emplace(proxy_action.name, std::move(action_ptr));
}
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPostInitLog,
[this] {
SetLogger(aimrt::logger::LoggerRef(
core_ptr_->GetLoggerManager().GetLoggerProxy().NativeHandle()));
});
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPreInitModules,
[this] {
for (auto& proxy_action_itr : proxy_action_map_) {
proxy_action_itr.second->InitExecutor();
}
RegisterSubChannel();
RegisterPubChannel();
});
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPreShutdown,
[this] {
SetLogger(aimrt::logger::GetSimpleLoggerRef());
});
plugin_options_node = options_;
core_ptr_->GetPluginManager().UpdatePluginOptionsNode(Name(), plugin_options_node);
return true;
} catch (const std::exception& e) {
AIMRT_ERROR("Initialize failed, {}", e.what());
}
return false;
}
void ProxyPlugin::InitTypeSupport(Options::TypeSupportPkg& options) {
auto loader_ptr = std::make_unique<aimrt::runtime::core::util::TypeSupportPkgLoader>();
loader_ptr->LoadTypeSupportPkg(options.path);
options.path = loader_ptr->GetDynamicLib().GetLibFullPath();
auto type_support_array = loader_ptr->GetTypeSupportArray();
for (const auto* item : type_support_array) {
aimrt::util::TypeSupportRef type_support_ref(item);
auto type_name = type_support_ref.TypeName();
// check duplicate type
auto finditr = type_support_map_.find(type_name);
if (finditr != type_support_map_.end()) {
AIMRT_WARN("Duplicate msg type '{}' in {} and {}.",
type_name, options.path, finditr->second.options.path);
continue;
}
type_support_map_.emplace(
type_name,
TypeSupportWrapper{
.options = options,
.type_support_ref = type_support_ref,
.loader_ptr = loader_ptr.get()});
}
type_support_pkg_loader_vec_.emplace_back(std::move(loader_ptr));
AIMRT_TRACE("Load {} type support pkgs.", type_support_pkg_loader_vec_.size());
}
void ProxyPlugin::RegisterSubChannel() {
using namespace aimrt::runtime::core::channel;
for (const auto& [_, proxy_action] : proxy_action_map_) {
const auto& action_topic_meta_map = proxy_action->GetTopicMetaMap();
for (const auto& [_, topic_meta] : action_topic_meta_map) {
auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
const auto& type_support_wrapper = finditr->second;
SubscribeWrapper subscribe_wrapper{
.info = {
.msg_type = topic_meta.msg_type,
.topic_name = topic_meta.topic_name,
.pkg_path = type_support_wrapper.options.path,
.module_name = "core",
.msg_type_support_ref = type_support_wrapper.type_support_ref}};
subscribe_wrapper.callback = [this, action_raw_ptr = proxy_action.get()](
MsgWrapper& msg_wrapper, std::function<void()>&& release_callback) {
if (msg_wrapper.msg_ptr == nullptr && msg_wrapper.serialization_cache.size() == 0) [[unlikely]] {
AIMRT_WARN("Receive empty msg, ignore it.");
release_callback();
return;
}
action_raw_ptr->GetExecutor().Execute([this, msg_wrapper, topic_meta_map = action_raw_ptr->GetTopicMetaMap()]() {
runtime::core::util::TopicMetaKey key{
.topic_name = msg_wrapper.info.topic_name,
.msg_type = msg_wrapper.info.msg_type,
};
auto finditr = topic_meta_map.find(key);
AIMRT_CHECK_ERROR_THROW(finditr != topic_meta_map.end(),
"Can not find topic meta, topic name: {}, msg type: {}.",
key.topic_name, key.msg_type);
for (auto &pub_topic_name : finditr->second.pub_topic_name) {
runtime::core::util::TopicMetaKey pub_key{
.topic_name = pub_topic_name,
.msg_type = key.msg_type,
};
const auto& topic_pub_wrapper = topic_pub_wrapper_map_.find(pub_key)->second;
AIMRT_CHECK_ERROR_THROW(topic_pub_wrapper.pub_type_wrapper_ptr, "Get publish type wrapper failed!");
aimrt::channel::Context ctx;
MsgWrapper pub_msg_wrapper{
.info = topic_pub_wrapper.pub_type_wrapper_ptr->info,
.msg_ptr = msg_wrapper.msg_ptr,
.ctx_ref = ctx,
};
pub_msg_wrapper.serialization_cache = msg_wrapper.serialization_cache;
core_ptr_->GetChannelManager().Publish(std::move(pub_msg_wrapper));
} });
release_callback();
};
bool ret = core_ptr_->GetChannelManager().Subscribe(std::move(subscribe_wrapper));
AIMRT_CHECK_ERROR_THROW(ret, "Register subscribe channel failed!");
}
}
}
void ProxyPlugin::RegisterPubChannel() {
using namespace aimrt::runtime::core::channel;
for (auto& proxy_action_itr : proxy_action_map_) {
// register publish type
auto& proxy_action = *(proxy_action_itr.second);
const auto& topic_meta_map = proxy_action.GetTopicMetaMap();
for (const auto& [_, topic_meta] : topic_meta_map) {
auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
const auto& type_support_wrapper = finditr->second;
// register publish type
for (auto& pub_topic_name : topic_meta.pub_topic_name) {
PublishTypeWrapper pub_type_wrapper;
pub_type_wrapper.info = TopicInfo{
.msg_type = topic_meta.msg_type,
.topic_name = pub_topic_name,
.pkg_path = type_support_wrapper.options.path,
.module_name = "core",
.msg_type_support_ref = type_support_wrapper.type_support_ref};
bool ret = core_ptr_->GetChannelManager().RegisterPublishType(std::move(pub_type_wrapper));
AIMRT_CHECK_ERROR_THROW(ret, "Register publish type failed!");
}
}
// map pub_type_wrapper_ptr
for (const auto& [_, topic_meta] : topic_meta_map) {
for (auto& pub_topic_name : topic_meta.pub_topic_name) {
runtime::core::util::TopicMetaKey key{
.topic_name = pub_topic_name,
.msg_type = topic_meta.msg_type};
auto finditr = type_support_map_.find(topic_meta.msg_type);
AIMRT_CHECK_ERROR_THROW(finditr != type_support_map_.end(),
"Can not find type '{}' in any type support pkg!", topic_meta.msg_type);
const auto& type_support_wrapper = finditr->second;
const auto* pub_type_wrapper_ptr = core_ptr_->GetChannelManager().GetChannelRegistry()->GetPublishTypeWrapperPtr(
topic_meta.msg_type, pub_topic_name, type_support_wrapper.options.path, "core");
AIMRT_CHECK_ERROR_THROW(pub_type_wrapper_ptr, "Get publish type wrapper failed!");
topic_pub_wrapper_map_.emplace(key, TopicPubWrapper{.pub_type_wrapper_ptr = pub_type_wrapper_ptr});
}
}
}
}
void ProxyPlugin::Shutdown() noexcept {
try {
if (!init_flag_) return;
for (auto& proxy_action_itr : proxy_action_map_) {
proxy_action_itr.second->Shutdown();
}
} catch (const std::exception& e) {
AIMRT_ERROR("Shutdown failed, {}", e.what());
}
}
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,82 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <string>
#include <unordered_map>
#include <vector>
#include "aimrt_core_plugin_interface/aimrt_core_plugin_base.h"
#include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/aimrt_core.h"
#include "core/util/topic_meta_key.h"
#include "core/util/type_support_pkg_loader.h"
#include "proxy_action.h"
#include "proxy_plugin/topic_meta.h"
namespace aimrt::plugins::proxy_plugin {
class ProxyPlugin : public AimRTCorePluginBase {
public:
struct Options {
struct ProxyAction {
std::string name;
YAML::Node options;
};
std::vector<ProxyAction> proxy_actions;
struct TypeSupportPkg {
std::string path;
};
std::vector<TypeSupportPkg> type_support_pkgs;
std::string executor;
};
public:
ProxyPlugin() = default;
~ProxyPlugin() override = default;
std::string_view Name() const noexcept override { return "proxy_plugin"; }
bool Initialize(runtime::core::AimRTCore* core_ptr) noexcept override;
void Shutdown() noexcept override;
private:
void InitTypeSupport(Options::TypeSupportPkg& options);
void RegisterSubChannel();
void RegisterPubChannel();
private:
runtime::core::AimRTCore* core_ptr_ = nullptr;
aimrt::executor::ExecutorRef executor_;
Options options_;
bool init_flag_ = false;
struct TypeSupportWrapper {
const Options::TypeSupportPkg& options;
aimrt::util::TypeSupportRef type_support_ref;
runtime::core::util::TypeSupportPkgLoader* loader_ptr;
};
struct TopicPubWrapper {
const aimrt::runtime::core::channel::PublishTypeWrapper* pub_type_wrapper_ptr;
};
std::unordered_map<std::string_view, TypeSupportWrapper> type_support_map_;
std::unordered_map<runtime::core::util::TopicMetaKey, TopicPubWrapper,
runtime::core::util::TopicMetaKey::Hash>
topic_pub_wrapper_map_;
std::vector<std::unique_ptr<runtime::core::util::TypeSupportPkgLoader>>
type_support_pkg_loader_vec_;
std::unordered_map<std::string_view, std::unique_ptr<ProxyAction>> proxy_action_map_;
};
} // namespace aimrt::plugins::proxy_plugin

View File

@ -0,0 +1,16 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "aimrt_core_plugin_interface/aimrt_core_plugin_main.h"
#include "proxy_plugin/proxy_plugin.h"
extern "C" {
aimrt::AimRTCorePluginBase* AimRTDynlibCreateCorePluginHandle() {
return new aimrt::plugins::proxy_plugin::ProxyPlugin();
}
void AimRTDynlibDestroyCorePluginHandle(const aimrt::AimRTCorePluginBase* plugin) {
delete plugin;
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <string>
#include <vector>
namespace aimrt::plugins::proxy_plugin {
struct TopicMeta {
std::string topic_name;
std::string msg_type;
std::vector<std::string> pub_topic_name;
};
} // namespace aimrt::plugins::proxy_plugin

View File

@ -5,7 +5,7 @@
#include <deque> #include <deque>
#include "record_playback_plugin/topic_meta_key.h" #include "record_playback_plugin/topic_meta.h"
#include "yaml-cpp/yaml.h" #include "yaml-cpp/yaml.h"

View File

@ -10,8 +10,9 @@
#include "aimrt_module_cpp_interface/executor/executor.h" #include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/util/buffer.h" #include "aimrt_module_cpp_interface/util/buffer.h"
#include "aimrt_module_cpp_interface/util/type_support.h" #include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/util/topic_meta_key.h"
#include "record_playback_plugin/metadata_yaml.h" #include "record_playback_plugin/metadata_yaml.h"
#include "record_playback_plugin/topic_meta_key.h" #include "record_playback_plugin/topic_meta.h"
#include "sqlite3.h" #include "sqlite3.h"
#include "yaml-cpp/yaml.h" #include "yaml-cpp/yaml.h"

View File

@ -144,7 +144,7 @@ void RecordAction::Initialize(YAML::Node options) {
metadata_.version = kVersion; metadata_.version = kVersion;
for (auto& topic_meta_option : options_.topic_meta_list) { for (auto& topic_meta_option : options_.topic_meta_list) {
TopicMetaKey key{ runtime::core::util::TopicMetaKey key{
.topic_name = topic_meta_option.topic_name, .topic_name = topic_meta_option.topic_name,
.msg_type = topic_meta_option.msg_type}; .msg_type = topic_meta_option.msg_type};

View File

@ -11,8 +11,9 @@
#include "aimrt_module_cpp_interface/executor/executor.h" #include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/util/buffer.h" #include "aimrt_module_cpp_interface/util/buffer.h"
#include "aimrt_module_cpp_interface/util/type_support.h" #include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/util/topic_meta_key.h"
#include "record_playback_plugin/metadata_yaml.h" #include "record_playback_plugin/metadata_yaml.h"
#include "record_playback_plugin/topic_meta_key.h" #include "record_playback_plugin/topic_meta.h"
#include "sqlite3.h" #include "sqlite3.h"
#include "yaml-cpp/yaml.h" #include "yaml-cpp/yaml.h"
@ -98,7 +99,9 @@ class RecordAction {
aimrt::executor::ExecutorRef executor_; aimrt::executor::ExecutorRef executor_;
std::function<aimrt::util::TypeSupportRef(std::string_view)> get_type_support_func_; std::function<aimrt::util::TypeSupportRef(std::string_view)> get_type_support_func_;
std::unordered_map<TopicMetaKey, TopicMeta, TopicMetaKey::Hash> topic_meta_map_; std::unordered_map<aimrt::runtime::core::util::TopicMetaKey, TopicMeta,
aimrt::runtime::core::util::TopicMetaKey::Hash>
topic_meta_map_;
size_t max_bag_size_ = 0; size_t max_bag_size_ = 0;
size_t cur_data_size_ = 0; size_t cur_data_size_ = 0;

View File

@ -8,6 +8,7 @@
#include "core/channel/channel_backend_tools.h" #include "core/channel/channel_backend_tools.h"
#include "record_playback_plugin/global.h" #include "record_playback_plugin/global.h"
#include "util/time_util.h" #include "util/time_util.h"
#include "util/topic_meta_key.h"
namespace YAML { namespace YAML {
@ -294,7 +295,6 @@ void RecordPlaybackPlugin::RegisterRpcService() {
void RecordPlaybackPlugin::RegisterRecordChannel() { void RecordPlaybackPlugin::RegisterRecordChannel() {
using namespace aimrt::runtime::core::channel; using namespace aimrt::runtime::core::channel;
using RecordFunc = std::function<void(uint64_t, MsgWrapper&)>; using RecordFunc = std::function<void(uint64_t, MsgWrapper&)>;
struct Wrapper { struct Wrapper {
@ -302,7 +302,9 @@ void RecordPlaybackPlugin::RegisterRecordChannel() {
std::vector<RecordFunc> record_func_vec; std::vector<RecordFunc> record_func_vec;
}; };
std::unordered_map<TopicMetaKey, Wrapper, TopicMetaKey::Hash> recore_func_map; std::unordered_map<aimrt::runtime::core::util::TopicMetaKey, Wrapper,
aimrt::runtime::core::util::TopicMetaKey::Hash>
recore_func_map;
for (auto& record_action_itr : record_action_map_) { for (auto& record_action_itr : record_action_map_) {
auto& record_action = *(record_action_itr.second); auto& record_action = *(record_action_itr.second);
@ -389,7 +391,9 @@ void RecordPlaybackPlugin::RegisterRecordChannel() {
void RecordPlaybackPlugin::RegisterPlaybackChannel() { void RecordPlaybackPlugin::RegisterPlaybackChannel() {
using namespace aimrt::runtime::core::channel; using namespace aimrt::runtime::core::channel;
std::unordered_set<TopicMetaKey, TopicMetaKey::Hash> playback_topic_meta_set; std::unordered_set<aimrt::runtime::core::util::TopicMetaKey,
aimrt::runtime::core::util::TopicMetaKey::Hash>
playback_topic_meta_set;
// 处理 playback action // 处理 playback action
for (auto& playback_action_itr : playback_action_map_) { for (auto& playback_action_itr : playback_action_map_) {
@ -398,7 +402,7 @@ void RecordPlaybackPlugin::RegisterPlaybackChannel() {
const auto& topic_meta_map = playback_action.GetTopicMetaMap(); const auto& topic_meta_map = playback_action.GetTopicMetaMap();
for (const auto& topic_meta_itr : topic_meta_map) { for (const auto& topic_meta_itr : topic_meta_map) {
playback_topic_meta_set.emplace(TopicMetaKey{ playback_topic_meta_set.emplace(aimrt::runtime::core::util::TopicMetaKey{
.topic_name = topic_meta_itr.second.topic_name, .topic_name = topic_meta_itr.second.topic_name,
.msg_type = topic_meta_itr.second.msg_type}); .msg_type = topic_meta_itr.second.msg_type});
} }

View File

@ -0,0 +1,17 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <string>
namespace aimrt::plugins::record_playback_plugin {
struct TopicMeta {
uint64_t id;
std::string topic_name;
std::string msg_type;
std::string serialization_type;
};
} // namespace aimrt::plugins::record_playback_plugin

View File

@ -2,10 +2,9 @@
// All rights reserved. // All rights reserved.
#pragma once #pragma once
#include <string> #include <string>
namespace aimrt::plugins::record_playback_plugin { namespace aimrt::runtime::core::util {
struct TopicMetaKey { struct TopicMetaKey {
std::string topic_name; std::string topic_name;
@ -23,11 +22,4 @@ struct TopicMetaKey {
}; };
}; };
struct TopicMeta { } // namespace aimrt::runtime::core::util
uint64_t id;
std::string topic_name;
std::string msg_type;
std::string serialization_type;
};
} // namespace aimrt::plugins::record_playback_plugin

View File

@ -28,6 +28,7 @@ cmake -B build ^
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^
-DAIMRT_BUILD_GRPC_PLUGIN=OFF ^ -DAIMRT_BUILD_GRPC_PLUGIN=OFF ^
-DAIMRT_BUILD_ECHO_PLUGIN=OFF ^ -DAIMRT_BUILD_ECHO_PLUGIN=OFF ^
-DAIMRT_BUILD_PROXY_PLUGIN=ON ^
-DAIMRT_BUILD_PYTHON_PACKAGE=ON ^ -DAIMRT_BUILD_PYTHON_PACKAGE=ON ^
%* %*

View File

@ -30,6 +30,7 @@ cmake -B build \
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \ -DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \
-DAIMRT_BUILD_GRPC_PLUGIN=ON \ -DAIMRT_BUILD_GRPC_PLUGIN=ON \
-DAIMRT_BUILD_ECHO_PLUGIN=ON \ -DAIMRT_BUILD_ECHO_PLUGIN=ON \
-DAIMRT_BUILD_PROXY_PLUGIN=ON \
-DAIMRT_BUILD_PYTHON_PACKAGE=ON \ -DAIMRT_BUILD_PYTHON_PACKAGE=ON \
$@ $@