feat: Asynchronous connection to mqtt broker (#85)

* feat: Asynchronous connection to mqtt  broker

* add new option: client_key_password

* add example for mqtt plugin with ssl/tls

* add new info at release notes

* Make minor formatting adjustments.

* Improve  mqtt_plugin's README documentation

---------

Co-authored-by: hanjun <hanjun@agibot.com>
This commit is contained in:
han J 2024-11-08 16:13:10 +08:00 committed by GitHub
parent e01333313f
commit 5b01f4e9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 281 additions and 50 deletions

View File

@ -6,7 +6,6 @@
- 更新 zenohc 库至 1.0.0.11 版本;
- 添加了 zenoh rpc 后端;
- 现在可以传入 zenoh 原生配置;
- mqtt 新增配置项以支持加密传输;
- 新增了第三方库 asioruntime::core 不再引用 boost改为引用独立的 asio 库,以减轻依赖;
- 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件;
- 新增 Echo 插件,用于回显消息;
@ -27,3 +26,5 @@
- 删除一些未使用的协议;
- 支持日志自定义输出格式;
- grpc 插件支持 ros2 消息以及 json 序列化格式;
- mqtt 新增配置项以支持 ssl/tls 加密传输;
- mqtt 插件在broker未启动时会自动重试异步连接

View File

@ -17,19 +17,24 @@
插件的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| -------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
| truststore | string | 可选 | "" | broker的CA证书路径 |
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ------------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
| truststore | string | 可选 | "" | CA证书路径 |
| client_cert | string | 可选 | "" | 客户端证书路径 |
| client_key | string | 可选 | "" | 客户端私钥路径 |
| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 |
关于**mqtt_plugin**的配置,使用注意点如下:
- `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。
- `client_id`表示本节点连接 mqtt broker 时的 client id。
- `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意必须 broker 也要支持该尺寸才行。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项, 请注意若只配置该选项则视为单向认证。
- `client_cert`表示客户端证书路径,例如`/etc/emqx/certs/client-cert.pem`。当需要双向认证时使用,与`client_key`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。
- `client_key`表示客户端私钥路径,例如`/etc/emqx/certs/client-key.pem`。当需要双向认证时使用,与`client_cert`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。
- `client_key_password`表示客户端私钥设置的密码,如果私钥设置了密码,则需要设置该选项。如果 broker_addr 使用非加密协议,该选项将被忽略。
**mqtt_plugin**插件基于[paho.mqtt.c](https://github.com/eclipse/paho.mqtt.c)封装在使用时Channel 订阅回调、RPC Server 处理方法、RPC Client 返回时,使用的都是**paho.mqtt.c**提供的线程,当使用者在回调中阻塞了线程时,有可能导致无法继续接收/发送消息。正如 Module 接口文档中所述,一般来说,如果回调中的任务非常轻量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里处理。

View File

@ -133,3 +133,80 @@
说明:
- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议;
## protobuf channel with SSL/TLS
一个基于 protobuf 协议与 mqtt 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**mqtt_plugin**
- 如何使用 mqtt 类型的 channel 后端;
- 如何配置 SSL/TLS 加密;
核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)
配置文件:
- [examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml](./install/linux/bin/cfg/examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml)
- [examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml](./install/linux/bin/cfg/examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml)
运行方式linux
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_MQTT_PLUGIN` 选项编译 AimRT
- 生成加密通信所需的证书和密钥文件主要需要的是ca_crt.pem、server_crt.pem、server_key.pem、client_crt.pem、client_key.pem 这五个文件),以下是一个简单的生成示例:
```shell
# 生成 CA 私钥
openssl genpkey -algorithm RSA -out ca_key.pem
# 生成 CA 自签名证书
openssl req -x509 -new -key ca_key.pem -sha256 -days 3650 -out ca_crt.pem
```
```shell
# 生成服务器私钥
openssl genpkey -algorithm RSA -out server_key.pem
# 生成服务器证书签名请求 (CSR)
openssl req -new -key server_key.pem -out server_csr.pem
# 使用 CA 签署服务器证书
openssl x509 -req -in server_csr.pem -CA ca_crt.pem -CAkey ca_key.pem -CAcreateserial -out server_crt.pem -days 365 -sha256
```
```shell
# 生成客户端私钥
openssl genpkey -algorithm RSA -out client_key.pem
# 生成客户端证书签名请求 (CSR)
openssl req -new -key client_key.pem -out client_csr.pem
# 使用 CA 签署客户端证书
openssl x509 -req -in client_csr.pem -CA ca_crt.pem -CAkey ca_key.pem -CAcreateserial -out client_crt.pem -days 365 -sha256
```
```shell
# [可选] 使用 OpenSSL 的 openssl pkcs8 命令将 client 的私钥加密
openssl pkcs8 -topk8 -inform PEM -outform PEM -in client_key.pem -out client_key_encrypted.pem -v2 aes-256-cbc
```
- 将生成的 `ca_crt.pem``server_crt.pem``server_key.pem` 文件的路径复制到 broker 配置文件中对应的位置, 并配置地址(默认 0.0.0.0:8883以及是否需要双向认证默认不开启
- 将生成的 `ca_crt.pem``client_crt.pem``client_key.pem` 文件的路径依次复制到客户端配置文件中对应的 `truststore``client_cert``client_key` 如果设置客户端私钥文件被加密,则将设置的密码配置在`client_key_password`中;
- 在本地启动一个 mqtt broker也可以使用其他 IP 地址的 mqtt broker但需要修改示例配置中的 `broker_addr`
- 在终端运行 build 目录下 `start_examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl.sh` 脚本启动订阅端( sub 进程);
- 再开启一个新的终端窗口运行 `start_examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl.sh` 脚本启动发布端( pub 进程);
- 分别在两个终端键入 `ctrl-c`停止对应进程;
说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**mqtt_plugin**,并使用 mqtt 类型的 channel 后端进行通信,配置 `ssl://127.0.0.1:8883` 作为 broker 地址, 默认该端口用于 SSL/TLS 加密通信;
- `直接运行给定的示例不能正常工作,需要修改配置文件中证书和密钥的路径为实际生成位置`

View File

@ -0,0 +1,43 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: ssl://127.0.0.1:8883
client_id: example_pb_chn_publisher
max_pkg_size_k: 1024
truststore: /XX/YY/ZZ/cacert.pem # replace with your own truststore path
client_cert: /XX/YY/ZZ/client-cert.pem # replace with your own client certificate path
client_key: /XX/YY/ZZ/client-key.pem # replace with your own client key path
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 2
channel:
backends:
- type: mqtt
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: INFO
# Module custom configuration
NormalPublisherModule:
topic_name: test_topic
channel_frq: 0.5

View File

@ -0,0 +1,37 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: ssl://127.0.0.1:8883
client_id: example_pb_chn_subscriber
max_pkg_size_k: 1024
truststore: /XX/YY/ZZ/cacert.pem # replace with your own truststore path
client_cert: /XX/YY/ZZ/client-cert.pem # replace with your own client certificate path
client_key: /XX/YY/ZZ/client-key.pem # replace with your own client key path
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: mqtt
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [NormalSubscriberModule]
modules:
- name: NormalSubscriberModule
log_lvl: INFO
# Module custom configuration
NormalSubscriberModule:
topic_name: test_topic

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml

View File

@ -0,0 +1,3 @@
#!/bin/bash
./aimrt_main --cfg_file_path=./cfg/examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml

View File

@ -93,8 +93,6 @@ void MqttChannelBackend::Start() {
AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'.");
SubscribeMqttTopic();
}
void MqttChannelBackend::Shutdown() {

View File

@ -21,6 +21,9 @@ struct convert<aimrt::plugins::mqtt_plugin::MqttPlugin::Options> {
node["client_id"] = rhs.client_id;
node["max_pkg_size_k"] = rhs.max_pkg_size_k;
node["truststore"] = rhs.truststore;
node["client_cert"] = rhs.client_cert;
node["client_key"] = rhs.client_key;
node["client_key_password"] = rhs.client_key_password;
return node;
}
@ -37,6 +40,15 @@ struct convert<aimrt::plugins::mqtt_plugin::MqttPlugin::Options> {
if (node["truststore"])
rhs.truststore = node["truststore"].as<std::string>();
if (node["client_cert"])
rhs.client_cert = node["client_cert"].as<std::string>();
if (node["client_key"])
rhs.client_key = node["client_key"].as<std::string>();
if (node["client_key_password"])
rhs.client_key_password = node["client_key_password"].as<std::string>();
return true;
}
};
@ -71,42 +83,8 @@ bool MqttPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept {
},
NULL);
// connect to broker
std::promise<bool> connect_ret_promise;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
// check broker_add protocol is ssl/mqtts or not
auto ret = common::util::ParseUrl(options_.broker_addr);
AIMRT_CHECK_ERROR_THROW(ret != std::nullopt, "Parse broker_addr failed");
if (ret->protocol == "ssl" || ret->protocol == "mqtts") {
AIMRT_CHECK_ERROR_THROW(!options_.truststore.empty(), "Use ssl/mqtts must set truststore");
ssl_opts.trustStore = options_.truststore.c_str();
conn_opts.ssl = &ssl_opts;
} else {
AIMRT_CHECK_WARN(options_.truststore.empty(), "Broker protocol is not ssl/mqtts, the truststore you set will be ignored.");
}
conn_opts.onSuccess = [](void *context, MQTTAsync_successData *response) {
static_cast<std::promise<bool> *>(context)->set_value(true);
};
conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) {
AIMRT_ERROR("Failed to connect mqtt broker, code: {}, msg: {}",
response->code, response->message);
static_cast<std::promise<bool> *>(context)->set_value(false);
};
conn_opts.context = &connect_ret_promise;
int rc = MQTTAsync_connect(client_, &conn_opts);
AIMRT_CHECK_ERROR_THROW(rc == MQTTASYNC_SUCCESS, "Failed to connect mqtt broker, return code: {}", rc);
bool connect_ret = connect_ret_promise.get_future().get();
AIMRT_CHECK_ERROR_THROW(connect_ret, "Failed to connect mqtt broker");
// connect to broker, which is an async operation
AsyncConnect();
msg_handle_registry_ptr_ = std::make_shared<MsgHandleRegistry>();
@ -122,6 +100,9 @@ bool MqttPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept {
plugin_options_node = options_;
core_ptr_->GetPluginManager().UpdatePluginOptionsNode(Name(), plugin_options_node);
// Wait a moment for the connection to be established
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return true;
} catch (const std::exception &e) {
AIMRT_ERROR("Initialize failed, {}", e.what());
@ -188,6 +169,60 @@ void MqttPlugin::RegisterMqttRpcBackend() {
core_ptr_->GetRpcManager().RegisterRpcBackend(std::move(mqtt_rpc_backend_ptr));
}
void MqttPlugin::AsyncConnect() {
if (stop_flag_.load()) return;
// connect to broker
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
// check broker_add protocol is ssl/mqtts or not
auto ret = common::util::ParseUrl(options_.broker_addr);
AIMRT_CHECK_ERROR_THROW(ret != std::nullopt, "Parse broker_addr failed");
// check if need to set ssl options
if (ret->protocol == "ssl" || ret->protocol == "mqtts") {
SetSSL(conn_opts, ssl_opts);
} else {
AIMRT_CHECK_WARN(options_.truststore.empty(), "Broker protocol is not ssl/mqtts, the truststore you set will be ignored.");
AIMRT_CHECK_WARN(options_.client_cert.empty(), "Broker protocol is not ssl/mqtts, the client_cert you set will be ignored.");
AIMRT_CHECK_WARN(options_.client_key.empty(), "Broker protocol is not ssl/mqtts, the client_key you set will be ignored.");
AIMRT_CHECK_WARN(options_.client_key_password.empty(), "Broker protocol is not ssl/mqtts, the client_key_password you set will be ignored.");
}
// if connect success, call all registered hook functions to subscribe mqtt topic
conn_opts.onSuccess = [](void *context, MQTTAsync_successData *response) {
AIMRT_INFO("Connect to mqtt broker success.");
auto *mqtt_plugin_ptr = static_cast<MqttPlugin *>(context);
// Synchronize reconnect_hook_
while (mqtt_plugin_ptr->reconnect_hook_.size() < 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
for (const auto &f : mqtt_plugin_ptr->reconnect_hook_)
f();
};
// if connect failed, call connect again
conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) {
AIMRT_WARN("Failed to connect mqtt broker, code: {}, msg: {}",
response->code, response->message);
static_cast<MqttPlugin *>(context)->AsyncConnect();
};
conn_opts.context = this;
int rc = MQTTAsync_connect(client_, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) {
AIMRT_ERROR("Failed to start connection, rc: {}", rc);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
AsyncConnect(); // todo: avoid stack overflow
}
}
void MqttPlugin::OnConnectLost(const char *cause) {
AIMRT_WARN("Lost connect to mqtt broker, cause {}", (cause == nullptr) ? "nil" : cause);
@ -211,7 +246,8 @@ void MqttPlugin::OnConnectLost(const char *cause) {
int rc = MQTTAsync_connect(client_, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) {
AIMRT_ERROR("Failed to connect mqtt broker, return code: {}", rc);
AIMRT_ERROR("Failed to reconnect mqtt broker, return code: {}", rc);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
OnConnectLost("Reconnect failed"); // TODO: 得防止爆栈
}
}
@ -224,4 +260,29 @@ int MqttPlugin::OnMsgRecv(char *topic, int topic_len, MQTTAsync_message *message
return 1;
}
void MqttPlugin::SetSSL(MQTTAsync_connectOptions &conn_opts, MQTTAsync_SSLOptions &ssl_opts) const {
// check if set ca file
AIMRT_CHECK_ERROR_THROW(!options_.truststore.empty(), "Use ssl/mqtts must set truststore");
ssl_opts.trustStore = options_.truststore.c_str();
bool has_cert = !options_.client_cert.empty();
bool has_key = !options_.client_key.empty();
// client_cert and client_key must be set together ,which means use double authentication
if (has_cert || has_key) {
AIMRT_CHECK_ERROR_THROW(has_cert && has_key,
"When using client certificate authentication, both cert_path and key_path must be set");
// set client certificate and key
ssl_opts.keyStore = options_.client_cert.c_str();
ssl_opts.privateKey = options_.client_key.c_str();
// set client key password
if (!options_.client_key_password.empty()) {
ssl_opts.privateKeyPassword = options_.client_key_password.c_str();
}
}
conn_opts.ssl = &ssl_opts;
}
} // namespace aimrt::plugins::mqtt_plugin

View File

@ -22,6 +22,9 @@ class MqttPlugin : public AimRTCorePluginBase {
std::string client_id;
uint32_t max_pkg_size_k = 1024;
std::string truststore;
std::string client_cert;
std::string client_key;
std::string client_key_password;
};
public:
@ -35,9 +38,11 @@ class MqttPlugin : public AimRTCorePluginBase {
private:
void SetPluginLogger();
void SetSSL(MQTTAsync_connectOptions &conn_opts, MQTTAsync_SSLOptions &ssl_opts) const;
void RegisterMqttRpcBackend();
void RegisterMqttChannelBackend();
void AsyncConnect();
void OnConnectLost(const char *cause);
int OnMsgRecv(char *topic, int topic_len, MQTTAsync_message *message);

View File

@ -132,8 +132,6 @@ void MqttRpcBackend::Start() {
AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'.");
SubscribeMqttTopic();
}
void MqttRpcBackend::Shutdown() {