From 062ffad43126dde25d7499b44eac1249f28d5f16 Mon Sep 17 00:00:00 2001 From: han J <89577994+owny990312@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:42:59 +0800 Subject: [PATCH] feat: zenoh plugin with shm (#96) * add zenoh-shm API * use zenoh shm-api for channel and rpc * set z_alloc_result as a local variable. * add zenoh_buffer_array_allocator * avoid copy opreation when pub data with shm * if shm pool size is not enough, use net buffer instead * add z_pub_shm_size_map_ to store topic-loan_size * little fix * remove client send data 's copying * remove server send data 's copy * add doc * change benchamrk item * minor modification --------- Co-authored-by: hanjun --- cmake/GetZenoh.cmake | 6 + document/sphinx-cn/release_notes/v0_9_0.md | 9 +- .../tutorials/plugins/zenoh_plugin.md | 56 ++- src/examples/plugins/zenoh_plugin/README.md | 114 ++++- ...gin_pb_chn_benchmark_pub_with_shm_cfg.yaml | 57 +++ ...gin_pb_chn_benchmark_sub_with_shm_cfg.yaml | 32 ++ ..._zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml | 42 ++ ..._zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml | 30 ++ ..._pb_rpc_benchmark_client_with_shm_cfg.yaml | 65 +++ ...noh_plugin_pb_rpc_client_with_shm_cfg.yaml | 46 ++ ...lugins_zenoh_plugin_pb_rpc_server_cfg.yaml | 2 +- ...noh_plugin_pb_rpc_server_with_shm_cfg.yaml | 33 ++ ...oh_plugin_pb_chn_benchmark_pub_with_shm.sh | 3 + ...oh_plugin_pb_chn_benchmark_sub_with_shm.sh | 3 + ...lugins_zenoh_plugin_pb_chn_pub_with_shm.sh | 3 + ...lugins_zenoh_plugin_pb_chn_sub_with_shm.sh | 3 + ...plugin_pb_rpc_benchmark_client_with_shm.sh | 3 + ...ins_zenoh_plugin_pb_rpc_client_with_shm.sh | 3 + ...ins_zenoh_plugin_pb_rpc_server_with_shm.sh | 3 + src/plugins/zenoh_plugin/util.h | 80 ++++ .../zenoh_buffer_array_allocator.h | 98 ++++ .../zenoh_plugin/zenoh_channel_backend.cc | 228 ++++++++- .../zenoh_plugin/zenoh_channel_backend.h | 16 +- src/plugins/zenoh_plugin/zenoh_manager.cc | 43 +- src/plugins/zenoh_plugin/zenoh_manager.h | 24 +- src/plugins/zenoh_plugin/zenoh_plugin.cc | 12 +- src/plugins/zenoh_plugin/zenoh_plugin.h | 2 + src/plugins/zenoh_plugin/zenoh_rpc_backend.cc | 442 ++++++++++++++++-- src/plugins/zenoh_plugin/zenoh_rpc_backend.h | 17 + 29 files changed, 1378 insertions(+), 97 deletions(-) create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml create mode 100644 src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh create mode 100755 src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh create mode 100644 src/plugins/zenoh_plugin/util.h create mode 100644 src/plugins/zenoh_plugin/zenoh_buffer_array_allocator.h diff --git a/cmake/GetZenoh.cmake b/cmake/GetZenoh.cmake index d29e73a1f..c803aee57 100644 --- a/cmake/GetZenoh.cmake +++ b/cmake/GetZenoh.cmake @@ -27,6 +27,12 @@ endif() function(get_zenohc) FetchContent_GetProperties(zenohc) if(NOT zenohc_POPULATED) + set(ZENOHC_BUILD_WITH_UNSTABLE_API + TRUE + CACHE BOOL "Enable unstable API") + set(ZENOHC_BUILD_WITH_SHARED_MEMORY + TRUE + CACHE BOOL "Enable shared memory") FetchContent_MakeAvailable(zenohc) endif() endfunction() diff --git a/document/sphinx-cn/release_notes/v0_9_0.md b/document/sphinx-cn/release_notes/v0_9_0.md index e6ae5588a..1423c6a9a 100644 --- a/document/sphinx-cn/release_notes/v0_9_0.md +++ b/document/sphinx-cn/release_notes/v0_9_0.md @@ -3,9 +3,9 @@ **重要修改**: - 优化了 zenoh 插件: - - 更新 zenohc 库至 1.0.0.11 版本; - 添加了 zenoh rpc 后端; - - 现在可以传入 zenoh 原生配置; + - zenoh 插件支持网络通信和共享内存两种通信方式; + - 可以传入 zenoh 原生配置进行更丰富的配置; - 新增了第三方库 asio,runtime::core 不再引用 boost,改为引用独立的 asio 库,以减轻依赖; - 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件; - 新增 Echo 插件,用于回显消息; @@ -27,6 +27,7 @@ - 支持日志自定义输出格式; - 支持日志定期主动落盘操作; - grpc 插件支持 ros2 消息以及 json 序列化格式; -- mqtt 新增配置项以支持 ssl/tls 加密传输; -- mqtt 插件在broker未启动时,会自动重试异步连接; +- mqtt 新增配置项以支持 ssl/tls 单向认证/双向认证的加密传输; +- mqtt 插件在broker未启动时,会自动重试异步连接, 并提供重连间隔配置项; +- ros2 插件支持自定义 rpc 服务名称; - asio thread/strand 执行器现在支持是否使用 system clock; diff --git a/document/sphinx-cn/tutorials/plugins/zenoh_plugin.md b/document/sphinx-cn/tutorials/plugins/zenoh_plugin.md index 7d839343d..89bcd77ae 100644 --- a/document/sphinx-cn/tutorials/plugins/zenoh_plugin.md +++ b/document/sphinx-cn/tutorials/plugins/zenoh_plugin.md @@ -13,21 +13,26 @@ - `服务发现`机制的通信系统; - 灵活的网络拓扑结构; - 低延迟、高吞吐量的网络通信和数据传输; - +- SHM 和 非 SHM 两种传输模式; +- 此插件为 AimRT 提供以下组件: - `zenoh` 类型 Rpc 后端 - `zenoh` 类型 Channel 后端 - 插件的配置项如下: -| 节点 | 类型 | 是否可选 | 默认值 | 作用 | -| :-------------: | :----: | :------: | :----: | :-------------------------: | -| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 | -| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 | +| 节点 | 类型 | 是否可选 | 默认值 | 作用 | +| :----------------: | :----: | :------: | :----: | :-----------------------------: | +| shm_pool_size | int | 可选 | 10 MB | 共享内存池的大小 , 单位:B | +| shm_init_loan_size | int | 可选 | 1 KB | 共享内存初始借用大小 , 单位:B | +| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 | +| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 | + 关于**zenoh_plugin**的配置,使用注意点如下: +- `shm_pool_size` 表示共享内存池的大小,单位:B,默认值是 10 MB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。 如果剩余共享内存不足以满足数据传输需求,则会自动切换到非共享内存传输模式 。 +- `shm_init_loan_size` 表示向共享内存池初始借用大小,单位:B,默认值是 1 KB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。 - `native_cfg_path` 表示 zenoh 提供的原生配置文件的路径,可以通过配置该文件来灵活配置 zenoh 的网络结构。如果不填写则默认使用 zenoh 官方提供的默认配置,具体配置内容请参考 zenoh 官方关于[configuration](https://zenoh.io/docs/manual/configuration/)的说明,您也可以直接修改 {{ '[zenoh_native_config.json5]({}/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/zenoh_native_config.json5)'.format(code_site_root_path_url) }}文件来自定义配置,这里列举几个常用的配置项: | 配置项 | 作用 | zenoh_native_config中的配置值 | @@ -67,9 +72,17 @@ aimrt: ## zenoh 类型 Rpc 后端 `zenoh`类型的 Rpc后端是**zenoh_plugin**中提供的一种 Rpc 后端,主要用来构建请求-响应模型。其所有的配置项如下: -| 节点 | 类型 | 是否可选 | 默认值 | 作用 | -| ---------------- | ------ | -------- | ------ | ------------------------------------ | -| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 | +| 节点 | 类型 | 是否可选 | 默认值 | 作用 | +| ------------------------------ | ------ | -------- | ------ | ------------------------------------ | +| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 | +| clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 | +| clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 | +| clients_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 | +| servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 | +| servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 | +| servers_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 | + +注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。 以下是一个简单的客户端的示例: @@ -81,6 +94,7 @@ aimrt: path: ./libaimrt_zenoh_plugin.so options: native_cfg_path: ./cfg/zenoh_native_config.json5 + shm_pool_size: 10240 executor: executors: - name: timeout_handle @@ -92,6 +106,9 @@ aimrt: - type: zenoh options: timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + shm_enabled: false clients_options: - func_name: "(.*)" enable_backends: [zenoh] @@ -112,6 +129,11 @@ aimrt: rpc: backends: - type: zenoh + options: + timeout_executor: timeout_handle + servers_options: + - func_name: "(.*)" + shm_enabled: true servers_options: - func_name: "(.*)" enable_backends: [zenoh] @@ -175,7 +197,15 @@ Server -> Client 的 Zenoh 数据包格式整体分 4 段: ## zenoh 类型 Channel 后端 -`zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端,主要用来构建发布和订阅模型。 +`zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端,主要用来构建发布和订阅模型。其所有的配置项如下: + +| 节点 | 类型 | 是否可选 | 默认值 | 作用 | +| --------------------------------- | ------ | -------- | ------ | ------------------------------- | +| pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 | +| pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 | +| pub_topics_options[i].shm_enabled | bool | 必选 | false | Publish 端 是否使用共享内存通信 | + +注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。 以下是一个简单的发布端的示例: ```yaml @@ -185,10 +215,14 @@ aimrt: - name: zenoh_plugin path: ./libaimrt_zenoh_plugin.so options: - native_cfg_path: ./cfg/zenoh_native_config.json5 + shm_pool_size: 1024 channel: backends: - type: zenoh + options: + pub_topics_options: + - topic_name: "(.*)" + shm_enabled: false pub_topics_options: - topic_name: "(.*)" enable_backends: [zenoh] diff --git a/src/examples/plugins/zenoh_plugin/README.md b/src/examples/plugins/zenoh_plugin/README.md index 95a8f14b7..8c75baca9 100644 --- a/src/examples/plugins/zenoh_plugin/README.md +++ b/src/examples/plugins/zenoh_plugin/README.md @@ -118,4 +118,116 @@ 说明: -- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议; \ No newline at end of file +- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议; + +## protobuf channel with shared-memory + +一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括: +- 如何在配置文件中加载**zenoh_plugin**; +- 如何使用 zenoh 类型的 channel 后端; +- 如何使用共享内存进行通信; + + +核心代码: +- [event.proto](../../../protocols/example/event.proto) +- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc) +- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc) + + +配置文件: +- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml) +- [examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml) + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境); +- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空; +- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh`脚本启动订阅端(sub 进程); +- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端(pub 进程); +- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息; +- 分别在两个终端键入`ctrl-c`停止对应进程; + + +说明: +- 此示例创建了以下两个模块: + - `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息; + - `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息; +- 此示例将 `NormalPublisherModule` 和 `NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg` 和 `pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中; +- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信; + + +## protobuf rpc with shared-memory + +一个基于 protobuf 协议、协程型接口与 zenoh 后端的 rpc 示例,演示内容包括: +- 如何在配置文件中加载**zenoh_plugin**; +- 如何使用 zenoh 类型的 rpc 后端; +- 如何使用共享内存进行通信; + +核心代码: +- [rpc.proto](../../../protocols/example/rpc.proto) +- [normal_rpc_co_client_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc) +- [normal_rpc_co_server_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc) +- [service.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/service.cc) + + +配置文件: +- [examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml) +- [examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml) + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境); +- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空; +- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh`脚本启动订阅端(srv 进程); +- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh`脚本启动发布端(cli 进程); +- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息; +- 分别在两个终端键入`ctrl-c`停止对应进程; + +说明: +- 此示例创建了以下两个模块: + - `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService` 发起 RPC 请求; + - `NormalRpcCoServerModule`:会注册 `ExampleService` 服务端,通过协程 Server 接口,提供 echo 功能; +- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时; +- 此示例将 `NormalRpcCoClientModule` 和 `NormalRpcCoServerModule` 分别集成到 `pb_rpc_client_pkg` 和 `pb_rpc_server_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 srv 和 cli 进程中; +- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 rpc 后端进行通信,此外还在客户端配置了 `timeout_handle` 执行器作为超时执行器; + + +## protobuf channel with shared-memory and network + +一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括: +- 如何在配置文件中加载**zenoh_plugin**; +- 如何使用 zenoh 类型的 channel 后端; +- 如何使用共享内存和网络进行混合通信; + + +核心代码: +- [event.proto](../../../protocols/example/event.proto) +- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc) +- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc) + + +配置文件: +- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml) +- [examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml) + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境); +- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空; +- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub.sh`脚本启动订阅端(sub 进程); +- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端(pub 进程); +- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息; +- 分别在两个终端键入`ctrl-c`停止对应进程; + + +说明: +- 此示例创建了以下两个模块: + - `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息; + - `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息; +- 此示例将 `NormalPublisherModule` 和 `NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg` 和 `pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中; +- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信; +- zenoh 插件配置的共享内存数据一旦超过其作用范围,就会自动切换到网络传输模式。 用户可尝试分别在一台主机和两台主机分别执行上述步骤,观察到不同主机间的通信效果: + - 在一台主机上运行的发布端和订阅端,可观察到共享内存的通信效果; + - 在两台主机上运行的发布端和订阅端,可观察到网络的通信效果, 尽管发布端使用的共享内存方案; + + \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml new file mode 100644 index 000000000..fb2bea941 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml @@ -0,0 +1,57 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + shm_pool_size: 10240000 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: publish_control_executor + type: simple_thread + - name: publish_executor_0 + type: simple_thread + - name: publish_executor_1 + type: simple_thread + - name: publish_executor_2 + type: simple_thread + - name: publish_executor_3 + type: simple_thread + channel: + backends: + - type: zenoh + options: + pub_topics_options: + - topic_name: "(.*)" + shm_enabled: true + pub_topics_options: + - topic_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_chn_pub_pkg.so + enable_modules: [BenchmarkPublisherModule] + modules: + - name: BenchmarkPublisherModule + log_lvl: INFO + +# Module custom configuration +BenchmarkPublisherModule: + max_topic_number: 4 + bench_plans: + - channel_frq: 1000 + msg_size: 512 + topic_number: 4 + msg_count: 5000 + - channel_frq: 1000 + msg_size: 4096 + topic_number: 1 + msg_count: 1000 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml new file mode 100644 index 000000000..a3c7abfb5 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml @@ -0,0 +1,32 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + channel: + backends: + - type: zenoh + sub_topics_options: + - topic_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_chn_sub_pkg.so + enable_modules: [BenchmarkSubscriberModule] + modules: + - name: BenchmarkSubscriberModule + log_lvl: INFO + +# Module custom configuration +BenchmarkSubscriberModule: + max_topic_number: 4 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml new file mode 100644 index 000000000..d5bf79407 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml @@ -0,0 +1,42 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + shm_pool_size: 10240 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 2 + channel: + backends: + - type: zenoh + options: + pub_topics_options: + - topic_name: "(.*)" + shm_enabled: true + pub_topics_options: + - topic_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_chn_pub_pkg.so + enable_modules: [NormalPublisherModule] + modules: + - name: NormalPublisherModule + log_lvl: INFO + +# Module custom configuration +NormalPublisherModule: + topic_name: test_topic + channel_frq: 0.5 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml new file mode 100644 index 000000000..9cb5b8458 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml @@ -0,0 +1,30 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + channel: + backends: + - type: zenoh + sub_topics_options: + - topic_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_chn_sub_pkg.so + enable_modules: [NormalSubscriberModule] + modules: + - name: NormalSubscriberModule + log_lvl: INFO + +# Module custom configuration +NormalSubscriberModule: + topic_name: test_topic diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml new file mode 100644 index 000000000..aa5d860a3 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml @@ -0,0 +1,65 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + shm_pool_size: 10240000 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: client_statistics_executor + type: asio_thread + options: + thread_num: 4 + - name: timeout_handle + type: time_wheel + options: + bind_executor: client_statistics_executor + - name: client_executor_0 + type: asio_thread + - name: client_executor_1 + type: asio_thread + - name: client_executor_2 + type: asio_thread + - name: client_executor_3 + type: asio_thread + rpc: + backends: + - type: zenoh + options: + timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + shm_enabled: true + clients_options: + - func_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_rpc_client_pkg.so + enable_modules: [BenchmarkRpcClientModule] + modules: + - name: BenchmarkRpcClientModule + log_lvl: INFO + +# Module custom configuration +BenchmarkRpcClientModule: + max_parallel: 4 + bench_plans: + - perf_mode: bench + msg_size: 256 + parallel: 4 + msg_count: 10000 + - perf_mode: fixed-freq + freq: 1000 + msg_size: 1024 + parallel: 2 + msg_count: 1000 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml new file mode 100644 index 000000000..9cd4fb540 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml @@ -0,0 +1,46 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 4 + - name: timeout_handle + type: time_wheel + options: + bind_executor: work_thread_pool + rpc: + backends: + - type: zenoh + options: + timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + shm_enabled: true + clients_options: + - func_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_rpc_client_pkg.so + enable_modules: [NormalRpcCoClientModule] + modules: + - name: NormalRpcCoClientModule + log_lvl: INFO + +# Module custom configuration +NormalRpcCoClientModule: + rpc_frq: 0.5 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_cfg.yaml index eb7525648..d55dca1b1 100644 --- a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_cfg.yaml +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_cfg.yaml @@ -25,4 +25,4 @@ aimrt: enable_modules: [NormalRpcCoServerModule] modules: - name: NormalRpcCoServerModule - log_lvl: INFO + log_lvl: Info diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml new file mode 100644 index 000000000..436f9a494 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml @@ -0,0 +1,33 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + shm_pool_size: 10240000 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + rpc: + backends: + - type: zenoh + options: + servers_options: + - func_name: "(.*)" + shm_enabled: true + servers_options: + - func_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_rpc_server_pkg.so + enable_modules: [NormalRpcCoServerModule] + modules: + - name: NormalRpcCoServerModule + log_lvl: Info diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm.sh new file mode 100755 index 000000000..326218a07 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_pub_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm.sh new file mode 100755 index 000000000..965981b12 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_benchmark_sub_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh new file mode 100755 index 000000000..a040c8a75 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh new file mode 100755 index 000000000..9d757f736 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm.sh new file mode 100755 index 000000000..0502d6d4c --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh new file mode 100755 index 000000000..e0312555e --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh new file mode 100755 index 000000000..d9373a2be --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/util.h b/src/plugins/zenoh_plugin/util.h new file mode 100644 index 000000000..d6ed5e3cb --- /dev/null +++ b/src/plugins/zenoh_plugin/util.h @@ -0,0 +1,80 @@ +// Copyright(c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "core/channel/channel_msg_wrapper.h" +#include "core/rpc/rpc_invoke_wrapper.h" + +namespace aimrt::plugins::zenoh_plugin { +constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string, which is enough to the max value of uint64_t + +inline std::pair, size_t> SerializeMsgSupportedZenoh( + runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) { + auto& serialization_cache = msg_wrapper.serialization_cache; + const auto& info = msg_wrapper.info; + + auto finditr = serialization_cache.find(serialization_type); + if (finditr != serialization_cache.end()) { + return {finditr->second, finditr->second->BufferSize()}; + } + auto buffer_array_ptr = std::make_unique(allocator); + bool serialize_ret = info.msg_type_support_ref.Serialize( + serialization_type, + msg_wrapper.msg_ptr, + buffer_array_ptr->AllocatorNativeHandle(), + buffer_array_ptr->BufferArrayNativeHandle()); + AIMRT_ASSERT(serialize_ret, "Serialize failed."); + + return {nullptr, buffer_array_ptr->BufferSize()}; +} + +inline std::pair, size_t> SerializeReqSupportedZenoh( + runtime::core::rpc::InvokeWrapper& invoke_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) { + const auto& info = invoke_wrapper.info; + auto& req_serialization_cache = invoke_wrapper.req_serialization_cache; + + auto find_itr = req_serialization_cache.find(serialization_type); + if (find_itr != req_serialization_cache.end()) + return {find_itr->second, find_itr->second->BufferSize()}; + + auto buffer_array_ptr = std::make_unique(allocator); + bool serialize_ret = info.req_type_support_ref.Serialize( + serialization_type, + invoke_wrapper.req_ptr, + buffer_array_ptr->AllocatorNativeHandle(), + buffer_array_ptr->BufferArrayNativeHandle()); + + AIMRT_ASSERT(serialize_ret, "Serialize failed."); + + return {nullptr, buffer_array_ptr->BufferSize()}; +} + +inline std::pair, size_t> SerializeRspSupportedZenoh( + runtime::core::rpc::InvokeWrapper& invoke_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) { + const auto& info = invoke_wrapper.info; + auto& rsp_serialization_cache = invoke_wrapper.rsp_serialization_cache; + + auto find_itr = rsp_serialization_cache.find(serialization_type); + if (find_itr != rsp_serialization_cache.end()) + return {find_itr->second, find_itr->second->BufferSize()}; + + auto buffer_array_ptr = std::make_unique(allocator); + bool serialize_ret = info.rsp_type_support_ref.Serialize( + serialization_type, + invoke_wrapper.rsp_ptr, + buffer_array_ptr->AllocatorNativeHandle(), + buffer_array_ptr->BufferArrayNativeHandle()); + + AIMRT_ASSERT(serialize_ret, "Serialize failed."); + + return {nullptr, buffer_array_ptr->BufferSize()}; +} + +inline std::string IntToFixedLengthString(int number, int length) { + std::ostringstream oss; + oss << std::setw(length) << number; + return oss.str(); +} + +} // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_buffer_array_allocator.h b/src/plugins/zenoh_plugin/zenoh_buffer_array_allocator.h new file mode 100644 index 000000000..bd6d49eed --- /dev/null +++ b/src/plugins/zenoh_plugin/zenoh_buffer_array_allocator.h @@ -0,0 +1,98 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include +#include +#include +#include +#include "aimrt_module_c_interface/util/buffer_base.h" + +namespace aimrt::util { + +class ZenohBufferArrayAllocator { + public: + ZenohBufferArrayAllocator(uint64_t z_shm_capacity, void* z_shm_head_ptr) + : z_shm_capacity_(z_shm_capacity), + z_shm_head_ptr_(z_shm_head_ptr), + z_shm_cur_ptr_(z_shm_head_ptr), + base_(GenBase(this)) {} + + ~ZenohBufferArrayAllocator() = default; + ZenohBufferArrayAllocator(const ZenohBufferArrayAllocator&) = delete; + ZenohBufferArrayAllocator& operator=(const ZenohBufferArrayAllocator&) = delete; + const aimrt_buffer_array_allocator_t* NativeHandle() const { return &base_; } + + void Reserve(aimrt_buffer_array_t* buffer_array, size_t new_cap) { + aimrt_buffer_t* cur_data = buffer_array->data; + + buffer_array->data = new aimrt_buffer_t[new_cap]; + buffer_array->capacity = new_cap; + + if (cur_data) { + memcpy(buffer_array->data, cur_data, buffer_array->len * sizeof(aimrt_buffer_t)); + delete[] cur_data; + } + } + + aimrt_buffer_t Allocate(aimrt_buffer_array_t* buffer_array, size_t size) { + void* data = z_shm_cur_ptr_; + z_shm_cur_ptr_ = static_cast(z_shm_cur_ptr_) + size; + z_shm_used_size_ += size; + + if (z_shm_used_size_ > z_shm_capacity_) [[unlikely]] { + z_is_shm_enough_ = false; + return aimrt_buffer_t{nullptr, 0}; + } + + if (data == nullptr) [[unlikely]] + return aimrt_buffer_t{nullptr, 0}; + + if (buffer_array->capacity > buffer_array->len) { + return (buffer_array->data[buffer_array->len++] = aimrt_buffer_t{data, size}); + } + + static constexpr size_t kInitCapacitySzie = 2; + size_t new_capacity = (buffer_array->capacity < kInitCapacitySzie) + ? kInitCapacitySzie + : (buffer_array->capacity << 1); + + Reserve(buffer_array, new_capacity); + + return (buffer_array->data[buffer_array->len++] = aimrt_buffer_t{data, size}); + } + + void Release(aimrt_buffer_array_t* buffer_array) { + if (buffer_array->data) delete[] buffer_array->data; + } + + bool IsShmEnough() { return z_is_shm_enough_; } + + static const aimrt_buffer_array_allocator_t GenBase(void* impl) { + return aimrt_buffer_array_allocator_t{ + .reserve = [](void* impl, aimrt_buffer_array_t* buffer_array, size_t new_cap) { + static_cast(impl)->Reserve(buffer_array, new_cap); // + }, + .allocate = [](void* impl, aimrt_buffer_array_t* buffer_array, size_t size) -> aimrt_buffer_t { + return static_cast(impl)->Allocate(buffer_array, size); + }, + .release = [](void* impl, aimrt_buffer_array_t* buffer_array) { + static_cast(impl)->Release(buffer_array); // + }, + .impl = impl}; + } + + private: + const aimrt_buffer_array_allocator_t base_; + + void* z_shm_head_ptr_; + void* z_shm_cur_ptr_; + + uint64_t z_shm_capacity_; + uint64_t z_shm_used_size_ = 0; + + bool z_is_shm_enough_ = true; +}; + +} // namespace aimrt::util \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_channel_backend.cc b/src/plugins/zenoh_plugin/zenoh_channel_backend.cc index 215d3d873..d5f479d8f 100644 --- a/src/plugins/zenoh_plugin/zenoh_channel_backend.cc +++ b/src/plugins/zenoh_plugin/zenoh_channel_backend.cc @@ -11,10 +11,29 @@ struct convert { static Node encode(const Options& rhs) { Node node; + node["pub_topics_options"] = YAML::Node(); + for (const auto& pub_topic_options : rhs.pub_topics_options) { + Node pub_topic_options_node; + pub_topic_options_node["topic_name"] = pub_topic_options.topic_name; + pub_topic_options_node["shm_enabled"] = pub_topic_options.shm_enabled; + node["pub_topics_options"].push_back(pub_topic_options_node); + } + return node; } static bool decode(const Node& node, Options& rhs) { + if (node["pub_topics_options"] && node["pub_topics_options"].IsSequence()) { + for (const auto& pub_topic_options_node : node["pub_topics_options"]) { + auto pub_topic_options = Options::PubTopicOptions{ + .topic_name = pub_topic_options_node["topic_name"].as(), + .shm_enabled = pub_topic_options_node["shm_enabled"].as(), + }; + + rhs.pub_topics_options.emplace_back(std::move(pub_topic_options)); + } + } + return true; } }; @@ -52,16 +71,34 @@ bool ZenohChannelBackend::RegisterPublishType( const auto& info = publish_type_wrapper.info; - // check path + bool shm_enabled = false; + + auto find_option_itr = std::find_if( + options_.pub_topics_options.begin(), options_.pub_topics_options.end(), + [topic_name = info.topic_name](const Options::PubTopicOptions& pub_option) { + try { + return std::regex_match(topic_name.begin(), topic_name.end(), std::regex(pub_option.topic_name, std::regex::ECMAScript)); + } catch (const std::exception& e) { + AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}", + pub_option.topic_name, topic_name, e.what()); + return false; + } + }); + + if (find_option_itr != options_.pub_topics_options.end()) { + shm_enabled = find_option_itr->shm_enabled; + } + namespace util = aimrt::common::util; std::string pattern = std::string("channel/") + util::UrlEncode(info.topic_name) + "/" + util::UrlEncode(info.msg_type) + limit_domain_; - zenoh_manager_ptr_->RegisterPublisher(pattern); + zenoh_manager_ptr_->RegisterPublisher(pattern, shm_enabled); + z_pub_shm_size_map_[pattern] = shm_init_loan_size_; - AIMRT_INFO("Register publish type to zenoh channel, url: {}", pattern); + AIMRT_INFO("Register publish type to zenoh channel, url: {}, shm_enabled: {}", pattern, shm_enabled); return true; } catch (const std::exception& e) { @@ -102,25 +139,31 @@ bool ZenohChannelBackend::Subscribe( try { auto ctx_ptr = std::make_shared(aimrt_channel_context_type_t::AIMRT_CHANNEL_SUBSCRIBER_CONTEXT); + // todo(hj): avoid copy when use shm const z_loaned_bytes_t* payload = z_sample_payload(message); size_t serialized_size = z_bytes_len(payload); z_bytes_reader_t reader = z_bytes_get_reader(payload); std::vector serialized_data(serialized_size); - if (z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size) >= 0) { - util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); + // read data from payload + auto ret = z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size); + if (ret >= 0) { + util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); + + // get serialization type std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); ctx_ptr->SetSerializationType(serialization_type); + // get context meta size_t ctx_num = buf_oper.GetUint8(); for (size_t ii = 0; ii < ctx_num; ++ii) { auto key = buf_oper.GetString(util::BufferLenType::kUInt16); auto val = buf_oper.GetString(util::BufferLenType::kUInt16); ctx_ptr->SetMetaValue(key, val); } - ctx_ptr->SetMetaValue(AIMRT_CHANNEL_CONTEXT_KEY_BACKEND, Name()); + // get msg auto remaining_buf = buf_oper.GetRemainingBuffer(); sub_tool_ptr->DoSubscribeCallback( @@ -145,12 +188,29 @@ bool ZenohChannelBackend::Subscribe( void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrapper) noexcept { try { + // checkout state AIMRT_CHECK_ERROR_THROW(state_.load() == State::kStart, "Method can only be called when state is 'Start'."); namespace util = aimrt::common::util; const auto& info = msg_wrapper.info; + std::string zenoh_pub_topic = std::string("channel/") + + util::UrlEncode(info.topic_name) + "/" + + util::UrlEncode(info.msg_type) + + limit_domain_; + + // find publisher with zenoh_pub_topic + auto z_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap(); + auto z_pub_iter = z_pub_registry->find(zenoh_pub_topic); + if (z_pub_iter == z_pub_registry->end()) { + AIMRT_ERROR("Url: {} is not registered!", zenoh_pub_topic); + return; + } + + auto z_pub = z_pub_iter->second; + + // get serialization type auto publish_type_support_ref = info.msg_type_support_ref; auto serialization_type = msg_wrapper.ctx_ref.GetSerializationType(); @@ -158,16 +218,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe serialization_type = publish_type_support_ref.DefaultSerializationType(); } - auto buffer_array_view_ptr = aimrt::runtime::core::channel::SerializeMsgWithCache(msg_wrapper, serialization_type); - AIMRT_CHECK_ERROR_THROW( - buffer_array_view_ptr, - "Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}", - serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type); - - const auto* buffer_array_data = buffer_array_view_ptr->Data(); - const size_t buffer_array_len = buffer_array_view_ptr->Size(); - size_t msg_size = buffer_array_view_ptr->BufferSize(); - + // get meta data const auto& keys = msg_wrapper.ctx_ref.GetMetaKeys(); AIMRT_CHECK_ERROR_THROW(keys.size() <= 255, "Too much context meta, require less than 255, but actually {}.", keys.size()); @@ -183,32 +234,163 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe context_meta_kv.emplace_back(val); } - int32_t pkg_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; + // shm_enabled + if (z_pub.second) { + unsigned char* z_pub_loaned_shm_ptr = nullptr; + std::shared_ptr buffer_array_cache_ptr = nullptr; + bool is_shm_loan_size_enough = true; + bool is_shm_pool_size_enough = true; + + uint64_t msg_size = 0; + z_buf_layout_alloc_result_t loan_result; + + do { + // release old shm + if (z_pub_loaned_shm_ptr != nullptr) { + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + } + + // load a new size shm + uint64_t loan_size = z_pub_shm_size_map_[zenoh_pub_topic]; + z_shm_provider_alloc(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_); + + // if shm pool is not enough, use net buffer instead + if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + is_shm_pool_size_enough = false; + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead."); + break; + } + + z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); + + // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + + // write serialization type on loaned shm + buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + + // write context meta on loaned shm + buf_oper.SetUint8(static_cast(keys.size())); + for (const auto& s : context_meta_kv) { + buf_oper.SetString(s, util::BufferLenType::kUInt16); + } + auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size; + + // write msg on loaned shm: should start at the (FIXED_LEN + type_and_ctx_len)-th byte + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + kFixedLen); + if (buffer_array_cache_ptr == nullptr) { + try { + auto result = SerializeMsgSupportedZenoh(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle())); + msg_size = result.second; + buffer_array_cache_ptr = result.first; + if (buffer_array_cache_ptr == nullptr) { + // in this case means no cache is set, then do nomal serialization(if size is small will throw exception) + is_shm_loan_size_enough = true; + } else { + if (msg_size > buf_oper.GetRemainingSize()) { + // in this case means the msg has serialization cache but the size is too large, then expand suitable size + is_shm_loan_size_enough = false; + z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + kFixedLen; + } else { + // in this case means the msg has serialization cache and the size is suitable, then use cachema + is_shm_loan_size_enough = true; + } + } + + } catch (const std::exception& e) { + if (!z_allocator.IsShmEnough()) { + // the shm is not enough, need to expand a double size + z_pub_shm_size_map_[zenoh_pub_topic] = z_pub_shm_size_map_[zenoh_pub_topic] << 1; + is_shm_loan_size_enough = false; + } else { + AIMRT_ERROR( + "Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}, exception: {}", + serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type, e.what()); + return; + } + } + } + + } while (!is_shm_loan_size_enough); + + if (is_shm_pool_size_enough) { + // if has cache, the copy it to shm to replace the serialization + if (buffer_array_cache_ptr != nullptr) { + unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + context_meta_kv_size + serialization_type.size() + 1; + for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { + std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); + strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; + } + + buffer_array_cache_ptr = nullptr; + } + // write info pkg length on loaned shm + std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen); + z_owned_bytes_t z_payload; + if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); + } + z_publisher_put(z_loan(z_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); + + // collect garbage and defragment shared memory, whose reference counting is zero + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + + AIMRT_TRACE("Zenoh publish to '{}'", zenoh_pub_topic); + + return; + } + } + + // shm disabled + // serialize msg + auto buffer_array_view_ptr = aimrt::runtime::core::channel::SerializeMsgWithCache(msg_wrapper, serialization_type); + AIMRT_CHECK_ERROR_THROW( + buffer_array_view_ptr, + "Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, topic_name: {}, msg_type: {}", + serialization_type, info.pkg_path, info.module_name, info.topic_name, info.msg_type); + + const auto* buffer_array_data = buffer_array_view_ptr->Data(); + const size_t buffer_array_len = buffer_array_view_ptr->Size(); + size_t msg_size = buffer_array_view_ptr->BufferSize(); + + int32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; + int32_t pkg_size = data_size + kFixedLen; + + // create buffer for serialization std::vector serialized_data(pkg_size); util::BufferOperator buf_oper(serialized_data.data(), pkg_size); + + // full data_size + buf_oper.SetBuffer(IntToFixedLengthString(data_size, kFixedLen).c_str(), kFixedLen); + + // full serialization_type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + // full context meta buf_oper.SetUint8(static_cast(keys.size())); for (const auto& s : context_meta_kv) { buf_oper.SetString(s, util::BufferLenType::kUInt16); } + // full msg for (size_t ii = 0; ii < buffer_array_len; ++ii) { buf_oper.SetBuffer( static_cast(buffer_array_data[ii].data), buffer_array_data[ii].len); } - std::string zenoh_pub_topic = std::string("channel/") + - util::UrlEncode(info.topic_name) + "/" + - util::UrlEncode(info.msg_type) + - limit_domain_; + // publish + z_owned_bytes_t z_payload; + z_bytes_from_buf(&z_payload, reinterpret_cast(serialized_data.data()), pkg_size, nullptr, nullptr); + z_publisher_put(z_loan(z_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); AIMRT_TRACE("Zenoh publish to '{}'", zenoh_pub_topic); - zenoh_manager_ptr_->Publish(zenoh_pub_topic, serialized_data.data(), pkg_size); - return; } catch (const std::exception& e) { AIMRT_ERROR("{}", e.what()); diff --git a/src/plugins/zenoh_plugin/zenoh_channel_backend.h b/src/plugins/zenoh_plugin/zenoh_channel_backend.h index fa5c15ac8..ad4829509 100644 --- a/src/plugins/zenoh_plugin/zenoh_channel_backend.h +++ b/src/plugins/zenoh_plugin/zenoh_channel_backend.h @@ -8,6 +8,8 @@ #include "util/buffer_util.h" #include "util/url_encode.h" #include "zenoh.h" +#include "zenoh_plugin/util.h" +#include "zenoh_plugin/zenoh_buffer_array_allocator.h" #include "zenoh_plugin/zenoh_manager.h" namespace aimrt::plugins::zenoh_plugin { @@ -15,13 +17,19 @@ namespace aimrt::plugins::zenoh_plugin { class ZenohChannelBackend : public runtime::core::channel::ChannelBackendBase { public: struct Options { + struct PubTopicOptions { + std::string topic_name; + bool shm_enabled = false; + }; + std::vector pub_topics_options; }; public: ZenohChannelBackend( - const std::shared_ptr& zenoh_util_ptr, const std::string& limit_domain) + const std::shared_ptr& zenoh_util_ptr, const std::string& limit_domain, size_t shm_init_loan_size) : zenoh_manager_ptr_(zenoh_util_ptr), - limit_domain_(limit_domain) {} + limit_domain_(limit_domain), + shm_init_loan_size_(shm_init_loan_size) {} ~ZenohChannelBackend() override = default; @@ -60,6 +68,10 @@ class ZenohChannelBackend : public runtime::core::channel::ChannelBackendBase { std::string, std::unique_ptr> subscribe_wrapper_map_; + + std::unordered_map z_pub_shm_size_map_; + + uint64_t shm_init_loan_size_; }; } // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_manager.cc b/src/plugins/zenoh_plugin/zenoh_manager.cc index 2fad81d3d..96b588768 100644 --- a/src/plugins/zenoh_plugin/zenoh_manager.cc +++ b/src/plugins/zenoh_plugin/zenoh_manager.cc @@ -7,7 +7,7 @@ namespace aimrt::plugins::zenoh_plugin { -void ZenohManager::Initialize(const std::string &native_cfg_path) { +void ZenohManager::Initialize(const std::string &native_cfg_path, size_t shm_pool_size) { if (!native_cfg_path.empty() && native_cfg_path.c_str() != nullptr) { if (zc_config_from_file(&z_config_, native_cfg_path.c_str()) != Z_OK) { AIMRT_ERROR("Unable to load configuration file: {}", native_cfg_path); @@ -22,11 +22,14 @@ void ZenohManager::Initialize(const std::string &native_cfg_path) { AIMRT_ERROR("Unable to open zenoh session!"); return; } + + shm_pool_size_ = shm_pool_size; } void ZenohManager::Shutdown() { - for (auto ptr : z_pub_registry_) { - z_drop(z_move(ptr.second)); + for (const auto &ptr : z_pub_registry_) { + auto z_pub = ptr.second.first; + z_drop(z_move(z_pub)); } for (auto ptr : z_sub_registry_) { @@ -38,25 +41,34 @@ void ZenohManager::Shutdown() { z_sub_registry_.clear(); z_drop(z_move(z_session_)); + z_drop(z_move(shm_provider_)); + z_drop(z_move(shm_layout_)); } -void ZenohManager::RegisterPublisher(const std::string &keyexpr) { +void ZenohManager::RegisterPublisher(const std::string &keyexpr, bool shm_enabled) { z_view_keyexpr_t key; z_view_keyexpr_from_str(&key, keyexpr.c_str()); z_owned_publisher_t z_pub; z_publisher_put_options_default(&z_pub_options_); - if (z_declare_publisher(z_loan(z_session_), &z_pub, z_loan(key), NULL) < 0) { + if (z_declare_publisher(z_loan(z_session_), &z_pub, z_loan(key), nullptr) < 0) { AIMRT_ERROR("Unable to declare Publisher!"); return; } - z_pub_registry_.emplace(keyexpr, z_pub); + z_pub_registry_.emplace(keyexpr, std::make_pair(z_pub, shm_enabled)); AIMRT_TRACE("Publisher with keyexpr: {} registered successfully.", keyexpr.c_str()); + + // Create shared memory provider + std::lock_guard lock(z_mutex_); + if (!shm_initialized_.load() && shm_enabled) { + z_memory_layout_new(&shm_layout_, shm_pool_size_, alignment_); + z_posix_shm_provider_new(&shm_provider_, z_loan(shm_layout_)); + shm_initialized_.store(true); + } } -void data_handler(const z_loaned_sample_t *sample, void *arg) {} void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc handle) { z_view_keyexpr_t key; z_view_keyexpr_from_str(&key, keyexpr.c_str()); @@ -74,7 +86,7 @@ void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc z_owned_subscriber_t z_sub; - if (z_declare_subscriber(z_loan(z_session_), &z_sub, z_loan(key), z_move(z_callback), NULL) < 0) { + if (z_declare_subscriber(z_loan(z_session_), &z_sub, z_loan(key), z_move(z_callback), nullptr) < 0) { AIMRT_ERROR("Unable to declare Subscriber!"); return; } @@ -83,7 +95,7 @@ void ZenohManager::RegisterSubscriber(const std::string &keyexpr, MsgHandleFunc AIMRT_TRACE("Subscriber with keyexpr: {} registered successfully.", keyexpr.c_str()); } -void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc handle, const std::string &role) { +void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc handle, const std::string &role, bool shm_enabled) { std::string pub_keyexpr; std::string sub_keyexpr; @@ -98,9 +110,9 @@ void ZenohManager::RegisterRpcNode(const std::string &keyexpr, MsgHandleFunc han return; } - RegisterPublisher(pub_keyexpr); + RegisterPublisher(pub_keyexpr, shm_enabled); RegisterSubscriber(sub_keyexpr, std::move(handle)); - AIMRT_INFO("{} with keyexpr: {} registered successfully.", role, keyexpr.c_str()); + AIMRT_INFO("{} with keyexpr: {} ,role: {} , shm_enabled: {} is registered successfully.", role, keyexpr.c_str(), role, shm_enabled); } void ZenohManager::Publish(const std::string &topic, char *serialized_data_ptr, uint64_t serialized_data_len) { @@ -112,8 +124,13 @@ void ZenohManager::Publish(const std::string &topic, char *serialized_data_ptr, z_owned_bytes_t z_payload; - z_bytes_from_buf(&z_payload, reinterpret_cast(serialized_data_ptr), serialized_data_len, NULL, NULL); - z_publisher_put(z_loan(z_pub_iter->second), z_move(z_payload), &z_pub_options_); + z_bytes_from_buf(&z_payload, reinterpret_cast(serialized_data_ptr), serialized_data_len, nullptr, nullptr); + + z_publisher_put(z_loan(z_pub_iter->second.first), z_move(z_payload), &z_pub_options_); +} + +std::unique_ptr>> ZenohManager::GetPublisherRegisterMap() { + return std::make_unique>>(z_pub_registry_); } } // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_manager.h b/src/plugins/zenoh_plugin/zenoh_manager.h index c1f5a3406..878ae4100 100644 --- a/src/plugins/zenoh_plugin/zenoh_manager.h +++ b/src/plugins/zenoh_plugin/zenoh_manager.h @@ -17,16 +17,23 @@ class ZenohManager { ZenohManager(const ZenohManager&) = delete; ZenohManager& operator=(const ZenohManager&) = delete; - void Initialize(const std::string& native_cfg_path); + void Initialize(const std::string& native_cfg_path, size_t shm_pool_size); void Shutdown(); void RegisterSubscriber(const std::string& keyexpr, MsgHandleFunc handle); - void RegisterPublisher(const std::string& keyexpr); + void RegisterPublisher(const std::string& keyexpr, bool shm_enabled); - void RegisterRpcNode(const std::string& keyexpr, MsgHandleFunc handle, const std::string& role); + void RegisterRpcNode(const std::string& keyexpr, MsgHandleFunc handle, const std::string& role, bool shm_enabled); void Publish(const std::string& topic, char* serialized_data_ptr, uint64_t serialized_data_len); + std::unique_ptr>> GetPublisherRegisterMap(); + + public: + z_owned_shm_provider_t shm_provider_; + z_alloc_alignment_t alignment_ = {0}; + z_publisher_put_options_t z_pub_options_; + private: static void PrintZenohCgf(z_owned_config_t z_config) { z_owned_string_t out_config_string; @@ -46,14 +53,21 @@ class ZenohManager { z_drop(z_move(out_config_string)); } - std::unordered_map z_pub_registry_; + std::unordered_map> z_pub_registry_; std::unordered_map z_sub_registry_; std::vector> msg_handle_vec_; - z_publisher_put_options_t z_pub_options_; z_owned_session_t z_session_; z_owned_config_t z_config_; + + // shm related + size_t shm_pool_size_; + z_owned_memory_layout_t shm_layout_; + + std::atomic_bool shm_initialized_ = false; + + std::mutex z_mutex_; }; } // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_plugin.cc b/src/plugins/zenoh_plugin/zenoh_plugin.cc index 7adc90a73..c072ec48b 100644 --- a/src/plugins/zenoh_plugin/zenoh_plugin.cc +++ b/src/plugins/zenoh_plugin/zenoh_plugin.cc @@ -13,6 +13,8 @@ struct convert { node["native_cfg_path"] = rhs.native_cfg_path; node["limit_domain"] = rhs.limit_domain; + node["shm_pool_size"] = rhs.shm_pool_size; + node["shm_init_loan_size"] = rhs.shm_init_loan_size; return node; } @@ -26,6 +28,12 @@ struct convert { if (node["limit_domain"]) rhs.limit_domain = '/' + node["limit_domain"].as(); + if (node["shm_pool_size"]) + rhs.shm_pool_size = node["shm_pool_size"].as(); + + if (node["shm_init_loan_size"]) + rhs.shm_init_loan_size = node["shm_init_loan_size"].as(); + return true; } }; @@ -46,7 +54,7 @@ bool ZenohPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept { init_flag_ = true; // todo remove role - zenoh_manager_ptr_->Initialize(options_.native_cfg_path); + zenoh_manager_ptr_->Initialize(options_.native_cfg_path, options_.shm_pool_size); core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPostInitLog, [this] { SetPluginLogger(); }); @@ -88,7 +96,7 @@ void ZenohPlugin::SetPluginLogger() { void ZenohPlugin::RegisterZenohChannelBackend() { std::unique_ptr zenoh_channel_backend_ptr = - std::make_unique(zenoh_manager_ptr_, options_.limit_domain); + std::make_unique(zenoh_manager_ptr_, options_.limit_domain, options_.shm_init_loan_size); core_ptr_->GetChannelManager().RegisterChannelBackend(std::move(zenoh_channel_backend_ptr)); } diff --git a/src/plugins/zenoh_plugin/zenoh_plugin.h b/src/plugins/zenoh_plugin/zenoh_plugin.h index 0e9d7a87c..55cb04cce 100644 --- a/src/plugins/zenoh_plugin/zenoh_plugin.h +++ b/src/plugins/zenoh_plugin/zenoh_plugin.h @@ -17,6 +17,8 @@ class ZenohPlugin : public AimRTCorePluginBase { struct Options { std::string native_cfg_path; std::string limit_domain; + size_t shm_pool_size = static_cast(1024 * 1024 * 10); // default: 10 MB + size_t shm_init_loan_size = 1024; // default: 1 KB }; public: diff --git a/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc b/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc index bd8df5942..8b50fc298 100644 --- a/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc +++ b/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc @@ -13,6 +13,22 @@ struct convert { node["timeout_executor"] = rhs.timeout_executor; + node["clients_options"] = YAML::Node(); + for (const auto& client_options : rhs.clients_options) { + Node client_options_node; + client_options_node["func_name"] = client_options.func_name; + client_options_node["shm_enabled"] = client_options.shm_enabled; + node["clients_options"].push_back(client_options_node); + } + + node["servers_options"] = YAML::Node(); + for (const auto& server_options : rhs.servers_options) { + Node server_options_node; + server_options_node["func_name"] = server_options.func_name; + server_options_node["shm_enabled"] = server_options.shm_enabled; + node["servers_options"].push_back(server_options_node); + } + return node; } @@ -20,6 +36,28 @@ struct convert { if (node["timeout_executor"]) rhs.timeout_executor = node["timeout_executor"].as(); + if (node["clients_options"] && node["clients_options"].IsSequence()) { + for (const auto& client_options_node : node["clients_options"]) { + auto client_options = Options::ClientOptions{ + .func_name = client_options_node["func_name"].as(), + .shm_enabled = client_options_node["shm_enabled"].as(), + }; + + rhs.clients_options.emplace_back(std::move(client_options)); + } + } + + if (node["servers_options"] && node["servers_options"].IsSequence()) { + for (const auto& server_options_node : node["servers_options"]) { + auto server_options = Options::ServerOptions{ + .func_name = server_options_node["func_name"].as(), + .shm_enabled = server_options_node["shm_enabled"].as(), + }; + + rhs.servers_options.emplace_back(std::move(server_options)); + } + } + return true; } }; @@ -86,6 +124,24 @@ bool ZenohRpcBackend::RegisterServiceFunc( namespace util = aimrt::common::util; const auto& info = service_func_wrapper.info; + bool shm_enabled = false; + + auto find_option_itr = std::find_if( + options_.servers_options.begin(), options_.servers_options.end(), + [func_name = GetRealFuncName(info.func_name)](const Options::ServerOptions& server_option) { + try { + return std::regex_match(func_name.begin(), func_name.end(), std::regex(server_option.func_name, std::regex::ECMAScript)); + } catch (const std::exception& e) { + AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}", + server_option.func_name, func_name, e.what()); + return false; + } + }); + + if (find_option_itr != options_.servers_options.end()) { + shm_enabled = find_option_itr->shm_enabled; + } + std::string pattern = std::string("aimrt_rpc/") + util::UrlEncode(GetRealFuncName(info.func_name)) + limit_domain_; @@ -101,14 +157,15 @@ bool ZenohRpcBackend::RegisterServiceFunc( auto ctx_ptr = std::make_shared(aimrt_rpc_context_type_t::AIMRT_RPC_SERVER_CONTEXT); service_invoke_wrapper_ptr->ctx_ref = ctx_ptr; - // read data const z_loaned_bytes_t* payload = z_sample_payload(message); size_t serialized_size = z_bytes_len(payload); z_bytes_reader_t reader = z_bytes_get_reader(payload); std::vector serialized_data(serialized_size); - if (z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size) >= 0) { - util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); + // read data from payload + auto ret = z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size); + if (ret >= 0) { + util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); // deserialize type std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); @@ -165,6 +222,134 @@ bool ZenohRpcBackend::RegisterServiceFunc( return; } + std::string node_pub_topic = "rsp/" + pattern; + + // find node's publisher with pattern + auto z_node_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap(); + auto z_node_pub_iter = z_node_pub_registry->find(node_pub_topic); + if (z_node_pub_iter == z_node_pub_registry->end()) [[unlikely]] { + AIMRT_ERROR("Can not find publisher with pattern: {}", pattern); + return; + } + + auto z_node_pub = z_node_pub_iter->second; + + // shm enabled + if (z_node_pub.second) { + unsigned char* z_pub_loaned_shm_ptr = nullptr; + std::shared_ptr buffer_array_cache_ptr = nullptr; + + bool is_shm_loan_size_enough = true; + bool is_shm_pool_size_enough = true; + + uint64_t msg_size = 0; + size_t header_len = 0; + z_buf_layout_alloc_result_t loan_result; + + do { + // release old shm + if (z_pub_loaned_shm_ptr != nullptr) { + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + } + + // loan a new size shm + uint64_t loan_size = z_node_shm_size_map_[node_pub_topic]; + z_shm_provider_alloc_gc_defrag(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_); + + // if shm pool is not enough, use net buffer instead + if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + is_shm_pool_size_enough = false; + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead."); + break; + } + + z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); + + // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + + // write serialization type on loaned shm + buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + + // write req id on loaned shm + buf_oper.SetBuffer(req_id_buf, sizeof(req_id_buf)); + + // write an zero on loaned shm + buf_oper.SetUint32(0); + + header_len = 1 + serialization_type.size() + 4 + 4; + + // write msg on loaned shm: should start at the (FIXED_LEN + header_len)-th byte + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen); + + if (buffer_array_cache_ptr == nullptr) { + try { + auto result = SerializeRspSupportedZenoh(*service_invoke_wrapper_ptr, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle())); + msg_size = result.second; + buffer_array_cache_ptr = result.first; + if (buffer_array_cache_ptr == nullptr) { + // in this case means no cache is set, then do nomal serialization(if size is small will throw exception) + is_shm_loan_size_enough = true; + } else { + if (msg_size > buf_oper.GetRemainingSize()) { + // in this case means the msg has serialization cache but the size is too large, then expand suitable size + is_shm_loan_size_enough = false; + z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size; + } else { + // in this case means the msg has serialization cache and the size is suitable, then use cachema + is_shm_loan_size_enough = true; + } + } + + } catch (const std::exception& e) { + if (!z_allocator.IsShmEnough()) { + // the shm is not enough, need to expand a double size + z_node_shm_size_map_[node_pub_topic] = z_node_shm_size_map_[node_pub_topic] << 1; + is_shm_loan_size_enough = false; + } else { + AIMRT_ERROR( + "Msg serialization failed, serialization_type {}, pattern: {}, exception: {}", + serialization_type, pattern, e.what()); + return; + } + } + } + + } while (!is_shm_loan_size_enough); + + if (is_shm_pool_size_enough) { + // if has cache, the copy it to shm to replace the serialization + if (buffer_array_cache_ptr != nullptr) { + unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len; + for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { + std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); + strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; + } + + buffer_array_cache_ptr = nullptr; + } + + // write info pkg length on loaned shm + std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen); + z_owned_bytes_t z_payload; + if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); + } + z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); + + // collect garbage and defragment shared memory, whose reference counting is zero + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + + AIMRT_TRACE("Zenoh Invoke req with '{}'", pattern); + return; + } + } + + // shm disabled // serivice rsp serialize auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeRspWithCache( *service_invoke_wrapper_ptr, serialization_type); @@ -178,21 +363,36 @@ bool ZenohRpcBackend::RegisterServiceFunc( const size_t buffer_array_len = buffer_array_view_ptr->Size(); size_t rsp_size = buffer_array_view_ptr->BufferSize(); - size_t pkg_size = 1 + serialization_type.size() + 4 + 4 + rsp_size; + size_t z_data_size = 1 + serialization_type.size() + 4 + 4 + rsp_size; + size_t pkg_size = z_data_size + kFixedLen; + // get buf to store data std::vector msg_buf_vec(pkg_size); - util::BufferOperator buf_oper(msg_buf_vec.data(), msg_buf_vec.size()); + util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size); + + // full data_size + buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen); + + // full serialize type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + + // full req_id buf_oper.SetBuffer(req_id_buf, sizeof(req_id_buf)); + + // full an 0 buf_oper.SetUint32(0); + // full rsp_size for (size_t ii = 0; ii < buffer_array_len; ++ii) { buf_oper.SetBuffer( static_cast(buffer_array_data[ii].data), buffer_array_data[ii].len); } - zenoh_manager_ptr_->Publish("rsp/" + pattern, msg_buf_vec.data(), pkg_size); + // server send rsp + z_owned_bytes_t z_payload; + z_bytes_from_buf(&z_payload, reinterpret_cast(msg_buf_vec.data()), pkg_size, nullptr, nullptr); + z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); }; // call service service_func_wrapper.service_func(service_invoke_wrapper_ptr); @@ -204,7 +404,8 @@ bool ZenohRpcBackend::RegisterServiceFunc( AIMRT_WARN("Handle zenoh rpc msg failed, exception info: {}", e.what()); } }; - zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "server"); + zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "server", shm_enabled); + z_node_shm_size_map_["rsp/" + pattern] = shm_init_loan_size_; return true; } catch (const std::exception& e) { AIMRT_ERROR("{}", e.what()); @@ -224,6 +425,23 @@ bool ZenohRpcBackend::RegisterClientFunc( const auto& info = client_func_wrapper.info; + bool shm_enabled = false; + auto find_option_itr = std::find_if( + options_.clients_options.begin(), options_.clients_options.end(), + [func_name = GetRealFuncName(info.func_name)](const Options::ClientOptions& client_option) { + try { + return std::regex_match(func_name.begin(), func_name.end(), std::regex(client_option.func_name, std::regex::ECMAScript)); + } catch (const std::exception& e) { + AIMRT_WARN("Regex get exception, expr: {}, string: {}, exception info: {}", + client_option.func_name, func_name, e.what()); + return false; + } + }); + + if (find_option_itr != options_.clients_options.end()) { + shm_enabled = find_option_itr->shm_enabled; + } + std::string pattern = std::string("aimrt_rpc/") + util::UrlEncode(GetRealFuncName(info.func_name)) + limit_domain_; @@ -242,7 +460,7 @@ bool ZenohRpcBackend::RegisterClientFunc( return; } - util::ConstBufferOperator buf_oper(serialized_data.data(), serialized_size); + util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); uint32_t req_id = buf_oper.GetUint32(); @@ -294,7 +512,8 @@ bool ZenohRpcBackend::RegisterClientFunc( client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_BACKEND_INTERNAL_ERROR)); }; - zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "client"); + zenoh_manager_ptr_->RegisterRpcNode(pattern, std::move(handle), "client", shm_enabled); + z_node_shm_size_map_["req/" + pattern] = shm_init_loan_size_; } catch (const std::exception& e) { AIMRT_ERROR("{}", e.what()); return false; @@ -312,15 +531,28 @@ void ZenohRpcBackend::Invoke( } namespace util = aimrt::common::util; - const auto& info = client_invoke_wrapper_ptr->info; std::string pattern = std::string("aimrt_rpc/") + util::UrlEncode(GetRealFuncName(info.func_name)) + limit_domain_; + std::string node_pub_topic = "req/" + pattern; + + // find node's publisher with pattern + auto z_node_pub_registry = zenoh_manager_ptr_->GetPublisherRegisterMap(); + auto z_node_pub_iter = z_node_pub_registry->find(node_pub_topic); + if (z_node_pub_iter == z_node_pub_registry->end()) [[unlikely]] { + AIMRT_ERROR("Can not find publisher with pattern: {}", pattern); + return; + } + + auto z_node_pub = z_node_pub_iter->second; + + // get req id uint32_t cur_req_id = req_id_++; + // get serialization type auto serialization_type = client_invoke_wrapper_ptr->ctx_ref.GetMetaValue(AIMRT_RPC_CONTEXT_KEY_SERIALIZATION_TYPE); @@ -329,20 +561,7 @@ void ZenohRpcBackend::Invoke( return; } - // client req serialize - auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache( - *client_invoke_wrapper_ptr, serialization_type); - if (!buffer_array_view_ptr) [[unlikely]] { - // serialize failed - client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_SERIALIZATION_FAILED)); - return; - } - - const auto* buffer_array_data = buffer_array_view_ptr->Data(); - const size_t buffer_array_len = buffer_array_view_ptr->Size(); - size_t req_size = buffer_array_view_ptr->BufferSize(); - - // context + // get meta data const auto& keys = client_invoke_wrapper_ptr->ctx_ref.GetMetaKeys(); if (keys.size() > 255) [[unlikely]] { AIMRT_WARN("Too much context meta, require less than 255, but actually {}.", keys.size()); @@ -360,16 +579,11 @@ void ZenohRpcBackend::Invoke( context_meta_kv_size += (2 + val.size()); context_meta_kv.emplace_back(val); } - // padding zenoh pkg - size_t z_pkg_size = 1 + serialization_type.size() + - 1 + pattern.size() + - 4 + - context_meta_kv_size + - req_size; auto timeout = client_invoke_wrapper_ptr->ctx_ref.Timeout(); auto record_ptr = client_invoke_wrapper_ptr; + // record this req with timeout bool ret = client_tool_ptr_->Record(cur_req_id, timeout, std::move(record_ptr)); if (!ret) [[unlikely]] { @@ -378,27 +592,185 @@ void ZenohRpcBackend::Invoke( return; } - std::vector msg_buf_vec(z_pkg_size); - util::BufferOperator buf_oper(msg_buf_vec.data(), msg_buf_vec.size()); + // shm enabled + if (z_node_pub.second) { + unsigned char* z_pub_loaned_shm_ptr = nullptr; + std::shared_ptr buffer_array_cache_ptr = nullptr; + bool is_shm_loan_size_enough = true; + bool is_shm_pool_size_enough = true; + + uint64_t msg_size = 0; + size_t header_len = 0; + z_buf_layout_alloc_result_t loan_result; + + do { + // release old shm + if (z_pub_loaned_shm_ptr != nullptr) { + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + } + + // loan a new size shm + uint64_t loan_size = z_node_shm_size_map_[node_pub_topic]; + z_shm_provider_alloc_gc_defrag(&loan_result, z_loan(zenoh_manager_ptr_->shm_provider_), loan_size, zenoh_manager_ptr_->alignment_); + + // if shm pool is not enough, use net buffer instead + if (loan_result.status != ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + is_shm_pool_size_enough = false; + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + AIMRT_WARN("Zenoh Plugin shm pool is not enough, use net buffer instead."); + break; + } + + z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); + + // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + + // write serialization type on loaned shm + buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + + // write pattern on loaned shm + buf_oper.SetString(pattern, util::BufferLenType::kUInt8); + + // write req id on loaned shm + buf_oper.SetUint32(cur_req_id); + + // write context meta on loaned shm + buf_oper.SetUint8(static_cast(keys.size())); + for (const auto& s : context_meta_kv) { + buf_oper.SetString(s, util::BufferLenType::kUInt16); + } + + header_len = 1 + serialization_type.size() + + 1 + pattern.size() + + 4 + + context_meta_kv_size; + + // write msg on loaned shm: should start at the (FIXED_LEN + header_len)-th byte + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen); + + if (buffer_array_cache_ptr == nullptr) { + try { + auto result = SerializeReqSupportedZenoh(*client_invoke_wrapper_ptr, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle())); + msg_size = result.second; + buffer_array_cache_ptr = result.first; + if (buffer_array_cache_ptr == nullptr) { + // in this case means no cache is set, then do nomal serialization(if size is small will throw exception) + is_shm_loan_size_enough = true; + } else { + if (msg_size > buf_oper.GetRemainingSize()) { + // in this case means the msg has serialization cache but the size is too large, then expand suitable size + is_shm_loan_size_enough = false; + z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size; + } else { + // in this case means the msg has serialization cache and the size is suitable, then use cachema + is_shm_loan_size_enough = true; + } + } + + } catch (const std::exception& e) { + if (!z_allocator.IsShmEnough()) { + // the shm is not enough, need to expand a double size + z_node_shm_size_map_[node_pub_topic] = z_node_shm_size_map_[node_pub_topic] << 1; + is_shm_loan_size_enough = false; + } else { + AIMRT_ERROR( + "Msg serialization failed, serialization_type {}, pkg_path: {}, module_name: {}, func_name: {}, exception: {}", + serialization_type, info.pkg_path, info.module_name, info.func_name, e.what()); + return; + } + } + } + + } while (!is_shm_loan_size_enough); + + if (is_shm_pool_size_enough) { + // if has cache, the copy it to shm to replace the serialization + if (buffer_array_cache_ptr != nullptr) { + unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len; + for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { + std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); + strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; + } + + buffer_array_cache_ptr = nullptr; + } + + // write info pkg length on loaned shm + std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen); + z_owned_bytes_t z_payload; + if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); + } + z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); + + // collect garbage and defragment shared memory, whose reference counting is zero + z_shm_provider_garbage_collect(z_loan(zenoh_manager_ptr_->shm_provider_)); + z_shm_provider_defragment(z_loan(zenoh_manager_ptr_->shm_provider_)); + + AIMRT_TRACE("Zenoh Invoke req with '{}'", pattern); + return; + } + } + + // shm disabled + // client req serialize + auto buffer_array_view_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache( + *client_invoke_wrapper_ptr, serialization_type); + if (!buffer_array_view_ptr) [[unlikely]] { + // serialize failed + client_invoke_wrapper_ptr->callback(aimrt::rpc::Status(AIMRT_RPC_STATUS_CLI_SERIALIZATION_FAILED)); + return; + } + + const auto* buffer_array_data = buffer_array_view_ptr->Data(); + const size_t buffer_array_len = buffer_array_view_ptr->Size(); + size_t req_size = buffer_array_view_ptr->BufferSize(); + + // padding zenoh pkg (serialization_type + pattern + req_id + context_meta + req) + size_t z_data_size = 1 + serialization_type.size() + + 1 + pattern.size() + + 4 + + context_meta_kv_size + + req_size; + + size_t pkg_size = z_data_size + kFixedLen; + // create buffer for serialization + std::vector msg_buf_vec(pkg_size); + util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size); + + // full data_size + buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen); + + // full serialization_type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); + + // full pattern buf_oper.SetString(pattern, util::BufferLenType::kUInt8); + + // full req id buf_oper.SetUint32(cur_req_id); + // full context meta buf_oper.SetUint8(static_cast(keys.size())); for (const auto& s : context_meta_kv) { buf_oper.SetString(s, util::BufferLenType::kUInt16); } - // data + // full client req for (size_t ii = 0; ii < buffer_array_len; ++ii) { buf_oper.SetBuffer( static_cast(buffer_array_data[ii].data), buffer_array_data[ii].len); } - // send data - zenoh_manager_ptr_->Publish("req/" + pattern, msg_buf_vec.data(), z_pkg_size); + // send req + z_owned_bytes_t z_payload; + z_bytes_from_buf(&z_payload, reinterpret_cast(msg_buf_vec.data()), pkg_size, nullptr, nullptr); + z_publisher_put(z_loan(z_node_pub.first), z_move(z_payload), &zenoh_manager_ptr_->z_pub_options_); } catch (const std::exception& e) { AIMRT_ERROR("{}", e.what()); diff --git a/src/plugins/zenoh_plugin/zenoh_rpc_backend.h b/src/plugins/zenoh_plugin/zenoh_rpc_backend.h index 4f9a6bf87..eecdc8d47 100644 --- a/src/plugins/zenoh_plugin/zenoh_rpc_backend.h +++ b/src/plugins/zenoh_plugin/zenoh_rpc_backend.h @@ -11,6 +11,8 @@ #include "util/buffer_util.h" #include "util/url_encode.h" #include "zenoh.h" +#include "zenoh_plugin/util.h" +#include "zenoh_plugin/zenoh_buffer_array_allocator.h" #include "zenoh_plugin/zenoh_manager.h" namespace aimrt::plugins::zenoh_plugin { @@ -18,6 +20,17 @@ class ZenohRpcBackend : public runtime::core::rpc::RpcBackendBase { public: struct Options { std::string timeout_executor; + struct ClientOptions { + std::string func_name; + bool shm_enabled = false; + }; + std::vector clients_options; + + struct ServerOptions { + std::string func_name; + bool shm_enabled = false; + }; + std::vector servers_options; }; public: @@ -74,6 +87,10 @@ class ZenohRpcBackend : public runtime::core::rpc::RpcBackendBase { std::string limit_domain_; std::unique_ptr>> client_tool_ptr_; + + std::unordered_map z_node_shm_size_map_; + + uint64_t shm_init_loan_size_ = 1024; }; } // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file