diff --git a/src/examples/cpp/pb_rpc/CMakeLists.txt b/src/examples/cpp/pb_rpc/CMakeLists.txt index b8f8d7be0..eb9ad3673 100644 --- a/src/examples/cpp/pb_rpc/CMakeLists.txt +++ b/src/examples/cpp/pb_rpc/CMakeLists.txt @@ -9,6 +9,7 @@ set_namespace() # module add_subdirectory(module/benchmark_rpc_client_module) +add_subdirectory(module/proxy_rpc_co_module) add_subdirectory(module/normal_rpc_co_client_module) add_subdirectory(module/normal_rpc_sync_client_module) add_subdirectory(module/normal_rpc_async_client_module) diff --git a/src/examples/cpp/pb_rpc/README.md b/src/examples/cpp/pb_rpc/README.md index 3b5b9eef3..203aac196 100644 --- a/src/examples/cpp/pb_rpc/README.md +++ b/src/examples/cpp/pb_rpc/README.md @@ -196,3 +196,44 @@ 说明: - 此示例与 **protobuf rpc co** 示例基本一致,唯一的区别是将 `NormalRpcCoClientModule` 和 `NormalRpcCoServerModule` 集成到 `pb_rpc_pkg` 一个 Pkg 中; + +## protobuf proxy rpc co + + +一个基于 protobuf 协议、协程型接口与 local 后端的 rpc 示例,演示内容包括: +- 如何使用 protobuf 协议作为 rpc 服务协议; +- 如何基于 Module 方式使用 Executor、协程型 Rpc client 和 server 接口; +- 如何使用协程形式的 Rpc filter 功能; +- 如何使用 local 类型的 rpc 后端; +- 如何以 Pkg 模式集成 Module 并启动; +- 如何构建链式 rpc 调用; + + + +核心代码: +- [rpc.proto](../../../protocols/example/rpc.proto) +- [proxy_rpc_co_module.cc](./module/proxy_rpc_co_module/proxy_rpc_co_module.cc) +- [service.cc](./module/proxy_rpc_co_module/service.cc) +- [pb_rpc_client_pkg/pkg_main.cc](./pkg/pb_rpc_client_pkg/pkg_main.cc) +- [pb_rpc_server_pkg/pkg_main.cc](./pkg/pb_rpc_server_pkg/pkg_main.cc) + + +配置文件: +- [examples_cpp_pb_proxy_rpc_co_cfg.yaml](./install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml) + + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_WITH_PROTOBUF` 选项编译 AimRT; +- 直接运行 build 目录下`start_examples_cpp_pb_rpc_co.sh`脚本启动进程; +- 键入`ctrl-c`停止进程; + + +说明: +- 此示例创建了以下三个模块, 以构成 RPC 的链式调用: + - `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService_1` 发起 RPC 请求; + - `ProxyRpcCoModule`:会注册 `ExampleService_1` 服务端,通过协程 Server 接口,提供 echo 功能, 并向 `ExampleService_2` 发起 RPC 请求; + - `NormalRpcCoServerModule`:会注册 `ExampleService_2` 服务端,通过协程 Server 接口,提供 echo 功能; +- 此示例将 `NormalRpcCoClientModule` 集成到 `pb_rpc_client_pkg`, 将 `ProxyRpcCoModule` 和 `NormalRpcCoServerModule` 集成到 `pb_rpc_server_pkg` Pkg 中,并在配置文件中加载这两个 Pkg 到一个 AimRT 进程中; +- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时; +- 此示例使用 local 类型的 rpc 后端进行通信,并配置了 `timeout_handle` 执行器作为超时执行器; diff --git a/src/examples/cpp/pb_rpc/install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml b/src/examples/cpp/pb_rpc/install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml new file mode 100644 index 000000000..5ec1b2815 --- /dev/null +++ b/src/examples/cpp/pb_rpc/install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml @@ -0,0 +1,54 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 4 + - name: timeout_handle + type: time_wheel + options: + bind_executor: work_thread_pool + rpc: + backends: + - type: local + options: + timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + enable_backends: [local] + servers_options: + - func_name: "(.*)" + enable_backends: [local] + module: + pkgs: + - path: ./libpb_rpc_client_pkg.so + enable_modules: [NormalRpcCoClientModule] + - path: ./libpb_rpc_server_pkg.so + enable_modules: [ProxyRpcCoModule,NormalRpcCoServerModule] + modules: + - name: NormalRpcCoClientModule + log_lvl: INFO + - name: ProxyRpcCoModule + log_lvl: INFO + - name: NormalRpcCoServerModule + log_lvl: INFO + +# Module custom configuration +NormalRpcCoClientModule: + rpc_frq: 0.5 + service_name: "ExampleService_1" + +ProxyRpcCoModule: + service_name_for_client: "ExampleService_2" + service_name_for_server: "ExampleService_1" + +NormalRpcCoServerModule: + service_name: "ExampleService_2" \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/install/linux/bin/start_examples_cpp_pb_proxy_rpc_co.sh b/src/examples/cpp/pb_rpc/install/linux/bin/start_examples_cpp_pb_proxy_rpc_co.sh new file mode 100755 index 000000000..603cd2563 --- /dev/null +++ b/src/examples/cpp/pb_rpc/install/linux/bin/start_examples_cpp_pb_proxy_rpc_co.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.cc b/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.cc index c312888c1..b34d63f5e 100644 --- a/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.cc +++ b/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.cc @@ -47,6 +47,10 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) { YAML::Node cfg_node = YAML::LoadFile(std::string(file_path)); max_parallel_ = cfg_node["max_parallel"].as(); + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } + if (cfg_node["bench_plans"] && cfg_node["bench_plans"].IsSequence()) { for (const auto& bench_plan_node : cfg_node["bench_plans"]) { BenchPlan bench_plan; @@ -83,12 +87,22 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) { AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); // Register rpc client - bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + bool ret = false; + if (service_name_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); // Create rpc proxy proxy_ = std::make_shared(rpc_handle); + if (!service_name_.empty()) { + proxy_->SetServiceName(service_name_); + } + // Check executor client_statistics_executor_ = core_.GetExecutorManager().GetExecutor("client_statistics_executor"); AIMRT_CHECK_ERROR_THROW( diff --git a/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.h b/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.h index 85dacb394..13427916c 100644 --- a/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.h +++ b/src/examples/cpp/pb_rpc/module/benchmark_rpc_client_module/benchmark_rpc_client_module.h @@ -68,6 +68,7 @@ class BenchmarkRpcClientModule : public aimrt::ModuleBase { // cfg uint32_t max_parallel_; std::vector bench_plans_; + std::string service_name_; }; } // namespace aimrt::examples::cpp::pb_rpc::benchmark_rpc_client_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.cc index 654b58bd3..fbabacb13 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.cc @@ -17,6 +17,10 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) { if (!file_path.empty()) { YAML::Node cfg_node = YAML::LoadFile(file_path); rpc_frq_ = cfg_node["rpc_frq"].as(); + + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } } // Get executor handle @@ -29,9 +33,22 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) { AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); // Register rpc client - bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + bool ret = false; + if (service_name_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); + // Create rpc proxy + proxy_ = std::make_shared(rpc_handle); + + if (!service_name_.empty()) { + proxy_->SetServiceName(service_name_); + } + } catch (const std::exception& e) { AIMRT_ERROR("Init failed, {}", e.what()); return false; @@ -73,22 +90,19 @@ void NormalRpcAsyncClientModule::MainLoopFunc() { count_++; AIMRT_INFO("Loop count : {} -------------------------", count_); - // Create proxy - aimrt::protocols::example::ExampleServiceAsyncProxy proxy(core_.GetRpcHandle()); - // Create req and rsp auto req_ptr = std::make_shared(); auto rsp_ptr = std::make_shared(); req_ptr->set_msg("hello world foo, count " + std::to_string(count_)); // Create ctx - auto ctx_ptr = proxy.NewContextSharedPtr(); + auto ctx_ptr = proxy_->NewContextSharedPtr(); ctx_ptr->SetTimeout(std::chrono::seconds(3)); AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(*req_ptr)); // Call rpc - proxy.GetFooData( + proxy_->GetFooData( ctx_ptr, *req_ptr, *rsp_ptr, [this, ctx_ptr, req_ptr, rsp_ptr](aimrt::rpc::Status status) { // Check result diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.h index af7b92f47..2c0cea186 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_async_client_module/normal_rpc_async_client_module.h @@ -42,6 +42,9 @@ class NormalRpcAsyncClientModule : public aimrt::ModuleBase { std::promise stop_sig_; double rpc_frq_ = 1.0; + std::string service_name_; + + std::shared_ptr proxy_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_client_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.cc index 6c211849a..909c5be4d 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.cc @@ -4,6 +4,8 @@ #include "normal_rpc_async_server_module/normal_rpc_async_server_module.h" #include "normal_rpc_async_server_module/global.h" +#include "yaml-cpp/yaml.h" + namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module { bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) { @@ -12,11 +14,26 @@ bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) { SetLogger(core_.GetLogger()); try { + // Read cfg + std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath()); + if (!file_path.empty()) { + YAML::Node cfg_node = YAML::LoadFile(file_path); + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } + } + // Create service service_ptr_ = std::make_shared(); // Register service - bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + bool ret = false; + if (service_name_.empty()) { + ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + } else { + ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get()); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register service failed."); AIMRT_INFO("Register service succeeded."); diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.h index 48b0af2cb..62dacf36f 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_async_server_module/normal_rpc_async_server_module.h @@ -28,6 +28,8 @@ class NormalRpcAsyncServerModule : public aimrt::ModuleBase { private: aimrt::CoreRef core_; std::shared_ptr service_ptr_; + + std::string service_name_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc index eafcc2652..680c7f80c 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc @@ -22,6 +22,10 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) { if (!file_path.empty()) { YAML::Node cfg_node = YAML::LoadFile(file_path); rpc_frq_ = cfg_node["rpc_frq"].as(); + + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } } // Get executor handle @@ -34,12 +38,22 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) { AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); // Register rpc client - bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + bool ret = false; + if (service_name_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); // Create rpc proxy proxy_ = std::make_shared(rpc_handle); + if (!service_name_.empty()) { + proxy_->SetServiceName(service_name_); + } + // Register filter proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx, const void* req_ptr, void* rsp_ptr, diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.h index 3de88cac2..f4b48ea44 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.h @@ -43,6 +43,8 @@ class NormalRpcCoClientModule : public aimrt::ModuleBase { std::atomic_bool run_flag_ = true; double rpc_frq_ = 1.0; + std::string service_name_; + std::shared_ptr proxy_; }; diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc index 1385785bf..7d1869fe8 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc @@ -5,6 +5,8 @@ #include "normal_rpc_co_server_module/filter.h" #include "normal_rpc_co_server_module/global.h" +#include "yaml-cpp/yaml.h" + namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module { bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) { @@ -13,6 +15,15 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) { SetLogger(core_.GetLogger()); try { + // Read cfg + std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath()); + if (!file_path.empty()) { + YAML::Node cfg_node = YAML::LoadFile(file_path); + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } + } + // Create service service_ptr_ = std::make_shared(); @@ -21,7 +32,13 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) { service_ptr_->RegisterFilter(TimeCostLogServerFilter); // Register service - bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + bool ret = false; + if (service_name_.empty()) { + ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + } else { + ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get()); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register service failed."); AIMRT_INFO("Register service succeeded."); diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.h index 648b1e0a3..6d6bfad36 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.h @@ -28,6 +28,8 @@ class NormalRpcCoServerModule : public aimrt::ModuleBase { private: aimrt::CoreRef core_; std::shared_ptr service_ptr_; + + std::string service_name_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.cc index 7ba02b116..9349df40d 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.cc @@ -17,6 +17,10 @@ bool NormalRpcFutureClientModule::Initialize(aimrt::CoreRef core) { if (!file_path.empty()) { YAML::Node cfg_node = YAML::LoadFile(file_path); rpc_frq_ = cfg_node["rpc_frq"].as(); + + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } } // Get executor handle @@ -28,9 +32,22 @@ bool NormalRpcFutureClientModule::Initialize(aimrt::CoreRef core) { AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); // Register rpc client - bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + bool ret = false; + if (service_name_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); + // Create rpc proxy + proxy_ = std::make_shared(rpc_handle); + + if (!service_name_.empty()) { + proxy_->SetServiceName(service_name_); + } + } catch (const std::exception& e) { AIMRT_ERROR("Init failed, {}", e.what()); return false; @@ -70,9 +87,6 @@ void NormalRpcFutureClientModule::MainLoop() { try { AIMRT_INFO("Start MainLoop."); - // Create proxy - aimrt::protocols::example::ExampleServiceFutureProxy proxy(core_.GetRpcHandle()); - uint32_t count = 0; while (run_flag_) { // Sleep @@ -87,13 +101,13 @@ void NormalRpcFutureClientModule::MainLoop() { req.set_msg("hello world foo, count " + std::to_string(count)); // Create ctx - auto ctx_ptr = proxy.NewContextSharedPtr(); + auto ctx_ptr = proxy_->NewContextSharedPtr(); ctx_ptr->SetTimeout(std::chrono::seconds(3)); AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req)); // Call rpc - auto status_future = proxy.GetFooData(ctx_ptr, req, rsp); + auto status_future = proxy_->GetFooData(ctx_ptr, req, rsp); auto status = status_future.get(); // Check result diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.h index 0db37400b..b95e807c8 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_future_client_module/normal_rpc_future_client_module.h @@ -41,6 +41,9 @@ class NormalRpcFutureClientModule : public aimrt::ModuleBase { std::promise stop_sig_; double rpc_frq_ = 1.0; + std::string service_name_; + + std::shared_ptr proxy_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_future_client_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.cc index bdcfa6b1d..7e63b363d 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.cc @@ -17,6 +17,10 @@ bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) { if (!file_path.empty()) { YAML::Node cfg_node = YAML::LoadFile(file_path); rpc_frq_ = cfg_node["rpc_frq"].as(); + + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } } // Get executor handle @@ -28,9 +32,21 @@ bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) { AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); // Register rpc client - bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + bool ret = false; + if (service_name_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_); + } AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); + // Create rpc proxy + proxy_ = std::make_shared(rpc_handle); + + if (!service_name_.empty()) { + proxy_->SetServiceName(service_name_); + } + } catch (const std::exception& e) { AIMRT_ERROR("Init failed, {}", e.what()); return false; @@ -70,9 +86,6 @@ void NormalRpcSyncClientModule::MainLoop() { try { AIMRT_INFO("Start MainLoop."); - // Create proxy - aimrt::protocols::example::ExampleServiceSyncProxy proxy(core_.GetRpcHandle()); - uint32_t count = 0; while (run_flag_) { // Sleep @@ -87,13 +100,13 @@ void NormalRpcSyncClientModule::MainLoop() { req.set_msg("hello world foo, count " + std::to_string(count)); // Create ctx - auto ctx_ptr = proxy.NewContextSharedPtr(); + auto ctx_ptr = proxy_->NewContextSharedPtr(); ctx_ptr->SetTimeout(std::chrono::seconds(3)); AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req)); // Call rpc - auto status = proxy.GetFooData(ctx_ptr, req, rsp); + auto status = proxy_->GetFooData(ctx_ptr, req, rsp); // Check result if (status.OK()) { diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.h index 3e0e17e7c..03196c7f1 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_client_module/normal_rpc_sync_client_module.h @@ -41,6 +41,9 @@ class NormalRpcSyncClientModule : public aimrt::ModuleBase { std::promise stop_sig_; double rpc_frq_ = 1.0; + std::string service_name_; + + std::shared_ptr proxy_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_client_module diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.cc b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.cc index 4baa1b352..142fb11b3 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.cc +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.cc @@ -4,6 +4,8 @@ #include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h" #include "normal_rpc_sync_server_module/global.h" +#include "yaml-cpp/yaml.h" + namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module { bool NormalRpcSyncServerModule::Initialize(aimrt::CoreRef core) { @@ -12,11 +14,26 @@ bool NormalRpcSyncServerModule::Initialize(aimrt::CoreRef core) { SetLogger(core_.GetLogger()); try { + // Read cfg + std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath()); + if (!file_path.empty()) { + YAML::Node cfg_node = YAML::LoadFile(file_path); + if (cfg_node["service_name"]) { + service_name_ = cfg_node["service_name"].as(); + } + } + // Create service service_ptr_ = std::make_shared(); // Register service - bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + bool ret = false; + if (service_name_.empty()) { + ret = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + } else { + ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get()); + } + AIMRT_CHECK_ERROR_THROW(ret, "Register service failed."); AIMRT_INFO("Register service succeeded."); diff --git a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.h b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.h index 2ec6c82d6..d34107940 100644 --- a/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.h +++ b/src/examples/cpp/pb_rpc/module/normal_rpc_sync_server_module/normal_rpc_sync_server_module.h @@ -28,6 +28,8 @@ class NormalRpcSyncServerModule : public aimrt::ModuleBase { private: aimrt::CoreRef core_; std::shared_ptr service_ptr_; + + std::string service_name_; }; } // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/CMakeLists.txt b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/CMakeLists.txt new file mode 100644 index 000000000..ef293ffb5 --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/CMakeLists.txt @@ -0,0 +1,44 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +# Get the current folder name +string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + +# Get namespace +get_namespace(CUR_SUPERIOR_NAMESPACE) +string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE}) + +# Set target name +set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}) +set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR}) + +# Set file collection +file(GLOB_RECURSE head_files ${CMAKE_CURRENT_SOURCE_DIR}/*.h) +file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc) +file(GLOB_RECURSE test_files ${CMAKE_CURRENT_SOURCE_DIR}/*_test.cc) +list(REMOVE_ITEM src ${test_files}) + +# Add target +add_library(${CUR_TARGET_NAME} STATIC) +add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME}) + +# Set source file of target +target_sources(${CUR_TARGET_NAME} PRIVATE ${src}) + +# Set include path of target +target_include_directories( + ${CUR_TARGET_NAME} + PUBLIC $) + +# Set link libraries of target +target_link_libraries( + ${CUR_TARGET_NAME} + PRIVATE yaml-cpp::yaml-cpp + PUBLIC aimrt::interface::aimrt_module_cpp_interface + aimrt::interface::aimrt_module_protobuf_interface + aimrt::protocols::example_aimrt_rpc_gencode) + +# Set test of target +if(AIMRT_BUILD_TESTS AND test_files) + add_gtest_target(TEST_TARGET ${CUR_TARGET_NAME} TEST_SRC ${test_files}) +endif() diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/filter.h b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/filter.h new file mode 100644 index 000000000..b2c5ffa8c --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/filter.h @@ -0,0 +1,39 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "aimrt_module_cpp_interface/rpc/rpc_co_filter.h" +#include "aimrt_module_protobuf_interface/util/protobuf_tools.h" +#include "proxy_rpc_co_module/global.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +inline co::Task DebugLogServerFilter( + aimrt::rpc::ContextRef ctx, const void* req_ptr, void* rsp_ptr, + const aimrt::rpc::CoRpcHandle& next) { + AIMRT_INFO("Svr get new rpc call. context: {}, req: {}", + ctx.ToString(), aimrt::Pb2CompactJson(*static_cast(req_ptr))); + + const auto& status = co_await next(ctx, req_ptr, rsp_ptr); + + AIMRT_INFO("Svr handle rpc completed, status: {}, rsp: {}", + status.ToString(), + aimrt::Pb2CompactJson(*static_cast(rsp_ptr))); + co_return status; +} + +inline co::Task TimeCostLogServerFilter( + aimrt::rpc::ContextRef ctx, const void* req_ptr, void* rsp_ptr, + const aimrt::rpc::CoRpcHandle& next) { + auto begin_time = std::chrono::steady_clock::now(); + const auto& status = co_await next(ctx, req_ptr, rsp_ptr); + auto end_time = std::chrono::steady_clock::now(); + + AIMRT_INFO("Svr rpc time cost {} us", + std::chrono::duration_cast(end_time - begin_time).count()); + + co_return status; +} + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.cc b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.cc new file mode 100644 index 000000000..965fa42f9 --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.cc @@ -0,0 +1,12 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_rpc_co_module/global.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +aimrt::logger::LoggerRef global_logger; +void SetLogger(aimrt::logger::LoggerRef logger) { global_logger = logger; } +aimrt::logger::LoggerRef GetLogger() { return global_logger; } + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.h b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.h new file mode 100644 index 000000000..840d29f67 --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/global.h @@ -0,0 +1,13 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "aimrt_module_cpp_interface/logger/logger.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +void SetLogger(aimrt::logger::LoggerRef); +aimrt::logger::LoggerRef GetLogger(); + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.cc b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.cc new file mode 100644 index 000000000..060d87ca7 --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.cc @@ -0,0 +1,128 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_rpc_co_module/proxy_rpc_co_module.h" +#include "aimrt_module_cpp_interface/co/sync_wait.h" +#include "aimrt_module_protobuf_interface/util/protobuf_tools.h" +#include "proxy_rpc_co_module/filter.h" +#include "yaml-cpp/yaml.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +bool ProxyRpcCoModule::Initialize(aimrt::CoreRef core) { + core_ = core; + + try { + // Read cfg + std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath()); + if (!file_path.empty()) { + YAML::Node cfg_node = YAML::LoadFile(file_path); + + if (cfg_node["service_name_for_client"]) { + service_name_for_client_ = cfg_node["service_name_for_client"].as(); + } + if (cfg_node["service_name_for_server"]) { + service_name_for_server_ = cfg_node["service_name_for_server"].as(); + } + } + + // Get executor handle + executor_ = core_.GetExecutorManager().GetExecutor("work_thread_pool"); + AIMRT_CHECK_ERROR_THROW(executor_ && executor_.SupportTimerSchedule(), + "Get executor 'work_thread_pool' failed."); + + // Get rpc handle + auto rpc_handle = core_.GetRpcHandle(); + AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed."); + + // Register rpc client + bool ret = false; + if (service_name_for_client_.empty()) { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle); + } else { + ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_for_client_); + } + + AIMRT_CHECK_ERROR_THROW(ret, "Register client failed."); + + // Create rpc proxy + proxy_ = std::make_shared(rpc_handle); + + if (!service_name_for_client_.empty()) { + proxy_->SetServiceName(service_name_for_client_); + } + + // Register filter + proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx, + const void* req_ptr, void* rsp_ptr, + const aimrt::rpc::CoRpcHandle& next) + -> co::Task { + // debuglog + AIMRT_INFO("Client start new rpc call. context: {}, req: {}", + ctx.ToString(), aimrt::Pb2CompactJson(*static_cast(req_ptr))); + const auto& status = co_await next(ctx, req_ptr, rsp_ptr); + if (status.OK()) { + AIMRT_INFO("Client get rpc ret, status: {}, rsp: {}", status.ToString(), + aimrt::Pb2CompactJson(*static_cast(rsp_ptr))); + } else { + AIMRT_WARN("Client get rpc error ret, status: {}", status.ToString()); + } + + co_return status; + }); + + proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx, + const void* req_ptr, void* rsp_ptr, + const aimrt::rpc::CoRpcHandle& next) + -> co::Task { + // timecost count + auto begin_time = std::chrono::steady_clock::now(); + const auto& status = co_await next(ctx, req_ptr, rsp_ptr); + auto end_time = std::chrono::steady_clock::now(); + + AIMRT_INFO("Client rpc time cost {} us", + std::chrono::duration_cast(end_time - begin_time).count()); + + co_return status; + }); + + SetLogger(core_.GetLogger()); + + // Create service + service_ptr_ = std::make_shared(proxy_, executor_); + + // Register filter + service_ptr_->RegisterFilter(DebugLogServerFilter); + service_ptr_->RegisterFilter(TimeCostLogServerFilter); + + // Register service + + bool ret_srv; + + if (service_name_for_server_.empty()) { + ret_srv = core_.GetRpcHandle().RegisterService(service_ptr_.get()); + } else { + ret_srv = core_.GetRpcHandle().RegisterService(service_name_for_server_, service_ptr_.get()); + } + + AIMRT_CHECK_ERROR_THROW(ret_srv, "Register service failed."); + + AIMRT_INFO("Init succeeded."); + + } catch (const std::exception& e) { + AIMRT_ERROR("Init failed, {}", e.what()); + return false; + } + + return true; +} + +bool ProxyRpcCoModule::Start() { + return true; +} + +void ProxyRpcCoModule::Shutdown() { + AIMRT_INFO("Shutdown succeeded."); +} + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.h b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.h new file mode 100644 index 000000000..851abd538 --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/proxy_rpc_co_module.h @@ -0,0 +1,44 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "aimrt_module_cpp_interface/co/async_scope.h" +#include "aimrt_module_cpp_interface/co/task.h" +#include "aimrt_module_cpp_interface/module_base.h" +#include "aimrt_module_cpp_interface/rpc/rpc_co_filter.h" +#include "proxy_rpc_co_module/service.h" + +#include "rpc.aimrt_rpc.pb.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { +class ProxyRpcCoModule : public aimrt::ModuleBase { + public: + ProxyRpcCoModule() = default; + ~ProxyRpcCoModule() override = default; + + ModuleInfo Info() const override { + return ModuleInfo{.name = "ProxyRpcCoModule"}; + } + + bool Initialize(aimrt::CoreRef core) override; + + bool Start() override; + + void Shutdown() override; + + private: + auto GetLogger() { return core_.GetLogger(); } + + private: + aimrt::CoreRef core_; + aimrt::executor::ExecutorRef executor_; + + std::string service_name_for_client_; + std::string service_name_for_server_; + + std::shared_ptr proxy_; + std::shared_ptr service_ptr_; +}; + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.cc b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.cc new file mode 100644 index 000000000..a6a63b6ff --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#include "proxy_rpc_co_module/service.h" +#include "aimrt_module_cpp_interface/co/aimrt_context.h" +#include "aimrt_module_cpp_interface/co/inline_scheduler.h" +#include "aimrt_module_cpp_interface/co/on.h" +#include "aimrt_module_cpp_interface/co/schedule.h" +#include "aimrt_module_protobuf_interface/util/protobuf_tools.h" +#include "proxy_rpc_co_module/global.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +co::Task ExampleCoComplexCoServiceImpl::GetFooData( + aimrt::rpc::ContextRef ctx, + const ::aimrt::protocols::example::GetFooDataReq& req, + ::aimrt::protocols::example::GetFooDataRsp& rsp) { + try { + // Create proxy req and rsp + aimrt::protocols::example::GetBarDataReq proxy_req; + aimrt::protocols::example::GetBarDataRsp proxy_rsp; + proxy_req.set_msg(req.msg()); + + // Create ctx + auto ctx_ptr = proxy_->NewContextSharedPtr(); + ctx_ptr->SetTimeout(std::chrono::seconds(3)); + + // Call rpc + auto status = co_await proxy_->GetBarData(ctx_ptr, proxy_req, proxy_rsp); + + rsp.set_code(proxy_rsp.code()); + rsp.set_msg(proxy_rsp.msg()); + + AIMRT_INFO(" Server handle new rpc call. context: {}, req: {}, return rsp: {}", + ctx.ToString(), aimrt::Pb2CompactJson(req), aimrt::Pb2CompactJson(rsp)); + + } catch (const std::exception& e) { + AIMRT_ERROR("Exit with exception, {}", e.what()); + } + co_return aimrt::rpc::Status(); +} + +co::Task ExampleCoComplexCoServiceImpl::GetBarData( + aimrt::rpc::ContextRef ctx, + const ::aimrt::protocols::example::GetBarDataReq& req, + ::aimrt::protocols::example::GetBarDataRsp& rsp) { + rsp.set_msg("echo " + req.msg()); + + AIMRT_INFO("Server handle new rpc call. context: {}, req: {}, return rsp: {}", + ctx.ToString(), aimrt::Pb2CompactJson(req), aimrt::Pb2CompactJson(rsp)); + co_return aimrt::rpc::Status(); +} + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.h b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.h new file mode 100644 index 000000000..7e3bdaf7d --- /dev/null +++ b/src/examples/cpp/pb_rpc/module/proxy_rpc_co_module/service.h @@ -0,0 +1,36 @@ +// Copyright (c) 2023, AgiBot Inc. +// All rights reserved. + +#pragma once + +#include "aimrt_module_cpp_interface/executor/executor.h" +#include "rpc.aimrt_rpc.pb.h" + +namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module { + +using MsgHandleFunc = std::function()>; + +class ExampleCoComplexCoServiceImpl : public aimrt::protocols::example::ExampleServiceCoService { + public: + ExampleCoComplexCoServiceImpl(const std::shared_ptr& proxy, + aimrt::executor::ExecutorRef executor) : proxy_(proxy), + executor_(executor) {} + ~ExampleCoComplexCoServiceImpl() override = default; + + co::Task GetFooData( + aimrt::rpc::ContextRef ctx, + const ::aimrt::protocols::example::GetFooDataReq& req, + ::aimrt::protocols::example::GetFooDataRsp& rsp) override; + + co::Task GetBarData( + aimrt::rpc::ContextRef ctx, + const ::aimrt::protocols::example::GetBarDataReq& req, + ::aimrt::protocols::example::GetBarDataRsp& rsp) override; + + private: + MsgHandleFunc handle_; + std::shared_ptr proxy_; + aimrt::executor::ExecutorRef executor_; +}; + +} // namespace aimrt::examples::cpp::pb_rpc::proxy_rpc_co_module \ No newline at end of file diff --git a/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/CMakeLists.txt b/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/CMakeLists.txt index 2ba781585..674c779a8 100644 --- a/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/CMakeLists.txt +++ b/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries( ${CUR_TARGET_NAME} PRIVATE aimrt::interface::aimrt_pkg_c_interface ${CUR_SUPERIOR_NAMESPACE}::benchmark_rpc_client_module + ${CUR_SUPERIOR_NAMESPACE}::proxy_rpc_co_module ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_client_module ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_server_module ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_sync_client_module diff --git a/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/pkg_main.cc b/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/pkg_main.cc index 43747090a..fc9c1f47d 100644 --- a/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/pkg_main.cc +++ b/src/examples/cpp/pb_rpc/pkg/pb_rpc_pkg/pkg_main.cc @@ -12,6 +12,7 @@ #include "normal_rpc_future_client_module/normal_rpc_future_client_module.h" #include "normal_rpc_sync_client_module/normal_rpc_sync_client_module.h" #include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h" +#include "proxy_rpc_co_module/proxy_rpc_co_module.h" using namespace aimrt::examples::cpp::pb_rpc; @@ -19,6 +20,9 @@ static std::tuple> aimrt_m {"BenchmarkRpcClientModule", []() -> aimrt::ModuleBase* { return new benchmark_rpc_client_module::BenchmarkRpcClientModule(); }}, + {"ProxyRpcCoModule", []() -> aimrt::ModuleBase* { + return new proxy_rpc_co_module ::ProxyRpcCoModule(); + }}, {"NormalRpcCoClientModule", []() -> aimrt::ModuleBase* { return new normal_rpc_co_client_module::NormalRpcCoClientModule(); }}, diff --git a/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/CMakeLists.txt b/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/CMakeLists.txt index d41894118..9d429b60b 100644 --- a/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/CMakeLists.txt +++ b/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries( ${CUR_TARGET_NAME} PRIVATE aimrt::interface::aimrt_pkg_c_interface ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_co_server_module + ${CUR_SUPERIOR_NAMESPACE}::proxy_rpc_co_module ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_sync_server_module ${CUR_SUPERIOR_NAMESPACE}::normal_rpc_async_server_module) diff --git a/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/pkg_main.cc b/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/pkg_main.cc index c2fd5e923..8ee366f82 100644 --- a/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/pkg_main.cc +++ b/src/examples/cpp/pb_rpc/pkg/pb_rpc_server_pkg/pkg_main.cc @@ -7,13 +7,16 @@ #include "normal_rpc_async_server_module/normal_rpc_async_server_module.h" #include "normal_rpc_co_server_module/normal_rpc_co_server_module.h" #include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h" - +#include "proxy_rpc_co_module/proxy_rpc_co_module.h" using namespace aimrt::examples::cpp::pb_rpc; static std::tuple> aimrt_module_register_array[]{ {"NormalRpcCoServerModule", []() -> aimrt::ModuleBase* { return new normal_rpc_co_server_module::NormalRpcCoServerModule(); }}, + {"ProxyRpcCoModule", []() -> aimrt::ModuleBase* { + return new proxy_rpc_co_module::ProxyRpcCoModule(); + }}, {"NormalRpcAsyncServerModule", []() -> aimrt::ModuleBase* { return new normal_rpc_async_server_module::NormalRpcAsyncServerModule(); }}, diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy_cfg.yaml new file mode 100644 index 000000000..a844ad18f --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy_cfg.yaml @@ -0,0 +1,62 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: client_statistics_executor + type: asio_thread + options: + thread_num: 4 + - name: timeout_handle + type: time_wheel + options: + bind_executor: client_statistics_executor + - name: client_executor_0 + type: asio_thread + - name: client_executor_1 + type: asio_thread + - name: client_executor_2 + type: asio_thread + - name: client_executor_3 + type: asio_thread + rpc: + backends: + - type: zenoh + options: + timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_rpc_client_pkg.so + enable_modules: [BenchmarkRpcClientModule] + modules: + - name: BenchmarkRpcClientModule + log_lvl: INFO + +# Module custom configuration +BenchmarkRpcClientModule: + service_name: "ExampleProxyService" + max_parallel: 4 + bench_plans: + - perf_mode: bench + msg_size: 256 + parallel: 4 + msg_count: 10000 + - perf_mode: fixed-freq + freq: 1000 + msg_size: 1024 + parallel: 2 + msg_count: 1000 diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_proxy_server_cfg.yaml b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_proxy_server_cfg.yaml new file mode 100644 index 000000000..62da25dfa --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_proxy_server_cfg.yaml @@ -0,0 +1,45 @@ +# Copyright (c) 2023, AgiBot Inc. +# All rights reserved. + +aimrt: + plugin: + plugins: + - name: zenoh_plugin + path: ./libaimrt_zenoh_plugin.so + options: + native_cfg_path: ./cfg/zenoh_native_config.json5 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: work_thread_pool + type: asio_thread + options: + thread_num: 4 + - name: timeout_handle + type: time_wheel + options: + bind_executor: work_thread_pool + rpc: + backends: + - type: zenoh + options: + timeout_executor: timeout_handle + clients_options: + - func_name: "(.*)" + enable_backends: [zenoh] + servers_options: + - func_name: "(.*)" + enable_backends: [zenoh] + module: + pkgs: + - path: ./libpb_rpc_server_pkg.so + enable_modules: [ProxyRpcCoModule] + modules: + - name: ProxyRpcCoModule + log_lvl: Warn + +ProxyRpcCoModule: + service_name_for_server: "ExampleProxyService" \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy.sh new file mode 100755 index 000000000..2343b7152 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_benchmark_client_to_proxy_cfg.yaml \ No newline at end of file diff --git a/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_proxy_server.sh b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_proxy_server.sh new file mode 100755 index 000000000..4d31f2988 --- /dev/null +++ b/src/examples/plugins/zenoh_plugin/install/linux/bin/start_examples_plugins_zenoh_plugin_pb_rpc_proxy_server.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +source install/share/example_ros2/local_setup.bash + +./aimrt_main --cfg_file_path=./cfg/examples_plugins_zenoh_plugin_pb_rpc_proxy_server_cfg.yaml \ No newline at end of file