Test: Add a test case for chained RPC calls (#111)

* add new option to rename service_name

* Modify the logic

* add zenoh proxy server

* opt code

---------

Co-authored-by: hanjun <hanjun@agibot.com>
This commit is contained in:
han J 2024-12-03 21:09:06 +08:00 committed by GitHub
parent dca739930e
commit 4bfe31f2b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 755 additions and 24 deletions

View File

@ -9,6 +9,7 @@ set_namespace()
# module
add_subdirectory(module/benchmark_rpc_client_module)
add_subdirectory(module/proxy_rpc_co_module)
add_subdirectory(module/normal_rpc_co_client_module)
add_subdirectory(module/normal_rpc_sync_client_module)
add_subdirectory(module/normal_rpc_async_client_module)

View File

@ -196,3 +196,44 @@
说明:
- 此示例与 **protobuf rpc co** 示例基本一致,唯一的区别是将 `NormalRpcCoClientModule``NormalRpcCoServerModule` 集成到 `pb_rpc_pkg` 一个 Pkg 中;
## protobuf proxy rpc co
一个基于 protobuf 协议、协程型接口与 local 后端的 rpc 示例,演示内容包括:
- 如何使用 protobuf 协议作为 rpc 服务协议;
- 如何基于 Module 方式使用 Executor、协程型 Rpc client 和 server 接口;
- 如何使用协程形式的 Rpc filter 功能;
- 如何使用 local 类型的 rpc 后端;
- 如何以 Pkg 模式集成 Module 并启动;
- 如何构建链式 rpc 调用;
核心代码:
- [rpc.proto](../../../protocols/example/rpc.proto)
- [proxy_rpc_co_module.cc](./module/proxy_rpc_co_module/proxy_rpc_co_module.cc)
- [service.cc](./module/proxy_rpc_co_module/service.cc)
- [pb_rpc_client_pkg/pkg_main.cc](./pkg/pb_rpc_client_pkg/pkg_main.cc)
- [pb_rpc_server_pkg/pkg_main.cc](./pkg/pb_rpc_server_pkg/pkg_main.cc)
配置文件:
- [examples_cpp_pb_proxy_rpc_co_cfg.yaml](./install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_PROTOBUF` 选项编译 AimRT
- 直接运行 build 目录下`start_examples_cpp_pb_rpc_co.sh`脚本启动进程;
- 键入`ctrl-c`停止进程;
说明:
- 此示例创建了以下三个模块, 以构成 RPC 的链式调用:
- `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService_1` 发起 RPC 请求;
- `ProxyRpcCoModule`:会注册 `ExampleService_1` 服务端,通过协程 Server 接口,提供 echo 功能, 并向 `ExampleService_2` 发起 RPC 请求;
- `NormalRpcCoServerModule`:会注册 `ExampleService_2` 服务端,通过协程 Server 接口,提供 echo 功能;
- 此示例将 `NormalRpcCoClientModule` 集成到 `pb_rpc_client_pkg`, 将 `ProxyRpcCoModule``NormalRpcCoServerModule` 集成到 `pb_rpc_server_pkg` Pkg 中,并在配置文件中加载这两个 Pkg 到一个 AimRT 进程中;
- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时;
- 此示例使用 local 类型的 rpc 后端进行通信,并配置了 `timeout_handle` 执行器作为超时执行器;

View File

@ -0,0 +1,54 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: work_thread_pool
rpc:
backends:
- type: local
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
enable_backends: [local]
servers_options:
- func_name: "(.*)"
enable_backends: [local]
module:
pkgs:
- path: ./libpb_rpc_client_pkg.so
enable_modules: [NormalRpcCoClientModule]
- path: ./libpb_rpc_server_pkg.so
enable_modules: [ProxyRpcCoModule,NormalRpcCoServerModule]
modules:
- name: NormalRpcCoClientModule
log_lvl: INFO
- name: ProxyRpcCoModule
log_lvl: INFO
- name: NormalRpcCoServerModule
log_lvl: INFO
# Module custom configuration
NormalRpcCoClientModule:
rpc_frq: 0.5
service_name: "ExampleService_1"
ProxyRpcCoModule:
service_name_for_client: "ExampleService_2"
service_name_for_server: "ExampleService_1"
NormalRpcCoServerModule:
service_name: "ExampleService_2"

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml

View File

@ -47,6 +47,10 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) {
YAML::Node cfg_node = YAML::LoadFile(std::string(file_path));
max_parallel_ = cfg_node["max_parallel"].as<uint32_t>();
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
if (cfg_node["bench_plans"] && cfg_node["bench_plans"].IsSequence()) {
for (const auto& bench_plan_node : cfg_node["bench_plans"]) {
BenchPlan bench_plan;
@ -83,12 +87,22 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceCoProxy>(rpc_handle);
if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}
// Check executor
client_statistics_executor_ = core_.GetExecutorManager().GetExecutor("client_statistics_executor");
AIMRT_CHECK_ERROR_THROW(

View File

@ -68,6 +68,7 @@ class BenchmarkRpcClientModule : public aimrt::ModuleBase {
// cfg
uint32_t max_parallel_;
std::vector<BenchPlan> bench_plans_;
std::string service_name_;
};
} // namespace aimrt::examples::cpp::pb_rpc::benchmark_rpc_client_module

View File

@ -17,6 +17,10 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Get executor handle
@ -29,9 +33,22 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceAsyncProxy>(rpc_handle);
if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}
} catch (const std::exception& e) {
AIMRT_ERROR("Init failed, {}", e.what());
return false;
@ -73,22 +90,19 @@ void NormalRpcAsyncClientModule::MainLoopFunc() {
count_++;
AIMRT_INFO("Loop count : {} -------------------------", count_);
// Create proxy
aimrt::protocols::example::ExampleServiceAsyncProxy proxy(core_.GetRpcHandle());
// Create req and rsp
auto req_ptr = std::make_shared<aimrt::protocols::example::GetFooDataReq>();
auto rsp_ptr = std::make_shared<aimrt::protocols::example::GetFooDataRsp>();
req_ptr->set_msg("hello world foo, count " + std::to_string(count_));
// Create ctx
auto ctx_ptr = proxy.NewContextSharedPtr();
auto ctx_ptr = proxy_->NewContextSharedPtr();
ctx_ptr->SetTimeout(std::chrono::seconds(3));
AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(*req_ptr));
// Call rpc
proxy.GetFooData(
proxy_->GetFooData(
ctx_ptr, *req_ptr, *rsp_ptr,
[this, ctx_ptr, req_ptr, rsp_ptr](aimrt::rpc::Status status) {
// Check result

View File

@ -42,6 +42,9 @@ class NormalRpcAsyncClientModule : public aimrt::ModuleBase {
std::promise<void> stop_sig_;
double rpc_frq_ = 1.0;
std::string service_name_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceAsyncProxy> proxy_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_client_module

View File

@ -4,6 +4,8 @@
#include "normal_rpc_async_server_module/normal_rpc_async_server_module.h"
#include "normal_rpc_async_server_module/global.h"
#include "yaml-cpp/yaml.h"
namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module {
bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) {
@ -12,11 +14,26 @@ bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) {
SetLogger(core_.GetLogger());
try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Create service
service_ptr_ = std::make_shared<ExampleServiceAsyncServiceImpl>();
// Register service
bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
bool ret = false;
if (service_name_.empty()) {
ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());
}
AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");
AIMRT_INFO("Register service succeeded.");

View File

@ -28,6 +28,8 @@ class NormalRpcAsyncServerModule : public aimrt::ModuleBase {
private:
aimrt::CoreRef core_;
std::shared_ptr<ExampleServiceAsyncServiceImpl> service_ptr_;
std::string service_name_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module

View File

@ -22,6 +22,10 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Get executor handle
@ -34,12 +38,22 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceCoProxy>(rpc_handle);
if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}
// Register filter
proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx,
const void* req_ptr, void* rsp_ptr,

View File

@ -43,6 +43,8 @@ class NormalRpcCoClientModule : public aimrt::ModuleBase {
std::atomic_bool run_flag_ = true;
double rpc_frq_ = 1.0;
std::string service_name_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceCoProxy> proxy_;
};

View File

@ -5,6 +5,8 @@
#include "normal_rpc_co_server_module/filter.h"
#include "normal_rpc_co_server_module/global.h"
#include "yaml-cpp/yaml.h"
namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module {
bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
@ -13,6 +15,15 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
SetLogger(core_.GetLogger());
try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Create service
service_ptr_ = std::make_shared<ExampleServiceImpl>();
@ -21,7 +32,13 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
service_ptr_->RegisterFilter(TimeCostLogServerFilter);
// Register service
bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
bool ret = false;
if (service_name_.empty()) {
ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());
}
AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");
AIMRT_INFO("Register service succeeded.");

View File

@ -28,6 +28,8 @@ class NormalRpcCoServerModule : public aimrt::ModuleBase {
private:
aimrt::CoreRef core_;
std::shared_ptr<ExampleServiceImpl> service_ptr_;
std::string service_name_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module

View File

@ -17,6 +17,10 @@ bool NormalRpcFutureClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Get executor handle
@ -28,9 +32,22 @@ bool NormalRpcFutureClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceFutureProxy>(rpc_handle);
if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}
} catch (const std::exception& e) {
AIMRT_ERROR("Init failed, {}", e.what());
return false;
@ -70,9 +87,6 @@ void NormalRpcFutureClientModule::MainLoop() {
try {
AIMRT_INFO("Start MainLoop.");
// Create proxy
aimrt::protocols::example::ExampleServiceFutureProxy proxy(core_.GetRpcHandle());
uint32_t count = 0;
while (run_flag_) {
// Sleep
@ -87,13 +101,13 @@ void NormalRpcFutureClientModule::MainLoop() {
req.set_msg("hello world foo, count " + std::to_string(count));
// Create ctx
auto ctx_ptr = proxy.NewContextSharedPtr();
auto ctx_ptr = proxy_->NewContextSharedPtr();
ctx_ptr->SetTimeout(std::chrono::seconds(3));
AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req));
// Call rpc
auto status_future = proxy.GetFooData(ctx_ptr, req, rsp);
auto status_future = proxy_->GetFooData(ctx_ptr, req, rsp);
auto status = status_future.get();
// Check result

View File

@ -41,6 +41,9 @@ class NormalRpcFutureClientModule : public aimrt::ModuleBase {
std::promise<void> stop_sig_;
double rpc_frq_ = 1.0;
std::string service_name_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceFutureProxy> proxy_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_future_client_module

View File

@ -17,6 +17,10 @@ bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Get executor handle
@ -28,9 +32,21 @@ bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceSyncProxy>(rpc_handle);
if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}
} catch (const std::exception& e) {
AIMRT_ERROR("Init failed, {}", e.what());
return false;
@ -70,9 +86,6 @@ void NormalRpcSyncClientModule::MainLoop() {
try {
AIMRT_INFO("Start MainLoop.");
// Create proxy
aimrt::protocols::example::ExampleServiceSyncProxy proxy(core_.GetRpcHandle());
uint32_t count = 0;
while (run_flag_) {
// Sleep
@ -87,13 +100,13 @@ void NormalRpcSyncClientModule::MainLoop() {
req.set_msg("hello world foo, count " + std::to_string(count));
// Create ctx
auto ctx_ptr = proxy.NewContextSharedPtr();
auto ctx_ptr = proxy_->NewContextSharedPtr();
ctx_ptr->SetTimeout(std::chrono::seconds(3));
AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req));
// Call rpc
auto status = proxy.GetFooData(ctx_ptr, req, rsp);
auto status = proxy_->GetFooData(ctx_ptr, req, rsp);
// Check result
if (status.OK()) {

View File

@ -41,6 +41,9 @@ class NormalRpcSyncClientModule : public aimrt::ModuleBase {
std::promise<void> stop_sig_;
double rpc_frq_ = 1.0;
std::string service_name_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceSyncProxy> proxy_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_client_module

View File

@ -4,6 +4,8 @@
#include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h"
#include "normal_rpc_sync_server_module/global.h"
#include "yaml-cpp/yaml.h"
namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {
bool NormalRpcSyncServerModule::Initialize(aimrt::CoreRef core) {
@ -12,11 +14,26 @@ bool NormalRpcSyncServerModule::Initialize(aimrt::CoreRef core) {
SetLogger(core_.GetLogger());
try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}
// Create service
service_ptr_ = std::make_shared<ExampleServiceSyncServiceImpl>();
// Register service
bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
bool ret = false;
if (service_name_.empty()) {
ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());
}
AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");
AIMRT_INFO("Register service succeeded.");

View File

@ -28,6 +28,8 @@ class NormalRpcSyncServerModule : public aimrt::ModuleBase {
private:
aimrt::CoreRef core_;
std::shared_ptr<ExampleServiceSyncServiceImpl> service_ptr_;
std::string service_name_;
};
} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module

View File

@ -0,0 +1,44 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# Get namespace
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})
# Set target name
set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR})
set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR})
# Set file collection
file(GLOB_RECURSE head_files ${CMAKE_CURRENT_SOURCE_DIR}/*.h)
file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)
file(GLOB_RECURSE test_files ${CMAKE_CURRENT_SOURCE_DIR}/*_test.cc)
list(REMOVE_ITEM src ${test_files})
# Add target
add_library(${CUR_TARGET_NAME} STATIC)
add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME})
# Set source file of target
target_sources(${CUR_TARGET_NAME} PRIVATE ${src})
# Set include path of target
target_include_directories(
${CUR_TARGET_NAME}
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>)
# Set link libraries of target
target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE yaml-cpp::yaml-cpp
PUBLIC aimrt::interface::aimrt_module_cpp_interface
aimrt::interface::aimrt_module_protobuf_interface
aimrt::protocols::example_aimrt_rpc_gencode)
# Set test of target
if(AIMRT_BUILD_TESTS AND test_files)
add_gtest_target(TEST_TARGET ${CUR_TARGET_NAME} TEST_SRC ${test_files})
endif()

View File

@ -0,0 +1,39 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "aimrt_module_cpp_interface/rpc/rpc_co_filter.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"
#include "proxy_rpc_co_module/global.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
inline co::Task<aimrt::rpc::Status> DebugLogServerFilter(
aimrt::rpc::ContextRef ctx, const void* req_ptr, void* rsp_ptr,
const aimrt::rpc::CoRpcHandle& next) {
AIMRT_INFO("Svr get new rpc call. context: {}, req: {}",
ctx.ToString(), aimrt::Pb2CompactJson(*static_cast<const google::protobuf::Message*>(req_ptr)));
const auto& status = co_await next(ctx, req_ptr, rsp_ptr);
AIMRT_INFO("Svr handle rpc completed, status: {}, rsp: {}",
status.ToString(),
aimrt::Pb2CompactJson(*static_cast<const google::protobuf::Message*>(rsp_ptr)));
co_return status;
}
inline co::Task<aimrt::rpc::Status> TimeCostLogServerFilter(
aimrt::rpc::ContextRef ctx, const void* req_ptr, void* rsp_ptr,
const aimrt::rpc::CoRpcHandle& next) {
auto begin_time = std::chrono::steady_clock::now();
const auto& status = co_await next(ctx, req_ptr, rsp_ptr);
auto end_time = std::chrono::steady_clock::now();
AIMRT_INFO("Svr rpc time cost {} us",
std::chrono::duration_cast<std::chrono::microseconds>(end_time - begin_time).count());
co_return status;
}
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,12 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_rpc_co_module/global.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
aimrt::logger::LoggerRef global_logger;
void SetLogger(aimrt::logger::LoggerRef logger) { global_logger = logger; }
aimrt::logger::LoggerRef GetLogger() { return global_logger; }
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,13 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "aimrt_module_cpp_interface/logger/logger.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
void SetLogger(aimrt::logger::LoggerRef);
aimrt::logger::LoggerRef GetLogger();
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,128 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_rpc_co_module/proxy_rpc_co_module.h"
#include "aimrt_module_cpp_interface/co/sync_wait.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"
#include "proxy_rpc_co_module/filter.h"
#include "yaml-cpp/yaml.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
bool ProxyRpcCoModule::Initialize(aimrt::CoreRef core) {
core_ = core;
try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name_for_client"]) {
service_name_for_client_ = cfg_node["service_name_for_client"].as<std::string>();
}
if (cfg_node["service_name_for_server"]) {
service_name_for_server_ = cfg_node["service_name_for_server"].as<std::string>();
}
}
// Get executor handle
executor_ = core_.GetExecutorManager().GetExecutor("work_thread_pool");
AIMRT_CHECK_ERROR_THROW(executor_ && executor_.SupportTimerSchedule(),
"Get executor 'work_thread_pool' failed.");
// Get rpc handle
auto rpc_handle = core_.GetRpcHandle();
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");
// Register rpc client
bool ret = false;
if (service_name_for_client_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_for_client_);
}
AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");
// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceCoProxy>(rpc_handle);
if (!service_name_for_client_.empty()) {
proxy_->SetServiceName(service_name_for_client_);
}
// Register filter
proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx,
const void* req_ptr, void* rsp_ptr,
const aimrt::rpc::CoRpcHandle& next)
-> co::Task<aimrt::rpc::Status> {
// debuglog
AIMRT_INFO("Client start new rpc call. context: {}, req: {}",
ctx.ToString(), aimrt::Pb2CompactJson(*static_cast<const google::protobuf::Message*>(req_ptr)));
const auto& status = co_await next(ctx, req_ptr, rsp_ptr);
if (status.OK()) {
AIMRT_INFO("Client get rpc ret, status: {}, rsp: {}", status.ToString(),
aimrt::Pb2CompactJson(*static_cast<const google::protobuf::Message*>(rsp_ptr)));
} else {
AIMRT_WARN("Client get rpc error ret, status: {}", status.ToString());
}
co_return status;
});
proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx,
const void* req_ptr, void* rsp_ptr,
const aimrt::rpc::CoRpcHandle& next)
-> co::Task<aimrt::rpc::Status> {
// timecost count
auto begin_time = std::chrono::steady_clock::now();
const auto& status = co_await next(ctx, req_ptr, rsp_ptr);
auto end_time = std::chrono::steady_clock::now();
AIMRT_INFO("Client rpc time cost {} us",
std::chrono::duration_cast<std::chrono::microseconds>(end_time - begin_time).count());
co_return status;
});
SetLogger(core_.GetLogger());
// Create service
service_ptr_ = std::make_shared<ExampleCoComplexCoServiceImpl>(proxy_, executor_);
// Register filter
service_ptr_->RegisterFilter(DebugLogServerFilter);
service_ptr_->RegisterFilter(TimeCostLogServerFilter);
// Register service
bool ret_srv;
if (service_name_for_server_.empty()) {
ret_srv = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret_srv = core_.GetRpcHandle().RegisterService(service_name_for_server_, service_ptr_.get());
}
AIMRT_CHECK_ERROR_THROW(ret_srv, "Register service failed.");
AIMRT_INFO("Init succeeded.");
} catch (const std::exception& e) {
AIMRT_ERROR("Init failed, {}", e.what());
return false;
}
return true;
}
bool ProxyRpcCoModule::Start() {
return true;
}
void ProxyRpcCoModule::Shutdown() {
AIMRT_INFO("Shutdown succeeded.");
}
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,44 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "aimrt_module_cpp_interface/co/async_scope.h"
#include "aimrt_module_cpp_interface/co/task.h"
#include "aimrt_module_cpp_interface/module_base.h"
#include "aimrt_module_cpp_interface/rpc/rpc_co_filter.h"
#include "proxy_rpc_co_module/service.h"
#include "rpc.aimrt_rpc.pb.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
class ProxyRpcCoModule : public aimrt::ModuleBase {
public:
ProxyRpcCoModule() = default;
~ProxyRpcCoModule() override = default;
ModuleInfo Info() const override {
return ModuleInfo{.name = "ProxyRpcCoModule"};
}
bool Initialize(aimrt::CoreRef core) override;
bool Start() override;
void Shutdown() override;
private:
auto GetLogger() { return core_.GetLogger(); }
private:
aimrt::CoreRef core_;
aimrt::executor::ExecutorRef executor_;
std::string service_name_for_client_;
std::string service_name_for_server_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceCoProxy> proxy_;
std::shared_ptr<ExampleCoComplexCoServiceImpl> service_ptr_;
};
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,54 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#include "proxy_rpc_co_module/service.h"
#include "aimrt_module_cpp_interface/co/aimrt_context.h"
#include "aimrt_module_cpp_interface/co/inline_scheduler.h"
#include "aimrt_module_cpp_interface/co/on.h"
#include "aimrt_module_cpp_interface/co/schedule.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"
#include "proxy_rpc_co_module/global.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
co::Task<aimrt::rpc::Status> ExampleCoComplexCoServiceImpl::GetFooData(
aimrt::rpc::ContextRef ctx,
const ::aimrt::protocols::example::GetFooDataReq& req,
::aimrt::protocols::example::GetFooDataRsp& rsp) {
try {
// Create proxy req and rsp
aimrt::protocols::example::GetBarDataReq proxy_req;
aimrt::protocols::example::GetBarDataRsp proxy_rsp;
proxy_req.set_msg(req.msg());
// Create ctx
auto ctx_ptr = proxy_->NewContextSharedPtr();
ctx_ptr->SetTimeout(std::chrono::seconds(3));
// Call rpc
auto status = co_await proxy_->GetBarData(ctx_ptr, proxy_req, proxy_rsp);
rsp.set_code(proxy_rsp.code());
rsp.set_msg(proxy_rsp.msg());
AIMRT_INFO(" Server handle new rpc call. context: {}, req: {}, return rsp: {}",
ctx.ToString(), aimrt::Pb2CompactJson(req), aimrt::Pb2CompactJson(rsp));
} catch (const std::exception& e) {
AIMRT_ERROR("Exit with exception, {}", e.what());
}
co_return aimrt::rpc::Status();
}
co::Task<aimrt::rpc::Status> ExampleCoComplexCoServiceImpl::GetBarData(
aimrt::rpc::ContextRef ctx,
const ::aimrt::protocols::example::GetBarDataReq& req,
::aimrt::protocols::example::GetBarDataRsp& rsp) {
rsp.set_msg("echo " + req.msg());
AIMRT_INFO("Server handle new rpc call. context: {}, req: {}, return rsp: {}",
ctx.ToString(), aimrt::Pb2CompactJson(req), aimrt::Pb2CompactJson(rsp));
co_return aimrt::rpc::Status();
}
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -0,0 +1,36 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.
#pragma once
#include "aimrt_module_cpp_interface/executor/executor.h"
#include "rpc.aimrt_rpc.pb.h"
namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module {
using MsgHandleFunc = std::function<co::Task<void>()>;
class ExampleCoComplexCoServiceImpl : public aimrt::protocols::example::ExampleServiceCoService {
public:
ExampleCoComplexCoServiceImpl(const std::shared_ptr<aimrt::protocols::example::ExampleServiceCoProxy>& proxy,
aimrt::executor::ExecutorRef executor) : proxy_(proxy),
executor_(executor) {}
~ExampleCoComplexCoServiceImpl() override = default;
co::Task<aimrt::rpc::Status> GetFooData(
aimrt::rpc::ContextRef ctx,
const ::aimrt::protocols::example::GetFooDataReq& req,
::aimrt::protocols::example::GetFooDataRsp& rsp) override;
co::Task<aimrt::rpc::Status> GetBarData(
aimrt::rpc::ContextRef ctx,
const ::aimrt::protocols::example::GetBarDataReq& req,
::aimrt::protocols::example::GetBarDataRsp& rsp) override;
private:
MsgHandleFunc handle_;
std::shared_ptr<aimrt::protocols::example::ExampleServiceCoProxy> proxy_;
aimrt::executor::ExecutorRef executor_;
};
} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module

View File

@ -27,6 +27,7 @@ target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE aimrt::interface::aimrt_pkg_c_interface
${CUR_SUPERIOR_NAMESPACE}::benchmark_rpc_client_module
${CUR_SUPERIOR_NAMESPACE}::proxy_rpc_co_module
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_client_module
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_server_module
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_sync_client_module

View File

@ -12,6 +12,7 @@
#include "normal_rpc_future_client_module/normal_rpc_future_client_module.h"
#include "normal_rpc_sync_client_module/normal_rpc_sync_client_module.h"
#include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h"
#include "proxy_rpc_co_module/proxy_rpc_co_module.h"
using namespace aimrt::examples::cpp::pb_rpc;
@ -19,6 +20,9 @@ static std::tuple<std::string_view, std::function<aimrt::ModuleBase*()>> aimrt_m
{"BenchmarkRpcClientModule", []() -> aimrt::ModuleBase* {
return new benchmark_rpc_client_module::BenchmarkRpcClientModule();
}},
{"ProxyRpcCoModule", []() -> aimrt::ModuleBase* {
return new proxy_rpc_co_module ::ProxyRpcCoModule();
}},
{"NormalRpcCoClientModule", []() -> aimrt::ModuleBase* {
return new normal_rpc_co_client_module::NormalRpcCoClientModule();
}},

View File

@ -27,6 +27,7 @@ target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE aimrt::interface::aimrt_pkg_c_interface
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_server_module
${CUR_SUPERIOR_NAMESPACE}::proxy_rpc_co_module
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_sync_server_module
${CUR_SUPERIOR_NAMESPACE}::normal_rpc_async_server_module)

View File

@ -7,13 +7,16 @@
#include "normal_rpc_async_server_module/normal_rpc_async_server_module.h"
#include "normal_rpc_co_server_module/normal_rpc_co_server_module.h"
#include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h"
#include "proxy_rpc_co_module/proxy_rpc_co_module.h"
using namespace aimrt::examples::cpp::pb_rpc;
static std::tuple<std::string_view, std::function<aimrt::ModuleBase*()>> aimrt_module_register_array[]{
{"NormalRpcCoServerModule", []() -> aimrt::ModuleBase* {
return new normal_rpc_co_server_module::NormalRpcCoServerModule();
}},
{"ProxyRpcCoModule", []() -> aimrt::ModuleBase* {
return new proxy_rpc_co_module::ProxyRpcCoModule();
}},
{"NormalRpcAsyncServerModule", []() -> aimrt::ModuleBase* {
return new normal_rpc_async_server_module::NormalRpcAsyncServerModule();
}},

View File

@ -0,0 +1,62 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: client_statistics_executor
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: client_statistics_executor
- name: client_executor_0
type: asio_thread
- name: client_executor_1
type: asio_thread
- name: client_executor_2
type: asio_thread
- name: client_executor_3
type: asio_thread
rpc:
backends:
- type: zenoh
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_rpc_client_pkg.so
enable_modules: [BenchmarkRpcClientModule]
modules:
- name: BenchmarkRpcClientModule
log_lvl: INFO
# Module custom configuration
BenchmarkRpcClientModule:
service_name: "ExampleProxyService"
max_parallel: 4
bench_plans:
- perf_mode: bench
msg_size: 256
parallel: 4
msg_count: 10000
- perf_mode: fixed-freq
freq: 1000
msg_size: 1024
parallel: 2
msg_count: 1000

View File

@ -0,0 +1,45 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: work_thread_pool
rpc:
backends:
- type: zenoh
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
enable_backends: [zenoh]
servers_options:
- func_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_rpc_server_pkg.so
enable_modules: [ProxyRpcCoModule]
modules:
- name: ProxyRpcCoModule
log_lvl: Warn
ProxyRpcCoModule:
service_name_for_server: "ExampleProxyService"

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy_cfg.yaml

View File

@ -0,0 +1,5 @@
#!/bin/bash
source install/share/example_ros2/local_setup.bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_proxy_server_cfg.yaml