fix: optimize reconnection steps. (#93)

* Optimize reconnection steps.

* Using condition variables to establish a connection with the broker.

* use aimrt's light_signal

* add condition variable  for channel and rpc

* Change the notify timing to the start phase

* Remove redundancy

* Modify the timing of the sleep call

---------

Co-authored-by: hanjun <hanjun@agibot.com>
This commit is contained in:
han J 2024-11-13 19:33:12 +08:00 committed by GitHub
parent 7534ac360a
commit b9d861c327
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 39 additions and 23 deletions

View File

@ -17,20 +17,23 @@
插件的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ------------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
| truststore | string | 可选 | "" | CA证书路径 |
| client_cert | string | 可选 | "" | 客户端证书路径 |
| client_key | string | 可选 | "" | 客户端私钥路径 |
| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 |
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| --------------------- | ------ | -------- | ------ | --------------------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
| reconnect_interval_ms | int | 可选 | 1000 | 重连 broker 的时间间隔, 单位ms |
| truststore | string | 可选 | "" | CA证书路径 |
| client_cert | string | 可选 | "" | 客户端证书路径 |
| client_key | string | 可选 | "" | 客户端私钥路径 |
| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 |
关于**mqtt_plugin**的配置,使用注意点如下:
- `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。
- `client_id`表示本节点连接 mqtt broker 时的 client id。
- `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意必须 broker 也要支持该尺寸才行。
- `reconnect_interval_ms`表示重连 broker 的时间间隔,默认 1 秒。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项, 请注意若只配置该选项则视为单向认证。
- `client_cert`表示客户端证书路径,例如`/etc/emqx/certs/client-cert.pem`。当需要双向认证时使用,与`client_key`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。
- `client_key`表示客户端私钥路径,例如`/etc/emqx/certs/client-key.pem`。当需要双向认证时使用,与`client_cert`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。

View File

@ -93,6 +93,9 @@ void MqttChannelBackend::Start() {
AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'.");
// Wait a moment for the connection to be established
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
void MqttChannelBackend::Shutdown() {

View File

@ -20,6 +20,7 @@ struct convert<aimrt::plugins::mqtt_plugin::MqttPlugin::Options> {
node["broker_addr"] = rhs.broker_addr;
node["client_id"] = rhs.client_id;
node["max_pkg_size_k"] = rhs.max_pkg_size_k;
node["reconnect_interval_ms"] = rhs.reconnect_interval_ms;
node["truststore"] = rhs.truststore;
node["client_cert"] = rhs.client_cert;
node["client_key"] = rhs.client_key;
@ -49,6 +50,9 @@ struct convert<aimrt::plugins::mqtt_plugin::MqttPlugin::Options> {
if (node["client_key_password"])
rhs.client_key_password = node["client_key_password"].as<std::string>();
if (node["reconnect_interval_ms"])
rhs.reconnect_interval_ms = node["reconnect_interval_ms"].as<uint32_t>();
return true;
}
};
@ -97,12 +101,12 @@ bool MqttPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept {
core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPreInitChannel,
[this] { RegisterMqttChannelBackend(); });
core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPreStart,
[this] { signal_.Notify(); });
plugin_options_node = options_;
core_ptr_->GetPluginManager().UpdatePluginOptionsNode(Name(), plugin_options_node);
// Wait a moment for the connection to be established
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return true;
} catch (const std::exception &e) {
AIMRT_ERROR("Initialize failed, {}", e.what());
@ -144,7 +148,6 @@ void MqttPlugin::RegisterMqttChannelBackend() {
[ptr = static_cast<MqttChannelBackend *>(mqtt_channel_backend_ptr.get())]() {
ptr->SubscribeMqttTopic();
});
core_ptr_->GetChannelManager().RegisterChannelBackend(std::move(mqtt_channel_backend_ptr));
}
@ -165,7 +168,6 @@ void MqttPlugin::RegisterMqttRpcBackend() {
[ptr = static_cast<MqttRpcBackend *>(mqtt_rpc_backend_ptr.get())]() {
ptr->SubscribeMqttTopic();
});
core_ptr_->GetRpcManager().RegisterRpcBackend(std::move(mqtt_rpc_backend_ptr));
}
@ -198,10 +200,8 @@ void MqttPlugin::AsyncConnect() {
AIMRT_INFO("Connect to mqtt broker success.");
auto *mqtt_plugin_ptr = static_cast<MqttPlugin *>(context);
// Synchronize reconnect_hook_
while (mqtt_plugin_ptr->reconnect_hook_.size() < 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
mqtt_plugin_ptr->signal_.Wait();
mqtt_plugin_ptr->signal_.Reset();
for (const auto &f : mqtt_plugin_ptr->reconnect_hook_)
f();
@ -211,14 +211,16 @@ void MqttPlugin::AsyncConnect() {
conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) {
AIMRT_WARN("Failed to connect mqtt broker, code: {}, msg: {}",
response->code, response->message);
static_cast<MqttPlugin *>(context)->AsyncConnect();
auto *mqtt_plugin_ptr = static_cast<MqttPlugin *>(context);
std::this_thread::sleep_for(std::chrono::milliseconds(mqtt_plugin_ptr->options_.reconnect_interval_ms));
mqtt_plugin_ptr->AsyncConnect();
};
conn_opts.context = this;
int rc = MQTTAsync_connect(client_, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) {
AIMRT_ERROR("Failed to start connection, rc: {}", rc);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(options_.reconnect_interval_ms));
AsyncConnect(); // todo: avoid stack overflow
}
}
@ -240,14 +242,16 @@ void MqttPlugin::OnConnectLost(const char *cause) {
f();
};
conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) {
static_cast<MqttPlugin *>(context)->OnConnectLost("Reconnect failed");
auto *mqtt_plugin_ptr = static_cast<MqttPlugin *>(context);
std::this_thread::sleep_for(std::chrono::milliseconds(mqtt_plugin_ptr->options_.reconnect_interval_ms));
mqtt_plugin_ptr->OnConnectLost("Reconnect failed");
};
conn_opts.context = this;
int rc = MQTTAsync_connect(client_, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) {
AIMRT_ERROR("Failed to reconnect mqtt broker, return code: {}", rc);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(options_.reconnect_interval_ms));
OnConnectLost("Reconnect failed"); // TODO: 得防止爆栈
}
}

View File

@ -12,6 +12,7 @@
#include "mqtt_plugin/mqtt_channel_backend.h"
#include "mqtt_plugin/mqtt_rpc_backend.h"
#include "mqtt_plugin/msg_handle_registry.h"
#include "util/light_signal.h"
namespace aimrt::plugins::mqtt_plugin {
@ -21,6 +22,7 @@ class MqttPlugin : public AimRTCorePluginBase {
std::string broker_addr;
std::string client_id;
uint32_t max_pkg_size_k = 1024;
uint32_t reconnect_interval_ms = 1000;
std::string truststore;
std::string client_cert;
std::string client_key;
@ -59,6 +61,8 @@ class MqttPlugin : public AimRTCorePluginBase {
std::shared_ptr<MsgHandleRegistry> msg_handle_registry_ptr_;
std::vector<std::function<void()>> reconnect_hook_;
common::util::LightSignal signal_;
};
} // namespace aimrt::plugins::mqtt_plugin

View File

@ -132,8 +132,10 @@ void MqttRpcBackend::Start() {
AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'.");
}
// Wait a moment for the connection to be established
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
void MqttRpcBackend::Shutdown() {
if (std::atomic_exchange(&state_, State::kShutdown) == State::kShutdown)
return;