diff --git a/src/plugins/iceoryx_plugin/iceoryx_channel_backend.cc b/src/plugins/iceoryx_plugin/iceoryx_channel_backend.cc index 4fe698dfc..15625cc6e 100644 --- a/src/plugins/iceoryx_plugin/iceoryx_channel_backend.cc +++ b/src/plugins/iceoryx_plugin/iceoryx_channel_backend.cc @@ -114,7 +114,10 @@ bool IceoryxChannelBackend::Subscribe( msg = static_cast(payload); // fetch a data packet of a specified length - util::ConstBufferOperator buf_oper(msg + kFixedLen, std::stoi(std::string(msg, kFixedLen))); + util::ConstBufferOperator buf_oper_tmp(msg, 4); + uint32_t pkg_size_with_len = buf_oper_tmp.GetUint32(); + + util::ConstBufferOperator buf_oper(msg + 4, pkg_size_with_len); // get serialization type std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); @@ -234,7 +237,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap iox_pub_loaned_shm_ptr = loan_result.value(); // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg - util::BufferOperator buf_oper(reinterpret_cast(iox_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + util::BufferOperator buf_oper(reinterpret_cast(iox_pub_loaned_shm_ptr) + 4, loan_size - 4); // write serialization type on loaned shm buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); @@ -248,7 +251,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size; // write msg on loaned shm: should start at the (FIXED_LEN + type_and_ctx_len)-th byte - aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast(iox_pub_loaned_shm_ptr) + type_and_ctx_len + kFixedLen); + aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast(iox_pub_loaned_shm_ptr) + type_and_ctx_len + 4); if (buffer_array_cache_ptr == nullptr) { try { auto result = SerializeMsgSupportedIceoryx(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(iox_allocator.NativeHandle())); @@ -262,7 +265,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap if (msg_size > buf_oper.GetRemainingSize()) { // in this case means the msg has serialization cache but the size is too large, then expand suitable size is_shm_enough = false; - iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + kFixedLen; + iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + 4; } else { // in this case means the msg has serialization cache and the size is suitable, then use cachema @@ -287,7 +290,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap // if has cache, the copy it to shm to replace the serialization if (buffer_array_cache_ptr != nullptr) { - char* strat_pos = static_cast(iox_pub_loaned_shm_ptr) + kFixedLen + context_meta_kv_size + serialization_type.size() + 1; + char* strat_pos = static_cast(iox_pub_loaned_shm_ptr) + 4 + context_meta_kv_size + serialization_type.size() + 1; for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; @@ -297,7 +300,8 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap } // write info pkg length on loaned shm - std::memcpy(static_cast(iox_pub_loaned_shm_ptr), IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen); + uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; + util::SetBufFromUint32(reinterpret_cast(iox_pub_loaned_shm_ptr), data_size); iox_pub->publish(iox_pub_loaned_shm_ptr); } diff --git a/src/plugins/iceoryx_plugin/util.cc b/src/plugins/iceoryx_plugin/util.cc index b52bb8f00..7687dc69d 100644 --- a/src/plugins/iceoryx_plugin/util.cc +++ b/src/plugins/iceoryx_plugin/util.cc @@ -55,12 +55,6 @@ std::string GetPid() { return process_id_str; } -std::string IntToFixedLengthString(int number, int length) { - std::ostringstream oss; - oss << std::setw(length) << number; - return oss.str(); -} - std::pair, size_t> SerializeMsgSupportedIceoryx( MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) { auto& serialization_cache = msg_wrapper.serialization_cache; diff --git a/src/plugins/iceoryx_plugin/util.h b/src/plugins/iceoryx_plugin/util.h index eb3150098..36aead1ac 100644 --- a/src/plugins/iceoryx_plugin/util.h +++ b/src/plugins/iceoryx_plugin/util.h @@ -18,15 +18,12 @@ namespace aimrt::plugins::iceoryx_plugin { using IdString_t = iox::capro::IdString_t; -constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string, which is enough to the max value of uint64_t constexpr uint64_t kIoxShmInitSize = 1024; // default vaule of shm_init_size for iceoryx iox::capro::ServiceDescription Url2ServiceDescription(std::string& url); std::string GetPid(); -std::string IntToFixedLengthString(int number, int length); - using namespace aimrt::runtime::core::channel; std::pair, size_t> SerializeMsgSupportedIceoryx( MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator); diff --git a/src/plugins/zenoh_plugin/util.h b/src/plugins/zenoh_plugin/util.h index d6ed5e3cb..7724f5351 100644 --- a/src/plugins/zenoh_plugin/util.h +++ b/src/plugins/zenoh_plugin/util.h @@ -7,7 +7,6 @@ #include "core/rpc/rpc_invoke_wrapper.h" namespace aimrt::plugins::zenoh_plugin { -constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string, which is enough to the max value of uint64_t inline std::pair, size_t> SerializeMsgSupportedZenoh( runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) { @@ -71,10 +70,4 @@ inline std::pair, size_t> Serializ return {nullptr, buffer_array_ptr->BufferSize()}; } -inline std::string IntToFixedLengthString(int number, int length) { - std::ostringstream oss; - oss << std::setw(length) << number; - return oss.str(); -} - } // namespace aimrt::plugins::zenoh_plugin \ No newline at end of file diff --git a/src/plugins/zenoh_plugin/zenoh_channel_backend.cc b/src/plugins/zenoh_plugin/zenoh_channel_backend.cc index d5f479d8f..0fde5c805 100644 --- a/src/plugins/zenoh_plugin/zenoh_channel_backend.cc +++ b/src/plugins/zenoh_plugin/zenoh_channel_backend.cc @@ -148,7 +148,10 @@ bool ZenohChannelBackend::Subscribe( // read data from payload auto ret = z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size); if (ret >= 0) { - util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); + util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4); + uint32_t serialized_size_with_len = buf_oper_tmp.GetUint32(); + + util::ConstBufferOperator buf_oper(serialized_data.data() + 4, serialized_size_with_len); // get serialization type std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); @@ -268,7 +271,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg - util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + 4, loan_size - 4); // write serialization type on loaned shm buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); @@ -281,7 +284,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size; // write msg on loaned shm: should start at the (FIXED_LEN + type_and_ctx_len)-th byte - aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + kFixedLen); + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + 4); if (buffer_array_cache_ptr == nullptr) { try { auto result = SerializeMsgSupportedZenoh(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle())); @@ -294,7 +297,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe if (msg_size > buf_oper.GetRemainingSize()) { // in this case means the msg has serialization cache but the size is too large, then expand suitable size is_shm_loan_size_enough = false; - z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + kFixedLen; + z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + 4; } else { // in this case means the msg has serialization cache and the size is suitable, then use cachema is_shm_loan_size_enough = true; @@ -320,7 +323,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe if (is_shm_pool_size_enough) { // if has cache, the copy it to shm to replace the serialization if (buffer_array_cache_ptr != nullptr) { - unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + context_meta_kv_size + serialization_type.size() + 1; + unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + context_meta_kv_size + serialization_type.size() + 1; for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; @@ -329,7 +332,9 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe buffer_array_cache_ptr = nullptr; } // write info pkg length on loaned shm - std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen); + uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; + util::SetBufFromUint32(reinterpret_cast(z_pub_loaned_shm_ptr), data_size); + z_owned_bytes_t z_payload; if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); @@ -359,14 +364,14 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe size_t msg_size = buffer_array_view_ptr->BufferSize(); int32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size; - int32_t pkg_size = data_size + kFixedLen; + int32_t pkg_size = data_size + 4; // create buffer for serialization std::vector serialized_data(pkg_size); util::BufferOperator buf_oper(serialized_data.data(), pkg_size); // full data_size - buf_oper.SetBuffer(IntToFixedLengthString(data_size, kFixedLen).c_str(), kFixedLen); + buf_oper.SetUint32(data_size); // full serialization_type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); diff --git a/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc b/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc index 8b50fc298..663648b3e 100644 --- a/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc +++ b/src/plugins/zenoh_plugin/zenoh_rpc_backend.cc @@ -165,7 +165,10 @@ bool ZenohRpcBackend::RegisterServiceFunc( // read data from payload auto ret = z_bytes_reader_read(&reader, reinterpret_cast(serialized_data.data()), serialized_size); if (ret >= 0) { - util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); + util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4); + uint32_t pkg_size_with_len = buf_oper_tmp.GetUint32(); + + util::ConstBufferOperator buf_oper(serialized_data.data() + 4, pkg_size_with_len); // deserialize type std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); @@ -242,8 +245,8 @@ bool ZenohRpcBackend::RegisterServiceFunc( bool is_shm_loan_size_enough = true; bool is_shm_pool_size_enough = true; - uint64_t msg_size = 0; - size_t header_len = 0; + uint32_t msg_size = 0; + uint32_t header_len = 0; z_buf_layout_alloc_result_t loan_result; do { @@ -269,7 +272,7 @@ bool ZenohRpcBackend::RegisterServiceFunc( z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg - util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + 4, loan_size - 4); // write serialization type on loaned shm buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); @@ -283,7 +286,7 @@ bool ZenohRpcBackend::RegisterServiceFunc( header_len = 1 + serialization_type.size() + 4 + 4; // write msg on loaned shm: should start at the (FIXED_LEN + header_len)-th byte - aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen); + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + 4); if (buffer_array_cache_ptr == nullptr) { try { @@ -297,7 +300,7 @@ bool ZenohRpcBackend::RegisterServiceFunc( if (msg_size > buf_oper.GetRemainingSize()) { // in this case means the msg has serialization cache but the size is too large, then expand suitable size is_shm_loan_size_enough = false; - z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size; + z_node_shm_size_map_[node_pub_topic] = 4 + header_len + msg_size; } else { // in this case means the msg has serialization cache and the size is suitable, then use cachema is_shm_loan_size_enough = true; @@ -323,7 +326,7 @@ bool ZenohRpcBackend::RegisterServiceFunc( if (is_shm_pool_size_enough) { // if has cache, the copy it to shm to replace the serialization if (buffer_array_cache_ptr != nullptr) { - unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len; + unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + header_len; for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; @@ -333,7 +336,8 @@ bool ZenohRpcBackend::RegisterServiceFunc( } // write info pkg length on loaned shm - std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen); + util::SetBufFromUint32(reinterpret_cast(z_pub_loaned_shm_ptr), header_len); + z_owned_bytes_t z_payload; if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); @@ -364,14 +368,14 @@ bool ZenohRpcBackend::RegisterServiceFunc( size_t rsp_size = buffer_array_view_ptr->BufferSize(); size_t z_data_size = 1 + serialization_type.size() + 4 + 4 + rsp_size; - size_t pkg_size = z_data_size + kFixedLen; + size_t pkg_size = z_data_size + 4; // get buf to store data std::vector msg_buf_vec(pkg_size); util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size); // full data_size - buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen); + buf_oper.SetUint32(z_data_size); // full serialize type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); @@ -460,7 +464,10 @@ bool ZenohRpcBackend::RegisterClientFunc( return; } - util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen))); + util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4); + uint32_t data_size_with_len = buf_oper_tmp.GetUint32(); + + util::ConstBufferOperator buf_oper(serialized_data.data() + 4, data_size_with_len); std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8)); uint32_t req_id = buf_oper.GetUint32(); @@ -600,8 +607,8 @@ void ZenohRpcBackend::Invoke( bool is_shm_loan_size_enough = true; bool is_shm_pool_size_enough = true; - uint64_t msg_size = 0; - size_t header_len = 0; + uint32_t msg_size = 0; + uint32_t header_len = 0; z_buf_layout_alloc_result_t loan_result; do { @@ -627,7 +634,7 @@ void ZenohRpcBackend::Invoke( z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf)); // write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg - util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen); + util::BufferOperator buf_oper(reinterpret_cast(z_pub_loaned_shm_ptr) + 4, loan_size - 4); // write serialization type on loaned shm buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8); @@ -650,7 +657,7 @@ void ZenohRpcBackend::Invoke( context_meta_kv_size; // write msg on loaned shm: should start at the (FIXED_LEN + header_len)-th byte - aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + kFixedLen); + aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + header_len + 4); if (buffer_array_cache_ptr == nullptr) { try { @@ -664,7 +671,7 @@ void ZenohRpcBackend::Invoke( if (msg_size > buf_oper.GetRemainingSize()) { // in this case means the msg has serialization cache but the size is too large, then expand suitable size is_shm_loan_size_enough = false; - z_node_shm_size_map_[node_pub_topic] = kFixedLen + header_len + msg_size; + z_node_shm_size_map_[node_pub_topic] = 4 + header_len + msg_size; } else { // in this case means the msg has serialization cache and the size is suitable, then use cachema is_shm_loan_size_enough = true; @@ -690,7 +697,7 @@ void ZenohRpcBackend::Invoke( if (is_shm_pool_size_enough) { // if has cache, the copy it to shm to replace the serialization if (buffer_array_cache_ptr != nullptr) { - unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + header_len; + unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + header_len; for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) { std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len); strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len; @@ -700,7 +707,8 @@ void ZenohRpcBackend::Invoke( } // write info pkg length on loaned shm - std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(header_len, kFixedLen).c_str(), kFixedLen); + util::SetBufFromUint32(reinterpret_cast(z_pub_loaned_shm_ptr), header_len); + z_owned_bytes_t z_payload; if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf)); @@ -737,13 +745,13 @@ void ZenohRpcBackend::Invoke( context_meta_kv_size + req_size; - size_t pkg_size = z_data_size + kFixedLen; + size_t pkg_size = z_data_size + 4; // create buffer for serialization std::vector msg_buf_vec(pkg_size); util::BufferOperator buf_oper(msg_buf_vec.data(), pkg_size); // full data_size - buf_oper.SetBuffer(IntToFixedLengthString(z_data_size, kFixedLen).c_str(), kFixedLen); + buf_oper.SetUint32(z_data_size); // full serialization_type buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);