feat: zenoh plugin with shm (#96)

* add zenoh-shm API

* use zenoh shm-api for channel and rpc

* set z_alloc_result as a local variable.

* add zenoh_buffer_array_allocator

* avoid copy opreation when pub data with shm

* if shm pool size is not enough, use net buffer instead

* add z_pub_shm_size_map_ to store topic-loan_size

* little fix

* remove client send data 's copying

* remove server send data 's copy

* add doc

* change benchamrk  item

* minor modification

---------

Co-authored-by: hanjun <hanjun@agibot.com>
This commit is contained in:
han J 2024-11-15 18:42:59 +08:00 committed by GitHub
parent 5c0e63c4ca
commit 062ffad431
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1378 additions and 97 deletions

View File

@ -27,6 +27,12 @@ endif()
function(get_zenohc) function(get_zenohc)
FetchContent_GetProperties(zenohc) FetchContent_GetProperties(zenohc)
if(NOT zenohc_POPULATED) if(NOT zenohc_POPULATED)
set(ZENOHC_BUILD_WITH_UNSTABLE_API
TRUE
CACHE BOOL "Enable unstable API")
set(ZENOHC_BUILD_WITH_SHARED_MEMORY
TRUE
CACHE BOOL "Enable shared memory")
FetchContent_MakeAvailable(zenohc) FetchContent_MakeAvailable(zenohc)
endif() endif()
endfunction() endfunction()

View File

@ -3,9 +3,9 @@
**重要修改** **重要修改**
- 优化了 zenoh 插件: - 优化了 zenoh 插件:
- 更新 zenohc 库至 1.0.0.11 版本;
- 添加了 zenoh rpc 后端; - 添加了 zenoh rpc 后端;
- 现在可以传入 zenoh 原生配置; - zenoh 插件支持网络通信和共享内存两种通信方式;
- 可以传入 zenoh 原生配置进行更丰富的配置;
- 新增了第三方库 asioruntime::core 不再引用 boost改为引用独立的 asio 库,以减轻依赖; - 新增了第三方库 asioruntime::core 不再引用 boost改为引用独立的 asio 库,以减轻依赖;
- 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件; - 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件;
- 新增 Echo 插件,用于回显消息; - 新增 Echo 插件,用于回显消息;
@ -27,6 +27,7 @@
- 支持日志自定义输出格式; - 支持日志自定义输出格式;
- 支持日志定期主动落盘操作; - 支持日志定期主动落盘操作;
- grpc 插件支持 ros2 消息以及 json 序列化格式; - grpc 插件支持 ros2 消息以及 json 序列化格式;
- mqtt 新增配置项以支持 ssl/tls 加密传输; - mqtt 新增配置项以支持 ssl/tls 单向认证/双向认证的加密传输;
- mqtt 插件在broker未启动时会自动重试异步连接 - mqtt 插件在broker未启动时会自动重试异步连接 并提供重连间隔配置项;
- ros2 插件支持自定义 rpc 服务名称;
- asio thread/strand 执行器现在支持是否使用 system clock - asio thread/strand 执行器现在支持是否使用 system clock

View File

@ -13,21 +13,26 @@
- `服务发现`机制的通信系统; - `服务发现`机制的通信系统;
- 灵活的网络拓扑结构; - 灵活的网络拓扑结构;
- 低延迟、高吞吐量的网络通信和数据传输; - 低延迟、高吞吐量的网络通信和数据传输;
- SHM 和 非 SHM 两种传输模式;
-
此插件为 AimRT 提供以下组件: 此插件为 AimRT 提供以下组件:
- `zenoh` 类型 Rpc 后端 - `zenoh` 类型 Rpc 后端
- `zenoh` 类型 Channel 后端 - `zenoh` 类型 Channel 后端
插件的配置项如下: 插件的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 | | 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| :-------------: | :----: | :------: | :----: | :-------------------------: | | :----------------: | :----: | :------: | :----: | :-----------------------------: |
| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 | | shm_pool_size | int | 可选 | 10 MB | 共享内存池的大小 单位B |
| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 | | shm_init_loan_size | int | 可选 | 1 KB | 共享内存初始借用大小 单位B |
| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 |
| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 |
关于**zenoh_plugin**的配置,使用注意点如下: 关于**zenoh_plugin**的配置,使用注意点如下:
- `shm_pool_size` 表示共享内存池的大小单位B默认值是 10 MB可以根据实际情况调整 如果不使用共享内存,可忽略该配置项。 如果剩余共享内存不足以满足数据传输需求,则会自动切换到非共享内存传输模式 。
- `shm_init_loan_size` 表示向共享内存池初始借用大小单位B默认值是 1 KB可以根据实际情况调整 如果不使用共享内存,可忽略该配置项。
- `native_cfg_path` 表示 zenoh 提供的原生配置文件的路径,可以通过配置该文件来灵活配置 zenoh 的网络结构。如果不填写则默认使用 zenoh 官方提供的默认配置,具体配置内容请参考 zenoh 官方关于[configuration](https://zenoh.io/docs/manual/configuration/)的说明,您也可以直接修改 {{ '[zenoh_native_config.json5]({}/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/zenoh_native_config.json5)'.format(code_site_root_path_url) }}文件来自定义配置,这里列举几个常用的配置项: - `native_cfg_path` 表示 zenoh 提供的原生配置文件的路径,可以通过配置该文件来灵活配置 zenoh 的网络结构。如果不填写则默认使用 zenoh 官方提供的默认配置,具体配置内容请参考 zenoh 官方关于[configuration](https://zenoh.io/docs/manual/configuration/)的说明,您也可以直接修改 {{ '[zenoh_native_config.json5]({}/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/zenoh_native_config.json5)'.format(code_site_root_path_url) }}文件来自定义配置,这里列举几个常用的配置项:
| 配置项 | 作用 | zenoh_native_config中的配置值 | | 配置项 | 作用 | zenoh_native_config中的配置值 |
@ -67,9 +72,17 @@ aimrt:
## zenoh 类型 Rpc 后端 ## zenoh 类型 Rpc 后端
`zenoh`类型的 Rpc后端是**zenoh_plugin**中提供的一种 Rpc 后端,主要用来构建请求-响应模型。其所有的配置项如下: `zenoh`类型的 Rpc后端是**zenoh_plugin**中提供的一种 Rpc 后端,主要用来构建请求-响应模型。其所有的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 | | 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ---------------- | ------ | -------- | ------ | ------------------------------------ | | ------------------------------ | ------ | -------- | ------ | ------------------------------------ |
| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 | | timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 |
| clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 |
| clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| clients_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 |
| servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 |
| servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| servers_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 |
注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。
以下是一个简单的客户端的示例: 以下是一个简单的客户端的示例:
@ -81,6 +94,7 @@ aimrt:
path: ./libaimrt_zenoh_plugin.so path: ./libaimrt_zenoh_plugin.so
options: options:
native_cfg_path: ./cfg/zenoh_native_config.json5 native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240
executor: executor:
executors: executors:
- name: timeout_handle - name: timeout_handle
@ -92,6 +106,9 @@ aimrt:
- type: zenoh - type: zenoh
options: options:
timeout_executor: timeout_handle timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
shm_enabled: false
clients_options: clients_options:
- func_name: "(.*)" - func_name: "(.*)"
enable_backends: [zenoh] enable_backends: [zenoh]
@ -112,6 +129,11 @@ aimrt:
rpc: rpc:
backends: backends:
- type: zenoh - type: zenoh
options:
timeout_executor: timeout_handle
servers_options:
- func_name: "(.*)"
shm_enabled: true
servers_options: servers_options:
- func_name: "(.*)" - func_name: "(.*)"
enable_backends: [zenoh] enable_backends: [zenoh]
@ -175,7 +197,15 @@ Server -> Client 的 Zenoh 数据包格式整体分 4 段:
## zenoh 类型 Channel 后端 ## zenoh 类型 Channel 后端
`zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端主要用来构建发布和订阅模型。 `zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端主要用来构建发布和订阅模型。其所有的配置项如下
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| --------------------------------- | ------ | -------- | ------ | ------------------------------- |
| pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| pub_topics_options[i].shm_enabled | bool | 必选 | false | Publish 端 是否使用共享内存通信 |
注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。
以下是一个简单的发布端的示例: 以下是一个简单的发布端的示例:
```yaml ```yaml
@ -185,10 +215,14 @@ aimrt:
- name: zenoh_plugin - name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so path: ./libaimrt_zenoh_plugin.so
options: options:
native_cfg_path: ./cfg/zenoh_native_config.json5 shm_pool_size: 1024
channel: channel:
backends: backends:
- type: zenoh - type: zenoh
options:
pub_topics_options:
- topic_name: "(.*)"
shm_enabled: false
pub_topics_options: pub_topics_options:
- topic_name: "(.*)" - topic_name: "(.*)"
enable_backends: [zenoh] enable_backends: [zenoh]

View File

@ -119,3 +119,115 @@
说明: 说明:
- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议; - 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议;
## protobuf channel with shared-memory
一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 channel 后端;
- 如何使用共享内存进行通信;
核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)
配置文件:
- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT编译时需要提前准备好rust编译环境
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh`脚本启动订阅端sub 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端pub 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;
说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信;
## protobuf rpc with shared-memory
一个基于 protobuf 协议、协程型接口与 zenoh 后端的 rpc 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 rpc 后端;
- 如何使用共享内存进行通信;
核心代码:
- [rpc.proto](../../../protocols/example/rpc.proto)
- [normal_rpc_co_client_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc)
- [normal_rpc_co_server_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc)
- [service.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/service.cc)
配置文件:
- [examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT编译时需要提前准备好rust编译环境
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh`脚本启动订阅端srv 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh`脚本启动发布端cli 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;
说明:
- 此示例创建了以下两个模块:
- `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService` 发起 RPC 请求;
- `NormalRpcCoServerModule`:会注册 `ExampleService` 服务端,通过协程 Server 接口,提供 echo 功能;
- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时;
- 此示例将 `NormalRpcCoClientModule``NormalRpcCoServerModule` 分别集成到 `pb_rpc_client_pkg``pb_rpc_server_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 srv 和 cli 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 rpc 后端进行通信,此外还在客户端配置了 `timeout_handle` 执行器作为超时执行器;
## protobuf channel with shared-memory and network
一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 channel 后端;
- 如何使用共享内存和网络进行混合通信;
核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)
配置文件:
- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT编译时需要提前准备好rust编译环境
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub.sh`脚本启动订阅端sub 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端pub 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;
说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信;
- zenoh 插件配置的共享内存数据一旦超过其作用范围,就会自动切换到网络传输模式。 用户可尝试分别在一台主机和两台主机分别执行上述步骤,观察到不同主机间的通信效果:
- 在一台主机上运行的发布端和订阅端,可观察到共享内存的通信效果;
- 在两台主机上运行的发布端和订阅端,可观察到网络的通信效果, 尽管发布端使用的共享内存方案;

View File

@ -0,0 +1,57 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240000
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: publish_control_executor
type: simple_thread
- name: publish_executor_0
type: simple_thread
- name: publish_executor_1
type: simple_thread
- name: publish_executor_2
type: simple_thread
- name: publish_executor_3
type: simple_thread
channel:
backends:
- type: zenoh
options:
pub_topics_options:
- topic_name: "(.*)"
shm_enabled: true
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [BenchmarkPublisherModule]
modules:
- name: BenchmarkPublisherModule
log_lvl: INFO
# Module custom configuration
BenchmarkPublisherModule:
max_topic_number: 4
bench_plans:
- channel_frq: 1000
msg_size: 512
topic_number: 4
msg_count: 5000
- channel_frq: 1000
msg_size: 4096
topic_number: 1
msg_count: 1000

View File

@ -0,0 +1,32 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: zenoh
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [BenchmarkSubscriberModule]
modules:
- name: BenchmarkSubscriberModule
log_lvl: INFO
# Module custom configuration
BenchmarkSubscriberModule:
max_topic_number: 4

View File

@ -0,0 +1,42 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
shm_pool_size: 10240
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 2
channel:
backends:
- type: zenoh
options:
pub_topics_options:
- topic_name: "(.*)"
shm_enabled: true
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: INFO
# Module custom configuration
NormalPublisherModule:
topic_name: test_topic
channel_frq: 0.5

View File

@ -0,0 +1,30 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: zenoh
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [NormalSubscriberModule]
modules:
- name: NormalSubscriberModule
log_lvl: INFO
# Module custom configuration
NormalSubscriberModule:
topic_name: test_topic

View File

@ -0,0 +1,65 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240000
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: client_statistics_executor
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: client_statistics_executor
- name: client_executor_0
type: asio_thread
- name: client_executor_1
type: asio_thread
- name: client_executor_2
type: asio_thread
- name: client_executor_3
type: asio_thread
rpc:
backends:
- type: zenoh
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
shm_enabled: true
clients_options:
- func_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_rpc_client_pkg.so
enable_modules: [BenchmarkRpcClientModule]
modules:
- name: BenchmarkRpcClientModule
log_lvl: INFO
# Module custom configuration
BenchmarkRpcClientModule:
max_parallel: 4
bench_plans:
- perf_mode: bench
msg_size: 256
parallel: 4
msg_count: 10000
- perf_mode: fixed-freq
freq: 1000
msg_size: 1024
parallel: 2
msg_count: 1000

View File

@ -0,0 +1,46 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: work_thread_pool
rpc:
backends:
- type: zenoh
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
shm_enabled: true
clients_options:
- func_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_rpc_client_pkg.so
enable_modules: [NormalRpcCoClientModule]
modules:
- name: NormalRpcCoClientModule
log_lvl: INFO
# Module custom configuration
NormalRpcCoClientModule:
rpc_frq: 0.5

View File

@ -25,4 +25,4 @@ aimrt:
enable_modules: [NormalRpcCoServerModule] enable_modules: [NormalRpcCoServerModule]
modules: modules:
- name: NormalRpcCoServerModule - name: NormalRpcCoServerModule
log_lvl: INFO log_lvl: Info

View File

@ -0,0 +1,33 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240000
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
rpc:
backends:
- type: zenoh
options:
servers_options:
- func_name: "(.*)"
shm_enabled: true
servers_options:
- func_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_rpc_server_pkg.so
enable_modules: [NormalRpcCoServerModule]
modules:
- name: NormalRpcCoServerModule
log_lvl: Info

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml

View File

@ -0,0 +1,80 @@
// Copyright(c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "core/channel/channel_msg_wrapper.h"
#include "core/rpc/rpc_invoke_wrapper.h"
namespace aimrt::plugins::zenoh_plugin {
constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string which is enough to the max value of uint64_t
inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedZenoh(
runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
auto& serialization_cache = msg_wrapper.serialization_cache;
const auto& info = msg_wrapper.info;
auto finditr = serialization_cache.find(serialization_type);
if (finditr != serialization_cache.end()) {
return {finditr->second, finditr->second->BufferSize()};
}
auto buffer_array_ptr = std::make_unique<aimrt::util::BufferArray>(allocator);
bool serialize_ret = info.msg_type_support_ref.Serialize(
serialization_type,
msg_wrapper.msg_ptr,
buffer_array_ptr->AllocatorNativeHandle(),
buffer_array_ptr->BufferArrayNativeHandle());
AIMRT_ASSERT(serialize_ret, "Serialize failed.");
return {nullptr, buffer_array_ptr->BufferSize()};
}
inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeReqSupportedZenoh(
runtime::core::rpc::InvokeWrapper& invoke_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
const auto& info = invoke_wrapper.info;
auto& req_serialization_cache = invoke_wrapper.req_serialization_cache;
auto find_itr = req_serialization_cache.find(serialization_type);
if (find_itr != req_serialization_cache.end())
return {find_itr->second, find_itr->second->BufferSize()};
auto buffer_array_ptr = std::make_unique<aimrt::util::BufferArray>(allocator);
bool serialize_ret = info.req_type_support_ref.Serialize(
serialization_type,
invoke_wrapper.req_ptr,
buffer_array_ptr->AllocatorNativeHandle(),
buffer_array_ptr->BufferArrayNativeHandle());
AIMRT_ASSERT(serialize_ret, "Serialize failed.");
return {nullptr, buffer_array_ptr->BufferSize()};
}
inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeRspSupportedZenoh(
runtime::core::rpc::InvokeWrapper& invoke_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
const auto& info = invoke_wrapper.info;
auto& rsp_serialization_cache = invoke_wrapper.rsp_serialization_cache;
auto find_itr = rsp_serialization_cache.find(serialization_type);
if (find_itr != rsp_serialization_cache.end())
return {find_itr->second, find_itr->second->BufferSize()};
auto buffer_array_ptr = std::make_unique<aimrt::util::BufferArray>(allocator);
bool serialize_ret = info.rsp_type_support_ref.Serialize(
serialization_type,
invoke_wrapper.rsp_ptr,
buffer_array_ptr->AllocatorNativeHandle(),
buffer_array_ptr->BufferArrayNativeHandle());
AIMRT_ASSERT(serialize_ret, "Serialize failed.");
return {nullptr, buffer_array_ptr->BufferSize()};
}
inline std::string IntToFixedLengthString(int number, int length) {
std::ostringstream oss;
oss << std::setw(length) << number;
return oss.str();
}
} // namespace aimrt::plugins::zenoh_plugin

View File

@ -0,0 +1,98 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include "aimrt_module_c_interface/util/buffer_base.h"
namespace aimrt::util {
class ZenohBufferArrayAllocator {
public:
ZenohBufferArrayAllocator(uint64_t z_shm_capacity, void* z_shm_head_ptr)
: z_shm_capacity_(z_shm_capacity),
z_shm_head_ptr_(z_shm_head_ptr),
z_shm_cur_ptr_(z_shm_head_ptr),
base_(GenBase(this)) {}
~ZenohBufferArrayAllocator() = default;
ZenohBufferArrayAllocator(const ZenohBufferArrayAllocator&) = delete;
ZenohBufferArrayAllocator& operator=(const ZenohBufferArrayAllocator&) = delete;
const aimrt_buffer_array_allocator_t* NativeHandle() const { return &base_; }
void Reserve(aimrt_buffer_array_t* buffer_array, size_t new_cap) {
aimrt_buffer_t* cur_data = buffer_array->data;
buffer_array->data = new aimrt_buffer_t[new_cap];
buffer_array->capacity = new_cap;
if (cur_data) {
memcpy(buffer_array->data, cur_data, buffer_array->len * sizeof(aimrt_buffer_t));
delete[] cur_data;
}
}
aimrt_buffer_t Allocate(aimrt_buffer_array_t* buffer_array, size_t size) {
void* data = z_shm_cur_ptr_;
z_shm_cur_ptr_ = static_cast<char*>(z_shm_cur_ptr_) + size;
z_shm_used_size_ += size;
if (z_shm_used_size_ > z_shm_capacity_) [[unlikely]] {
z_is_shm_enough_ = false;
return aimrt_buffer_t{nullptr, 0};
}
if (data == nullptr) [[unlikely]]
return aimrt_buffer_t{nullptr, 0};
if (buffer_array->capacity > buffer_array->len) {
return (buffer_array->data[buffer_array->len++] = aimrt_buffer_t{data, size});
}
static constexpr size_t kInitCapacitySzie = 2;
size_t new_capacity = (buffer_array->capacity < kInitCapacitySzie)
? kInitCapacitySzie
: (buffer_array->capacity << 1);
Reserve(buffer_array, new_capacity);
return (buffer_array->data[buffer_array->len++] = aimrt_buffer_t{data, size});
}
void Release(aimrt_buffer_array_t* buffer_array) {
if (buffer_array->data) delete[] buffer_array->data;
}
bool IsShmEnough() { return z_is_shm_enough_; }
static const aimrt_buffer_array_allocator_t GenBase(void* impl) {
return aimrt_buffer_array_allocator_t{
.reserve = [](void* impl, aimrt_buffer_array_t* buffer_array, size_t new_cap) {
static_cast<ZenohBufferArrayAllocator*>(impl)->Reserve(buffer_array, new_cap); //
},
.allocate = [](void* impl, aimrt_buffer_array_t* buffer_array, size_t size) -> aimrt_buffer_t {
return static_cast<ZenohBufferArrayAllocator*>(impl)->Allocate(buffer_array, size);
},
.release = [](void* impl, aimrt_buffer_array_t* buffer_array) {
static_cast<ZenohBufferArrayAllocator*>(impl)->Release(buffer_array); //
},
.impl = impl};
}
private:
const aimrt_buffer_array_allocator_t base_;
void* z_shm_head_ptr_;
void* z_shm_cur_ptr_;
uint64_t z_shm_capacity_;
uint64_t z_shm_used_size_ = 0;
bool z_is_shm_enough_ = true;
};
} // namespace aimrt::util

View File

@ -11,10 +11,29 @@ struct convert<aimrt::plugins::zenoh_plugin::ZenohChannelBackend::Options> {
static Node encode(const Options& rhs) { static Node encode(const Options& rhs) {
Node node; Node node;
node["pub_topics_options"] = YAML::Node();
for (const auto& pub_topic_options : rhs.pub_topics_options) {
Node pub_topic_options_node;
pub_topic_options_node["topic_name"] = pub_topic_options.topic_name;
pub_topic_options_node["shm_enabled"] = pub_topic_options.shm_enabled;
node["pub_topics_options"].push_back(pub_topic_options_node);
}
return node; return node;
} }
static bool decode(const Node& node, Options& rhs) { static bool decode(const Node& node, Options& rhs) {
if (node["pub_topics_options"] && node["pub_topics_options"].IsSequence()) {
for (const auto& pub_topic_options_node : node["pub_topics_options"]) {
auto pub_topic_options = Options::PubTopicOptions{
.topic_name = pub_topic_options_node["topic_name"].as<std::string>(),
.shm_enabled = pub_topic_options_node["shm_enabled"].as<bool>(),
};
rhs.pub_topics_options.emplace_back(std::move(pub_topic_options));
}
}
return true; return true;
} }
}; };
@ -52,16 +71,34 @@ bool ZenohChannelBackend::RegisterPublishType(
const auto& info = publish_type_wrapper.info; const auto& info = publish_type_wrapper.info;
// check path bool shm_enabled = false;
auto find_option_itr = std::find_if(
options_.pub_topics_options.begin(), options_.pub_topics_options.end(),
[topic_name = info.topic_name](const Options::PubTopicOptions& pub_option) {
try {
return std::regex_match(topic_name.begin(), topic_name.end(), std::regex(pub_option.topic_name, std::regex::ECMAScript));
} catch (const std::exception& e) {
AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}",
pub_option.topic_name, topic_name, e.what());
return false;
}
});
if (find_option_itr != options_.pub_topics_options.end()) {
shm_enabled = find_option_itr->shm_enabled;
}
namespace util = aimrt::common::util; namespace util = aimrt::common::util;
std::string pattern = std::string("channel/") + std::string pattern = std::string("channel/") +
util::UrlEncode(info.topic_name) + "/" + util::UrlEncode(info.topic_name) + "/" +
util::UrlEncode(info.msg_type) + util::UrlEncode(info.msg_type) +
limit_domain_; limit_domain_;
zenoh_manager_ptr_->RegisterPublisher(pattern); zenoh_manager_ptr_->RegisterPublisher(pattern, shm_enabled);
z_pub_shm_size_map_[pattern] = shm_init_loan_size_;
AIMRT_INFO("Register publish type to zenoh channel, url: {}", pattern); AIMRT_INFO("Register publish type to zenoh channel, url: {}, shm_enabled: {}", pattern, shm_enabled);
return true; return true;
} catch (const std::exception& e) { } catch (const std::exception& e) {
@ -102,25 +139,31 @@ bool ZenohChannelBackend::Subscribe(
try { try {
auto ctx_ptr = std::make_shared<aimrt::channel::Context>(aimrt_channel_context_type_t::AIMRT_CHANNEL_SUBSCRIBER_CONTEXT); auto ctx_ptr = std::make_shared<aimrt::channel::Context>(aimrt_channel_context_type_t::AIMRT_CHANNEL_SUBSCRIBER_CONTEXT);
// todo(hj): avoid copy when use shm
const z_loaned_bytes_t* payload = z_sample_payload(message); const z_loaned_bytes_t* payload = z_sample_payload(message);
size_t serialized_size = z_bytes_len(payload); size_t serialized_size = z_bytes_len(payload);
z_bytes_reader_t reader = z_bytes_get_reader(payload); z_bytes_reader_t reader = z_bytes_get_reader(payload);
std::vector<char> serialized_data(serialized_size); std::vector<char> serialized_data(serialized_size);
if (z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size) >= 0) { // read data from payload
util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); auto ret = z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size);
if (ret >= 0) {
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
// get serialization type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
ctx_ptr->SetSerializationType(serialization_type); ctx_ptr->SetSerializationType(serialization_type);
// get context meta
size_t ctx_num = buf_oper.GetUint8(); size_t ctx_num = buf_oper.GetUint8();
for (size_t ii = 0; ii < ctx_num; ++ii) { for (size_t ii = 0; ii < ctx_num; ++ii) {
auto key = buf_oper.GetString(util::BufferLenType::kUInt16); auto key = buf_oper.GetString(util::BufferLenType::kUInt16);
auto val = buf_oper.GetString(util::BufferLenType::kUInt16); auto val = buf_oper.GetString(util::BufferLenType::kUInt16);
ctx_ptr->SetMetaValue(key, val); ctx_ptr->SetMetaValue(key, val);
} }
ctx_ptr->SetMetaValue(AIMRT_CHANNEL_CONTEXT_KEY_BACKEND, Name()); ctx_ptr->SetMetaValue(AIMRT_CHANNEL_CONTEXT_KEY_BACKEND, Name());
// get msg
auto remaining_buf = buf_oper.GetRemainingBuffer(); auto remaining_buf = buf_oper.GetRemainingBuffer();
sub_tool_ptr->DoSubscribeCallback( sub_tool_ptr->DoSubscribeCallback(
@ -145,12 +188,29 @@ bool ZenohChannelBackend::Subscribe(
void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrapper) noexcept { void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrapper) noexcept {
try { try {
// checkout state
AIMRT_CHECK_ERROR_THROW(state_.load() == State::kStart, AIMRT_CHECK_ERROR_THROW(state_.load() == State::kStart,
"Method can only be called when state is 'Start'."); "Method can only be called when state is 'Start'.");
namespace util = aimrt::common::util; namespace util = aimrt::common::util;
const auto& info = msg_wrapper.info; const auto& info = msg_wrapper.info;
std::string zenoh_pub_topic = std::string("channel/") +
util::UrlEncode(info.topic_name) + "/" +
util::UrlEncode(info.msg_type) +
limit_domain_;
// find publisher with zenoh_pub_topic
auto z_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap();
auto z_pub_iter = z_pub_registry->find(zenoh_pub_topic);
if (z_pub_iter == z_pub_registry->end()) {
AIMRT_ERROR("Url: {} is not registered!", zenoh_pub_topic);
return;
}
auto z_pub = z_pub_iter->second;
// get serialization type
auto publish_type_support_ref = info.msg_type_support_ref; auto publish_type_support_ref = info.msg_type_support_ref;
auto serialization_type = msg_wrapper.ctx_ref.GetSerializationType(); auto serialization_type = msg_wrapper.ctx_ref.GetSerializationType();
@ -158,16 +218,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
serialization_type = publish_type_support_ref.DefaultSerializationType(); serialization_type = publish_type_support_ref.DefaultSerializationType();
} }
auto buffer_array_view_ptr = aimrt::runtime::core::channel::SerializeMsgWithCache(msg_wrapper, serialization_type); // get meta data
AIMRT_CHECK_ERROR_THROW(
buffer_array_view_ptr,
"Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}",
serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type);
const auto* buffer_array_data = buffer_array_view_ptr->Data();
const size_t buffer_array_len = buffer_array_view_ptr->Size();
size_t msg_size = buffer_array_view_ptr->BufferSize();
const auto& keys = msg_wrapper.ctx_ref.GetMetaKeys(); const auto& keys = msg_wrapper.ctx_ref.GetMetaKeys();
AIMRT_CHECK_ERROR_THROW(keys.size() <= 255, AIMRT_CHECK_ERROR_THROW(keys.size() <= 255,
"Too much context meta, require less than 255, but actually {}.", keys.size()); "Too much context meta, require less than 255, but actually {}.", keys.size());
@ -183,32 +234,163 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
context_meta_kv.emplace_back(val); context_meta_kv.emplace_back(val);
} }
int32_t pkg_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; // shm_enabled
if (z_pub.second) {
unsigned char* z_pub_loaned_shm_ptr = nullptr;
std::shared_ptr<aimrt::util::BufferArrayView> buffer_array_cache_ptr = nullptr;
bool is_shm_loan_size_enough = true;
bool is_shm_pool_size_enough = true;
uint64_t msg_size = 0;
z_buf_layout_alloc_result_t loan_result;
do {
// release old shm
if (z_pub_loaned_shm_ptr != nullptr) {
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
}
// load a new size shm
uint64_t loan_size = z_pub_shm_size_map_[zenoh_pub_topic];
z_shm_provider_alloc(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_);
// if shm pool is not enough, use net buffer instead
if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
is_shm_pool_size_enough = false;
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead.");
break;
}
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// write context meta on loaned shm
buf_oper.SetUint8(static_cast<uint8_t>(keys.size()));
for (const auto& s : context_meta_kv) {
buf_oper.SetString(s, util::BufferLenType::kUInt16);
}
auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size;
// write msg on loaned shm should start at the (FIXED_LEN + type_and_ctx_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + kFixedLen);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeMsgSupportedZenoh(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle()));
msg_size = result.second;
buffer_array_cache_ptr = result.first;
if (buffer_array_cache_ptr == nullptr) {
// in this case means no cache is set, then do nomal serialization(if size is small will throw exception)
is_shm_loan_size_enough = true;
} else {
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + kFixedLen;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
}
}
} catch (const std::exception& e) {
if (!z_allocator.IsShmEnough()) {
// the shm is not enough, need to expand a double size
z_pub_shm_size_map_[zenoh_pub_topic] = z_pub_shm_size_map_[zenoh_pub_topic] << 1;
is_shm_loan_size_enough = false;
} else {
AIMRT_ERROR(
"Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}, exception: {}",
serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type, e.what());
return;
}
}
}
} while (!is_shm_loan_size_enough);
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + context_meta_kv_size + serialization_type.size() + 1;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
}
buffer_array_cache_ptr = nullptr;
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
}
z_publisher_put(z_loan(z_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
// collect garbage and defragment shared memory, whose reference counting is zero
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_TRACE("Zenoh publish to '{}'", zenoh_pub_topic);
return;
}
}
// shm disabled
// serialize msg
auto buffer_array_view_ptr = aimrt::runtime::core::channel::SerializeMsgWithCache(msg_wrapper, serialization_type);
AIMRT_CHECK_ERROR_THROW(
buffer_array_view_ptr,
"Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}",
serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type);
const auto* buffer_array_data = buffer_array_view_ptr->Data();
const size_t buffer_array_len = buffer_array_view_ptr->Size();
size_t msg_size = buffer_array_view_ptr->BufferSize();
int32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
int32_t pkg_size = data_size + kFixedLen;
// create buffer for serialization
std::vector<char> serialized_data(pkg_size); std::vector<char> serialized_data(pkg_size);
util::BufferOperator buf_oper(serialized_data.data(), pkg_size); util::BufferOperator buf_oper(serialized_data.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(data_size, kFixedLen).c_str(), kFixedLen);
// full serialization_type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// full context meta
buf_oper.SetUint8(static_cast<uint8_t>(keys.size())); buf_oper.SetUint8(static_cast<uint8_t>(keys.size()));
for (const auto& s : context_meta_kv) { for (const auto& s : context_meta_kv) {
buf_oper.SetString(s, util::BufferLenType::kUInt16); buf_oper.SetString(s, util::BufferLenType::kUInt16);
} }
// full msg
for (size_t ii = 0; ii < buffer_array_len; ++ii) { for (size_t ii = 0; ii < buffer_array_len; ++ii) {
buf_oper.SetBuffer( buf_oper.SetBuffer(
static_cast<const char*>(buffer_array_data[ii].data), static_cast<const char*>(buffer_array_data[ii].data),
buffer_array_data[ii].len); buffer_array_data[ii].len);
} }
std::string zenoh_pub_topic = std::string("channel/") + // publish
util::UrlEncode(info.topic_name) + "/" + z_owned_bytes_t z_payload;
util::UrlEncode(info.msg_type) + z_bytes_from_buf(&z_payload, reinterpret_cast<uint8_t*>(serialized_data.data()), pkg_size, nullptr, nullptr);
limit_domain_; z_publisher_put(z_loan(z_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
AIMRT_TRACE("Zenoh publish to '{}'", zenoh_pub_topic); AIMRT_TRACE("Zenoh publish to '{}'", zenoh_pub_topic);
zenoh_manager_ptr_->Publish(zenoh_pub_topic, serialized_data.data(), pkg_size);
return; return;
} catch (const std::exception& e) { } catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what()); AIMRT_ERROR("{}", e.what());

View File

@ -8,6 +8,8 @@
#include "util/buffer_util.h" #include "util/buffer_util.h"
#include "util/url_encode.h" #include "util/url_encode.h"
#include "zenoh.h" #include "zenoh.h"
#include "zenoh_plugin/util.h"
#include "zenoh_plugin/zenoh_buffer_array_allocator.h"
#include "zenoh_plugin/zenoh_manager.h" #include "zenoh_plugin/zenoh_manager.h"
namespace aimrt::plugins::zenoh_plugin { namespace aimrt::plugins::zenoh_plugin {
@ -15,13 +17,19 @@ namespace aimrt::plugins::zenoh_plugin {
class ZenohChannelBackend : public runtime::core::channel::ChannelBackendBase { class ZenohChannelBackend : public runtime::core::channel::ChannelBackendBase {
public: public:
struct Options { struct Options {
struct PubTopicOptions {
std::string topic_name;
bool shm_enabled = false;
};
std::vector<PubTopicOptions> pub_topics_options;
}; };
public: public:
ZenohChannelBackend( ZenohChannelBackend(
const std::shared_ptr<ZenohManager>& zenoh_util_ptr, const std::string& limit_domain) const std::shared_ptr<ZenohManager>& zenoh_util_ptr, const std::string& limit_domain, size_t shm_init_loan_size)
: zenoh_manager_ptr_(zenoh_util_ptr), : zenoh_manager_ptr_(zenoh_util_ptr),
limit_domain_(limit_domain) {} limit_domain_(limit_domain),
shm_init_loan_size_(shm_init_loan_size) {}
~ZenohChannelBackend() override = default; ~ZenohChannelBackend() override = default;
@ -60,6 +68,10 @@ class ZenohChannelBackend : public runtime::core::channel::ChannelBackendBase {
std::string, std::string,
std::unique_ptr<aimrt::runtime::core::channel::SubscribeTool>> std::unique_ptr<aimrt::runtime::core::channel::SubscribeTool>>
subscribe_wrapper_map_; subscribe_wrapper_map_;
std::unordered_map<std::string, uint64_t> z_pub_shm_size_map_;
uint64_t shm_init_loan_size_;
}; };
} // namespace aimrt::plugins::zenoh_plugin } // namespace aimrt::plugins::zenoh_plugin

View File

@ -7,7 +7,7 @@
namespace aimrt::plugins::zenoh_plugin { namespace aimrt::plugins::zenoh_plugin {
void ZenohManager::Initialize(const std::string &native_cfg_path) { void ZenohManager::Initialize(const std::string &native_cfg_path, size_t shm_pool_size) {
if (!native_cfg_path.empty() && native_cfg_path.c_str() != nullptr) { if (!native_cfg_path.empty() && native_cfg_path.c_str() != nullptr) {
if (zc_config_from_file(&z_config_, native_cfg_path.c_str()) != Z_OK) { if (zc_config_from_file(&z_config_, native_cfg_path.c_str()) != Z_OK) {
AIMRT_ERROR("Unable to load configuration file: {}", native_cfg_path); AIMRT_ERROR("Unable to load configuration file: {}", native_cfg_path);
@ -22,11 +22,14 @@ void ZenohManager::Initialize(const std::string &native_cfg_path) {
AIMRT_ERROR("Unable to open zenoh session!"); AIMRT_ERROR("Unable to open zenoh session!");
return; return;
} }
shm_pool_size_ = shm_pool_size;
} }
void ZenohManager::Shutdown() { void ZenohManager::Shutdown() {
for (auto ptr : z_pub_registry_) { for (const auto &ptr : z_pub_registry_) {
z_drop(z_move(ptr.second)); auto z_pub = ptr.second.first;
z_drop(z_move(z_pub));
} }
for (auto ptr : z_sub_registry_) { for (auto ptr : z_sub_registry_) {
@ -38,25 +41,34 @@ void ZenohManager::Shutdown() {
z_sub_registry_.clear(); z_sub_registry_.clear();
z_drop(z_move(z_session_)); z_drop(z_move(z_session_));
z_drop(z_move(shm_provider_));
z_drop(z_move(shm_layout_));
} }
void ZenohManager::RegisterPublisher(const std::string &keyexpr) { void ZenohManager::RegisterPublisher(const std::string &keyexpr, bool shm_enabled) {
z_view_keyexpr_t key; z_view_keyexpr_t key;
z_view_keyexpr_from_str(&key, keyexpr.c_str()); z_view_keyexpr_from_str(&key, keyexpr.c_str());
z_owned_publisher_t z_pub; z_owned_publisher_t z_pub;
z_publisher_put_options_default(&z_pub_options_); z_publisher_put_options_default(&z_pub_options_);
if (z_declare_publisher(z_loan(z_session_), &z_pub, z_loan(key), NULL) < 0) { if (z_declare_publisher(z_loan(z_session_), &z_pub, z_loan(key), nullptr) < 0) {
AIMRT_ERROR("Unable to declare Publisher!"); AIMRT_ERROR("Unable to declare Publisher!");
return; return;
} }
z_pub_registry_.emplace(keyexpr, z_pub); z_pub_registry_.emplace(keyexpr, std::make_pair(z_pub, shm_enabled));
AIMRT_TRACE("Publisher with keyexpr: {} registered successfully.", keyexpr.c_str()); AIMRT_TRACE("Publisher with keyexpr: {} registered successfully.", keyexpr.c_str());
// Create shared memory provider
std::lock_guard<std::mutex> lock(z_mutex_);
if (!shm_initialized_.load() && shm_enabled) {
z_memory_layout_new(&shm_layout_, shm_pool_size_, alignment_);
z_posix_shm_provider_new(&shm_provider_, z_loan(shm_layout_));
shm_initialized_.store(true);
}
} }
void data_handler(const z_loaned_sample_t *sample, void *arg) {}
void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc handle) { void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc handle) {
z_view_keyexpr_t key; z_view_keyexpr_t key;
z_view_keyexpr_from_str(&key, keyexpr.c_str()); z_view_keyexpr_from_str(&key, keyexpr.c_str());
@ -74,7 +86,7 @@ void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc
z_owned_subscriber_t z_sub; z_owned_subscriber_t z_sub;
if (z_declare_subscriber(z_loan(z_session_), &z_sub, z_loan(key), z_move(z_callback), NULL) < 0) { if (z_declare_subscriber(z_loan(z_session_), &z_sub, z_loan(key), z_move(z_callback), nullptr) < 0) {
AIMRT_ERROR("Unable to declare Subscriber!"); AIMRT_ERROR("Unable to declare Subscriber!");
return; return;
} }
@ -83,7 +95,7 @@ void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc
AIMRT_TRACE("Subscriber with keyexpr: {} registered successfully.", keyexpr.c_str()); AIMRT_TRACE("Subscriber with keyexpr: {} registered successfully.", keyexpr.c_str());
} }
void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc handle, const std::string &role) { void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc handle, const std::string &role, bool shm_enabled) {
std::string pub_keyexpr; std::string pub_keyexpr;
std::string sub_keyexpr; std::string sub_keyexpr;
@ -98,9 +110,9 @@ void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc han
return; return;
} }
RegisterPublisher(pub_keyexpr); RegisterPublisher(pub_keyexpr, shm_enabled);
RegisterSubscriber(sub_keyexpr, std::move(handle)); RegisterSubscriber(sub_keyexpr, std::move(handle));
AIMRT_INFO("{} with keyexpr: {} registered successfully.", role, keyexpr.c_str()); AIMRT_INFO("{} with keyexpr: {} ,role: {} , shm_enabled: {} is registered successfully.", role, keyexpr.c_str(), role, shm_enabled);
} }
void ZenohManager::Publish(const std::string &topic, char *serialized_data_ptr, uint64_t serialized_data_len) { void ZenohManager::Publish(const std::string &topic, char *serialized_data_ptr, uint64_t serialized_data_len) {
@ -112,8 +124,13 @@ void ZenohManager::Publish(const std::string &topic, char *serialized_data_ptr,
z_owned_bytes_t z_payload; z_owned_bytes_t z_payload;
z_bytes_from_buf(&z_payload, reinterpret_cast<uint8_t *>(serialized_data_ptr), serialized_data_len, NULL, NULL); z_bytes_from_buf(&z_payload, reinterpret_cast<uint8_t *>(serialized_data_ptr), serialized_data_len, nullptr, nullptr);
z_publisher_put(z_loan(z_pub_iter->second), z_move(z_payload), &z_pub_options_);
z_publisher_put(z_loan(z_pub_iter->second.first), z_move(z_payload), &z_pub_options_);
}
std::unique_ptr<std::unordered_map<std::string, std::pair<z_owned_publisher_t, bool>>> ZenohManager::GetPublisherRegisterMap() {
return std::make_unique<std::unordered_map<std::string, std::pair<z_owned_publisher_t, bool>>>(z_pub_registry_);
} }
} // namespace aimrt::plugins::zenoh_plugin } // namespace aimrt::plugins::zenoh_plugin

View File

@ -17,16 +17,23 @@ class ZenohManager {
ZenohManager(const ZenohManager&) = delete; ZenohManager(const ZenohManager&) = delete;
ZenohManager& operator=(const ZenohManager&) = delete; ZenohManager& operator=(const ZenohManager&) = delete;
void Initialize(const std::string& native_cfg_path); void Initialize(const std::string& native_cfg_path, size_t shm_pool_size);
void Shutdown(); void Shutdown();
void RegisterSubscriber(const std::string& keyexpr, MsgHandleFunc handle); void RegisterSubscriber(const std::string& keyexpr, MsgHandleFunc handle);
void RegisterPublisher(const std::string& keyexpr); void RegisterPublisher(const std::string& keyexpr, bool shm_enabled);
void RegisterRpcNode(const std::string& keyexpr, MsgHandleFunc handle, const std::string& role); void RegisterRpcNode(const std::string& keyexpr, MsgHandleFunc handle, const std::string& role, bool shm_enabled);
void Publish(const std::string& topic, char* serialized_data_ptr, uint64_t serialized_data_len); void Publish(const std::string& topic, char* serialized_data_ptr, uint64_t serialized_data_len);
std::unique_ptr<std::unordered_map<std::string, std::pair<z_owned_publisher_t, bool>>> GetPublisherRegisterMap();
public:
z_owned_shm_provider_t shm_provider_;
z_alloc_alignment_t alignment_ = {0};
z_publisher_put_options_t z_pub_options_;
private: private:
static void PrintZenohCgf(z_owned_config_t z_config) { static void PrintZenohCgf(z_owned_config_t z_config) {
z_owned_string_t out_config_string; z_owned_string_t out_config_string;
@ -46,14 +53,21 @@ class ZenohManager {
z_drop(z_move(out_config_string)); z_drop(z_move(out_config_string));
} }
std::unordered_map<std::string, z_owned_publisher_t> z_pub_registry_; std::unordered_map<std::string, std::pair<z_owned_publisher_t, bool>> z_pub_registry_;
std::unordered_map<std::string, z_owned_subscriber_t> z_sub_registry_; std::unordered_map<std::string, z_owned_subscriber_t> z_sub_registry_;
std::vector<std::shared_ptr<MsgHandleFunc>> msg_handle_vec_; std::vector<std::shared_ptr<MsgHandleFunc>> msg_handle_vec_;
z_publisher_put_options_t z_pub_options_;
z_owned_session_t z_session_; z_owned_session_t z_session_;
z_owned_config_t z_config_; z_owned_config_t z_config_;
// shm related
size_t shm_pool_size_;
z_owned_memory_layout_t shm_layout_;
std::atomic_bool shm_initialized_ = false;
std::mutex z_mutex_;
}; };
} // namespace aimrt::plugins::zenoh_plugin } // namespace aimrt::plugins::zenoh_plugin

View File

@ -13,6 +13,8 @@ struct convert<aimrt::plugins::zenoh_plugin::ZenohPlugin::Options> {
node["native_cfg_path"] = rhs.native_cfg_path; node["native_cfg_path"] = rhs.native_cfg_path;
node["limit_domain"] = rhs.limit_domain; node["limit_domain"] = rhs.limit_domain;
node["shm_pool_size"] = rhs.shm_pool_size;
node["shm_init_loan_size"] = rhs.shm_init_loan_size;
return node; return node;
} }
@ -26,6 +28,12 @@ struct convert<aimrt::plugins::zenoh_plugin::ZenohPlugin::Options> {
if (node["limit_domain"]) if (node["limit_domain"])
rhs.limit_domain = '/' + node["limit_domain"].as<std::string>(); rhs.limit_domain = '/' + node["limit_domain"].as<std::string>();
if (node["shm_pool_size"])
rhs.shm_pool_size = node["shm_pool_size"].as<size_t>();
if (node["shm_init_loan_size"])
rhs.shm_init_loan_size = node["shm_init_loan_size"].as<size_t>();
return true; return true;
} }
}; };
@ -46,7 +54,7 @@ bool ZenohPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept {
init_flag_ = true; init_flag_ = true;
// todo remove role // todo remove role
zenoh_manager_ptr_->Initialize(options_.native_cfg_path); zenoh_manager_ptr_->Initialize(options_.native_cfg_path, options_.shm_pool_size);
core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPostInitLog, core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPostInitLog,
[this] { SetPluginLogger(); }); [this] { SetPluginLogger(); });
@ -88,7 +96,7 @@ void ZenohPlugin::SetPluginLogger() {
void ZenohPlugin::RegisterZenohChannelBackend() { void ZenohPlugin::RegisterZenohChannelBackend() {
std::unique_ptr<runtime::core::channel::ChannelBackendBase> zenoh_channel_backend_ptr = std::unique_ptr<runtime::core::channel::ChannelBackendBase> zenoh_channel_backend_ptr =
std::make_unique<ZenohChannelBackend>(zenoh_manager_ptr_, options_.limit_domain); std::make_unique<ZenohChannelBackend>(zenoh_manager_ptr_, options_.limit_domain, options_.shm_init_loan_size);
core_ptr_->GetChannelManager().RegisterChannelBackend(std::move(zenoh_channel_backend_ptr)); core_ptr_->GetChannelManager().RegisterChannelBackend(std::move(zenoh_channel_backend_ptr));
} }

View File

@ -17,6 +17,8 @@ class ZenohPlugin : public AimRTCorePluginBase {
struct Options { struct Options {
std::string native_cfg_path; std::string native_cfg_path;
std::string limit_domain; std::string limit_domain;
size_t shm_pool_size = static_cast<size_t>(1024 * 1024 * 10); // default: 10 MB
size_t shm_init_loan_size = 1024; // default: 1 KB
}; };
public: public:

View File

@ -13,6 +13,22 @@ struct convert<aimrt::plugins::zenoh_plugin::ZenohRpcBackend::Options> {
node["timeout_executor"] = rhs.timeout_executor; node["timeout_executor"] = rhs.timeout_executor;
node["clients_options"] = YAML::Node();
for (const auto& client_options : rhs.clients_options) {
Node client_options_node;
client_options_node["func_name"] = client_options.func_name;
client_options_node["shm_enabled"] = client_options.shm_enabled;
node["clients_options"].push_back(client_options_node);
}
node["servers_options"] = YAML::Node();
for (const auto& server_options : rhs.servers_options) {
Node server_options_node;
server_options_node["func_name"] = server_options.func_name;
server_options_node["shm_enabled"] = server_options.shm_enabled;
node["servers_options"].push_back(server_options_node);
}
return node; return node;
} }
@ -20,6 +36,28 @@ struct convert<aimrt::plugins::zenoh_plugin::ZenohRpcBackend::Options> {
if (node["timeout_executor"]) if (node["timeout_executor"])
rhs.timeout_executor = node["timeout_executor"].as<std::string>(); rhs.timeout_executor = node["timeout_executor"].as<std::string>();
if (node["clients_options"] && node["clients_options"].IsSequence()) {
for (const auto& client_options_node : node["clients_options"]) {
auto client_options = Options::ClientOptions{
.func_name = client_options_node["func_name"].as<std::string>(),
.shm_enabled = client_options_node["shm_enabled"].as<bool>(),
};
rhs.clients_options.emplace_back(std::move(client_options));
}
}
if (node["servers_options"] && node["servers_options"].IsSequence()) {
for (const auto& server_options_node : node["servers_options"]) {
auto server_options = Options::ServerOptions{
.func_name = server_options_node["func_name"].as<std::string>(),
.shm_enabled = server_options_node["shm_enabled"].as<bool>(),
};
rhs.servers_options.emplace_back(std::move(server_options));
}
}
return true; return true;
} }
}; };
@ -86,6 +124,24 @@ bool ZenohRpcBackend::RegisterServiceFunc(
namespace util = aimrt::common::util; namespace util = aimrt::common::util;
const auto& info = service_func_wrapper.info; const auto& info = service_func_wrapper.info;
bool shm_enabled = false;
auto find_option_itr = std::find_if(
options_.servers_options.begin(), options_.servers_options.end(),
[func_name = GetRealFuncName(info.func_name)](const Options::ServerOptions& server_option) {
try {
return std::regex_match(func_name.begin(), func_name.end(), std::regex(server_option.func_name, std::regex::ECMAScript));
} catch (const std::exception& e) {
AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}",
server_option.func_name, func_name, e.what());
return false;
}
});
if (find_option_itr != options_.servers_options.end()) {
shm_enabled = find_option_itr->shm_enabled;
}
std::string pattern = std::string("aimrt_rpc/") + std::string pattern = std::string("aimrt_rpc/") +
util::UrlEncode(GetRealFuncName(info.func_name)) + util::UrlEncode(GetRealFuncName(info.func_name)) +
limit_domain_; limit_domain_;
@ -101,14 +157,15 @@ bool ZenohRpcBackend::RegisterServiceFunc(
auto ctx_ptr = std::make_shared<aimrt::rpc::Context>(aimrt_rpc_context_type_t::AIMRT_RPC_SERVER_CONTEXT); auto ctx_ptr = std::make_shared<aimrt::rpc::Context>(aimrt_rpc_context_type_t::AIMRT_RPC_SERVER_CONTEXT);
service_invoke_wrapper_ptr->ctx_ref = ctx_ptr; service_invoke_wrapper_ptr->ctx_ref = ctx_ptr;
// read data
const z_loaned_bytes_t* payload = z_sample_payload(message); const z_loaned_bytes_t* payload = z_sample_payload(message);
size_t serialized_size = z_bytes_len(payload); size_t serialized_size = z_bytes_len(payload);
z_bytes_reader_t reader = z_bytes_get_reader(payload); z_bytes_reader_t reader = z_bytes_get_reader(payload);
std::vector<char> serialized_data(serialized_size); std::vector<char> serialized_data(serialized_size);
if (z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size) >= 0) { // read data from payload
util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); auto ret = z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size);
if (ret >= 0) {
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
// deserialize type // deserialize type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
@ -165,6 +222,134 @@ bool ZenohRpcBackend::RegisterServiceFunc(
return; return;
} }
std::string node_pub_topic = "rsp/" + pattern;
// find node's publisher with pattern
auto z_node_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap();
auto z_node_pub_iter = z_node_pub_registry->find(node_pub_topic);
if (z_node_pub_iter == z_node_pub_registry->end()) [[unlikely]] {
AIMRT_ERROR("Can not find publisher with pattern: {}", pattern);
return;
}
auto z_node_pub = z_node_pub_iter->second;
// shm enabled
if (z_node_pub.second) {
unsigned char* z_pub_loaned_shm_ptr = nullptr;
std::shared_ptr<aimrt::util::BufferArrayView> buffer_array_cache_ptr = nullptr;
bool is_shm_loan_size_enough = true;
bool is_shm_pool_size_enough = true;
uint64_t msg_size = 0;
size_t header_len = 0;
z_buf_layout_alloc_result_t loan_result;
do {
// release old shm
if (z_pub_loaned_shm_ptr != nullptr) {
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
}
// loan a new size shm
uint64_t loan_size = z_node_shm_size_map_[node_pub_topic];
z_shm_provider_alloc_gc_defrag(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_);
// if shm pool is not enough, use net buffer instead
if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
is_shm_pool_size_enough = false;
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead.");
break;
}
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// write req id on loaned shm
buf_oper.SetBuffer(req_id_buf, sizeof(req_id_buf));
// write an zero on loaned shm
buf_oper.SetUint32(0);
header_len = 1 + serialization_type.size() + 4 + 4;
// write msg on loaned shm should start at the (FIXED_LEN + header_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeRspSupportedZenoh(*service_invoke_wrapper_ptr, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle()));
msg_size = result.second;
buffer_array_cache_ptr = result.first;
if (buffer_array_cache_ptr == nullptr) {
// in this case means no cache is set, then do nomal serialization(if size is small will throw exception)
is_shm_loan_size_enough = true;
} else {
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
}
}
} catch (const std::exception& e) {
if (!z_allocator.IsShmEnough()) {
// the shm is not enough, need to expand a double size
z_node_shm_size_map_[node_pub_topic] = z_node_shm_size_map_[node_pub_topic] << 1;
is_shm_loan_size_enough = false;
} else {
AIMRT_ERROR(
"Msg serialization failed, serialization_type {}, pattern: {}, exception: {}",
serialization_type, pattern, e.what());
return;
}
}
}
} while (!is_shm_loan_size_enough);
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
}
buffer_array_cache_ptr = nullptr;
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
}
z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
// collect garbage and defragment shared memory, whose reference counting is zero
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_TRACE("Zenoh Invoke req with '{}'", pattern);
return;
}
}
// shm disabled
// serivice rsp serialize // serivice rsp serialize
auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeRspWithCache( auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeRspWithCache(
*service_invoke_wrapper_ptr, serialization_type); *service_invoke_wrapper_ptr, serialization_type);
@ -178,21 +363,36 @@ bool ZenohRpcBackend::RegisterServiceFunc(
const size_t buffer_array_len = buffer_array_view_ptr->Size(); const size_t buffer_array_len = buffer_array_view_ptr->Size();
size_t rsp_size = buffer_array_view_ptr->BufferSize(); size_t rsp_size = buffer_array_view_ptr->BufferSize();
size_t pkg_size = 1 + serialization_type.size() + 4 + 4 + rsp_size; size_t z_data_size = 1 + serialization_type.size() + 4 + 4 + rsp_size;
size_t pkg_size = z_data_size + kFixedLen;
// get buf to store data
std::vector<char> msg_buf_vec(pkg_size); std::vector<char> msg_buf_vec(pkg_size);
util::BufferOperator buf_oper(msg_buf_vec.data(), msg_buf_vec.size()); util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen);
// full serialize type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// full req_id
buf_oper.SetBuffer(req_id_buf, sizeof(req_id_buf)); buf_oper.SetBuffer(req_id_buf, sizeof(req_id_buf));
// full an 0
buf_oper.SetUint32(0); buf_oper.SetUint32(0);
// full rsp_size
for (size_t ii = 0; ii < buffer_array_len; ++ii) { for (size_t ii = 0; ii < buffer_array_len; ++ii) {
buf_oper.SetBuffer( buf_oper.SetBuffer(
static_cast<const char*>(buffer_array_data[ii].data), static_cast<const char*>(buffer_array_data[ii].data),
buffer_array_data[ii].len); buffer_array_data[ii].len);
} }
zenoh_manager_ptr_->Publish("rsp/" + pattern, msg_buf_vec.data(), pkg_size); // server send rsp
z_owned_bytes_t z_payload;
z_bytes_from_buf(&z_payload, reinterpret_cast<uint8_t*>(msg_buf_vec.data()), pkg_size, nullptr, nullptr);
z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
}; };
// call service // call service
service_func_wrapper.service_func(service_invoke_wrapper_ptr); service_func_wrapper.service_func(service_invoke_wrapper_ptr);
@ -204,7 +404,8 @@ bool ZenohRpcBackend::RegisterServiceFunc(
AIMRT_WARN("Handle zenoh rpc msg failed, exception info: {}", e.what()); AIMRT_WARN("Handle zenoh rpc msg failed, exception info: {}", e.what());
} }
}; };
zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "server"); zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "server", shm_enabled);
z_node_shm_size_map_["rsp/" + pattern] = shm_init_loan_size_;
return true; return true;
} catch (const std::exception& e) { } catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what()); AIMRT_ERROR("{}", e.what());
@ -224,6 +425,23 @@ bool ZenohRpcBackend::RegisterClientFunc(
const auto& info = client_func_wrapper.info; const auto& info = client_func_wrapper.info;
bool shm_enabled = false;
auto find_option_itr = std::find_if(
options_.clients_options.begin(), options_.clients_options.end(),
[func_name = GetRealFuncName(info.func_name)](const Options::ClientOptions& client_option) {
try {
return std::regex_match(func_name.begin(), func_name.end(), std::regex(client_option.func_name, std::regex::ECMAScript));
} catch (const std::exception& e) {
AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}",
client_option.func_name, func_name, e.what());
return false;
}
});
if (find_option_itr != options_.clients_options.end()) {
shm_enabled = find_option_itr->shm_enabled;
}
std::string pattern = std::string("aimrt_rpc/") + std::string pattern = std::string("aimrt_rpc/") +
util::UrlEncode(GetRealFuncName(info.func_name)) + util::UrlEncode(GetRealFuncName(info.func_name)) +
limit_domain_; limit_domain_;
@ -242,7 +460,7 @@ bool ZenohRpcBackend::RegisterClientFunc(
return; return;
} }
util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
uint32_t req_id = buf_oper.GetUint32(); uint32_t req_id = buf_oper.GetUint32();
@ -294,7 +512,8 @@ bool ZenohRpcBackend::RegisterClientFunc(
client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_BACKEND_INTERNAL_ERROR)); client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_BACKEND_INTERNAL_ERROR));
}; };
zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "client"); zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "client", shm_enabled);
z_node_shm_size_map_["req/" + pattern] = shm_init_loan_size_;
} catch (const std::exception& e) { } catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what()); AIMRT_ERROR("{}", e.what());
return false; return false;
@ -312,15 +531,28 @@ void ZenohRpcBackend::Invoke(
} }
namespace util = aimrt::common::util; namespace util = aimrt::common::util;
const auto& info = client_invoke_wrapper_ptr->info; const auto& info = client_invoke_wrapper_ptr->info;
std::string pattern = std::string("aimrt_rpc/") + std::string pattern = std::string("aimrt_rpc/") +
util::UrlEncode(GetRealFuncName(info.func_name)) + util::UrlEncode(GetRealFuncName(info.func_name)) +
limit_domain_; limit_domain_;
std::string node_pub_topic = "req/" + pattern;
// find node's publisher with pattern
auto z_node_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap();
auto z_node_pub_iter = z_node_pub_registry->find(node_pub_topic);
if (z_node_pub_iter == z_node_pub_registry->end()) [[unlikely]] {
AIMRT_ERROR("Can not find publisher with pattern: {}", pattern);
return;
}
auto z_node_pub = z_node_pub_iter->second;
// get req id
uint32_t cur_req_id = req_id_++; uint32_t cur_req_id = req_id_++;
// get serialization type
auto serialization_type = auto serialization_type =
client_invoke_wrapper_ptr->ctx_ref.GetMetaValue(AIMRT_RPC_CONTEXT_KEY_SERIALIZATION_TYPE); client_invoke_wrapper_ptr->ctx_ref.GetMetaValue(AIMRT_RPC_CONTEXT_KEY_SERIALIZATION_TYPE);
@ -329,20 +561,7 @@ void ZenohRpcBackend::Invoke(
return; return;
} }
// client req serialize // get meta data
auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache(
*client_invoke_wrapper_ptr, serialization_type);
if (!buffer_array_view_ptr) [[unlikely]] {
// serialize failed
client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_SERIALIZATION_FAILED));
return;
}
const auto* buffer_array_data = buffer_array_view_ptr->Data();
const size_t buffer_array_len = buffer_array_view_ptr->Size();
size_t req_size = buffer_array_view_ptr->BufferSize();
// context
const auto& keys = client_invoke_wrapper_ptr->ctx_ref.GetMetaKeys(); const auto& keys = client_invoke_wrapper_ptr->ctx_ref.GetMetaKeys();
if (keys.size() > 255) [[unlikely]] { if (keys.size() > 255) [[unlikely]] {
AIMRT_WARN("Too much context meta, require less than 255, but actually {}.", keys.size()); AIMRT_WARN("Too much context meta, require less than 255, but actually {}.", keys.size());
@ -360,16 +579,11 @@ void ZenohRpcBackend::Invoke(
context_meta_kv_size += (2 + val.size()); context_meta_kv_size += (2 + val.size());
context_meta_kv.emplace_back(val); context_meta_kv.emplace_back(val);
} }
// padding zenoh pkg
size_t z_pkg_size = 1 + serialization_type.size() +
1 + pattern.size() +
4 +
context_meta_kv_size +
req_size;
auto timeout = client_invoke_wrapper_ptr->ctx_ref.Timeout(); auto timeout = client_invoke_wrapper_ptr->ctx_ref.Timeout();
auto record_ptr = client_invoke_wrapper_ptr; auto record_ptr = client_invoke_wrapper_ptr;
// record this req with timeout
bool ret = client_tool_ptr_->Record(cur_req_id, timeout, std::move(record_ptr)); bool ret = client_tool_ptr_->Record(cur_req_id, timeout, std::move(record_ptr));
if (!ret) [[unlikely]] { if (!ret) [[unlikely]] {
@ -378,27 +592,185 @@ void ZenohRpcBackend::Invoke(
return; return;
} }
std::vector<char> msg_buf_vec(z_pkg_size); // shm enabled
util::BufferOperator buf_oper(msg_buf_vec.data(), msg_buf_vec.size()); if (z_node_pub.second) {
unsigned char* z_pub_loaned_shm_ptr = nullptr;
std::shared_ptr<aimrt::util::BufferArrayView> buffer_array_cache_ptr = nullptr;
bool is_shm_loan_size_enough = true;
bool is_shm_pool_size_enough = true;
uint64_t msg_size = 0;
size_t header_len = 0;
z_buf_layout_alloc_result_t loan_result;
do {
// release old shm
if (z_pub_loaned_shm_ptr != nullptr) {
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
}
// loan a new size shm
uint64_t loan_size = z_node_shm_size_map_[node_pub_topic];
z_shm_provider_alloc_gc_defrag(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_);
// if shm pool is not enough, use net buffer instead
if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
is_shm_pool_size_enough = false;
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead.");
break;
}
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// write pattern on loaned shm
buf_oper.SetString(pattern, util::BufferLenType::kUInt8);
// write req id on loaned shm
buf_oper.SetUint32(cur_req_id);
// write context meta on loaned shm
buf_oper.SetUint8(static_cast<uint8_t>(keys.size()));
for (const auto& s : context_meta_kv) {
buf_oper.SetString(s, util::BufferLenType::kUInt16);
}
header_len = 1 + serialization_type.size() +
1 + pattern.size() +
4 +
context_meta_kv_size;
// write msg on loaned shm should start at the (FIXED_LEN + header_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeReqSupportedZenoh(*client_invoke_wrapper_ptr, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle()));
msg_size = result.second;
buffer_array_cache_ptr = result.first;
if (buffer_array_cache_ptr == nullptr) {
// in this case means no cache is set, then do nomal serialization(if size is small will throw exception)
is_shm_loan_size_enough = true;
} else {
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
}
}
} catch (const std::exception& e) {
if (!z_allocator.IsShmEnough()) {
// the shm is not enough, need to expand a double size
z_node_shm_size_map_[node_pub_topic] = z_node_shm_size_map_[node_pub_topic] << 1;
is_shm_loan_size_enough = false;
} else {
AIMRT_ERROR(
"Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, func_name: {}, exception: {}",
serialization_type, info.pkg_path, info.module_name, info.func_name, e.what());
return;
}
}
}
} while (!is_shm_loan_size_enough);
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
}
buffer_array_cache_ptr = nullptr;
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
}
z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
// collect garbage and defragment shared memory, whose reference counting is zero
z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_));
z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_));
AIMRT_TRACE("Zenoh Invoke req with '{}'", pattern);
return;
}
}
// shm disabled
// client req serialize
auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache(
*client_invoke_wrapper_ptr, serialization_type);
if (!buffer_array_view_ptr) [[unlikely]] {
// serialize failed
client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_SERIALIZATION_FAILED));
return;
}
const auto* buffer_array_data = buffer_array_view_ptr->Data();
const size_t buffer_array_len = buffer_array_view_ptr->Size();
size_t req_size = buffer_array_view_ptr->BufferSize();
// padding zenoh pkg (serialization_type + pattern + req_id + context_meta + req)
size_t z_data_size = 1 + serialization_type.size() +
1 + pattern.size() +
4 +
context_meta_kv_size +
req_size;
size_t pkg_size = z_data_size + kFixedLen;
// create buffer for serialization
std::vector<char> msg_buf_vec(pkg_size);
util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen);
// full serialization_type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
// full pattern
buf_oper.SetString(pattern, util::BufferLenType::kUInt8); buf_oper.SetString(pattern, util::BufferLenType::kUInt8);
// full req id
buf_oper.SetUint32(cur_req_id); buf_oper.SetUint32(cur_req_id);
// full context meta
buf_oper.SetUint8(static_cast<uint8_t>(keys.size())); buf_oper.SetUint8(static_cast<uint8_t>(keys.size()));
for (const auto& s : context_meta_kv) { for (const auto& s : context_meta_kv) {
buf_oper.SetString(s, util::BufferLenType::kUInt16); buf_oper.SetString(s, util::BufferLenType::kUInt16);
} }
// data // full client req
for (size_t ii = 0; ii < buffer_array_len; ++ii) { for (size_t ii = 0; ii < buffer_array_len; ++ii) {
buf_oper.SetBuffer( buf_oper.SetBuffer(
static_cast<const char*>(buffer_array_data[ii].data), static_cast<const char*>(buffer_array_data[ii].data),
buffer_array_data[ii].len); buffer_array_data[ii].len);
} }
// send data // send req
zenoh_manager_ptr_->Publish("req/" + pattern, msg_buf_vec.data(), z_pkg_size); z_owned_bytes_t z_payload;
z_bytes_from_buf(&z_payload, reinterpret_cast<uint8_t*>(msg_buf_vec.data()), pkg_size, nullptr, nullptr);
z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_);
} catch (const std::exception& e) { } catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what()); AIMRT_ERROR("{}", e.what());

View File

@ -11,6 +11,8 @@
#include "util/buffer_util.h" #include "util/buffer_util.h"
#include "util/url_encode.h" #include "util/url_encode.h"
#include "zenoh.h" #include "zenoh.h"
#include "zenoh_plugin/util.h"
#include "zenoh_plugin/zenoh_buffer_array_allocator.h"
#include "zenoh_plugin/zenoh_manager.h" #include "zenoh_plugin/zenoh_manager.h"
namespace aimrt::plugins::zenoh_plugin { namespace aimrt::plugins::zenoh_plugin {
@ -18,6 +20,17 @@ class ZenohRpcBackend : public runtime::core::rpc::RpcBackendBase {
public: public:
struct Options { struct Options {
std::string timeout_executor; std::string timeout_executor;
struct ClientOptions {
std::string func_name;
bool shm_enabled = false;
};
std::vector<ClientOptions> clients_options;
struct ServerOptions {
std::string func_name;
bool shm_enabled = false;
};
std::vector<ServerOptions> servers_options;
}; };
public: public:
@ -74,6 +87,10 @@ class ZenohRpcBackend : public runtime::core::rpc::RpcBackendBase {
std::string limit_domain_; std::string limit_domain_;
std::unique_ptr<runtime::core::util::RpcClientTool<std::shared_ptr<runtime::core::rpc::InvokeWrapper>>> client_tool_ptr_; std::unique_ptr<runtime::core::util::RpcClientTool<std::shared_ptr<runtime::core::rpc::InvokeWrapper>>> client_tool_ptr_;
std::unordered_map<std::string, uint64_t> z_node_shm_size_map_;
uint64_t shm_init_loan_size_ = 1024;
}; };
} // namespace aimrt::plugins::zenoh_plugin } // namespace aimrt::plugins::zenoh_plugin