From 1f3401341a151efdaf79c4ccaf97d5bb4d77df6d Mon Sep 17 00:00:00 2001 From: ATT_POWER <34850640+yglsaltfish@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:55:37 +0800 Subject: [PATCH] feat: add the rpc and channel metric in opentelemetry plugin (#92) * feat(opentelemetry_plugin): Add RPC tracing and performance metrics - Added multiple counters and histograms in OpenTelemetryPlugin for tracking RPC calls - Updated ChannelTraceFilter and RpcTraceFilter with enhanced context attribute handling - Added new RpcMetricsFilter for collecting RPC performance metrics - Adjusted log level configuration from INFO to Warn - Added link to echo_plugin example in documentation * fix: format the code * fix : format * perf(opentelemetry_plugin): Optimize RPC performance metrics calculation logic * fix: Simplify opentelemetry plugin code * refactor(opentelemetry_plugin): remove rpc status * fix: delete unnessary label * format * feat(opentelemetry_plugin): Add custom histogram boundaries option for RPC metrics * fix(opentelemetry): Fix histogram boundary value type * fix: change new to make_unique * docs: add opentelemetry_plugin doc * fix: opt the expression of doc * choro: format the code * docs: opt opentelemetry doc * fix: update OpenTelemetry plugin documentation - Change trace_otlp_http_exporter_url and metrics_otlp_http_exporter_url fields from required to optional - Optimize capture list in OpenTelemetryPlugin::RpcMetricsFilter function --- .../tutorials/plugins/opentelemetry_plugin.md | 53 ++++- .../plugins/opentelemetry_plugin/README.md | 40 ++++ ...entelemetry_plugin_pb_rpc_metrics_cfg.yaml | 1 + .../opentelemetry_plugin.cc | 225 +++++++++++++++--- .../opentelemetry_plugin.h | 6 +- 5 files changed, 286 insertions(+), 39 deletions(-) diff --git a/document/sphinx-cn/tutorials/plugins/opentelemetry_plugin.md b/document/sphinx-cn/tutorials/plugins/opentelemetry_plugin.md index a2eeef09c..fcbd40f95 100644 --- a/document/sphinx-cn/tutorials/plugins/opentelemetry_plugin.md +++ b/document/sphinx-cn/tutorials/plugins/opentelemetry_plugin.md @@ -13,7 +13,7 @@ **opentelemetry_plugin**是一个基于[OpenTelemetry](https://opentelemetry.io/)的插件,为 AimRT 提供框架层面的可观测性功能。它主要基于 AimRT 中的 RPC/Channel Framework Filter 进行工作,关于 Filter 的概念请参考[AimRT 中的基本概念](../concepts/concepts.md)文档中的相关章节。 -当前版本,**opentelemetry_plugin**仅支持了 trace 功能,后续还计划完善 mertric 等功能。 +当前版本,**opentelemetry_plugin**仅支持了 trace 功能 以及 metrics 的 rpc 和 channel 部分功能,后续还计划完善 mertric 的执行器和服务部分功能。 **opentelemetry_plugin**提供了以下这些 RPC/Channel Framework Filter: @@ -36,16 +36,21 @@ | 节点 | 类型 | 是否可选| 默认值 | 作用 | | ---- | ---- | ---- | ---- | ---- | | node_name | string | 必选 | "" | 上报时的节点名称,不可为空 | -| trace_otlp_http_exporter_url | string | 必选 | "" | 基于 otlp http exporter 上报 trace 时的 url | +| trace_otlp_http_exporter_url | string | 可选 | "" | 基于 otlp http exporter 上报 trace 时的 url , 如果不需要上报 trace 则可以不配置 | +| metrics_otlp_http_exporter_url | string | 可选 | "" | 基于 otlp http exporter 上报 metrics 时的 url , 如果不需要上报 metrics 则可以不配置 | +| rpc_time_cost_histogram_boundaries | array | 可选 | [1, 2 , 4, ... , 2147483648] | 上报 RPC 调用时间时,使用到的 histogram 的边界值列表,单位为 us | | force_trace | bool | 可选 | false | 是否强制上报 trace | | attributes | array | 可选 | [] | 本节点上报时附带的 kv 属性列表 | | attributes[i].key | string | 必选 | "" | 属性的 key 值 | | attributes[i].val | string | 必选 | "" | 属性的 val 值 | -在配置了插件后,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_trace`或`otp_simple_trace`类型的过滤器,才能在 rpc/channel 调用前后进行 trace 跟踪。 +在配置了插件后, +- 对于 trace 功能,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_trace`或`otp_simple_trace`类型的过滤器,才能在 rpc/channel 调用前后进行 trace 跟踪 +- 对于 metrics 功能,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_metrics`类型的过滤器,才能在 rpc/channel 调用前后进行 metrics 跟踪 +### trace 示例 以下是一个简单的基于 local 后端进行 RPC、Channel 通信,并进行 trace 跟踪的示例: ```yaml aimrt: @@ -139,6 +144,48 @@ RPC/Channel 的 trace 功能开启方式分为以下几种情况: } ``` +### metrics 示例 +以下是一个简单的基于 local 后端进行 RPC、Channel 通信,并进行 metrics 跟踪的示例,设置了`rpc_time_cost_histogram_boundaries`,上报 RPC 调用时间使用到的 histogram 的边界值列表,单位为 us: +```yaml +aimrt: + plugin: + plugins: + - name: opentelemetry_plugin + path: ./libaimrt_opentelemetry_plugin.so + options: + node_name: example_node + metrics_otlp_http_exporter_url: http://localhost:4318/v1/metrics + rpc_time_cost_histogram_boundaries: [0, 50.0, 150.0, 350.0, 750.0, 1350.0] # unit: us, optional + attributes: + - key: sn + val: 123456 + rpc: + backends: + - type: local + clients_options: + - func_name: "(.*)" + enable_backends: [local] + enable_filters: [otp_metrics] + servers_options: + - func_name: "(.*)" + enable_backends: [local] + enable_filters: [otp_metrics] + channel: + backends: + - type: local + options: + subscriber_use_inline_executor: true + pub_topics_options: + - topic_name: "(.*)" + enable_backends: [local] + enable_filters: [otp_metrics] + sub_topics_options: + - topic_name: "(.*)" + enable_backends: [local] + enable_filters: [otp_metrics] + module: + # ... +``` ## 常用实践 diff --git a/src/examples/plugins/opentelemetry_plugin/README.md b/src/examples/plugins/opentelemetry_plugin/README.md index e13525ded..ef26ffd3f 100644 --- a/src/examples/plugins/opentelemetry_plugin/README.md +++ b/src/examples/plugins/opentelemetry_plugin/README.md @@ -75,6 +75,26 @@ 一个基于 **opentelemetry_plugin** 的 rpc metrics 示例,演示内容包括: - 如何在启动时加载 **opentelemetry_plugin**; - 如何为 rpc 配置 metrics 功能; +- 如何设置 rpc 调用时间的直方图的边界; + + +核心代码: +- [rpc.proto](../../../protocols/example/rpc.proto) +- [normal_rpc_co_client_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc) +- [normal_rpc_co_server_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc) +- [service.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/service.cc) + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_OPENTELEMETRY_PLUGIN` 选项编译 AimRT; +- 将启动配置中的 `metrics_otlp_http_exporter_url` 配置为 collector 或 jaejer 的上报地址,详情请参考插件文档; +- 直接运行 build 目录下`start_examples_plugins_opentelemetry_plugin_pb_rpc_metrics.sh`脚本启动进程; +- 在 jaejer 平台上观察 rpc metrics 数据; +- 键入`ctrl-c`停止进程; + +说明: +- 此示例基于 protobuf rpc local 后端示例,通过 **opentelemetry_plugin** 中的 `otp_metrics` 类型 filter 上报 rpc metrics 数据; + ## channel metrics @@ -83,3 +103,23 @@ - 如何在启动时加载 **opentelemetry_plugin**; - 如何为 channel 配置 metrics 功能; +核心代码: +- [event.proto](../../../protocols/example/event.proto) +- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc) +- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc) + + +配置文件: +- [examples_plugins_opentelemetry_plugin_pb_chn_metrics_cfg.yaml](./install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_chn_metrics_cfg.yaml) + + + +运行方式(linux): +- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_OPENTELEMETRY_PLUGIN` 选项编译 AimRT; +- 将启动配置中的 `metrics_otlp_http_exporter_url` 配置为 collector 或 jaejer 的上报地址,详情请参考插件文档; +- 直接运行 build 目录下`start_examples_plugins_opentelemetry_plugin_pb_chn_metrics.sh`脚本启动进程; +- 在 jaejer 平台上观察 channel metrics 数据; +- 键入`ctrl-c`停止进程; + +说明: +- 此示例基于 protobuf channel local 后端示例,通过 **opentelemetry_plugin** 中的 `otp_metrics` 类型 filter 上报 channel metrics 数据; diff --git a/src/examples/plugins/opentelemetry_plugin/install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_rpc_metrics_cfg.yaml b/src/examples/plugins/opentelemetry_plugin/install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_rpc_metrics_cfg.yaml index bebf25867..32e41afa3 100644 --- a/src/examples/plugins/opentelemetry_plugin/install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_rpc_metrics_cfg.yaml +++ b/src/examples/plugins/opentelemetry_plugin/install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_rpc_metrics_cfg.yaml @@ -9,6 +9,7 @@ aimrt: options: node_name: example_rpc_node metrics_otlp_http_exporter_url: http://localhost:4318/v1/metrics + rpc_time_cost_histogram_boundaries: [0, 50.0, 150.0, 350.0, 750.0, 1350.0] # unit: us, optional attributes: - key: sn val: 123456 diff --git a/src/plugins/opentelemetry_plugin/opentelemetry_plugin.cc b/src/plugins/opentelemetry_plugin/opentelemetry_plugin.cc index f0b39aae5..44bc0b964 100644 --- a/src/plugins/opentelemetry_plugin/opentelemetry_plugin.cc +++ b/src/plugins/opentelemetry_plugin/opentelemetry_plugin.cc @@ -26,6 +26,7 @@ struct convert(); + if (node["rpc_time_cost_histogram_boundaries"]) + rhs.rpc_time_cost_histogram_boundaries = node["rpc_time_cost_histogram_boundaries"].as>(); + for (const auto& attribute_node : node["attributes"]) { auto attribute = Options::Attribute{ .key = attribute_node["key"].as(), @@ -132,10 +136,64 @@ bool OpenTelemetryPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcep reader_opts.export_interval_millis = std::chrono::milliseconds(options_.metrics_export_interval_ms); reader_opts.export_timeout_millis = std::chrono::milliseconds(options_.metrics_export_timeout_ms); - auto reader = - metric_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), reader_opts); + auto reader = metric_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), reader_opts); + + std::vector bucket_boundaries; + if (!options_.rpc_time_cost_histogram_boundaries.empty()) { + bucket_boundaries = options_.rpc_time_cost_histogram_boundaries; + } else { + bucket_boundaries.resize(32); + double val = 1.0; + std::generate(bucket_boundaries.begin(), bucket_boundaries.end(), + [&val]() { + double current = val; + val *= 2.0; + return current; + }); + } auto views = metric_sdk::ViewRegistryFactory::Create(); + // configure RPC client time cost histogram + std::unique_ptr client_instrument_selector = std::make_unique( + metric_sdk::InstrumentType::kHistogram, + "rpc.client.time_cost", + "us"); + std::unique_ptr client_meter_selector = std::make_unique(options_.node_name, "", ""); + + std::shared_ptr client_config = std::make_shared(); + client_config->boundaries_ = bucket_boundaries; + + std::unique_ptr client_view = std::make_unique( + "rpc_client_time_cost", + "RPC client time cost histogram view", + "us", + metric_sdk::AggregationType::kHistogram, + client_config); + + views->AddView(std::move(client_instrument_selector), + std::move(client_meter_selector), + std::move(client_view)); + + // configure RPC server time cost histogram + std::unique_ptr server_instrument_selector = std::make_unique( + metric_sdk::InstrumentType::kHistogram, + "rpc.server.time_cost", + "us"); + std::unique_ptr server_meter_selector = std::make_unique(options_.node_name, "", ""); + + std::shared_ptr server_config = std::make_shared(); + server_config->boundaries_ = bucket_boundaries; + + std::unique_ptr server_view = std::make_unique( + "rpc_server_time_cost", + "RPC server time cost histogram view", + "us", + metric_sdk::AggregationType::kHistogram, + server_config); + + views->AddView(std::move(server_instrument_selector), + std::move(server_meter_selector), + std::move(server_view)); auto context = metric_sdk::MeterContextFactory::Create(std::move(views), resource); context->AddMetricReader(std::move(reader)); @@ -143,12 +201,24 @@ bool OpenTelemetryPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcep meter_provider_ = metric_sdk::MeterProviderFactory::Create(std::move(context)); meter_ = meter_provider_->GetMeter(options_.node_name); - + // channel chn_pub_msg_num_counter_ = meter_->CreateUInt64Counter("chn.pub.msg_num", "Total num of channel publish msg"); chn_sub_msg_num_counter_ = meter_->CreateUInt64Counter("chn.sub.msg_num", "Total num of channel subscribe msg"); chn_pub_msg_size_counter_ = meter_->CreateUInt64Counter("chn.pub.msg_size", "Total size of channel publish msg", "bytes"); chn_sub_msg_size_counter_ = meter_->CreateUInt64Counter("chn.sub.msg_size", "Total size of channel subscribe msg", "bytes"); + + // rpc + rpc_client_invoke_num_counter_ = meter_->CreateUInt64Counter("rpc.client.invoke_num", "Total num of rpc client invoke"); + rpc_server_invoke_num_counter_ = meter_->CreateUInt64Counter("rpc.server.invoke_num", "Total num of rpc server invoke"); + + rpc_client_req_size_counter_ = meter_->CreateUInt64Counter("rpc.client.req_size", "Total size of rpc client request", "bytes"); + rpc_client_rsp_size_counter_ = meter_->CreateUInt64Counter("rpc.client.rsp_size", "Total size of rpc client response", "bytes"); + rpc_server_req_size_counter_ = meter_->CreateUInt64Counter("rpc.server.req_size", "Total size of rpc server request", "bytes"); + rpc_server_rsp_size_counter_ = meter_->CreateUInt64Counter("rpc.server.rsp_size", "Total size of rpc server response", "bytes"); + + rpc_client_time_cost_histogram_ = meter_->CreateDoubleHistogram("rpc.client.time_cost", "Time cost of rpc client", "us"); + rpc_server_time_cost_histogram_ = meter_->CreateDoubleHistogram("rpc.server.time_cost", "Time cost of rpc server", "us"); } // register hook @@ -312,7 +382,8 @@ void OpenTelemetryPlugin::ChannelTraceFilter( auto ctx_ref = msg_wrapper.ctx_ref; const auto& info = msg_wrapper.info; - // 如果设置了全局强制trace,或者context强制设置了start_new_trace,或者上层传递了span,则新启动一个span + // if global force trace is set, or context force sets start_new_trace, or an upper layer passes a span, + // then start a new span bool start_new_trace = options_.force_trace; if (!start_new_trace) { @@ -323,7 +394,7 @@ void OpenTelemetryPlugin::ChannelTraceFilter( ContextCarrier carrier(ctx_ref); - // 解压传进来的context,得到父span + // unpack the incoming context to get the parent span trace_api::StartSpanOptions op; op.kind = (type == ChannelFilterType::kPublisher) ? trace_api::SpanKind::kProducer @@ -339,34 +410,34 @@ void OpenTelemetryPlugin::ChannelTraceFilter( start_new_trace = true; } - // 不需要启动一个新trace + // no need to start a new trace if (!start_new_trace) { h(msg_wrapper); return; } - // 需要启动一个新trace + // need to start a new trace std::string span_name = msg_wrapper.info.topic_name + "/" + msg_wrapper.info.msg_type; auto span = tracer_->StartSpan(span_name, op); - // 先发布数据 + // publish msg first h(msg_wrapper); - // 将当前span的context打包 + // pack current span's context opentelemetry::context::Context output_ot_ctx(trace_api::kSpanKey, span); propagator_->Inject(carrier, output_ot_ctx); - // 添加base信息 + // add base info span->SetAttribute("module_name", info.module_name); - // 添加context中的属性 + // add context attributes auto keys = ctx_ref.GetMetaKeys(); for (auto& item : keys) { span->SetAttribute(item, ctx_ref.GetMetaValue(item)); } if (upload_msg) { - // 序列化包成json + // serialize msg to json auto buf_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, "json"); if (buf_ptr) { auto msg_str = buf_ptr->JoinToString(); @@ -384,7 +455,8 @@ void OpenTelemetryPlugin::RpcTraceFilter( aimrt::runtime::core::rpc::FrameworkAsyncRpcHandle&& h) { auto ctx_ref = wrapper_ptr->ctx_ref; - // 如果设置了全局强制trace,或者context强制设置了start_new_trace,或者上层传递了span,则新启动一个span + // if global force trace is set, or context force sets start_new_trace, or an upper layer passes a span, + // then start a new span bool start_new_trace = options_.force_trace; if (!start_new_trace) { @@ -395,7 +467,7 @@ void OpenTelemetryPlugin::RpcTraceFilter( ContextCarrier carrier(ctx_ref); - // 解压传进来的context,得到父span + // unpack the incoming context to get the parent span trace_api::StartSpanOptions op; op.kind = (type == RpcFilterType::kClient) ? trace_api::SpanKind::kClient @@ -411,16 +483,16 @@ void OpenTelemetryPlugin::RpcTraceFilter( start_new_trace = true; } - // 不需要启动一个新trace + // no need to start a new trace if (!start_new_trace) { h(wrapper_ptr); return; } - // 需要启动一个新trace + // need to start a new trace auto span = tracer_->StartSpan(ctx_ref.GetFunctionName(), op); - // 将当前span的context打包 + // pack current span's context opentelemetry::context::Context output_ot_ctx(trace_api::kSpanKey, span); propagator_->Inject(carrier, output_ot_ctx); @@ -438,17 +510,17 @@ void OpenTelemetryPlugin::RpcTraceFilter( auto ctx_ref = wrapper_ptr->ctx_ref; const auto& info = wrapper_ptr->info; - // 添加base信息 + // add base info span->SetAttribute("module_name", info.module_name); - // 添加context中的属性 + // add context attributes auto keys = ctx_ref.GetMetaKeys(); for (auto& item : keys) { span->SetAttribute(item, ctx_ref.GetMetaValue(item)); } if (upload_msg) { - // 序列化req/rsp为json + // serialize req/rsp to json auto req_buf_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache(*wrapper_ptr, "json"); if (req_buf_ptr) { auto req_json = req_buf_ptr->JoinToString(); @@ -466,7 +538,6 @@ void OpenTelemetryPlugin::RpcTraceFilter( callback(status); }; - h(wrapper_ptr); } @@ -476,18 +547,13 @@ void OpenTelemetryPlugin::ChannelMetricsFilter( aimrt::runtime::core::channel::FrameworkAsyncChannelHandle&& h) { // publish msg first h(msg_wrapper); - // get counter - metrics_api::Counter* msg_num_counter_ptr = nullptr; - metrics_api::Counter* msg_size_counter_ptr = nullptr; - - if (type == ChannelFilterType::kPublisher) { - msg_num_counter_ptr = chn_pub_msg_num_counter_.get(); - msg_size_counter_ptr = chn_pub_msg_size_counter_.get(); - } else { - msg_num_counter_ptr = chn_sub_msg_num_counter_.get(); - msg_size_counter_ptr = chn_sub_msg_size_counter_.get(); - } + auto& msg_num_counter_ptr = (type == ChannelFilterType::kPublisher) + ? chn_pub_msg_num_counter_ + : chn_sub_msg_num_counter_; + auto& msg_size_counter_ptr = (type == ChannelFilterType::kPublisher) + ? chn_pub_msg_size_counter_ + : chn_sub_msg_size_counter_; // make labels auto ctx_ref = msg_wrapper.ctx_ref; @@ -533,7 +599,6 @@ void OpenTelemetryPlugin::ChannelMetricsFilter( } } - labels.emplace("serialization_type", serialization_type); msg_size_counter_ptr->Add(msg_size, labels); } @@ -541,7 +606,97 @@ void OpenTelemetryPlugin::RpcMetricsFilter( RpcFilterType type, const std::shared_ptr& wrapper_ptr, aimrt::runtime::core::rpc::FrameworkAsyncRpcHandle&& h) { + auto begin_time = std::chrono::steady_clock::now(); + wrapper_ptr->callback = [this, type, wrapper_ptr, + begin_time, + callback{std::move(wrapper_ptr->callback)}](aimrt::rpc::Status status) { + auto end_time = std::chrono::steady_clock::now(); + + auto time_cost = std::chrono::duration_cast( + end_time - begin_time) + .count(); + + const auto& info = wrapper_ptr->info; + std::map labels{ + {"module_name", info.module_name}, + {"func_name", info.func_name}, + {"status", std::string(status.GetCodeMsg(status.Code()))}, + }; + + // choose ptr depends on type + auto& rpc_msg_num_counter_ptr = (type == RpcFilterType::kClient) + ? rpc_client_invoke_num_counter_ + : rpc_server_invoke_num_counter_; + + auto& rpc_msg_req_size_counter_ptr = (type == RpcFilterType::kClient) + ? rpc_client_req_size_counter_ + : rpc_server_req_size_counter_; + + auto& rpc_msg_rsp_size_counter_ptr = (type == RpcFilterType::kClient) + ? rpc_client_rsp_size_counter_ + : rpc_server_rsp_size_counter_; + + auto& time_cost_histogram_ptr = (type == RpcFilterType::kClient) + ? rpc_client_time_cost_histogram_ + : rpc_server_time_cost_histogram_; + + time_cost_histogram_ptr->Record(time_cost, labels, opentelemetry::context::Context()); + + // msg num + rpc_msg_num_counter_ptr->Add(1, labels); + + size_t req_msg_size = 0, rsp_msg_size = 0; + + // req msg size + const auto& req_serialization_cache = wrapper_ptr->req_serialization_cache; + const auto& req_serialization_type_span = info.req_type_support_ref.SerializationTypesSupportedListSpan(); + std::string_view req_serialization_type; + + if (req_serialization_cache.size() == 1) { + req_serialization_type = req_serialization_cache.begin()->first; + req_msg_size = req_serialization_cache.begin()->second->BufferSize(); + } else { + for (auto item : req_serialization_type_span) { + auto cur_serialization_type = aimrt::util::ToStdStringView(item); + + auto finditr = req_serialization_cache.find(cur_serialization_type); + if (finditr == req_serialization_cache.end()) [[unlikely]] + continue; + + req_serialization_type = cur_serialization_type; + req_msg_size = finditr->second->BufferSize(); + break; + } + } + + rpc_msg_req_size_counter_ptr->Add(req_msg_size, labels); + + // rsp msg size + const auto& rsp_serialization_cache = wrapper_ptr->rsp_serialization_cache; + const auto& rsp_serialization_type_span = info.rsp_type_support_ref.SerializationTypesSupportedListSpan(); + std::string_view rsp_serialization_type; + + if (rsp_serialization_cache.size() == 1) { + rsp_serialization_type = rsp_serialization_cache.begin()->first; + rsp_msg_size = rsp_serialization_cache.begin()->second->BufferSize(); + } else { + for (auto item : rsp_serialization_type_span) { + auto cur_serialization_type = aimrt::util::ToStdStringView(item); + + auto finditr = rsp_serialization_cache.find(cur_serialization_type); + if (finditr == rsp_serialization_cache.end()) [[unlikely]] + continue; + + rsp_serialization_type = cur_serialization_type; + rsp_msg_size = finditr->second->BufferSize(); + break; + } + } + rpc_msg_rsp_size_counter_ptr->Add(rsp_msg_size, labels); + + callback(status); + }; + h(wrapper_ptr); } - -} // namespace aimrt::plugins::opentelemetry_plugin +} // namespace aimrt::plugins::opentelemetry_plugin \ No newline at end of file diff --git a/src/plugins/opentelemetry_plugin/opentelemetry_plugin.h b/src/plugins/opentelemetry_plugin/opentelemetry_plugin.h index efec92388..7a07cf055 100644 --- a/src/plugins/opentelemetry_plugin/opentelemetry_plugin.h +++ b/src/plugins/opentelemetry_plugin/opentelemetry_plugin.h @@ -62,6 +62,7 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase { std::string metrics_otlp_http_exporter_url; uint32_t metrics_export_interval_ms = 15000; uint32_t metrics_export_timeout_ms = 5000; + std::vector rpc_time_cost_histogram_boundaries; struct Attribute { std::string key; @@ -136,6 +137,7 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase { std::shared_ptr meter_; using u64_counter = std::unique_ptr>; + using db_histogram = std::unique_ptr>; u64_counter chn_pub_msg_num_counter_; u64_counter chn_sub_msg_num_counter_; @@ -148,6 +150,8 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase { u64_counter rpc_client_rsp_size_counter_; u64_counter rpc_server_req_size_counter_; u64_counter rpc_server_rsp_size_counter_; + db_histogram rpc_client_time_cost_histogram_; + db_histogram rpc_server_time_cost_histogram_; }; -} // namespace aimrt::plugins::opentelemetry_plugin +} // namespace aimrt::plugins::opentelemetry_plugin \ No newline at end of file