feat: add the rpc and channel metric in opentelemetry plugin (#92)
* feat(opentelemetry_plugin): Add RPC tracing and performance metrics - Added multiple counters and histograms in OpenTelemetryPlugin for tracking RPC calls - Updated ChannelTraceFilter and RpcTraceFilter with enhanced context attribute handling - Added new RpcMetricsFilter for collecting RPC performance metrics - Adjusted log level configuration from INFO to Warn - Added link to echo_plugin example in documentation * fix: format the code * fix : format * perf(opentelemetry_plugin): Optimize RPC performance metrics calculation logic * fix: Simplify opentelemetry plugin code * refactor(opentelemetry_plugin): remove rpc status * fix: delete unnessary label * format * feat(opentelemetry_plugin): Add custom histogram boundaries option for RPC metrics * fix(opentelemetry): Fix histogram boundary value type * fix: change new to make_unique * docs: add opentelemetry_plugin doc * fix: opt the expression of doc * choro: format the code * docs: opt opentelemetry doc * fix: update OpenTelemetry plugin documentation - Change trace_otlp_http_exporter_url and metrics_otlp_http_exporter_url fields from required to optional - Optimize capture list in OpenTelemetryPlugin::RpcMetricsFilter function
This commit is contained in:
parent
062ffad431
commit
1f3401341a
@ -13,7 +13,7 @@
|
||||
**opentelemetry_plugin**是一个基于[OpenTelemetry](https://opentelemetry.io/)的插件,为 AimRT 提供框架层面的可观测性功能。它主要基于 AimRT 中的 RPC/Channel Framework Filter 进行工作,关于 Filter 的概念请参考[AimRT 中的基本概念](../concepts/concepts.md)文档中的相关章节。
|
||||
|
||||
|
||||
当前版本,**opentelemetry_plugin**仅支持了 trace 功能,后续还计划完善 mertric 等功能。
|
||||
当前版本,**opentelemetry_plugin**仅支持了 trace 功能 以及 metrics 的 rpc 和 channel 部分功能,后续还计划完善 mertric 的执行器和服务部分功能。
|
||||
|
||||
|
||||
**opentelemetry_plugin**提供了以下这些 RPC/Channel Framework Filter:
|
||||
@ -36,16 +36,21 @@
|
||||
| 节点 | 类型 | 是否可选| 默认值 | 作用 |
|
||||
| ---- | ---- | ---- | ---- | ---- |
|
||||
| node_name | string | 必选 | "" | 上报时的节点名称,不可为空 |
|
||||
| trace_otlp_http_exporter_url | string | 必选 | "" | 基于 otlp http exporter 上报 trace 时的 url |
|
||||
| trace_otlp_http_exporter_url | string | 可选 | "" | 基于 otlp http exporter 上报 trace 时的 url , 如果不需要上报 trace 则可以不配置 |
|
||||
| metrics_otlp_http_exporter_url | string | 可选 | "" | 基于 otlp http exporter 上报 metrics 时的 url , 如果不需要上报 metrics 则可以不配置 |
|
||||
| rpc_time_cost_histogram_boundaries | array | 可选 | [1, 2 , 4, ... , 2147483648] | 上报 RPC 调用时间时,使用到的 histogram 的边界值列表,单位为 us |
|
||||
| force_trace | bool | 可选 | false | 是否强制上报 trace |
|
||||
| attributes | array | 可选 | [] | 本节点上报时附带的 kv 属性列表 |
|
||||
| attributes[i].key | string | 必选 | "" | 属性的 key 值 |
|
||||
| attributes[i].val | string | 必选 | "" | 属性的 val 值 |
|
||||
|
||||
|
||||
在配置了插件后,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_trace`或`otp_simple_trace`类型的过滤器,才能在 rpc/channel 调用前后进行 trace 跟踪。
|
||||
|
||||
在配置了插件后,
|
||||
- 对于 trace 功能,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_trace`或`otp_simple_trace`类型的过滤器,才能在 rpc/channel 调用前后进行 trace 跟踪
|
||||
- 对于 metrics 功能,还需要在`rpc`/`channel`节点下的的`enable_filters`配置中注册`otp_metrics`类型的过滤器,才能在 rpc/channel 调用前后进行 metrics 跟踪
|
||||
|
||||
### trace 示例
|
||||
以下是一个简单的基于 local 后端进行 RPC、Channel 通信,并进行 trace 跟踪的示例:
|
||||
```yaml
|
||||
aimrt:
|
||||
@ -139,6 +144,48 @@ RPC/Channel 的 trace 功能开启方式分为以下几种情况:
|
||||
}
|
||||
```
|
||||
|
||||
### metrics 示例
|
||||
以下是一个简单的基于 local 后端进行 RPC、Channel 通信,并进行 metrics 跟踪的示例,设置了`rpc_time_cost_histogram_boundaries`,上报 RPC 调用时间使用到的 histogram 的边界值列表,单位为 us:
|
||||
```yaml
|
||||
aimrt:
|
||||
plugin:
|
||||
plugins:
|
||||
- name: opentelemetry_plugin
|
||||
path: ./libaimrt_opentelemetry_plugin.so
|
||||
options:
|
||||
node_name: example_node
|
||||
metrics_otlp_http_exporter_url: http://localhost:4318/v1/metrics
|
||||
rpc_time_cost_histogram_boundaries: [0, 50.0, 150.0, 350.0, 750.0, 1350.0] # unit: us, optional
|
||||
attributes:
|
||||
- key: sn
|
||||
val: 123456
|
||||
rpc:
|
||||
backends:
|
||||
- type: local
|
||||
clients_options:
|
||||
- func_name: "(.*)"
|
||||
enable_backends: [local]
|
||||
enable_filters: [otp_metrics]
|
||||
servers_options:
|
||||
- func_name: "(.*)"
|
||||
enable_backends: [local]
|
||||
enable_filters: [otp_metrics]
|
||||
channel:
|
||||
backends:
|
||||
- type: local
|
||||
options:
|
||||
subscriber_use_inline_executor: true
|
||||
pub_topics_options:
|
||||
- topic_name: "(.*)"
|
||||
enable_backends: [local]
|
||||
enable_filters: [otp_metrics]
|
||||
sub_topics_options:
|
||||
- topic_name: "(.*)"
|
||||
enable_backends: [local]
|
||||
enable_filters: [otp_metrics]
|
||||
module:
|
||||
# ...
|
||||
```
|
||||
|
||||
## 常用实践
|
||||
|
||||
|
@ -75,6 +75,26 @@
|
||||
一个基于 **opentelemetry_plugin** 的 rpc metrics 示例,演示内容包括:
|
||||
- 如何在启动时加载 **opentelemetry_plugin**;
|
||||
- 如何为 rpc 配置 metrics 功能;
|
||||
- 如何设置 rpc 调用时间的直方图的边界;
|
||||
|
||||
|
||||
核心代码:
|
||||
- [rpc.proto](../../../protocols/example/rpc.proto)
|
||||
- [normal_rpc_co_client_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc)
|
||||
- [normal_rpc_co_server_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc)
|
||||
- [service.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/service.cc)
|
||||
|
||||
|
||||
运行方式(linux):
|
||||
- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_OPENTELEMETRY_PLUGIN` 选项编译 AimRT;
|
||||
- 将启动配置中的 `metrics_otlp_http_exporter_url` 配置为 collector 或 jaejer 的上报地址,详情请参考插件文档;
|
||||
- 直接运行 build 目录下`start_examples_plugins_opentelemetry_plugin_pb_rpc_metrics.sh`脚本启动进程;
|
||||
- 在 jaejer 平台上观察 rpc metrics 数据;
|
||||
- 键入`ctrl-c`停止进程;
|
||||
|
||||
说明:
|
||||
- 此示例基于 protobuf rpc local 后端示例,通过 **opentelemetry_plugin** 中的 `otp_metrics` 类型 filter 上报 rpc metrics 数据;
|
||||
|
||||
|
||||
|
||||
## channel metrics
|
||||
@ -83,3 +103,23 @@
|
||||
- 如何在启动时加载 **opentelemetry_plugin**;
|
||||
- 如何为 channel 配置 metrics 功能;
|
||||
|
||||
核心代码:
|
||||
- [event.proto](../../../protocols/example/event.proto)
|
||||
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
|
||||
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)
|
||||
|
||||
|
||||
配置文件:
|
||||
- [examples_plugins_opentelemetry_plugin_pb_chn_metrics_cfg.yaml](./install/linux/bin/cfg/examples_plugins_opentelemetry_plugin_pb_chn_metrics_cfg.yaml)
|
||||
|
||||
|
||||
|
||||
运行方式(linux):
|
||||
- 开启 `AIMRT_BUILD_EXAMPLES`、`AIMRT_BUILD_OPENTELEMETRY_PLUGIN` 选项编译 AimRT;
|
||||
- 将启动配置中的 `metrics_otlp_http_exporter_url` 配置为 collector 或 jaejer 的上报地址,详情请参考插件文档;
|
||||
- 直接运行 build 目录下`start_examples_plugins_opentelemetry_plugin_pb_chn_metrics.sh`脚本启动进程;
|
||||
- 在 jaejer 平台上观察 channel metrics 数据;
|
||||
- 键入`ctrl-c`停止进程;
|
||||
|
||||
说明:
|
||||
- 此示例基于 protobuf channel local 后端示例,通过 **opentelemetry_plugin** 中的 `otp_metrics` 类型 filter 上报 channel metrics 数据;
|
||||
|
@ -9,6 +9,7 @@ aimrt:
|
||||
options:
|
||||
node_name: example_rpc_node
|
||||
metrics_otlp_http_exporter_url: http://localhost:4318/v1/metrics
|
||||
rpc_time_cost_histogram_boundaries: [0, 50.0, 150.0, 350.0, 750.0, 1350.0] # unit: us, optional
|
||||
attributes:
|
||||
- key: sn
|
||||
val: 123456
|
||||
|
@ -26,6 +26,7 @@ struct convert<aimrt::plugins::opentelemetry_plugin::OpenTelemetryPlugin::Option
|
||||
node["metrics_otlp_http_exporter_url"] = rhs.metrics_otlp_http_exporter_url;
|
||||
node["metrics_export_interval_ms"] = rhs.metrics_export_interval_ms;
|
||||
node["metrics_export_timeout_ms"] = rhs.metrics_export_timeout_ms;
|
||||
node["rpc_time_cost_histogram_boundaries"] = rhs.rpc_time_cost_histogram_boundaries;
|
||||
|
||||
node["attributes"] = YAML::Node();
|
||||
for (const auto& attribute : rhs.attributes) {
|
||||
@ -58,6 +59,9 @@ struct convert<aimrt::plugins::opentelemetry_plugin::OpenTelemetryPlugin::Option
|
||||
if (node["metrics_export_timeout_ms"])
|
||||
rhs.metrics_export_timeout_ms = node["metrics_export_timeout_ms"].as<uint32_t>();
|
||||
|
||||
if (node["rpc_time_cost_histogram_boundaries"])
|
||||
rhs.rpc_time_cost_histogram_boundaries = node["rpc_time_cost_histogram_boundaries"].as<std::vector<double>>();
|
||||
|
||||
for (const auto& attribute_node : node["attributes"]) {
|
||||
auto attribute = Options::Attribute{
|
||||
.key = attribute_node["key"].as<std::string>(),
|
||||
@ -132,10 +136,64 @@ bool OpenTelemetryPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcep
|
||||
reader_opts.export_interval_millis = std::chrono::milliseconds(options_.metrics_export_interval_ms);
|
||||
reader_opts.export_timeout_millis = std::chrono::milliseconds(options_.metrics_export_timeout_ms);
|
||||
|
||||
auto reader =
|
||||
metric_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), reader_opts);
|
||||
auto reader = metric_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), reader_opts);
|
||||
|
||||
std::vector<double> bucket_boundaries;
|
||||
if (!options_.rpc_time_cost_histogram_boundaries.empty()) {
|
||||
bucket_boundaries = options_.rpc_time_cost_histogram_boundaries;
|
||||
} else {
|
||||
bucket_boundaries.resize(32);
|
||||
double val = 1.0;
|
||||
std::generate(bucket_boundaries.begin(), bucket_boundaries.end(),
|
||||
[&val]() {
|
||||
double current = val;
|
||||
val *= 2.0;
|
||||
return current;
|
||||
});
|
||||
}
|
||||
|
||||
auto views = metric_sdk::ViewRegistryFactory::Create();
|
||||
// configure RPC client time cost histogram
|
||||
std::unique_ptr<metric_sdk::InstrumentSelector> client_instrument_selector = std::make_unique<metric_sdk::InstrumentSelector>(
|
||||
metric_sdk::InstrumentType::kHistogram,
|
||||
"rpc.client.time_cost",
|
||||
"us");
|
||||
std::unique_ptr<metric_sdk::MeterSelector> client_meter_selector = std::make_unique<metric_sdk::MeterSelector>(options_.node_name, "", "");
|
||||
|
||||
std::shared_ptr<metric_sdk::HistogramAggregationConfig> client_config = std::make_shared<metric_sdk::HistogramAggregationConfig>();
|
||||
client_config->boundaries_ = bucket_boundaries;
|
||||
|
||||
std::unique_ptr<metric_sdk::View> client_view = std::make_unique<metric_sdk::View>(
|
||||
"rpc_client_time_cost",
|
||||
"RPC client time cost histogram view",
|
||||
"us",
|
||||
metric_sdk::AggregationType::kHistogram,
|
||||
client_config);
|
||||
|
||||
views->AddView(std::move(client_instrument_selector),
|
||||
std::move(client_meter_selector),
|
||||
std::move(client_view));
|
||||
|
||||
// configure RPC server time cost histogram
|
||||
std::unique_ptr<metric_sdk::InstrumentSelector> server_instrument_selector = std::make_unique<metric_sdk::InstrumentSelector>(
|
||||
metric_sdk::InstrumentType::kHistogram,
|
||||
"rpc.server.time_cost",
|
||||
"us");
|
||||
std::unique_ptr<metric_sdk::MeterSelector> server_meter_selector = std::make_unique<metric_sdk::MeterSelector>(options_.node_name, "", "");
|
||||
|
||||
std::shared_ptr<metric_sdk::HistogramAggregationConfig> server_config = std::make_shared<metric_sdk::HistogramAggregationConfig>();
|
||||
server_config->boundaries_ = bucket_boundaries;
|
||||
|
||||
std::unique_ptr<metric_sdk::View> server_view = std::make_unique<metric_sdk::View>(
|
||||
"rpc_server_time_cost",
|
||||
"RPC server time cost histogram view",
|
||||
"us",
|
||||
metric_sdk::AggregationType::kHistogram,
|
||||
server_config);
|
||||
|
||||
views->AddView(std::move(server_instrument_selector),
|
||||
std::move(server_meter_selector),
|
||||
std::move(server_view));
|
||||
|
||||
auto context = metric_sdk::MeterContextFactory::Create(std::move(views), resource);
|
||||
context->AddMetricReader(std::move(reader));
|
||||
@ -143,12 +201,24 @@ bool OpenTelemetryPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexcep
|
||||
meter_provider_ = metric_sdk::MeterProviderFactory::Create(std::move(context));
|
||||
|
||||
meter_ = meter_provider_->GetMeter(options_.node_name);
|
||||
|
||||
// channel
|
||||
chn_pub_msg_num_counter_ = meter_->CreateUInt64Counter("chn.pub.msg_num", "Total num of channel publish msg");
|
||||
chn_sub_msg_num_counter_ = meter_->CreateUInt64Counter("chn.sub.msg_num", "Total num of channel subscribe msg");
|
||||
|
||||
chn_pub_msg_size_counter_ = meter_->CreateUInt64Counter("chn.pub.msg_size", "Total size of channel publish msg", "bytes");
|
||||
chn_sub_msg_size_counter_ = meter_->CreateUInt64Counter("chn.sub.msg_size", "Total size of channel subscribe msg", "bytes");
|
||||
|
||||
// rpc
|
||||
rpc_client_invoke_num_counter_ = meter_->CreateUInt64Counter("rpc.client.invoke_num", "Total num of rpc client invoke");
|
||||
rpc_server_invoke_num_counter_ = meter_->CreateUInt64Counter("rpc.server.invoke_num", "Total num of rpc server invoke");
|
||||
|
||||
rpc_client_req_size_counter_ = meter_->CreateUInt64Counter("rpc.client.req_size", "Total size of rpc client request", "bytes");
|
||||
rpc_client_rsp_size_counter_ = meter_->CreateUInt64Counter("rpc.client.rsp_size", "Total size of rpc client response", "bytes");
|
||||
rpc_server_req_size_counter_ = meter_->CreateUInt64Counter("rpc.server.req_size", "Total size of rpc server request", "bytes");
|
||||
rpc_server_rsp_size_counter_ = meter_->CreateUInt64Counter("rpc.server.rsp_size", "Total size of rpc server response", "bytes");
|
||||
|
||||
rpc_client_time_cost_histogram_ = meter_->CreateDoubleHistogram("rpc.client.time_cost", "Time cost of rpc client", "us");
|
||||
rpc_server_time_cost_histogram_ = meter_->CreateDoubleHistogram("rpc.server.time_cost", "Time cost of rpc server", "us");
|
||||
}
|
||||
|
||||
// register hook
|
||||
@ -312,7 +382,8 @@ void OpenTelemetryPlugin::ChannelTraceFilter(
|
||||
auto ctx_ref = msg_wrapper.ctx_ref;
|
||||
const auto& info = msg_wrapper.info;
|
||||
|
||||
// 如果设置了全局强制trace,或者context强制设置了start_new_trace,或者上层传递了span,则新启动一个span
|
||||
// if global force trace is set, or context force sets start_new_trace, or an upper layer passes a span,
|
||||
// then start a new span
|
||||
bool start_new_trace = options_.force_trace;
|
||||
|
||||
if (!start_new_trace) {
|
||||
@ -323,7 +394,7 @@ void OpenTelemetryPlugin::ChannelTraceFilter(
|
||||
|
||||
ContextCarrier carrier(ctx_ref);
|
||||
|
||||
// 解压传进来的context,得到父span
|
||||
// unpack the incoming context to get the parent span
|
||||
trace_api::StartSpanOptions op;
|
||||
op.kind = (type == ChannelFilterType::kPublisher)
|
||||
? trace_api::SpanKind::kProducer
|
||||
@ -339,34 +410,34 @@ void OpenTelemetryPlugin::ChannelTraceFilter(
|
||||
start_new_trace = true;
|
||||
}
|
||||
|
||||
// 不需要启动一个新trace
|
||||
// no need to start a new trace
|
||||
if (!start_new_trace) {
|
||||
h(msg_wrapper);
|
||||
return;
|
||||
}
|
||||
|
||||
// 需要启动一个新trace
|
||||
// need to start a new trace
|
||||
std::string span_name = msg_wrapper.info.topic_name + "/" + msg_wrapper.info.msg_type;
|
||||
auto span = tracer_->StartSpan(span_name, op);
|
||||
|
||||
// 先发布数据
|
||||
// publish msg first
|
||||
h(msg_wrapper);
|
||||
|
||||
// 将当前span的context打包
|
||||
// pack current span's context
|
||||
opentelemetry::context::Context output_ot_ctx(trace_api::kSpanKey, span);
|
||||
propagator_->Inject(carrier, output_ot_ctx);
|
||||
|
||||
// 添加base信息
|
||||
// add base info
|
||||
span->SetAttribute("module_name", info.module_name);
|
||||
|
||||
// 添加context中的属性
|
||||
// add context attributes
|
||||
auto keys = ctx_ref.GetMetaKeys();
|
||||
for (auto& item : keys) {
|
||||
span->SetAttribute(item, ctx_ref.GetMetaValue(item));
|
||||
}
|
||||
|
||||
if (upload_msg) {
|
||||
// 序列化包成json
|
||||
// serialize msg to json
|
||||
auto buf_ptr = aimrt::runtime::core::channel::TrySerializeMsgWithCache(msg_wrapper, "json");
|
||||
if (buf_ptr) {
|
||||
auto msg_str = buf_ptr->JoinToString();
|
||||
@ -384,7 +455,8 @@ void OpenTelemetryPlugin::RpcTraceFilter(
|
||||
aimrt::runtime::core::rpc::FrameworkAsyncRpcHandle&& h) {
|
||||
auto ctx_ref = wrapper_ptr->ctx_ref;
|
||||
|
||||
// 如果设置了全局强制trace,或者context强制设置了start_new_trace,或者上层传递了span,则新启动一个span
|
||||
// if global force trace is set, or context force sets start_new_trace, or an upper layer passes a span,
|
||||
// then start a new span
|
||||
bool start_new_trace = options_.force_trace;
|
||||
|
||||
if (!start_new_trace) {
|
||||
@ -395,7 +467,7 @@ void OpenTelemetryPlugin::RpcTraceFilter(
|
||||
|
||||
ContextCarrier carrier(ctx_ref);
|
||||
|
||||
// 解压传进来的context,得到父span
|
||||
// unpack the incoming context to get the parent span
|
||||
trace_api::StartSpanOptions op;
|
||||
op.kind = (type == RpcFilterType::kClient)
|
||||
? trace_api::SpanKind::kClient
|
||||
@ -411,16 +483,16 @@ void OpenTelemetryPlugin::RpcTraceFilter(
|
||||
start_new_trace = true;
|
||||
}
|
||||
|
||||
// 不需要启动一个新trace
|
||||
// no need to start a new trace
|
||||
if (!start_new_trace) {
|
||||
h(wrapper_ptr);
|
||||
return;
|
||||
}
|
||||
|
||||
// 需要启动一个新trace
|
||||
// need to start a new trace
|
||||
auto span = tracer_->StartSpan(ctx_ref.GetFunctionName(), op);
|
||||
|
||||
// 将当前span的context打包
|
||||
// pack current span's context
|
||||
opentelemetry::context::Context output_ot_ctx(trace_api::kSpanKey, span);
|
||||
propagator_->Inject(carrier, output_ot_ctx);
|
||||
|
||||
@ -438,17 +510,17 @@ void OpenTelemetryPlugin::RpcTraceFilter(
|
||||
auto ctx_ref = wrapper_ptr->ctx_ref;
|
||||
const auto& info = wrapper_ptr->info;
|
||||
|
||||
// 添加base信息
|
||||
// add base info
|
||||
span->SetAttribute("module_name", info.module_name);
|
||||
|
||||
// 添加context中的属性
|
||||
// add context attributes
|
||||
auto keys = ctx_ref.GetMetaKeys();
|
||||
for (auto& item : keys) {
|
||||
span->SetAttribute(item, ctx_ref.GetMetaValue(item));
|
||||
}
|
||||
|
||||
if (upload_msg) {
|
||||
// 序列化req/rsp为json
|
||||
// serialize req/rsp to json
|
||||
auto req_buf_ptr = aimrt::runtime::core::rpc::TrySerializeReqWithCache(*wrapper_ptr, "json");
|
||||
if (req_buf_ptr) {
|
||||
auto req_json = req_buf_ptr->JoinToString();
|
||||
@ -466,7 +538,6 @@ void OpenTelemetryPlugin::RpcTraceFilter(
|
||||
|
||||
callback(status);
|
||||
};
|
||||
|
||||
h(wrapper_ptr);
|
||||
}
|
||||
|
||||
@ -476,18 +547,13 @@ void OpenTelemetryPlugin::ChannelMetricsFilter(
|
||||
aimrt::runtime::core::channel::FrameworkAsyncChannelHandle&& h) {
|
||||
// publish msg first
|
||||
h(msg_wrapper);
|
||||
|
||||
// get counter
|
||||
metrics_api::Counter<uint64_t>* msg_num_counter_ptr = nullptr;
|
||||
metrics_api::Counter<uint64_t>* msg_size_counter_ptr = nullptr;
|
||||
|
||||
if (type == ChannelFilterType::kPublisher) {
|
||||
msg_num_counter_ptr = chn_pub_msg_num_counter_.get();
|
||||
msg_size_counter_ptr = chn_pub_msg_size_counter_.get();
|
||||
} else {
|
||||
msg_num_counter_ptr = chn_sub_msg_num_counter_.get();
|
||||
msg_size_counter_ptr = chn_sub_msg_size_counter_.get();
|
||||
}
|
||||
auto& msg_num_counter_ptr = (type == ChannelFilterType::kPublisher)
|
||||
? chn_pub_msg_num_counter_
|
||||
: chn_sub_msg_num_counter_;
|
||||
auto& msg_size_counter_ptr = (type == ChannelFilterType::kPublisher)
|
||||
? chn_pub_msg_size_counter_
|
||||
: chn_sub_msg_size_counter_;
|
||||
|
||||
// make labels
|
||||
auto ctx_ref = msg_wrapper.ctx_ref;
|
||||
@ -533,7 +599,6 @@ void OpenTelemetryPlugin::ChannelMetricsFilter(
|
||||
}
|
||||
}
|
||||
|
||||
labels.emplace("serialization_type", serialization_type);
|
||||
msg_size_counter_ptr->Add(msg_size, labels);
|
||||
}
|
||||
|
||||
@ -541,7 +606,97 @@ void OpenTelemetryPlugin::RpcMetricsFilter(
|
||||
RpcFilterType type,
|
||||
const std::shared_ptr<aimrt::runtime::core::rpc::InvokeWrapper>& wrapper_ptr,
|
||||
aimrt::runtime::core::rpc::FrameworkAsyncRpcHandle&& h) {
|
||||
auto begin_time = std::chrono::steady_clock::now();
|
||||
wrapper_ptr->callback = [this, type, wrapper_ptr,
|
||||
begin_time,
|
||||
callback{std::move(wrapper_ptr->callback)}](aimrt::rpc::Status status) {
|
||||
auto end_time = std::chrono::steady_clock::now();
|
||||
|
||||
auto time_cost = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
end_time - begin_time)
|
||||
.count();
|
||||
|
||||
const auto& info = wrapper_ptr->info;
|
||||
std::map<std::string, std::string> labels{
|
||||
{"module_name", info.module_name},
|
||||
{"func_name", info.func_name},
|
||||
{"status", std::string(status.GetCodeMsg(status.Code()))},
|
||||
};
|
||||
|
||||
// choose ptr depends on type
|
||||
auto& rpc_msg_num_counter_ptr = (type == RpcFilterType::kClient)
|
||||
? rpc_client_invoke_num_counter_
|
||||
: rpc_server_invoke_num_counter_;
|
||||
|
||||
auto& rpc_msg_req_size_counter_ptr = (type == RpcFilterType::kClient)
|
||||
? rpc_client_req_size_counter_
|
||||
: rpc_server_req_size_counter_;
|
||||
|
||||
auto& rpc_msg_rsp_size_counter_ptr = (type == RpcFilterType::kClient)
|
||||
? rpc_client_rsp_size_counter_
|
||||
: rpc_server_rsp_size_counter_;
|
||||
|
||||
auto& time_cost_histogram_ptr = (type == RpcFilterType::kClient)
|
||||
? rpc_client_time_cost_histogram_
|
||||
: rpc_server_time_cost_histogram_;
|
||||
|
||||
time_cost_histogram_ptr->Record(time_cost, labels, opentelemetry::context::Context());
|
||||
|
||||
// msg num
|
||||
rpc_msg_num_counter_ptr->Add(1, labels);
|
||||
|
||||
size_t req_msg_size = 0, rsp_msg_size = 0;
|
||||
|
||||
// req msg size
|
||||
const auto& req_serialization_cache = wrapper_ptr->req_serialization_cache;
|
||||
const auto& req_serialization_type_span = info.req_type_support_ref.SerializationTypesSupportedListSpan();
|
||||
std::string_view req_serialization_type;
|
||||
|
||||
if (req_serialization_cache.size() == 1) {
|
||||
req_serialization_type = req_serialization_cache.begin()->first;
|
||||
req_msg_size = req_serialization_cache.begin()->second->BufferSize();
|
||||
} else {
|
||||
for (auto item : req_serialization_type_span) {
|
||||
auto cur_serialization_type = aimrt::util::ToStdStringView(item);
|
||||
|
||||
auto finditr = req_serialization_cache.find(cur_serialization_type);
|
||||
if (finditr == req_serialization_cache.end()) [[unlikely]]
|
||||
continue;
|
||||
|
||||
req_serialization_type = cur_serialization_type;
|
||||
req_msg_size = finditr->second->BufferSize();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rpc_msg_req_size_counter_ptr->Add(req_msg_size, labels);
|
||||
|
||||
// rsp msg size
|
||||
const auto& rsp_serialization_cache = wrapper_ptr->rsp_serialization_cache;
|
||||
const auto& rsp_serialization_type_span = info.rsp_type_support_ref.SerializationTypesSupportedListSpan();
|
||||
std::string_view rsp_serialization_type;
|
||||
|
||||
if (rsp_serialization_cache.size() == 1) {
|
||||
rsp_serialization_type = rsp_serialization_cache.begin()->first;
|
||||
rsp_msg_size = rsp_serialization_cache.begin()->second->BufferSize();
|
||||
} else {
|
||||
for (auto item : rsp_serialization_type_span) {
|
||||
auto cur_serialization_type = aimrt::util::ToStdStringView(item);
|
||||
|
||||
auto finditr = rsp_serialization_cache.find(cur_serialization_type);
|
||||
if (finditr == rsp_serialization_cache.end()) [[unlikely]]
|
||||
continue;
|
||||
|
||||
rsp_serialization_type = cur_serialization_type;
|
||||
rsp_msg_size = finditr->second->BufferSize();
|
||||
break;
|
||||
}
|
||||
}
|
||||
rpc_msg_rsp_size_counter_ptr->Add(rsp_msg_size, labels);
|
||||
|
||||
callback(status);
|
||||
};
|
||||
|
||||
h(wrapper_ptr);
|
||||
}
|
||||
|
||||
} // namespace aimrt::plugins::opentelemetry_plugin
|
||||
} // namespace aimrt::plugins::opentelemetry_plugin
|
@ -62,6 +62,7 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase {
|
||||
std::string metrics_otlp_http_exporter_url;
|
||||
uint32_t metrics_export_interval_ms = 15000;
|
||||
uint32_t metrics_export_timeout_ms = 5000;
|
||||
std::vector<double> rpc_time_cost_histogram_boundaries;
|
||||
|
||||
struct Attribute {
|
||||
std::string key;
|
||||
@ -136,6 +137,7 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase {
|
||||
std::shared_ptr<opentelemetry::metrics::Meter> meter_;
|
||||
|
||||
using u64_counter = std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>;
|
||||
using db_histogram = std::unique_ptr<opentelemetry::metrics::Histogram<double>>;
|
||||
|
||||
u64_counter chn_pub_msg_num_counter_;
|
||||
u64_counter chn_sub_msg_num_counter_;
|
||||
@ -148,6 +150,8 @@ class OpenTelemetryPlugin : public AimRTCorePluginBase {
|
||||
u64_counter rpc_client_rsp_size_counter_;
|
||||
u64_counter rpc_server_req_size_counter_;
|
||||
u64_counter rpc_server_rsp_size_counter_;
|
||||
db_histogram rpc_client_time_cost_histogram_;
|
||||
db_histogram rpc_server_time_cost_histogram_;
|
||||
};
|
||||
|
||||
} // namespace aimrt::plugins::opentelemetry_plugin
|
||||
} // namespace aimrt::plugins::opentelemetry_plugin
|
Loading…
x
Reference in New Issue
Block a user