fix: zenoh and iox on serializatio_length (#116)

* fix zenoh and iox on serializatio_length

* fix

---------

Co-authored-by: hanjun <hanjun@agibot.com>
This commit is contained in:
han J 2024-12-03 21:09:25 +08:00 committed by GitHub
parent 4bfe31f2b6
commit 356ddd7bb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 51 additions and 50 deletions

View File

@ -114,7 +114,10 @@ bool IceoryxChannelBackend::Subscribe(
msg = static_cast<const char*>(payload);
// fetch a data packet of a specified length
util::ConstBufferOperator buf_oper(msg + kFixedLen, std::stoi(std::string(msg, kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(msg, 4);
uint32_t pkg_size_with_len = buf_oper_tmp.GetUint32();
util::ConstBufferOperator buf_oper(msg + 4, pkg_size_with_len);
// get serialization type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
@ -234,7 +237,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
iox_pub_loaned_shm_ptr = loan_result.value();
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr) + 4, loan_size - 4);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
@ -248,7 +251,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size;
// write msg on loaned shm should start at the (FIXED_LEN + type_and_ctx_len)-th byte
aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast<char*>(iox_pub_loaned_shm_ptr) + type_and_ctx_len + kFixedLen);
aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast<char*>(iox_pub_loaned_shm_ptr) + type_and_ctx_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeMsgSupportedIceoryx(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(iox_allocator.NativeHandle()));
@ -262,7 +265,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_enough = false;
iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + kFixedLen;
iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + 4;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
@ -287,7 +290,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
char* strat_pos = static_cast<char*>(iox_pub_loaned_shm_ptr) + kFixedLen + context_meta_kv_size + serialization_type.size() + 1;
char* strat_pos = static_cast<char*>(iox_pub_loaned_shm_ptr) + 4 + context_meta_kv_size + serialization_type.size() + 1;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
@ -297,7 +300,8 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
}
// write info pkg length on loaned shm
std::memcpy(static_cast<char*>(iox_pub_loaned_shm_ptr), IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen);
uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
util::SetBufFromUint32(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr), data_size);
iox_pub->publish(iox_pub_loaned_shm_ptr);
}

View File

@ -55,12 +55,6 @@ std::string GetPid() {
return process_id_str;
}
std::string IntToFixedLengthString(int number, int length) {
std::ostringstream oss;
oss << std::setw(length) << number;
return oss.str();
}
std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedIceoryx(
MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
auto& serialization_cache = msg_wrapper.serialization_cache;

View File

@ -18,15 +18,12 @@ namespace aimrt::plugins::iceoryx_plugin {
using IdString_t = iox::capro::IdString_t;
constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string which is enough to the max value of uint64_t
constexpr uint64_t kIoxShmInitSize = 1024; // default vaule of shm_init_size for iceoryx
iox::capro::ServiceDescription Url2ServiceDescription(std::string& url);
std::string GetPid();
std::string IntToFixedLengthString(int number, int length);
using namespace aimrt::runtime::core::channel;
std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedIceoryx(
MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator);

View File

@ -7,7 +7,6 @@
#include "core/rpc/rpc_invoke_wrapper.h"
namespace aimrt::plugins::zenoh_plugin {
constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string which is enough to the max value of uint64_t
inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedZenoh(
runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
@ -71,10 +70,4 @@ inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> Serializ
return {nullptr, buffer_array_ptr->BufferSize()};
}
inline std::string IntToFixedLengthString(int number, int length) {
std::ostringstream oss;
oss << std::setw(length) << number;
return oss.str();
}
} // namespace aimrt::plugins::zenoh_plugin

View File

@ -148,7 +148,10 @@ bool ZenohChannelBackend::Subscribe(
// read data from payload
auto ret = z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size);
if (ret >= 0) {
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4);
uint32_t serialized_size_with_len = buf_oper_tmp.GetUint32();
util::ConstBufferOperator buf_oper(serialized_data.data() + 4, serialized_size_with_len);
// get serialization type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
@ -268,7 +271,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + 4, loan_size - 4);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
@ -281,7 +284,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size;
// write msg on loaned shm should start at the (FIXED_LEN + type_and_ctx_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + kFixedLen);
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeMsgSupportedZenoh(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle()));
@ -294,7 +297,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + kFixedLen;
z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + 4;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
@ -320,7 +323,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + context_meta_kv_size + serialization_type.size() + 1;
unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + context_meta_kv_size + serialization_type.size() + 1;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
@ -329,7 +332,9 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
buffer_array_cache_ptr = nullptr;
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen);
uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
util::SetBufFromUint32(reinterpret_cast<char*>(z_pub_loaned_shm_ptr), data_size);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
@ -359,14 +364,14 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
size_t msg_size = buffer_array_view_ptr->BufferSize();
int32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
int32_t pkg_size = data_size + kFixedLen;
int32_t pkg_size = data_size + 4;
// create buffer for serialization
std::vector<char> serialized_data(pkg_size);
util::BufferOperator buf_oper(serialized_data.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(data_size, kFixedLen).c_str(), kFixedLen);
buf_oper.SetUint32(data_size);
// full serialization_type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);

View File

@ -165,7 +165,10 @@ bool ZenohRpcBackend::RegisterServiceFunc(
// read data from payload
auto ret = z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size);
if (ret >= 0) {
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4);
uint32_t pkg_size_with_len = buf_oper_tmp.GetUint32();
util::ConstBufferOperator buf_oper(serialized_data.data() + 4, pkg_size_with_len);
// deserialize type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
@ -242,8 +245,8 @@ bool ZenohRpcBackend::RegisterServiceFunc(
bool is_shm_loan_size_enough = true;
bool is_shm_pool_size_enough = true;
uint64_t msg_size = 0;
size_t header_len = 0;
uint32_t msg_size = 0;
uint32_t header_len = 0;
z_buf_layout_alloc_result_t loan_result;
do {
@ -269,7 +272,7 @@ bool ZenohRpcBackend::RegisterServiceFunc(
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + 4, loan_size - 4);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
@ -283,7 +286,7 @@ bool ZenohRpcBackend::RegisterServiceFunc(
header_len = 1 + serialization_type.size() + 4 + 4;
// write msg on loaned shm should start at the (FIXED_LEN + header_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen);
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
@ -297,7 +300,7 @@ bool ZenohRpcBackend::RegisterServiceFunc(
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size;
z_node_shm_size_map_[node_pub_topic] = 4 + header_len + msg_size;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
@ -323,7 +326,7 @@ bool ZenohRpcBackend::RegisterServiceFunc(
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len;
unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + header_len;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
@ -333,7 +336,8 @@ bool ZenohRpcBackend::RegisterServiceFunc(
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen);
util::SetBufFromUint32(reinterpret_cast<char*>(z_pub_loaned_shm_ptr), header_len);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
@ -364,14 +368,14 @@ bool ZenohRpcBackend::RegisterServiceFunc(
size_t rsp_size = buffer_array_view_ptr->BufferSize();
size_t z_data_size = 1 + serialization_type.size() + 4 + 4 + rsp_size;
size_t pkg_size = z_data_size + kFixedLen;
size_t pkg_size = z_data_size + 4;
// get buf to store data
std::vector<char> msg_buf_vec(pkg_size);
util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen);
buf_oper.SetUint32(z_data_size);
// full serialize type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
@ -460,7 +464,10 @@ bool ZenohRpcBackend::RegisterClientFunc(
return;
}
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4);
uint32_t data_size_with_len = buf_oper_tmp.GetUint32();
util::ConstBufferOperator buf_oper(serialized_data.data() + 4, data_size_with_len);
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
uint32_t req_id = buf_oper.GetUint32();
@ -600,8 +607,8 @@ void ZenohRpcBackend::Invoke(
bool is_shm_loan_size_enough = true;
bool is_shm_pool_size_enough = true;
uint64_t msg_size = 0;
size_t header_len = 0;
uint32_t msg_size = 0;
uint32_t header_len = 0;
z_buf_layout_alloc_result_t loan_result;
do {
@ -627,7 +634,7 @@ void ZenohRpcBackend::Invoke(
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));
// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + 4, loan_size - 4);
// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
@ -650,7 +657,7 @@ void ZenohRpcBackend::Invoke(
context_meta_kv_size;
// write msg on loaned shm should start at the (FIXED_LEN + header_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen);
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
@ -664,7 +671,7 @@ void ZenohRpcBackend::Invoke(
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size;
z_node_shm_size_map_[node_pub_topic] = 4 + header_len + msg_size;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
@ -690,7 +697,7 @@ void ZenohRpcBackend::Invoke(
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len;
unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + header_len;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
@ -700,7 +707,8 @@ void ZenohRpcBackend::Invoke(
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen);
util::SetBufFromUint32(reinterpret_cast<char*>(z_pub_loaned_shm_ptr), header_len);
z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
@ -737,13 +745,13 @@ void ZenohRpcBackend::Invoke(
context_meta_kv_size +
req_size;
size_t pkg_size = z_data_size + kFixedLen;
size_t pkg_size = z_data_size + 4;
// create buffer for serialization
std::vector<char> msg_buf_vec(pkg_size);
util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size);
// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen);
buf_oper.SetUint32(z_data_size);
// full serialization_type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);