feat: add json and ros2 serialization support for grpc plugin (#81)

* feat: enhance gRPC content-type handling and serialization support

Improve content-type validation in gRPC request headers to support additional types, allowing for JSON serialization. Update the serialization logic to dynamically set the type based on incoming headers, ensuring better compatibility and error handling.

* feat: support ros2 service function names

Enhance service function registration to accept names starting with 'ros2:' in addition to 'pb:'. Update URL path handling and content type mappings accordingly to ensure compatibility with ROS 2 services, improving integration capabilities.

* feat: add gRPC plugin configuration and startup scripts

Introduce configuration files for the gRPC client and server plugins, allowing for easier setup and management of RPC communication. Add shell scripts to launch the client and server with the specified configurations, streamlining the development process.

* refactor: improve service function name validation and error handling

Enhance the clarity of service function name registration by separating the function name variable for improved readability. Update error messages for better guidance on valid prefixes and standardize maps for content types and serialization types. This increases maintainability and reduces the risk of errors during registration.

* refactor: streamline client option lookup and improve code clarity

Simplify the client option lookup by replacing `std::find_if` with `std::ranges::find_if`. Also, enhance readability by cleaning up comment formatting and unnecessary whitespace.

* chore: update copyright information in configuration files

Revise copyright year and licensing details to reflect the current ownership and license for the AimRT project.

* refactor: improve validation logic in options

Enhance option verification by using standard algorithms for consistency and readability. This ensures minimum thresholds for connection numbers and timer durations are respected, improving robustness in client and server configurations.

* docs: update release notes and plugin documentation

Add support for grpc plugin serialization with ros2 messages and json format, enhancing flexibility in data handling. Remove outdated protobuf-only restriction to improve compatibility and user experience.

* docs: add ros2 rpc example details and usage instructions

Expand the README to include a new section on using the grpc plugin with a ros2-based RPC interface. Provide explicit instructions on configuration, core code references, and running the server and client. Enhance the documentation to clarify the differences from the protobuf RPC example.

---------

Co-authored-by: zhangyi <zhangyi@agibot.com>
This commit is contained in:
zhangyi1357 2024-11-05 11:44:25 +08:00 committed by GitHub
parent 7ba8f93fdf
commit ac7715e424
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 171 additions and 42 deletions

View File

@ -25,4 +25,4 @@
- 现在支持 install aimrt::runtime::core
- 删除一些未使用的协议;
- 支持日志自定义输出格式;
- grpc 插件支持 ros2 消息以及 json 序列化格式;

View File

@ -46,7 +46,6 @@ aimrt:
注意事项:
* 当前 gRPC 插件仅支持 HTTP2 明文协议,不支持 TLS 加密协议。
* 当前 gRPC 插件仅支持 protobuf 序列化协议,不支持其他序列化协议。
* 当前 gRPC 插件仅支持一元 RPC 调用,不支持流式 RPC 调用。
* 当前 gRPC 插件不支持 HTTP2 消息压缩。
@ -71,7 +70,7 @@ aimrt:
listen_ip: 127.0.0.1
listen_port: 50081
rpc:
backends:
backends:
- type: grpc
options:
clients_options:
@ -94,7 +93,7 @@ aimrt:
listen_ip: 127.0.0.1
listen_port: 50080
rpc:
backends:
backends:
- type: grpc
servers_options:
- func_name: "(.*)"

View File

@ -50,3 +50,31 @@
- 此示例还创建了原生 grpc 客户端和服务端,用于展示如何使用原生 grpc 程序与 aimrt 程序通过 grpc 后端通信;
- 此示例加载了**grpc_plugin**,并使用 grpc 类型的 rpc 后端进行通信;
## ros2 rpc
一个基于 ros2 协议、协程型接口与 grpc 后端的 rpc 示例,演示内容包括:
- 如何在配置文件中加载**grpc_plugin**
- 如何使用 grpc 类型的 rpc 后端;
- 如何与原生 grpc 进行 ros2 协议的 rpc 相互调用;
核心代码:
- [example_ros2/srv/RosTestRpc.srv](../../../protocols/example_ros2/srv/RosTestRpc.srv)
- aimrt code
- [normal_rpc_co_client_module.cc](../../cpp/ros2_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc)
- [normal_rpc_co_server_module.cc](../../cpp/ros2_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc)
- [service.cc](../../cpp/ros2_rpc/module/normal_rpc_co_server_module/service.cc)
配置文件:
- [examples_plugins_grpc_plugin_ros2_rpc_client_cfg.yaml](./install/linux/bin/cfg/examples_plugins_grpc_plugin_ros2_rpc_client_cfg.yaml)
- [examples_plugins_grpc_plugin_ros2_rpc_server_cfg.yaml](./install/linux/bin/cfg/examples_plugins_grpc_plugin_ros2_rpc_server_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_ROS2``AIMRT_BUILD_GRPC_PLUGIN` 选项编译 AimRT
- 编译成功后在终端运行 build 目录下`start_examples_plugins_grpc_plugin_ros2_rpc_server.sh`脚本启动服务端srv 进程);
- 开启新的终端运行 build 目录下`start_examples_plugins_grpc_plugin_ros2_rpc_client.sh`脚本启动客户端cli 进程);
- 分别在两个终端键入`ctrl-c`停止对应进程;
说明:
- 此示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 srv 形式的协议;

View File

@ -0,0 +1,43 @@
# Copyright (c) 2024 The AimRT Authors.
# AimRT is licensed under Mulan PSL v2.
aimrt:
plugin:
plugins:
- name: grpc_plugin
path: ./libaimrt_grpc_plugin.so
options:
thread_num: 4
listen_ip: 127.0.0.1
listen_port: 50051
log:
core_lvl: Info # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
rpc:
backends:
- type: grpc
options:
clients_options:
- func_name: "(.*)"
server_url: http://127.0.0.1:50050
clients_options:
- func_name: "(.*)"
enable_backends: [grpc]
module:
pkgs:
- path: ./libros2_rpc_client_pkg.so
enable_modules: [NormalRpcCoClientModule]
modules:
- name: NormalRpcCoClientModule
log_lvl: INFO
# Module custom configuration
NormalRpcCoClientModule:
rpc_frq: 0.5

View File

@ -0,0 +1,30 @@
# Copyright (c) 2024 The AimRT Authors.
# AimRT is licensed under Mulan PSL v2.
aimrt:
plugin:
plugins:
- name: grpc_plugin
path: ./libaimrt_grpc_plugin.so
options:
thread_num: 4
listen_ip: 127.0.0.1
listen_port: 50050
log:
core_lvl: Info # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
rpc:
backends:
- type: grpc
servers_options:
- func_name: "(.*)"
enable_backends: [grpc]
module:
pkgs:
- path: ./libros2_rpc_server_pkg.so
enable_modules: [NormalRpcCoServerModule]
modules:
- name: NormalRpcCoServerModule
log_lvl: INFO

View File

@ -0,0 +1,5 @@
#!/bin/bash
source install/share/example_ros2/local_setup.bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_grpc_plugin_ros2_rpc_client_cfg.yaml

View File

@ -0,0 +1,5 @@
#!/bin/bash
source install/share/example_ros2/local_setup.bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_grpc_plugin_ros2_rpc_server_cfg.yaml

View File

@ -3,6 +3,7 @@
#pragma once
#include <algorithm>
#include <chrono>
#include <string>
@ -20,7 +21,7 @@ struct ClientOptions {
static ClientOptions Verify(const ClientOptions& verify_options) {
ClientOptions options(verify_options);
if (options.max_connection_num < 1) options.max_connection_num = 1;
options.max_connection_num = std::max<size_t>(options.max_connection_num, 1);
return options;
}
@ -45,7 +46,7 @@ struct ClientPoolOptions {
static ClientPoolOptions Verify(const ClientPoolOptions& verify_options) {
ClientPoolOptions options(verify_options);
if (options.max_client_num < 10) options.max_client_num = 10;
options.max_client_num = std::max<size_t>(options.max_client_num, 10);
return options;
}

View File

@ -10,6 +10,7 @@
#include <exception>
#include <memory>
#include <regex>
#include <unordered_map>
#include <vector>
#include <boost/asio.hpp>
@ -104,15 +105,6 @@ void CheckGrpcMessageBody(const http2::SimpleBuffer& buffer) {
}
void CheckGrpcReqHeaders(const http2::RequestPtr& req) {
// Check the content-type, only support application/grpc and application/grpc+proto
if (auto content_type_itr = req->GetHeaders().find("content-type"); content_type_itr == req->GetHeaders().end()) {
AIMRT_ERROR_THROW("content-type is not set for grpc");
} else if (content_type_itr->second != "application/grpc" &&
content_type_itr->second != "application/grpc+proto") {
AIMRT_ERROR_THROW("content-type is {}, which is not supported",
content_type_itr->second);
}
// Check the te, must be trailers
if (auto te_itr = req->GetHeaders().find("te"); te_itr == req->GetHeaders().end()) {
AIMRT_ERROR_THROW("te is not set for grpc");
@ -128,6 +120,7 @@ void CheckGrpcReqHeaders(const http2::RequestPtr& req) {
AIMRT_CHECK_ERROR_THROW(req->GetUrl().protocol == "http",
"Scheme is {}, which is not supported", req->GetUrl().protocol);
}
} // namespace
void GrpcRpcBackend::Initialize(YAML::Node options_node) {
@ -162,15 +155,17 @@ bool GrpcRpcBackend::RegisterServiceFunc(
return false;
}
AIMRT_DEBUG("Register service func: {}", service_func_wrapper.info.func_name);
const auto& func_name = service_func_wrapper.info.func_name;
if (!service_func_wrapper.info.func_name.starts_with("pb:")) {
AIMRT_WARN("Service func name should start with 'pb:'.");
AIMRT_DEBUG("Register service func: {}", func_name);
if (!func_name.starts_with("pb:") && !func_name.starts_with("ros2:")) {
AIMRT_ERROR("Service func name should start with 'pb:' or 'ros2:'.");
return false;
}
// pb:/aimrt.protocols.example.ExampleService/GetBarData -> /aimrt.protocols.example.ExampleService/GetBarData
auto pattern = std::string(GetRealFuncName(service_func_wrapper.info.func_name));
// pb:/aimrt.protocols.example.ExampleService/GetBarData -> /rpc/aimrt.protocols.example.ExampleService/GetBarData
// ros2:/example_ros2/srv/RosTestRpc -> /rpc/example_ros2/srv/RosTestRpc
auto pattern = "/rpc" + std::string(GetRealFuncName(func_name));
plugins::grpc_plugin::server::HttpHandle http_handle =
[this, &service_func_wrapper](
@ -189,11 +184,24 @@ bool GrpcRpcBackend::RegisterServiceFunc(
service_invoke_wrapper_ptr->ctx_ref = ctx_ptr;
// Set the serialization type
std::string serialization_type = "pb";
ctx_ptr->SetSerializationType(serialization_type);
auto content_type_itr = req->GetHeaders().find("content-type");
AIMRT_CHECK_ERROR_THROW(content_type_itr != req->GetHeaders().end(), "content-type is not set for grpc");
static const std::unordered_map<std::string_view, std::string_view> kContentTypeToSerializationTypeMap = {
{"application/grpc+json", "json"},
{"application/grpc+json charset=utf-8", "json"},
{"application/grpc", "pb"},
{"application/grpc+proto", "pb"},
{"application/grpc+ros2", "ros2"}};
auto find_itr = kContentTypeToSerializationTypeMap.find(content_type_itr->second);
AIMRT_CHECK_ERROR_THROW(find_itr != kContentTypeToSerializationTypeMap.end(),
"Unsupported content-type: {}", content_type_itr->second);
ctx_ptr->SetSerializationType(find_itr->second);
// Set the metadata
for (const auto& [key, value] : req->GetHeaders()) {
AIMRT_DEBUG("Http2 handle for rpc, key: {}, value: {}", key, value);
ctx_ptr->SetMetaValue(key, value);
}
ctx_ptr->SetFunctionName(service_func_wrapper.info.func_name);
@ -228,7 +236,7 @@ bool GrpcRpcBackend::RegisterServiceFunc(
service_invoke_wrapper_ptr->req_ptr = service_req_ptr.get();
bool deserialize_ret = service_func_wrapper.info.req_type_support_ref.Deserialize(
serialization_type, buffer_array_view, service_req_ptr.get());
ctx_ptr->GetSerializationType(), buffer_array_view, service_req_ptr.get());
AIMRT_CHECK_ERROR_THROW(deserialize_ret, "Http2 request deserialize failed.");
auto service_rsp_ptr = service_func_wrapper.info.rsp_type_support_ref.CreateSharedPtr();
service_invoke_wrapper_ptr->rsp_ptr = service_rsp_ptr.get();
@ -238,7 +246,7 @@ bool GrpcRpcBackend::RegisterServiceFunc(
auto sig_timer_ptr = std::make_shared<boost::asio::steady_timer>(*io_ptr_, chrono::nanoseconds::max());
service_invoke_wrapper_ptr->callback =
[service_invoke_wrapper_ptr,
serialization_type,
serialization_type = ctx_ptr->GetSerializationType(),
&rsp,
&ret_code,
&sig_timer_ptr](aimrt::rpc::Status status) {
@ -300,8 +308,8 @@ bool GrpcRpcBackend::RegisterClientFunc(const runtime::core::rpc::ClientFuncWrap
// Basically, we need to find the server url for each client function.
const auto& info = client_func_wrapper.info;
auto find_client_option = std::find_if(
options_.clients_options.begin(), options_.clients_options.end(),
auto find_client_option = std::ranges::find_if(
options_.clients_options,
[func_name = GetRealFuncName(info.func_name)](const Options::ClientOptions& client_option) {
try {
return std::regex_match(func_name.begin(), func_name.end(), std::regex(client_option.func_name, std::regex::ECMAScript));
@ -317,7 +325,7 @@ bool GrpcRpcBackend::RegisterClientFunc(const runtime::core::rpc::ClientFuncWrap
return false;
}
// pb:/aimrt.protocols.example.ExampleService/GetBarData -> 127.0.0.1:8080
// /aimrt.protocols.example.ExampleService/GetBarData -> 127.0.0.1:8080
client_server_url_map_.emplace(GetRealFuncName(info.func_name), find_client_option->server_url);
return true;
@ -333,7 +341,6 @@ void GrpcRpcBackend::Invoke(
}
const auto& info = client_invoke_wrapper_ptr->info;
auto real_func_name = GetRealFuncName(info.func_name);
// check ctx, to_addr priority: ctx > server_url
@ -354,11 +361,9 @@ void GrpcRpcBackend::Invoke(
client_invoke_wrapper_ptr->callback(rpc::Status(AIMRT_RPC_STATUS_CLI_INVALID_ADDR));
return;
}
if (url->path.empty()) {
url->path = std::string(GetRealFuncName(info.func_name));
url->path = "/rpc" + std::string(GetRealFuncName(info.func_name));
}
AIMRT_TRACE("Http2 cli session send request, remote addr {}, path: {}",
url->host, url->path);
@ -388,7 +393,6 @@ void GrpcRpcBackend::Invoke(
auto req_ptr = std::make_shared<http2::Request>();
req_ptr->SetUrl(*url);
req_ptr->SetMethod("POST");
req_ptr->AddHeader("content-type", "application/grpc");
req_ptr->AddHeader("te", "trailers");
req_ptr->AddHeader("user-agent", std::string("grpc-c++-aimrt/") + GetAimRTVersion());
@ -404,7 +408,20 @@ void GrpcRpcBackend::Invoke(
}
std::string serialization_type(client_invoke_wrapper_ptr->ctx_ref.GetSerializationType());
AIMRT_CHECK_ERROR_THROW(serialization_type == "pb", "Only support pb serialization now");
static const std::unordered_map<std::string_view, std::string_view> kSerializationTypeToContentTypeMap = {
{"json", "application/grpc+json"},
{"pb", "application/grpc+proto"},
{"ros2", "application/grpc+ros2"}};
auto find_itr = kSerializationTypeToContentTypeMap.find(serialization_type);
if (find_itr == kSerializationTypeToContentTypeMap.end()) {
AIMRT_ERROR("Unsupported serialization type: {}", serialization_type);
client_invoke_wrapper_ptr->callback(rpc::Status(AIMRT_RPC_STATUS_CLI_INVALID_SERIALIZATION_TYPE));
co_return;
}
req_ptr->AddHeader("content-type", find_itr->second);
auto buffer_array_view_ptr =
aimrt::runtime::core::rpc::TrySerializeReqWithCache(*client_invoke_wrapper_ptr, serialization_type);
if (!buffer_array_view_ptr) [[unlikely]] {
@ -462,7 +479,7 @@ void GrpcRpcBackend::Invoke(
// Skip the grpc compression flag and length prefix
body_str_view.remove_prefix(5);
// deserialize the response
// Deserialize the response
std::vector<aimrt_buffer_view_t> buffer_view_vec;
buffer_view_vec.push_back({.data = body_str_view.data(),
.len = body_str_view.size()});

View File

@ -60,6 +60,7 @@ class GrpcRpcBackend : public runtime::core::rpc::RpcBackendBase {
private:
static std::string_view GetRealFuncName(std::string_view func_name) {
if (func_name.substr(0, 3) == "pb:") return func_name.substr(3);
if (func_name.substr(0, 5) == "ros2:") return func_name.substr(5);
return func_name;
}

View File

@ -3,6 +3,7 @@
#pragma once
#include <algorithm>
#include <chrono>
#include <boost/asio.hpp>
@ -25,16 +26,15 @@ struct ServerOptions {
static ServerOptions Verify(const ServerOptions& verify_options) {
ServerOptions options(verify_options);
if (options.max_connection_num < 1) options.max_connection_num = 1;
options.max_connection_num = std::clamp<size_t>(options.max_connection_num,
1,
Tcp::acceptor::max_listen_connections);
if (options.max_connection_num > Tcp::acceptor::max_listen_connections)
options.max_connection_num = Tcp::acceptor::max_listen_connections;
options.mgr_timer_dt = std::max<std::chrono::nanoseconds>(options.mgr_timer_dt,
std::chrono::milliseconds(100));
if (options.mgr_timer_dt < std::chrono::milliseconds(100))
options.mgr_timer_dt = std::chrono::milliseconds(100);
if (options.max_no_data_duration < std::chrono::seconds(10))
options.max_no_data_duration = std::chrono::seconds(10);
options.max_no_data_duration = std::max<std::chrono::nanoseconds>(options.max_no_data_duration,
std::chrono::seconds(10));
return options;
}