diff --git a/document/sphinx-cn/release_notes/v0_9_0.md b/document/sphinx-cn/release_notes/v0_9_0.md index 4a51d6fc9..4c5cb0579 100644 --- a/document/sphinx-cn/release_notes/v0_9_0.md +++ b/document/sphinx-cn/release_notes/v0_9_0.md @@ -8,9 +8,9 @@ - mqtt 新增配置项以支持加密传输; - 新增了第三方库 asio,runtime::core 不再引用 boost,改为引用独立的 asio 库,以减轻依赖; - **次要修改**: - 缩短了一些 examples 的文件路径长度; - 修复了一些轻微问题; - 优化代码结构,移动代码 src/runtime/common/net 至新位置 src/common/net; - 升级 jsoncpp 至 1.9.6 版本以优化一些 cmake 问题; +- 新增了 aimrt_py channel benchmark 示例; diff --git a/document/sphinx-cn/tutorials/examples/examples_py.md b/document/sphinx-cn/tutorials/examples/examples_py.md index e9e397d93..30f1c2617 100644 --- a/document/sphinx-cn/tutorials/examples/examples_py.md +++ b/document/sphinx-cn/tutorials/examples/examples_py.md @@ -7,6 +7,7 @@ AimRT 提供了以下 Python 接口使用示例: - {{ '[helloworld]({}/src/examples/py/helloworld)'.format(code_site_root_path_url) }} - {{ '[pb_chn]({}/src/examples/py/pb_chn)'.format(code_site_root_path_url) }} - {{ '[pb_rpc]({}/src/examples/py/pb_rpc)'.format(code_site_root_path_url) }} + - {{ '[pb_chn_bench]({}/src/examples/py/pb_chn_bench)'.format(code_site_root_path_url) }} 关于这些示例的说明: - 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看; diff --git a/src/examples/cpp/pb_chn/module/benchmark_subscriber_module/benchmark_subscriber_module.cc b/src/examples/cpp/pb_chn/module/benchmark_subscriber_module/benchmark_subscriber_module.cc index 657f92b5e..0efef23bb 100644 --- a/src/examples/cpp/pb_chn/module/benchmark_subscriber_module/benchmark_subscriber_module.cc +++ b/src/examples/cpp/pb_chn/module/benchmark_subscriber_module/benchmark_subscriber_module.cc @@ -191,7 +191,7 @@ void BenchmarkSubscriberModule::Evaluate() const { AIMRT_INFO(R"str(Benchmark plan {} completed, report: frequency: {} hz topic number: {} -msg size: {} byte +msg size: {} bytes msg count per topic: {} send count : {} recv count: {} diff --git a/src/examples/py/helloworld/README.md b/src/examples/py/helloworld/README.md index 259321cd8..27c43c418 100644 --- a/src/examples/py/helloworld/README.md +++ b/src/examples/py/helloworld/README.md @@ -16,7 +16,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 直接运行本目录下[start_examples_py_helloworld_app_mode.sh](./start_examples_py_helloworld_app_mode.sh)脚本启动进程; - 键入`ctrl-c`停止进程; @@ -45,7 +45,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 直接运行本目录下[start_examples_py_helloworld_registration_mode.sh](./start_examples_py_helloworld_registration_mode.sh)脚本启动进程; - 键入`ctrl-c`停止进程; diff --git a/src/examples/py/parameter/README.md b/src/examples/py/parameter/README.md index c91e716d2..7c9edb215 100644 --- a/src/examples/py/parameter/README.md +++ b/src/examples/py/parameter/README.md @@ -14,7 +14,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 直接运行本目录下[start_examples_py_parameter_app.sh](./start_examples_py_parameter_app.sh)脚本启动进程; - 键入`ctrl-c`停止进程; diff --git a/src/examples/py/pb_chn/README.md b/src/examples/py/pb_chn/README.md index 995599276..dc9bb903e 100644 --- a/src/examples/py/pb_chn/README.md +++ b/src/examples/py/pb_chn/README.md @@ -23,7 +23,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 运行本目录下的[build_examples_py_pb_chn.sh](./build_examples_py_pb_chn.sh)脚本,生成协议桩代码文件; - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; - 运行本目录下的[start_examples_py_pb_chn_http_sub.sh](./start_examples_py_pb_chn_http_sub.sh)脚本,启动 subscriber; @@ -60,7 +60,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 运行本目录下的[build_examples_py_pb_chn.sh](./build_examples_py_pb_chn.sh)脚本,生成协议桩代码文件; - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; - 运行本目录下的[start_examples_py_pb_chn_ros2_sub.sh](./start_examples_py_pb_chn_ros2_sub.sh)脚本,启动 subscriber; diff --git a/src/examples/py/pb_chn_bench/README.md b/src/examples/py/pb_chn_bench/README.md new file mode 100644 index 000000000..814234e73 --- /dev/null +++ b/src/examples/py/pb_chn_bench/README.md @@ -0,0 +1,34 @@ +# protobuf channel benchmark + + +一个基于 protobuf 协议与 http 后端的 python channel benchmark 示例,演示内容包括: +- 如何在 python 中使用 protobuf 协议作为 channel 传输协议; +- 如何基于 aimrt_py 注册模块的方式使用 Channel publish 和 subscribe 功能; +- 如何使用 http 类型的 channel 后端; + +核心代码: +- [benchmark.proto](../../../protocols/example/benchmark.proto) +- [benchmark_publisher_module.py](./benchmark_publisher_module.py) +- [benchmark_publisher_app.py](./benchmark_publisher_app.py) +- [benchmark_subscriber_module.py](./benchmark_subscriber_module.py) +- [benchmark_subscriber_app.py](./benchmark_subscriber_app.py) + +配置文件: +- [benchmark_publisher_cfg.yaml](./cfg/benchmark_publisher_cfg.yaml) +- [benchmark_subscriber_cfg.yaml](./cfg/benchmark_subscriber_cfg.yaml) + + +运行方式(linux): +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); +- 运行本目录下的[build_examples_py_pb_chn_bench.sh](./build_examples_py_pb_chn_bench.sh)脚本,生成协议桩代码文件; + - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; +- 运行本目录下的[start_benchmark_subscriber.sh](./start_benchmark_subscriber.sh)脚本,启动 subscriber; +- 在新终端里运行本目录下的[start_benchmark_publisher.sh](./start_benchmark_publisher.sh)脚本,启动 publisher; +- Benchmark 运行结束后会输出 benchmark 结果并自动结束进程; + + +说明: +- 此示例创建了以下两个模块: + - `BenchmarkPublisherModule`:会在启动后根据配置好的 bench plan 向指定的 test_topic_xx 中发布类型为 `BenchmarkMsg` 的消息,每个 bench plan 前后会发送一个 `BenchmarkSignal` 类型的消息通知 subscriber 当前的 benchmark 状态,所有 bench plan 结束后会发送一个 `BenchmarkSignal` 类型的消息通知 subscriber 当前的 benchmark 结束; + - `BenchmarkSubscriberModule`:会订阅 channel 中的 test_topic_xx 消息,并统计接收到的消息的延迟分布; +- 此示例使用 http 类型的 channel 后端进行通信,请确保相关端口未被占用; diff --git a/src/examples/py/pb_chn_bench/benchmark_publisher_app.py b/src/examples/py/pb_chn_bench/benchmark_publisher_app.py new file mode 100644 index 000000000..9c94d8854 --- /dev/null +++ b/src/examples/py/pb_chn_bench/benchmark_publisher_app.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +import argparse +import signal +import sys +import threading + +import aimrt_py +import benchmark_publisher_module + +global_aimrt_core = None + + +def signal_handler(sig, _): + global global_aimrt_core + + if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)): + global_aimrt_core.Shutdown() + return + + sys.exit(0) + + +def main(): + parser = argparse.ArgumentParser(description='Example helloworld registration mode.') + parser.add_argument('--cfg_file_path', type=str, default="", help='config file path') + args = parser.parse_args() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + print("AimRT start.") + + aimrt_core = aimrt_py.Core() + + global global_aimrt_core + global_aimrt_core = aimrt_core + + module = benchmark_publisher_module.BenchmarkPublisher() + aimrt_core.RegisterModule(module) + + core_options = aimrt_py.CoreOptions() + core_options.cfg_file_path = args.cfg_file_path + aimrt_core.Initialize(core_options) + + thread = threading.Thread(target=aimrt_core.Start) + thread.start() + + while thread.is_alive(): + thread.join(1.0) + + aimrt_core.Shutdown() + + global_aimrt_core = None + + print("AimRT exit.") + + +if __name__ == '__main__': + main() diff --git a/src/examples/py/pb_chn_bench/benchmark_publisher_module.py b/src/examples/py/pb_chn_bench/benchmark_publisher_module.py new file mode 100644 index 000000000..abaa4dab9 --- /dev/null +++ b/src/examples/py/pb_chn_bench/benchmark_publisher_module.py @@ -0,0 +1,234 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +import os +import random +import signal +import string +import threading +import time + +import aimrt_py +import benchmark_pb2 +import yaml + + +class BenchmarkPublisher(aimrt_py.ModuleBase): + def __init__(self): + super().__init__() + self.core = aimrt_py.CoreRef() + self.logger = aimrt_py.LoggerRef() + self.max_topic_number = 0 + self.bench_plans = [] + self.run_flag = True + self.stop_sig = threading.Event() + self.shutdown_delay = 1 # seconds + self.bench_interval = 1 # seconds + self.publish_complete_event = threading.Event() + + def Info(self) -> aimrt_py.ModuleInfo: + info = aimrt_py.ModuleInfo() + info.name = "BenchmarkPublisherModule" + return info + + def Initialize(self, core: aimrt_py.CoreRef) -> bool: + assert isinstance(core, aimrt_py.CoreRef) + + self.core = core + self.logger = self.core.GetLogger() + + aimrt_py.info(self.logger, "Module initializing ...") + + try: + # read config + file_path = self.core.GetConfigurator().GetConfigFilePath() + with open(file_path, "r") as f: + cfg_node = yaml.safe_load(f) + + self.max_topic_number = cfg_node.get("max_topic_number", 0) + + if "bench_plans" in cfg_node and isinstance(cfg_node["bench_plans"], list): + for bench_plan_node in cfg_node["bench_plans"]: + bench_plan = { + "channel_frq": bench_plan_node["channel_frq"], + "msg_size": bench_plan_node["msg_size"], + "topic_number": bench_plan_node["topic_number"], + "msg_count": bench_plan_node["msg_count"] + } + + if bench_plan["topic_number"] > self.max_topic_number: + raise ValueError(f"Bench plan topic number ({bench_plan['topic_number']}) " + f"is greater than max topic number ({self.max_topic_number})") + + self.bench_plans.append(bench_plan) + + aimrt_py.info(self.logger, f"Module config: max_topic_number={self.max_topic_number}, " + f"bench_plans={self.bench_plans}") + + # controller + self.publish_control_executor = self.core.GetExecutorManager().GetExecutor("publish_control_executor") + if not self.publish_control_executor: + raise RuntimeError("Get executor 'publish_control_executor' failed.") + + # signal publisher + signal_topic = "benchmark_signal" + self.signal_publisher = self.core.GetChannelHandle().GetPublisher(signal_topic) + if not self.signal_publisher: + raise RuntimeError(f"Get publisher for topic '{signal_topic}' failed.") + + ret = aimrt_py.RegisterPublishType(self.signal_publisher, benchmark_pb2.BenchmarkSignal) + if not ret: + raise RuntimeError(f"Register publish type for topic '{signal_topic}' failed.") + + # message publishers + self.publisher_wrapper_vec = [] + for ii in range(self.max_topic_number): + executor_name = f"publish_executor_{ii}" + executor = self.core.GetExecutorManager().GetExecutor(executor_name) + if not executor: + raise RuntimeError(f"Get executor '{executor_name}' failed.") + + topic_name = f"test_topic_{ii}" + publisher = self.core.GetChannelHandle().GetPublisher(topic_name) + if not publisher: + raise RuntimeError(f"Get publisher for topic '{topic_name}' failed.") + + ret = aimrt_py.RegisterPublishType(publisher, benchmark_pb2.BenchmarkMessage) + if not ret: + raise RuntimeError(f"Register publish type for topic '{topic_name}' failed.") + + self.publisher_wrapper_vec.append({ + "publish_executor": executor, + "publisher": publisher + }) + + except Exception as e: + aimrt_py.error(self.logger, f"Initialize failed: {e}") + return False + + aimrt_py.info(self.logger, "Init succeeded") + return True + + def Start(self) -> bool: + try: + aimrt_py.info(self.logger, "Module starting ...") + self.publish_control_executor.Execute(self.MainLoop) + + except Exception as e: + aimrt_py.error(self.logger, f"Start failed: {e}") + return False + + aimrt_py.info(self.logger, "Start succeeded") + return True + + def Shutdown(self) -> bool: + try: + aimrt_py.info(self.logger, "Module shutting down ...") + + self.run_flag = False + self.publish_complete_event.set() + self.stop_sig.wait() + except Exception as e: + aimrt_py.error(self.logger, f"Shutdown failed: {e}") + return False + + aimrt_py.info(self.logger, "Shutdown succeeded") + return True + + def MainLoop(self) -> None: + try: + aimrt_py.info(self.logger, "Start Bench.") + + # warm up + for ii in range(10): + begin_signal = benchmark_pb2.BenchmarkSignal() + begin_signal.status = benchmark_pb2.BenchmarkStatus.WarmUp + aimrt_py.Publish(self.signal_publisher, begin_signal) + time.sleep(0.1) + aimrt_py.info(self.logger, f"Warm up {ii}") + + # benchmark + for ii, bench_plan in enumerate(self.bench_plans): + if not self.run_flag: + break + + print(f"Start bench plan {ii}") + self.StartSinglePlan(ii, bench_plan) + print(f"End bench plan {ii}") + + time.sleep(self.bench_interval) + + self.ShutdownPeer() + + except Exception as e: + aimrt_py.error(self.logger, f"Exit MainLoop with exception: {e}") + + self.stop_sig.set() + + def StartSinglePlan(self, plan_id: int, plan: dict) -> None: + # publish start signal + begin_signal = benchmark_pb2.BenchmarkSignal() + begin_signal.status = benchmark_pb2.BenchmarkStatus.Begin + begin_signal.bench_plan_id = plan_id + begin_signal.topic_number = plan['topic_number'] + begin_signal.send_num = plan['msg_count'] + begin_signal.message_size = plan['msg_size'] + begin_signal.send_frequency = plan['channel_frq'] + + aimrt_py.info(self.logger, f"Publish benchmark start signal, data: \n{begin_signal}") + aimrt_py.Publish(self.signal_publisher, begin_signal) + + self.publish_complete_event.clear() + self.completed_tasks = 0 + self.total_tasks = plan['topic_number'] + + time.sleep(1) + + # publish topic + for ii in range(plan['topic_number']): + publish_executor = self.publisher_wrapper_vec[ii]["publish_executor"] + publish_executor.Execute(lambda index=ii: self.PublishTask(self.publisher_wrapper_vec[index], plan)) + + self.publish_complete_event.wait() + + # publish end signal + end_signal = benchmark_pb2.BenchmarkSignal() + end_signal.status = benchmark_pb2.BenchmarkStatus.End + end_signal.bench_plan_id = plan_id + aimrt_py.info(self.logger, f"Publish benchmark end signal, data: {end_signal}") + aimrt_py.Publish(self.signal_publisher, end_signal) + + def PublishTask(self, publisher_wrapper: dict, plan: dict) -> None: + publisher = publisher_wrapper['publisher'] + msg = benchmark_pb2.BenchmarkMessage() + msg.data = self.GenerateRandomString(plan['msg_size']) + + send_count = 0 + sleep_time = 1 / plan['channel_frq'] + + while send_count < plan['msg_count'] and self.run_flag: + msg.seq = send_count + msg.timestamp = int(time.time() * 1e9) # ns + + aimrt_py.Publish(publisher, msg) + + send_count += 1 + time.sleep(sleep_time) + + with threading.Lock(): + self.completed_tasks += 1 + if self.completed_tasks == self.total_tasks: + self.publish_complete_event.set() + + def ShutdownPeer(self) -> None: + shutdown_peer_signal = benchmark_pb2.BenchmarkSignal() + shutdown_peer_signal.status = benchmark_pb2.BenchmarkStatus.ShutdownPeer + aimrt_py.info(self.logger, f"Publish benchmark shutdown peer signal, data: {shutdown_peer_signal}") + aimrt_py.Publish(self.signal_publisher, shutdown_peer_signal) + + time.sleep(self.shutdown_delay) + os.kill(os.getpid(), signal.SIGINT) + + @staticmethod + def GenerateRandomString(length: int) -> bytes: + return ''.join(random.choices(string.ascii_letters + string.digits, k=length)).encode() diff --git a/src/examples/py/pb_chn_bench/benchmark_subscriber_app.py b/src/examples/py/pb_chn_bench/benchmark_subscriber_app.py new file mode 100644 index 000000000..dcdc0b590 --- /dev/null +++ b/src/examples/py/pb_chn_bench/benchmark_subscriber_app.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +import argparse +import signal +import sys +import threading + +import aimrt_py +import benchmark_subscriber_module + +global_aimrt_core = None + + +def signal_handler(sig, _): + global global_aimrt_core + + if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)): + global_aimrt_core.Shutdown() + return + + sys.exit(0) + + +def main(): + parser = argparse.ArgumentParser(description='Example benchmark subscriber application.') + parser.add_argument('--cfg_file_path', type=str, default="", help='config file path') + args = parser.parse_args() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + print("AimRT start.") + + aimrt_core = aimrt_py.Core() + + global global_aimrt_core + global_aimrt_core = aimrt_core + + module = benchmark_subscriber_module.BenchmarkSubscriber() + aimrt_core.RegisterModule(module) + + core_options = aimrt_py.CoreOptions() + core_options.cfg_file_path = args.cfg_file_path + aimrt_core.Initialize(core_options) + + thread = threading.Thread(target=aimrt_core.Start) + thread.start() + + while thread.is_alive(): + thread.join(1.0) + + aimrt_core.Shutdown() + + global_aimrt_core = None + + print("AimRT exit.") + + +if __name__ == '__main__': + main() diff --git a/src/examples/py/pb_chn_bench/benchmark_subscriber_module.py b/src/examples/py/pb_chn_bench/benchmark_subscriber_module.py new file mode 100644 index 000000000..a4e934894 --- /dev/null +++ b/src/examples/py/pb_chn_bench/benchmark_subscriber_module.py @@ -0,0 +1,185 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +import os +import signal +import time +from dataclasses import dataclass, field +from typing import Dict, List + +import aimrt_py +import benchmark_pb2 +import yaml +from google.protobuf.json_format import MessageToJson + + +@dataclass +class MsgRecord: + recv: bool = False + send_timestamp: float = 0 + recv_timestamp: float = 0 + + +@dataclass +class TopicRecord: + topic_name: str + msg_record_vec: List[MsgRecord] = field(default_factory=list) + + +class BenchmarkSubscriber(aimrt_py.ModuleBase): + def __init__(self): + super().__init__() + self.core = aimrt_py.CoreRef() + self.logger = aimrt_py.LoggerRef() + self.max_topic_number = 0 + + self.cur_bench_plan = benchmark_pb2.BenchmarkSignal + self.topic_record_map: Dict[str, TopicRecord] = {} + + def Info(self) -> aimrt_py.ModuleInfo: + info = aimrt_py.ModuleInfo() + info.name = "BenchmarkSubscriberModule" + return info + + def Initialize(self, core: aimrt_py.CoreRef) -> bool: + assert isinstance(core, aimrt_py.CoreRef) + + self.core = core + self.logger = self.core.GetLogger() + + aimrt_py.info(self.logger, "Module initializing ...") + + try: + # read config + file_path = self.core.GetConfigurator().GetConfigFilePath() + with open(file_path, "r") as f: + cfg_node = yaml.safe_load(f) + + self.max_topic_number = cfg_node.get("max_topic_number", 0) + + aimrt_py.info(self.logger, f"Module config: max_topic_number={self.max_topic_number}") + + # signal subscriber + signal_topic = "benchmark_signal" + self.signal_subscriber = self.core.GetChannelHandle().GetSubscriber(signal_topic) + if not self.signal_subscriber: + raise RuntimeError(f"Get subscriber for topic '{signal_topic}' failed.") + + # set signal and message callbacks + aimrt_py.Subscribe(self.signal_subscriber, benchmark_pb2.BenchmarkSignal, self.SignalCallback) + + # message subscribers + for ii in range(self.max_topic_number): + topic_name = f"test_topic_{ii}" + subscriber = self.core.GetChannelHandle().GetSubscriber(topic_name) + if not subscriber: + raise RuntimeError(f"Get subscriber for topic '{topic_name}' failed.") + aimrt_py.Subscribe(subscriber, + benchmark_pb2.BenchmarkMessage, + lambda msg, topic_index=ii: self.MessageCallback(topic_index, msg)) + + except Exception as e: + aimrt_py.error(self.logger, f"Initialize failed: {e}") + return False + + aimrt_py.info(self.logger, "Init succeeded") + return True + + def Start(self) -> bool: + return True + + def Shutdown(self) -> bool: + try: + aimrt_py.info(self.logger, "Module is shutting down...") + except Exception as e: + aimrt_py.error(self.logger, f"Shutdown failed: {e}") + return False + + aimrt_py.info(self.logger, "Shutdown succeeded") + return True + + def SignalCallback(self, signal_msg: benchmark_pb2.BenchmarkSignal) -> None: + aimrt_py.info(self.logger, f"Received signal: {MessageToJson(signal_msg)}") + + match signal_msg.status: + case benchmark_pb2.BenchmarkStatus.WarmUp: + pass + + case benchmark_pb2.BenchmarkStatus.Begin: + self.cur_bench_plan = signal_msg + if self.cur_bench_plan.topic_number > self.max_topic_number: + raise RuntimeError(f"Topic number in bench plan is larger than max topic number: " + f"{self.cur_bench_plan.topic_number} > {self.max_topic_number}") + + for ii in range(self.cur_bench_plan.topic_number): + topic_name = f"test_topic_{ii}" + self.topic_record_map[topic_name] = TopicRecord(topic_name=topic_name) + self.topic_record_map[topic_name].msg_record_vec = [MsgRecord() + for _ in range(self.cur_bench_plan.send_num)] + + case benchmark_pb2.BenchmarkStatus.End: + self.Evaluate() + + case benchmark_pb2.BenchmarkStatus.ShutdownPeer: + aimrt_py.info(self.logger, f"Received shutdown peer signal, shutdown aimrt core now.") + os.kill(os.getpid(), signal.SIGINT) + + case _: + aimrt_py.error(self.logger, f"Unknown signal status: {signal_msg.status}") + + def MessageCallback(self, topic_index: int, benchmark_msg: benchmark_pb2.BenchmarkMessage) -> None: + recv_timestamp = time.time() * 1e9 # ns + + topic_name = f"test_topic_{topic_index}" + self.topic_record_map[topic_name].msg_record_vec[benchmark_msg.seq].recv = True + self.topic_record_map[topic_name].msg_record_vec[benchmark_msg.seq].recv_timestamp = recv_timestamp + self.topic_record_map[topic_name].msg_record_vec[benchmark_msg.seq].send_timestamp = benchmark_msg.timestamp + + def Evaluate(self) -> None: + # calculate metrics + latency_vec = [] + for ii in range(self.cur_bench_plan.topic_number): + topic_name = f"test_topic_{ii}" + topic_record = self.topic_record_map[topic_name] + for jj in range(self.cur_bench_plan.send_num): + msg_record = topic_record.msg_record_vec[jj] + if not msg_record.recv: + aimrt_py.error(self.logger, f"topic '{topic_name}' message seq '{jj}' not received") + continue + + if msg_record.recv_timestamp < msg_record.send_timestamp: + aimrt_py.error(self.logger, f"topic '{topic_name}' message seq '{jj}' " + "recv timestamp is smaller than send timestamp") + continue + + latency_vec.append((msg_record.recv_timestamp - msg_record.send_timestamp) / 1e3) # us + + latency_vec.sort() + recv_count = len(latency_vec) + + min_latency = latency_vec[0] + max_latency = latency_vec[-1] + p90_latency = latency_vec[int(recv_count * 0.9)] + p99_latency = latency_vec[int(recv_count * 0.99)] + p999_latency = latency_vec[int(recv_count * 0.999)] + + send_count = self.cur_bench_plan.send_num * self.cur_bench_plan.topic_number + loss_rate = (send_count - recv_count) / send_count * 100 + avg_latency = sum(latency_vec) / recv_count + + result_str = f"Benchmark plan {self.cur_bench_plan.bench_plan_id} completed, report:" + result_str += f"\nfrequency: {self.cur_bench_plan.send_frequency} hz" + result_str += f"\ntopic number: {self.cur_bench_plan.topic_number}" + result_str += f"\nmsg size: {self.cur_bench_plan.message_size} bytes" + result_str += f"\nmsg count per topic: {self.cur_bench_plan.send_num}" + result_str += f"\nsend count: {send_count}" + result_str += f"\nrecv count: {recv_count}" + result_str += f"\nloss rate: {loss_rate:.2f}" + result_str += f"\nmin latency: {min_latency:.3f} us" + result_str += f"\nmax latency: {max_latency:.3f} us" + result_str += f"\navg latency: {avg_latency:.3f} us" + result_str += f"\np90 latency: {p90_latency:.3f} us" + result_str += f"\np99 latency: {p99_latency:.3f} us" + result_str += f"\np999 latency: {p999_latency:.3f} us\n" + + aimrt_py.info(self.logger, result_str) diff --git a/src/examples/py/pb_chn_bench/build_py_pb_chn.sh b/src/examples/py/pb_chn_bench/build_py_pb_chn.sh new file mode 100755 index 000000000..c97cc8ca5 --- /dev/null +++ b/src/examples/py/pb_chn_bench/build_py_pb_chn.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +protoc_cmd=protoc +protocols_dir=../../../protocols/example/ + +if ! command -v ${protoc_cmd} &> /dev/null; then + echo "Can not find protoc!" + exit 1 +fi + +PROTOC_VERSION=$(${protoc_cmd} --version | awk '{print $2}') + +version_greater_equal() { + local version1=$1 + local version2=$2 + + [ "$(printf '%s\n' "$version1" "$version2" | sort -V | head -n1)" != "$version1" ] +} + +if version_greater_equal "$PROTOC_VERSION" "3.20"; then + echo "protoc version $PROTOC_VERSION." +else + echo "protoc version $PROTOC_VERSION, need 3.20 at least." + exit 1 +fi + +${protoc_cmd} -I${protocols_dir} --python_out=./ ${protocols_dir}/benchmark.proto diff --git a/src/examples/py/pb_chn_bench/cfg/benchmark_publisher_cfg.yaml b/src/examples/py/pb_chn_bench/cfg/benchmark_publisher_cfg.yaml new file mode 100644 index 000000000..9ee558616 --- /dev/null +++ b/src/examples/py/pb_chn_bench/cfg/benchmark_publisher_cfg.yaml @@ -0,0 +1,56 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +aimrt: + plugin: + plugins: + - name: net_plugin + path: ${AIMRT_PLUGIN_DIR}/libaimrt_net_plugin.so + options: + thread_num: 4 + http_options: + listen_ip: 127.0.0.1 + listen_port: 50081 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + executor: + executors: + - name: publish_control_executor + type: simple_thread + - name: publish_executor_0 + type: simple_thread + - name: publish_executor_1 + type: simple_thread + - name: publish_executor_2 + type: simple_thread + - name: publish_executor_3 + type: simple_thread + channel: + backends: + - type: http + options: + pub_topics_options: + - topic_name: "(.*)" + server_url_list: ["http://127.0.0.1:50080"] + pub_topics_options: + - topic_name: "(.*)" + enable_backends: [http] + module: + modules: + - name: BenchmarkPublisherModule + log_lvl: INFO + +# Module custom configuration +BenchmarkPublisherModule: + max_topic_number: 4 + bench_plans: + - channel_frq: 1000 + msg_size: 512 + topic_number: 4 + msg_count: 5000 + - channel_frq: 1000 + msg_size: 4096 + topic_number: 1 + msg_count: 1000 diff --git a/src/examples/py/pb_chn_bench/cfg/benchmark_subscriber_cfg.yaml b/src/examples/py/pb_chn_bench/cfg/benchmark_subscriber_cfg.yaml new file mode 100644 index 000000000..6245d7b16 --- /dev/null +++ b/src/examples/py/pb_chn_bench/cfg/benchmark_subscriber_cfg.yaml @@ -0,0 +1,31 @@ +# Copyright (c) 2024 The AimRT Authors. +# AimRT is licensed under Mulan PSL v2. + +aimrt: + plugin: + plugins: + - name: net_plugin + path: ${AIMRT_PLUGIN_DIR}/libaimrt_net_plugin.so + options: + thread_num: 4 + http_options: + listen_ip: 127.0.0.1 + listen_port: 50080 + log: + core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off + backends: + - type: console + channel: + backends: + - type: http + sub_topics_options: + - topic_name: "(.*)" + enable_backends: [http] + module: + modules: + - name: BenchmarkSubscriberModule + log_lvl: INFO + + +BenchmarkSubscriberModule: + max_topic_number: 4 \ No newline at end of file diff --git a/src/examples/py/pb_chn_bench/start_benchmark_publisher.sh b/src/examples/py/pb_chn_bench/start_benchmark_publisher.sh new file mode 100755 index 000000000..12747ed18 --- /dev/null +++ b/src/examples/py/pb_chn_bench/start_benchmark_publisher.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +export AIMRT_PLUGIN_DIR=$(pip show aimrt_py | grep Location | awk '{print $2}')/aimrt_py + +python3 benchmark_publisher_app.py --cfg_file_path ./cfg/benchmark_publisher_cfg.yaml diff --git a/src/examples/py/pb_chn_bench/start_benchmark_subscriber.sh b/src/examples/py/pb_chn_bench/start_benchmark_subscriber.sh new file mode 100755 index 000000000..030b046ca --- /dev/null +++ b/src/examples/py/pb_chn_bench/start_benchmark_subscriber.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +export AIMRT_PLUGIN_DIR=$(pip show aimrt_py | grep Location | awk '{print $2}')/aimrt_py + +python3 benchmark_subscriber_app.py --cfg_file_path ./cfg/benchmark_subscriber_cfg.yaml diff --git a/src/examples/py/pb_rpc/README.md b/src/examples/py/pb_rpc/README.md index 3e0a19d0b..b723cd66f 100644 --- a/src/examples/py/pb_rpc/README.md +++ b/src/examples/py/pb_rpc/README.md @@ -23,7 +23,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 运行本目录下的[build_examples_py_pb_rpc.sh](./build_examples_py_pb_rpc.sh)脚本,生成协议桩代码文件; - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; - 运行本目录下的[start_examples_py_pb_rpc_http_server.sh](./start_examples_py_pb_rpc_http_server.sh)脚本,启动 RPC Server; @@ -39,7 +39,7 @@ -## protobuf rpc grpcc +## protobuf rpc grpc 一个基于 protobuf 协议与 grpc 后端的 python rpc 示例,演示内容包括: @@ -61,7 +61,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 运行本目录下的[build_examples_py_pb_rpc.sh](./build_examples_py_pb_rpc.sh)脚本,生成协议桩代码文件; - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; - 运行本目录下的[start_examples_py_pb_rpc_grpc_server.sh](./start_examples_py_pb_rpc_grpc_server.sh)脚本,启动 RPC Server; @@ -100,7 +100,7 @@ 运行方式(linux): -- 安装 `aimrt_py` 包; +- [安装 `aimrt_py` 包](../../../../document/sphinx-cn/tutorials/quick_start/installation_py.md); - 运行本目录下的[build_examples_py_pb_rpc.sh](./build_examples_py_pb_rpc.sh)脚本,生成协议桩代码文件; - 如果本地没有 protoc 或者 protoc 版本小于 3.20,请安装或升级 protoc,或直接修改脚本中的 `protoc_cmd` 变量指向合适的路径; - 运行本目录下的[start_examples_py_pb_rpc_ros2_server.sh](./start_examples_py_pb_rpc_ros2_server.sh)脚本,启动 RPC Server; diff --git a/src/protocols/example/benchmark.proto b/src/protocols/example/benchmark.proto index 52df1ea7a..aa31e0027 100644 --- a/src/protocols/example/benchmark.proto +++ b/src/protocols/example/benchmark.proto @@ -9,6 +9,7 @@ enum BenchmarkStatus { Begin = 0; End = 1; WarmUp = 2; + ShutdownPeer = 3; } message BenchmarkSignal {