feat: add aimrt_py rpc benchmark example (#37)

* feat: add python benchmark example and update dependencies

Introduce an aimrt_py benchmark example for Python testing. Replace boost dependency with the standalone asio library to reduce overall dependencies and enhance configuration options for zenoh and mqtt.

* feat: add RPC benchmark client and configuration

Introduce a new benchmark client module and accompanying scripts to facilitate testing and performance measurement of RPC services. Include configuration for parallel execution and adjustable benchmarking parameters.

* docs: add benchmark process for protobuf RPC

Include detailed instructions for reproducing the crash issue and running benchmarks effectively to streamline troubleshooting and performance evaluation.

* chore: include string header

Add the string header to enable potential future string manipulations within the RPC handling code. This sets the stage for improved functionality and cleaner code integration down the line.

* feat: enhance benchmark reporting and logging

Improve performance benchmarking by logging detailed results, including latency metrics and error rates. Ensure more precise measurements by using microseconds for task duration and substituting print statements with structured logging.

* chore: update benchmark configurations for clarity and accuracy

Refine the benchmark plans to provide clearer settings. Adjust parameters for the fixed-frequency mode to ensure more relevant testing scenarios.

* chore: add script and config files for HTTP server examples

Include new symbolic links for server configuration and application scripts to streamline example setup and ensure consistency across projects.

* docs: update README for better clarity on python RPC benchmark setup

Enhance the README to provide a clearer guide on using the protobuf RPC benchmark example, including setup instructions, core files, and configuration details. This improves user experience and facilitates easier implementation for developers.

* style: correct casing in module name

Standardize the module name casing for improved consistency and readability in the RPC server application.

* docs: add pb_rpc_bench example link

Include the link to the pb_rpc_bench example for better accessibility and completeness in the tutorial.

* style: clean up code formatting

Remove unnecessary blank lines to improve readability in the benchmark RPC client module.

* docs: modify aimrt_py rpc release note
This commit is contained in:
zhangyi1357 2024-10-18 16:33:58 +08:00 committed by GitHub
parent 5ca412c339
commit 1186d81a07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 424 additions and 3 deletions

View File

@ -16,6 +16,6 @@
- 优化代码结构,移动代码 src/runtime/common/net 至新位置 src/common/net
- 升级 jsoncpp 至 1.9.6 版本以优化一些 cmake 问题;
- 新增了 aimrt_py channel benchmark 示例;
- 新增了 aimrt_py rpc benchmark 示例;
- iceoryx 插件在编译前先检查是否存在libacl不存在则不进行编译
- 提供 RPC 服务的插件现在支持指定 service name

View File

@ -8,6 +8,7 @@ AimRT 提供了以下 Python 接口使用示例:
- {{ '[pb_chn]({}/src/examples/py/pb_chn)'.format(code_site_root_path_url) }}
- {{ '[pb_rpc]({}/src/examples/py/pb_rpc)'.format(code_site_root_path_url) }}
- {{ '[pb_chn_bench]({}/src/examples/py/pb_chn_bench)'.format(code_site_root_path_url) }}
- {{ '[pb_rpc_bench]({}/src/examples/py/pb_rpc_bench)'.format(code_site_root_path_url) }}
关于这些示例的说明:
- 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看;

View File

@ -71,7 +71,7 @@ def main():
aimrt_core.Initialize(core_options)
# Create Module
module_handle = aimrt_core.CreateModule("NormalRpcServerPymodule")
module_handle = aimrt_core.CreateModule("NormalRpcServerPyModule")
# Register rpc service
service = ExampleServiceImpl(module_handle.GetLogger())

View File

@ -0,0 +1,35 @@
# protobuf rpc benchmark
一个基于 protobuf 协议与 http 后端的 python rpc 基准测试示例,演示内容包括:
- 如何在 python 中使用 protobuf 协议作为 rpc 传输协议;
- 如何基于 aimrt_py 注册模块的方式使用 rpc client 和 server 功能;
- 如何使用 http 类型的 rpc 后端;
- 如何使用 executor 功能;
核心代码:
- [rpc.proto](../../../protocols/example/rpc.proto)
- [benchmark_rpc_client_module.py](./benchmark_rpc_client_module.py)
- [benchmark_rpc_client_app.py](./benchmark_rpc_client_app.py)
- [examples_py_pb_rpc_http_server_app.py](./examples_py_pb_rpc_http_server_app.py)
配置文件:
- [benchmark_rpc_client_cfg.yaml](./cfg/benchmark_rpc_client_cfg.yaml)
- [examples_py_pb_rpc_http_server_cfg.yaml](./cfg/examples_py_pb_rpc_http_server_cfg.yaml)
运行方式linux
- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md)
- 运行本目录下的[build_examples_py_pb_rpc_bench.sh](./build_examples_py_pb_rpc_bench.sh)脚本,生成协议桩代码文件;
- 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径;
- 运行本目录下的[start_examples_py_pb_rpc_http_server.sh](./start_examples_py_pb_rpc_http_server.sh)脚本,启动 RPC Server
- 在另一个终端里运行本目录下的[start_benchmark_rpc_client.sh](./start_benchmark_rpc_client.sh)脚本,启动 RPC Client
- 向 RPC Client 和 RPC Server 进程所在终端里键入`ctrl-c`以停止进程;
说明:
- 此示例创建了以下两个模块:
- `BenchmarkRpcClientModule`:会在启动后向 `ExampleService` 发起基准测试请求,并输出测试结果;
- `NormalRpcServerPyModule`:会注册 `ExampleService` 服务端,通过 Server 接口,提供 echo 功能;
- 此示例使用 http 类型的 rpc 后端进行通信,请确保相关端口未被占用;

View File

@ -0,0 +1,61 @@
# Copyright (c) 2024 The AimRT Authors.
# AimRT is licensed under Mulan PSL v2.
import argparse
import signal
import sys
import threading
import aimrt_py
import benchmark_rpc_client_module
global_aimrt_core = None
def signal_handler(sig, _):
global global_aimrt_core
if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)):
global_aimrt_core.Shutdown()
return
sys.exit(0)
def main():
parser = argparse.ArgumentParser(description='Example helloworld registration mode.')
parser.add_argument('--cfg_file_path', type=str, default="", help='config file path')
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("AimRT start.")
aimrt_core = aimrt_py.Core()
global global_aimrt_core
global_aimrt_core = aimrt_core
module = benchmark_rpc_client_module.BenchmarkRpcClientModule()
aimrt_core.RegisterModule(module)
core_options = aimrt_py.CoreOptions()
core_options.cfg_file_path = args.cfg_file_path
aimrt_core.Initialize(core_options)
thread = threading.Thread(target=aimrt_core.Start)
thread.start()
while thread.is_alive():
thread.join(1.0)
aimrt_core.Shutdown()
global_aimrt_core = None
print("AimRT exit.")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,224 @@
# Copyright (c) 2024 The AimRT Authors.
# AimRT is licensed under Mulan PSL v2.
import random
import string
import threading
import time
import aimrt_py
import rpc_aimrt_rpc_pb2
import rpc_pb2
import yaml
class BenchmarkRpcClientModule(aimrt_py.ModuleBase):
def __init__(self):
super().__init__()
self.core: aimrt_py.CoreRef = None
self.bench_plans: list = []
self.proxy: rpc_aimrt_rpc_pb2.ExampleServiceProxy = None
self.client_statistics_executor: aimrt_py.ExecutorRef = None
self.executor_vec: list[aimrt_py.ExecutorRef] = []
self.bench_interval: int = 1 # seconds
self.shutdown_delay: int = 2 # seconds
self.run_flag: bool = True
self.stop_sig: threading.Event = threading.Event()
def Info(self) -> aimrt_py.ModuleInfo:
info = aimrt_py.ModuleInfo()
info.name = "BenchmarkRpcClientModule"
return info
def Initialize(self, core: aimrt_py.CoreRef) -> bool:
assert isinstance(core, aimrt_py.CoreRef)
self.core = core
self.logger = self.core.GetLogger()
aimrt_py.info(self.logger, "Module initializing ...")
try:
# read config
file_path = self.core.GetConfigurator().GetConfigFilePath()
with open(file_path, "r") as f:
cfg_node = yaml.safe_load(f)
self.max_parallel = cfg_node.get("max_parallel", 0)
if "bench_plans" in cfg_node and isinstance(cfg_node["bench_plans"], list):
for bench_plan_node in cfg_node["bench_plans"]:
bench_plan = {
"perf_mode": bench_plan_node["perf_mode"],
"msg_size": bench_plan_node["msg_size"],
"parallel": bench_plan_node["parallel"],
"msg_count": bench_plan_node["msg_count"]
}
if bench_plan["perf_mode"] == "fixed-freq":
bench_plan["freq"] = bench_plan_node["freq"]
if bench_plan["parallel"] > self.max_parallel:
raise ValueError(f"Bench plan parallel ({bench_plan['parallel']}) "
f"is greater than max parallel ({self.max_parallel})")
self.bench_plans.append(bench_plan)
# get rpc handle
rpc_handle = self.core.GetRpcHandle()
assert rpc_handle, "Get rpc handle failed."
# register rpc client
ret = rpc_aimrt_rpc_pb2.ExampleServiceProxy.RegisterClientFunc(rpc_handle)
assert ret, "Register client failed."
# create rpc proxy
self.proxy = rpc_aimrt_rpc_pb2.ExampleServiceProxy(rpc_handle)
# check executor
self.client_statistics_executor = self.core.GetExecutorManager().GetExecutor("client_statistics_executor")
assert self.client_statistics_executor and self.client_statistics_executor.SupportTimerSchedule(), \
"Get executor 'client_statistics_executor' failed."
for ii in range(self.max_parallel):
executor_name = f"client_executor_{ii}"
executor = self.core.GetExecutorManager().GetExecutor(executor_name)
assert executor and executor.SupportTimerSchedule(), f"Get executor '{executor_name}' failed."
self.executor_vec.append(executor)
aimrt_py.info(self.logger, f"Module config: max_parallel={self.max_parallel}, "
f"bench_plans={self.bench_plans}")
except Exception as e:
aimrt_py.error(self.logger, f"Initialize failed: {e}")
return False
aimrt_py.info(self.logger, "Init succeeded")
return True
def Start(self) -> bool:
try:
aimrt_py.info(self.logger, "Module starting ...")
self.client_statistics_executor.Execute(self.MainLoop)
except Exception as e:
aimrt_py.error(self.logger, f"Start failed: {e}")
return False
aimrt_py.info(self.logger, "Start succeeded")
return True
def Shutdown(self) -> bool:
try:
aimrt_py.info(self.logger, "Module shutting down ...")
self.run_flag = False
self.request_complete_event.set()
self.stop_sig.wait()
except Exception as e:
aimrt_py.error(self.logger, f"Shutdown failed: {e}")
return False
aimrt_py.info(self.logger, "Shutdown succeeded")
return True
def MainLoop(self) -> None:
try:
aimrt_py.info(self.logger, "Start Bench.")
# benchmark
for ii, bench_plan in enumerate(self.bench_plans):
if not self.run_flag:
break
aimrt_py.info(self.logger, f"Start bench plan {ii}")
self.StartSinglePlan(ii, bench_plan)
aimrt_py.info(self.logger, f"End bench plan {ii}")
time.sleep(self.bench_interval)
aimrt_py.info(self.logger, "Bench completed.")
except Exception as e:
aimrt_py.error(self.logger, f"Exit MainLoop with exception: {e}")
self.stop_sig.set()
def StartSinglePlan(self, plan_id: int, plan: dict) -> None:
self.request_complete_event = threading.Event()
self.completed_tasks = 0
self.total_tasks = plan['parallel']
start_time = time.time()
# start rpc tasks
self.perf_data = []
for ii in range(plan['parallel']):
publish_executor = self.executor_vec[ii]
publish_executor.Execute(lambda: self.StartBenchPlan(plan))
# wait for all tasks to complete
self.request_complete_event.wait()
end_time = time.time()
total_time_ms = (end_time - start_time) * 1e3
self.perf_data.sort()
correct_count = len(self.perf_data)
total_count = plan['parallel'] * plan['msg_count']
error_rate = (total_count - correct_count) / total_count
qps = (total_count * 1000.0) / total_time_ms
min_latency = self.perf_data[0]
max_latency = self.perf_data[-1]
avg_latency = sum(self.perf_data) / correct_count
p90_latency = self.perf_data[int(correct_count * 0.9)]
p99_latency = self.perf_data[int(correct_count * 0.99)]
p999_latency = self.perf_data[int(correct_count * 0.999)]
result_str = f"Benchmark plan {plan_id} completed, report:"
result_str += f"\nmode: {plan['perf_mode']}"
result_str += f"\nmsg size: {plan['msg_size']}"
result_str += f"\nparallel: {plan['parallel']}"
result_str += f"\nmsg count per co: {plan['msg_count']}"
result_str += f"\ntotal count: {total_count}"
result_str += f"\ntotal time: {total_time_ms:.2f} ms"
result_str += f"\ncorrect count: {correct_count}"
result_str += f"\nerror rate: {error_rate:.2f} %"
if plan['perf_mode'] == 'bench':
result_str += f"\nqps: {qps:.2f}"
result_str += f"\nmin latency: {min_latency:.2f} us"
result_str += f"\nmax latency: {max_latency:.2f} us"
result_str += f"\navg latency: {avg_latency:.2f} us"
result_str += f"\np90 latency: {p90_latency:.2f} us"
result_str += f"\np99 latency: {p99_latency:.2f} us"
result_str += f"\np999 latency: {p999_latency:.2f} us"
aimrt_py.info(self.logger, result_str)
def StartBenchPlan(self, plan: dict) -> None:
req = rpc_pb2.GetFooDataReq()
req.msg = self.GenerateRandomString(plan['msg_size'])
for _ in range(plan['msg_count']):
ctx = aimrt_py.RpcContext()
task_start_time = time.time()
status, _ = self.proxy.GetFooData(ctx, req)
task_end_time = time.time()
assert status.Code() == aimrt_py.RpcStatusRetCode.OK, f"GetFooData failed: {status}"
assert task_end_time > task_start_time, "Task end time is less than start time"
self.perf_data.append((task_end_time - task_start_time) * 1e6) # us
if plan['perf_mode'] == 'fixed-freq':
time.sleep(1 / plan['freq'])
with threading.Lock():
self.completed_tasks += 1
if self.completed_tasks == self.total_tasks:
self.request_complete_event.set()
@staticmethod
def GenerateRandomString(length: int) -> str:
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

View File

@ -0,0 +1,28 @@
#!/bin/bash
protoc_cmd=protoc
protocols_dir=../../../protocols/example/
if ! command -v ${protoc_cmd} &> /dev/null; then
echo "Can not find protoc!"
exit 1
fi
PROTOC_VERSION=$(${protoc_cmd} --version | awk '{print $2}')
version_greater_equal() {
local version1=$1
local version2=$2
[ "$(printf '%s\n' "$version1" "$version2" | sort -V | head -n1)" != "$version1" ]
}
if version_greater_equal "$PROTOC_VERSION" "3.20"; then
echo "protoc version $PROTOC_VERSION."
else
echo "protoc version $PROTOC_VERSION, need 3.20 at least."
exit 1
fi
${protoc_cmd} -I${protocols_dir} --python_out=./ ${protocols_dir}/common.proto ${protocols_dir}/rpc.proto
${protoc_cmd} -I${protocols_dir} --aimrt_rpc_out=./ --plugin=protoc-gen-aimrt_rpc=$(which protoc_plugin_py_gen_aimrt_py_rpc) ${protocols_dir}/rpc.proto

View File

@ -0,0 +1,63 @@
# Copyright (c) 2024 The AimRT Authors.
# AimRT is licensed under Mulan PSL v2.
aimrt:
plugin:
plugins:
- name: net_plugin
path: ${AIMRT_PLUGIN_DIR}/libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50050
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: client_statistics_executor
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: client_statistics_executor
- name: client_executor_0
type: asio_thread
- name: client_executor_1
type: asio_thread
- name: client_executor_2
type: asio_thread
- name: client_executor_3
type: asio_thread
rpc:
backends:
- type: http
options:
clients_options:
- func_name: "(.*)"
server_url: http://127.0.0.1:50080
clients_options:
- func_name: "(.*)"
enable_backends: [http]
module:
modules:
- name: BenchmarkRpcClientModule
log_lvl: INFO
# Module custom configuration
BenchmarkRpcClientModule:
max_parallel: 4
bench_plans:
- perf_mode: bench
msg_size: 256
parallel: 4
msg_count: 10000
- perf_mode: fixed-freq
freq: 1000
msg_size: 1024
parallel: 2
msg_count: 1000

View File

@ -0,0 +1 @@
../../pb_rpc/cfg/examples_py_pb_rpc_http_server_cfg.yaml

View File

@ -0,0 +1 @@
../pb_rpc/examples_py_pb_rpc_server_app.py

View File

@ -0,0 +1,5 @@
#!/bin/bash
export AIMRT_PLUGIN_DIR=$(pip show aimrt_py | grep Location | awk '{print $2}')/aimrt_py
python3 benchmark_rpc_client_app.py --cfg_file_path ./cfg/benchmark_rpc_client_cfg.yaml

View File

@ -0,0 +1 @@
../pb_rpc/start_examples_py_pb_rpc_http_server.sh

View File

@ -4,6 +4,7 @@
#pragma once
#include <future>
#include <string>
#include <utility>
#include "aimrt_module_cpp_interface/rpc/rpc_handle.h"
@ -203,4 +204,4 @@ inline void ExportRpcHandleRef(pybind11::object m) {
.def("RegisterClientFunc", &PyRpcHandleRefRegisterClientFunc)
.def("Invoke", &PyRpcHandleRefInvoke);
}
} // namespace aimrt::runtime::python_runtime
} // namespace aimrt::runtime::python_runtime