diff --git a/document/sphinx-cn/tutorials/plugins/mqtt_plugin.md b/document/sphinx-cn/tutorials/plugins/mqtt_plugin.md index 5e86b4426..e9432db97 100644 --- a/document/sphinx-cn/tutorials/plugins/mqtt_plugin.md +++ b/document/sphinx-cn/tutorials/plugins/mqtt_plugin.md @@ -17,20 +17,23 @@ 插件的配置项如下: -| 节点 | 类型 | 是否可选 | 默认值 | 作用 | -| ------------------- | ------ | -------- | ------ | ----------------------- | -| broker_addr | string | 必选 | "" | mqtt broker 的地址 | -| client_id | string | 必选 | "" | 本节点的 mqtt client id | -| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸,单位:KB | -| truststore | string | 可选 | "" | CA证书路径 | -| client_cert | string | 可选 | "" | 客户端证书路径 | -| client_key | string | 可选 | "" | 客户端私钥路径 | -| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 | +| 节点 | 类型 | 是否可选 | 默认值 | 作用 | +| --------------------- | ------ | -------- | ------ | --------------------------------- | +| broker_addr | string | 必选 | "" | mqtt broker 的地址 | +| client_id | string | 必选 | "" | 本节点的 mqtt client id | +| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸,单位:KB | +| reconnect_interval_ms | int | 可选 | 1000 | 重连 broker 的时间间隔, 单位:ms | +| truststore | string | 可选 | "" | CA证书路径 | +| client_cert | string | 可选 | "" | 客户端证书路径 | +| client_key | string | 可选 | "" | 客户端私钥路径 | +| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 | + 关于**mqtt_plugin**的配置,使用注意点如下: - `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。 - `client_id`表示本节点连接 mqtt broker 时的 client id。 - `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意,必须 broker 也要支持该尺寸才行。 +- `reconnect_interval_ms`表示重连 broker 的时间间隔,默认 1 秒。 - `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项, 请注意若只配置该选项则视为单向认证。 - `client_cert`表示客户端证书路径,例如`/etc/emqx/certs/client-cert.pem`。当需要双向认证时使用,与`client_key`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。 - `client_key`表示客户端私钥路径,例如`/etc/emqx/certs/client-key.pem`。当需要双向认证时使用,与`client_cert`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。 diff --git a/src/plugins/mqtt_plugin/mqtt_channel_backend.cc b/src/plugins/mqtt_plugin/mqtt_channel_backend.cc index 507ce5fb0..a23dd7773 100644 --- a/src/plugins/mqtt_plugin/mqtt_channel_backend.cc +++ b/src/plugins/mqtt_plugin/mqtt_channel_backend.cc @@ -93,6 +93,9 @@ void MqttChannelBackend::Start() { AIMRT_CHECK_ERROR_THROW( std::atomic_exchange(&state_, State::kStart) == State::kInit, "Method can only be called when state is 'Init'."); + + // Wait a moment for the connection to be established + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } void MqttChannelBackend::Shutdown() { diff --git a/src/plugins/mqtt_plugin/mqtt_plugin.cc b/src/plugins/mqtt_plugin/mqtt_plugin.cc index 0689ccb4e..df63b4b0d 100644 --- a/src/plugins/mqtt_plugin/mqtt_plugin.cc +++ b/src/plugins/mqtt_plugin/mqtt_plugin.cc @@ -20,6 +20,7 @@ struct convert { node["broker_addr"] = rhs.broker_addr; node["client_id"] = rhs.client_id; node["max_pkg_size_k"] = rhs.max_pkg_size_k; + node["reconnect_interval_ms"] = rhs.reconnect_interval_ms; node["truststore"] = rhs.truststore; node["client_cert"] = rhs.client_cert; node["client_key"] = rhs.client_key; @@ -49,6 +50,9 @@ struct convert { if (node["client_key_password"]) rhs.client_key_password = node["client_key_password"].as(); + if (node["reconnect_interval_ms"]) + rhs.reconnect_interval_ms = node["reconnect_interval_ms"].as(); + return true; } }; @@ -97,12 +101,12 @@ bool MqttPlugin::Initialize(runtime::core::AimRTCore *core_ptr) noexcept { core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPreInitChannel, [this] { RegisterMqttChannelBackend(); }); + core_ptr_->RegisterHookFunc(runtime::core::AimRTCore::State::kPreStart, + [this] { signal_.Notify(); }); + plugin_options_node = options_; core_ptr_->GetPluginManager().UpdatePluginOptionsNode(Name(), plugin_options_node); - // Wait a moment for the connection to be established - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - return true; } catch (const std::exception &e) { AIMRT_ERROR("Initialize failed, {}", e.what()); @@ -144,7 +148,6 @@ void MqttPlugin::RegisterMqttChannelBackend() { [ptr = static_cast(mqtt_channel_backend_ptr.get())]() { ptr->SubscribeMqttTopic(); }); - core_ptr_->GetChannelManager().RegisterChannelBackend(std::move(mqtt_channel_backend_ptr)); } @@ -165,7 +168,6 @@ void MqttPlugin::RegisterMqttRpcBackend() { [ptr = static_cast(mqtt_rpc_backend_ptr.get())]() { ptr->SubscribeMqttTopic(); }); - core_ptr_->GetRpcManager().RegisterRpcBackend(std::move(mqtt_rpc_backend_ptr)); } @@ -198,10 +200,8 @@ void MqttPlugin::AsyncConnect() { AIMRT_INFO("Connect to mqtt broker success."); auto *mqtt_plugin_ptr = static_cast(context); - // Synchronize reconnect_hook_ - while (mqtt_plugin_ptr->reconnect_hook_.size() < 2) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + mqtt_plugin_ptr->signal_.Wait(); + mqtt_plugin_ptr->signal_.Reset(); for (const auto &f : mqtt_plugin_ptr->reconnect_hook_) f(); @@ -211,14 +211,16 @@ void MqttPlugin::AsyncConnect() { conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) { AIMRT_WARN("Failed to connect mqtt broker, code: {}, msg: {}", response->code, response->message); - static_cast(context)->AsyncConnect(); + auto *mqtt_plugin_ptr = static_cast(context); + std::this_thread::sleep_for(std::chrono::milliseconds(mqtt_plugin_ptr->options_.reconnect_interval_ms)); + mqtt_plugin_ptr->AsyncConnect(); }; conn_opts.context = this; int rc = MQTTAsync_connect(client_, &conn_opts); if (rc != MQTTASYNC_SUCCESS) { AIMRT_ERROR("Failed to start connection, rc: {}", rc); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(options_.reconnect_interval_ms)); AsyncConnect(); // todo: avoid stack overflow } } @@ -240,14 +242,16 @@ void MqttPlugin::OnConnectLost(const char *cause) { f(); }; conn_opts.onFailure = [](void *context, MQTTAsync_failureData *response) { - static_cast(context)->OnConnectLost("Reconnect failed"); + auto *mqtt_plugin_ptr = static_cast(context); + std::this_thread::sleep_for(std::chrono::milliseconds(mqtt_plugin_ptr->options_.reconnect_interval_ms)); + mqtt_plugin_ptr->OnConnectLost("Reconnect failed"); }; conn_opts.context = this; int rc = MQTTAsync_connect(client_, &conn_opts); if (rc != MQTTASYNC_SUCCESS) { AIMRT_ERROR("Failed to reconnect mqtt broker, return code: {}", rc); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(options_.reconnect_interval_ms)); OnConnectLost("Reconnect failed"); // TODO: 得防止爆栈 } } diff --git a/src/plugins/mqtt_plugin/mqtt_plugin.h b/src/plugins/mqtt_plugin/mqtt_plugin.h index c51ecb206..90c771385 100644 --- a/src/plugins/mqtt_plugin/mqtt_plugin.h +++ b/src/plugins/mqtt_plugin/mqtt_plugin.h @@ -12,6 +12,7 @@ #include "mqtt_plugin/mqtt_channel_backend.h" #include "mqtt_plugin/mqtt_rpc_backend.h" #include "mqtt_plugin/msg_handle_registry.h" +#include "util/light_signal.h" namespace aimrt::plugins::mqtt_plugin { @@ -21,6 +22,7 @@ class MqttPlugin : public AimRTCorePluginBase { std::string broker_addr; std::string client_id; uint32_t max_pkg_size_k = 1024; + uint32_t reconnect_interval_ms = 1000; std::string truststore; std::string client_cert; std::string client_key; @@ -59,6 +61,8 @@ class MqttPlugin : public AimRTCorePluginBase { std::shared_ptr msg_handle_registry_ptr_; std::vector> reconnect_hook_; + + common::util::LightSignal signal_; }; } // namespace aimrt::plugins::mqtt_plugin diff --git a/src/plugins/mqtt_plugin/mqtt_rpc_backend.cc b/src/plugins/mqtt_plugin/mqtt_rpc_backend.cc index 9755ed79a..0530e47dc 100644 --- a/src/plugins/mqtt_plugin/mqtt_rpc_backend.cc +++ b/src/plugins/mqtt_plugin/mqtt_rpc_backend.cc @@ -132,8 +132,10 @@ void MqttRpcBackend::Start() { AIMRT_CHECK_ERROR_THROW( std::atomic_exchange(&state_, State::kStart) == State::kInit, "Method can only be called when state is 'Init'."); -} + // Wait a moment for the connection to be established + std::this_thread::sleep_for(std::chrono::milliseconds(200)); +} void MqttRpcBackend::Shutdown() { if (std::atomic_exchange(&state_, State::kShutdown) == State::kShutdown) return;