feat: enhance aimrt_py channel with ctx functionality (#44)

* chore: reorganize imports for consistency

Refactor the import statements across multiple modules to improve readability and maintain consistency. Ensure all relevant libraries are imported in a coherent manner.

* feat: add GetTopic method to PublisherRef and SubscriberRef

Expose the GetTopic method for both PublisherRef and SubscriberRef to improve topic management and enhance usability within the Python interface.

* feat: add context handling in publishing

Introduce new context management methods for publishing messages with context support. Enhance the Python bindings to allow for better integration with context references, improving the flexibility and usability of the publisher functionality.

* feat: enhance subscription with context support

Add a new subscription method that includes context handling for better message processing. This improves the flexibility of message serialization formats and error handling during callback execution.

* refactor: clean up publish and subscribe methods

Improve code readability by removing unnecessary blank lines and ensuring consistent formatting in the publish and subscribe functions.

* style: update ASCII art formatting in example runner

Improve visual consistency of the ASCII banner in the output, enhancing readability.

* chore: improve type annotations for publisher and subscriber functions

Enhance the type hints in the RegisterPublishType, Publish, and Subscribe functions to improve code clarity and maintainability. This facilitates better integration with type-checking tools and helps prevent potential runtime errors.

* feat: support multiple serialization types in publisher

Enhance the publishing functionality to handle both protobuf and JSON serialization types. This improves flexibility and allows for broader compatibility with different data formats. Add error handling for invalid serialization types to ensure robustness.

* fix: correct serialization type prefix

Update the serialization type from "pb" to "json" when publishing JSON messages, ensuring accurate format differentiation for better clarity and preventing potential processing errors.

* refactor: streamline serialization handling in PublishWithCtx

Improve message serialization by consolidating logic and enhancing readability. Ensure error handling for unsupported types remains clear.

* feat: add Context support for aimrt_py channel functionality

Enhance channel functionality in aimrt_py by providing Context support, improving usability and enabling better resource management during operations.

* refactor: streamline serialization type handling

Improve clarity by formatting the serialization type in the publish method. This enhances readability and ensures consistent output in message formatting.

* refactor: rename publish function and improve argument handling

Enhance the Publish function to accept different argument configurations and simplify the serialization process, ensuring clearer logic and improved usability.

* feat: enhance publishing with customizable serialization

Allow publishers to specify serialization type (protobuf or JSON) when publishing messages. Update method signatures to improve clarity and error handling for serialization type validation.

* feat: enhance Publish function documentation

Clarify usage and expectations for the Publish function, including argument types and potential exceptions. This improves maintainability and usability for future developers.

* refactor: simplify protobuf serialization handling

Streamline the protobuf message serialization and deserialization processes by separating concerns into dedicated functions. Improve callback handling for subscriber methods to enhance code readability and maintainability.

* refactor: streamline serialization and deserialization methods

Simplify the process of message serialization and deserialization by consolidating functions and improving code clarity. This enhances maintainability and reduces redundancy in handling protobuf messages.

* refactor: remove unused subscription method

Eliminate the `PySubscribeWithSerializationType` function to streamline the code and improve maintainability, as it is not being used in the current implementation.

* refactor: streamline callback parameter handling

Simplify the validation and handling of callback parameters in the subscription process. Ensure the callback adheres to a clear signature expectation, enhancing code maintainability and reducing potential errors.

* chore: streamline imports for clarity

Optimize import statements by specifying only required components, improving readability and maintainability.

* feat: enhance protobuf registration and subscription documentation

Improve docstrings for registering and subscribing to protobuf message types, clarifying parameter details and callback expectations to aid developer understanding and usage.
This commit is contained in:
zhangyi1357 2024-10-22 20:09:15 +08:00 committed by GitHub
parent 7811ecc7cc
commit e2e77060af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 195 additions and 48 deletions

View File

@ -9,6 +9,7 @@
- mqtt 新增配置项以支持加密传输;
- 新增了第三方库 asioruntime::core 不再引用 boost改为引用独立的 asio 库,以减轻依赖;
- 修复 aimrt_py 多线程 rpc 调用 server 端概率性崩溃的问题;
- 为 aimrt_py 的 channel 功能提供了 Context 支持;
**次要修改**
- 缩短了一些 examples 的文件路径长度;
@ -20,4 +21,3 @@
- iceoryx 插件在编译前先检查是否存在libacl不存在则不进行编译
- 提供 RPC 服务的插件现在支持指定 service name
- 提供一键运行example的脚本并生成测试报告

View File

@ -2,9 +2,10 @@
# All rights reserved.
import argparse
import threading
import signal
import sys
import threading
import aimrt_py
import helloworld_module

View File

@ -3,12 +3,12 @@
import argparse
import threading
import aimrt_py
import yaml
import time
from google.protobuf.json_format import MessageToJson
import aimrt_py
import event_pb2
import yaml
from google.protobuf.json_format import MessageToJson
def main():

View File

@ -2,14 +2,14 @@
# All rights reserved.
import argparse
import threading
import signal
import sys
import aimrt_py
import yaml
import threading
from google.protobuf.json_format import MessageToJson
import aimrt_py
import event_pb2
import yaml
from google.protobuf.json_format import MessageToJson
global_aimrt_core = None

View File

@ -2,14 +2,14 @@
# All rights reserved.
import argparse
import threading
import aimrt_py
import time
import datetime
import threading
import time
from google.protobuf.json_format import MessageToJson
import rpc_pb2
import aimrt_py
import rpc_aimrt_rpc_pb2
import rpc_pb2
from google.protobuf.json_format import MessageToJson
def main():

View File

@ -83,13 +83,13 @@ class ExampleRunner:
width = 65
report = f"""
{CYAN}{BOLD}
_____ _ ____ _
|_ _|__ ___| |_ | _ \ ___ _ __ ___ _ __| |_ _
| |/ _ \/ __| __| | |_) / _ \ '_ \ / _ \| '__| __(_)
| | __/\__ \ |_ | _ < __/ |_) | (_) | | | |_ _
|_|\___||___/\__| |_| \_\___| .__/ \___/|_| \__(_)
|_|
{CYAN}{BOLD}
_____ _ ____ _
|_ _|__ ___| |_ | _ \\ ___ _ __ ___ _ __| |_ _
| |/ _ \\/ __| __| | |_) / _ \\ '_ \\ / _ \\| '__| __(_)
| | __/\\__ \\ |_ | _ < __/ |_) | (_) | | | |_ _
|_|\\___||___/\\__| |_| \\_\\___| .__/ \\___/|_| \\__(_)
|_|
{RESET}
{YELLOW}{BOLD} Overall Result:{RESET}
{WHITE}{'Total tests:':<{width}}{CYAN}{total_tests}

View File

@ -2,5 +2,5 @@
# All rights reserved.
from .aimrt_py_log import *
from .aimrt_py_pb_chn import *
from .aimrt_py_pb_chn import Publish, RegisterPublishType, Subscribe
from .aimrt_python_runtime import *

View File

@ -1,12 +1,47 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
import inspect
from typing import Callable
import google.protobuf
import google.protobuf.message
from . import aimrt_python_runtime
def RegisterPublishType(publisher, protobuf_type):
def _SerializeProtobufMessage(pb_msg: google.protobuf.message.Message, serialization_type: str) -> bytes:
if serialization_type == "pb":
return pb_msg.SerializeToString()
elif serialization_type == "json":
return google.protobuf.json_format.MessageToJson(pb_msg)
else:
raise ValueError(f"Invalid serialization type: {serialization_type}")
def _DeserializeProtobufMessage(msg_buf: bytes,
serialization_type: str,
protobuf_type: google.protobuf.message.Message) -> google.protobuf.message.Message:
msg = protobuf_type()
if serialization_type == "pb":
msg.ParseFromString(msg_buf)
elif serialization_type == "json":
google.protobuf.json_format.Parse(msg_buf, msg)
else:
raise ValueError(f"Invalid serialization type: {serialization_type}")
return msg
def RegisterPublishType(publisher: aimrt_python_runtime.PublisherRef, protobuf_type: google.protobuf.message.Message):
"""Register a protobuf message type to a publisher.
Args:
publisher (aimrt_python_runtime.PublisherRef): channel publisher
protobuf_type (google.protobuf.message.Message): protobuf message type
Returns:
bool: True if success, False otherwise
"""
aimrt_ts = aimrt_python_runtime.TypeSupport()
aimrt_ts.SetTypeName("pb:" + protobuf_type.DESCRIPTOR.full_name)
aimrt_ts.SetSerializationTypesSupportedList(["pb", "json"])
@ -14,30 +49,92 @@ def RegisterPublishType(publisher, protobuf_type):
return publisher.RegisterPublishType(aimrt_ts)
def Publish(publisher, pb_msg):
publisher.Publish("pb:" + pb_msg.DESCRIPTOR.full_name, "pb", pb_msg.SerializeToString())
def Publish(publisher: aimrt_python_runtime.PublisherRef, second, third=None):
"""Publish a message to a channel.
This function can be called in following ways:
- Publish(publisher, pb_msg)
- Publish(publisher, pb_msg, ctx)
- Publish(publisher, pb_msg, serialization_type)
- Publish(publisher, ctx, pb_msg)
- Publish(publisher, serialization_type, pb_msg)
pb_msg: google.protobuf.message.Message
ctx: aimrt_python_runtime.Context or aimrt_python_runtime.ContextRef
serialization_type: str
Args:
publisher (aimrt_python_runtime.PublisherRef): channel publisher
second: pb_msg or ctx or serialization_type
third: pb_msg or ctx or serialization_type or None
Raises:
TypeError: if the arguments are invalid
"""
if isinstance(second, google.protobuf.message.Message):
pb_msg = second
ctx = third
elif isinstance(third, google.protobuf.message.Message):
pb_msg = third
ctx = second
else:
raise TypeError("Invalid arguments, no protobuf message found")
if isinstance(ctx, (aimrt_python_runtime.Context, aimrt_python_runtime.ContextRef)):
if isinstance(ctx, aimrt_python_runtime.Context):
ctx_ref = aimrt_python_runtime.ContextRef(ctx)
else:
ctx_ref = ctx
serialized_msg = _SerializeProtobufMessage(pb_msg, ctx_ref.GetSerializationType())
publisher.PublishWithCtx(f"pb:{pb_msg.DESCRIPTOR.full_name}", ctx_ref, serialized_msg)
elif isinstance(ctx, str):
serialization_type = ctx
serialized_msg = _SerializeProtobufMessage(pb_msg, serialization_type)
publisher.PublishWithSerializationType(f"pb:{pb_msg.DESCRIPTOR.full_name}", serialization_type, serialized_msg)
elif ctx is None:
# default use pb serialization
serialized_msg = _SerializeProtobufMessage(pb_msg, "pb")
publisher.PublishWithSerializationType(f"pb:{pb_msg.DESCRIPTOR.full_name}", "pb", serialized_msg)
else:
raise TypeError(
f"Invalid context type: {type(ctx)}, "
f"only 'aimrt_python_runtime.Context' or 'aimrt_python_runtime.ContextRef' or 'str' is supported")
def Subscribe(subscriber, protobuf_type, callback):
def Subscribe(subscriber: aimrt_python_runtime.SubscriberRef,
protobuf_type: google.protobuf.message.Message,
callback: Callable):
"""Subscribe a message from a channel.
Args:
subscriber (aimrt_python_runtime.SubscriberRef): channel subscriber
protobuf_type (google.protobuf.message.Message): protobuf message type
callback (Callable): callback function
Raises:
ValueError: if the callback is invalid
Callback function signature:
- callback(msg)
- callback(ctx, msg)
"""
aimrt_ts = aimrt_python_runtime.TypeSupport()
aimrt_ts.SetTypeName("pb:" + protobuf_type.DESCRIPTOR.full_name)
aimrt_ts.SetSerializationTypesSupportedList(["pb", "json"])
def handle_callback(serialization_type, msg_buf):
sig = inspect.signature(callback)
required_param_count = sum(1 for param in sig.parameters.values() if param.default == param.empty)
if not (1 <= required_param_count <= 2):
raise ValueError("Invalid callback: expected 1 or 2 parameters, with at most one optional parameter")
def handle_callback(ctx_ref: aimrt_python_runtime.ContextRef, msg_buf: bytes):
try:
if serialization_type == "pb":
msg = protobuf_type()
msg.ParseFromString(msg_buf)
msg = _DeserializeProtobufMessage(msg_buf, ctx_ref.GetSerializationType(), protobuf_type)
if required_param_count == 1:
callback(msg)
return
if serialization_type == "json":
msg = protobuf_type()
google.protobuf.json_format.Parse(msg_buf, msg)
callback(msg)
return
else:
callback(ctx_ref, msg)
except Exception as e:
print("AimRT channel handle get exception, {}".format(e))
return
print(f"AimRT channel handle get exception, {e}")
subscriber.Subscribe(aimrt_ts, handle_callback)
subscriber.SubscribeWithCtx(aimrt_ts, handle_callback)

View File

@ -5,6 +5,7 @@
#include <utility>
#include "aimrt_module_cpp_interface/channel/channel_context.h"
#include "aimrt_module_cpp_interface/channel/channel_handle.h"
#include "python_runtime/export_type_support.h"
@ -12,6 +13,40 @@
namespace aimrt::runtime::python_runtime {
inline void ExportContext(const pybind11::object& m) {
using aimrt::channel::Context;
using aimrt::channel::ContextRef;
pybind11::class_<Context>(m, "Context")
.def(pybind11::init<>())
.def("CheckUsed", &Context::CheckUsed)
.def("SetUsed", &Context::SetUsed)
.def("Reset", &Context::Reset)
.def("GetType", &Context::GetType)
.def("GetMetaValue", &Context::GetMetaValue)
.def("SetMetaValue", &Context::SetMetaValue)
.def("GetMetaKeys", &Context::GetMetaKeys)
.def("GetSerializationType", &Context::GetSerializationType)
.def("SetSerializationType", &Context::SetSerializationType)
.def("ToString", &Context::ToString);
pybind11::class_<ContextRef>(m, "ContextRef")
.def(pybind11::init<>())
.def(pybind11::init<const Context&>())
.def(pybind11::init<const Context*>())
.def(pybind11::init<const std::shared_ptr<Context>&>())
.def("__bool__", &ContextRef::operator bool)
.def("CheckUsed", &ContextRef::CheckUsed)
.def("SetUsed", &ContextRef::SetUsed)
.def("GetType", &ContextRef::GetType)
.def("GetMetaValue", &ContextRef::GetMetaValue)
.def("SetMetaValue", &ContextRef::SetMetaValue)
.def("GetMetaKeys", &ContextRef::GetMetaKeys)
.def("GetSerializationType", &ContextRef::GetSerializationType)
.def("SetSerializationType", &ContextRef::SetSerializationType)
.def("ToString", &ContextRef::ToString);
}
inline bool PyRegisterPublishType(
aimrt::channel::PublisherRef& publisher_ref,
const std::shared_ptr<const PyTypeSupport>& msg_type_support) {
@ -21,7 +56,7 @@ inline bool PyRegisterPublishType(
return publisher_ref.RegisterPublishType(msg_type_support->NativeHandle());
}
inline void PyPublish(
inline void PyPublishWithSerializationType(
aimrt::channel::PublisherRef& publisher_ref,
std::string_view msg_type,
std::string_view serialization_type,
@ -31,6 +66,14 @@ inline void PyPublish(
publisher_ref.Publish(msg_type, ctx, static_cast<const void*>(&msg_buf));
}
inline void PyPublishWithCtx(
aimrt::channel::PublisherRef& publisher_ref,
std::string_view msg_type,
const aimrt::channel::ContextRef& ctx_ref,
const std::string& msg_buf) {
publisher_ref.Publish(msg_type, ctx_ref, static_cast<const void*>(&msg_buf));
}
inline void ExportPublisherRef(pybind11::object m) {
using aimrt::channel::PublisherRef;
@ -38,13 +81,16 @@ inline void ExportPublisherRef(pybind11::object m) {
.def(pybind11::init<>())
.def("__bool__", &PublisherRef::operator bool)
.def("RegisterPublishType", &PyRegisterPublishType)
.def("Publish", &PyPublish);
.def("PublishWithSerializationType", &PyPublishWithSerializationType)
.def("PublishWithCtx", &PyPublishWithCtx)
.def("GetTopic", &PublisherRef::GetTopic)
.def("MergeSubscribeContextToPublishContext", &PublisherRef::MergeSubscribeContextToPublishContext);
}
inline bool PySubscribe(
inline bool PySubscribeWithCtx(
aimrt::channel::SubscriberRef& subscriber_ref,
const std::shared_ptr<const PyTypeSupport>& msg_type_support,
std::function<void(std::string_view, const pybind11::bytes&)>&& callback) {
std::function<void(aimrt::channel::ContextRef, const pybind11::bytes&)>&& callback) {
static std::vector<std::shared_ptr<const PyTypeSupport>> py_ts_vec;
py_ts_vec.emplace_back(msg_type_support);
@ -62,7 +108,7 @@ inline bool PySubscribe(
pybind11::gil_scoped_acquire acquire;
auto msg_buf_bytes = pybind11::bytes(msg_buf);
callback(ctx_ref.GetSerializationType(), msg_buf_bytes);
callback(ctx_ref, msg_buf_bytes);
msg_buf_bytes.release();
pybind11::gil_scoped_release release;
@ -77,7 +123,8 @@ inline void ExportSubscriberRef(pybind11::object m) {
pybind11::class_<SubscriberRef>(std::move(m), "SubscriberRef")
.def(pybind11::init<>())
.def("__bool__", &SubscriberRef::operator bool)
.def("Subscribe", &PySubscribe);
.def("SubscribeWithCtx", &PySubscribeWithCtx)
.def("GetTopic", &SubscriberRef::GetTopic);
}
inline void ExportChannelHandleRef(pybind11::object m) {
@ -87,6 +134,7 @@ inline void ExportChannelHandleRef(pybind11::object m) {
.def(pybind11::init<>())
.def("__bool__", &ChannelHandleRef::operator bool)
.def("GetPublisher", &ChannelHandleRef::GetPublisher)
.def("GetSubscriber", &ChannelHandleRef::GetSubscriber);
.def("GetSubscriber", &ChannelHandleRef::GetSubscriber)
.def("MergeSubscribeContextToPublishContext", &ChannelHandleRef::MergeSubscribeContextToPublishContext);
}
} // namespace aimrt::runtime::python_runtime
} // namespace aimrt::runtime::python_runtime

View File

@ -44,6 +44,7 @@ PYBIND11_MODULE(aimrt_python_runtime, m) {
ExportExecutorRef(m);
// channel
ExportContext(m);
ExportPublisherRef(m);
ExportSubscriberRef(m);
ExportChannelHandleRef(m);