# Mqtt 插件 ## 相关链接 参考示例: - {{ '[mqtt_plugin]({}/src/examples/plugins/mqtt_plugin)'.format(code_site_root_path_url) }} ## 插件概述 **mqtt_plugin**是一个基于 mqtt 协议实现的网络传输插件,此插件提供了以下组件: - `mqtt`类型 RPC 后端 - `mqtt`类型 Channel 后端 插件的配置项如下: | 节点 | 类型 | 是否可选 | 默认值 | 作用 | | -------------- | ------ | -------- | ------ | ----------------------- | | broker_addr | string | 必选 | "" | mqtt broker 的地址 | | client_id | string | 必选 | "" | 本节点的 mqtt client id | | max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸,单位:KB | | truststore | string | 可选 | "" | broker的CA证书路径 | 关于**mqtt_plugin**的配置,使用注意点如下: - `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。 - `client_id`表示本节点连接 mqtt broker 时的 client id。 - `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意,必须 broker 也要支持该尺寸才行。 - `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项。 **mqtt_plugin**插件基于[paho.mqtt.c](https://github.com/eclipse/paho.mqtt.c)封装,在使用时,Channel 订阅回调、RPC Server 处理方法、RPC Client 返回时,使用的都是**paho.mqtt.c**提供的线程,当使用者在回调中阻塞了线程时,有可能导致无法继续接收/发送消息。正如 Module 接口文档中所述,一般来说,如果回调中的任务非常轻量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里处理。 以下是一个简单的示例: ```yaml aimrt: plugin: plugins: - name: mqtt_plugin path: ./libaimrt_mqtt_plugin.so options: broker_addr: tcp://127.0.0.1:1883 client_id: example_mqtt_client max_pkg_size_k: 1024 ``` ## mqtt 类型 RPC 后端 `mqtt`类型的 RPC 后端是**mqtt_plugin**中提供的一种 RPC 后端,用于通过 mqtt 的方式来调用和处理 AimRT RPC 请求。其所有的配置项如下: | 节点 | 类型 | 是否可选 | 默认值 | 作用 | | --------------------------------- | ------ | -------- | ------ | ---------------------------------------------------------------------------- | | timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 | | clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 | | clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 | | clients_options[i].server_mqtt_id | string | 可选 | "" | RPC Func 发起调用时请求的 mqtt 服务端 id | | clients_options[i].qos | int | 可选 | 2 | RPC Client 端 mqtt qos,取值范围:0/1/2 | | servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 | | servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 | | servers_options[i].allow_share | bool | 可选 | true | 该 RPC 服务是否允许共享订阅,不允许的话该服务只能通过指定 server id 进行调用 | | servers_options[i].qos | int | 可选 | 2 | RPC Server 端 mqtt qos,取值范围:0/1/2 | 以下是一个简单的客户端的示例: ```yaml aimrt: plugin: plugins: - name: mqtt_plugin path: ./libaimrt_mqtt_plugin.so options: broker_addr: tcp://127.0.0.1:1883 client_id: example_client max_pkg_size_k: 1024 executor: executors: - name: timeout_handle type: time_wheel rpc: backends: - type: mqtt options: timeout_executor: timeout_handle clients_options: - func_name: "(.*)" qos: 0 clients_options: - func_name: "(.*)" enable_backends: [mqtt] ``` 以下则是一个简单的服务端的示例: ```yaml aimrt: plugin: plugins: - name: mqtt_plugin path: ./libaimrt_mqtt_plugin.so options: broker_addr: tcp://127.0.0.1:1883 client_id: example_server max_pkg_size_k: 1024 rpc: backends: - type: mqtt options: servers_options: - func_name: "(.*)" allow_share: true qos: 0 servers_options: - func_name: "(.*)" enable_backends: [mqtt] ``` 以上示例中,Client 端和 Server 端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt broker,Client 端也配置了所有的 RPC 请求都通过 mqtt 后端进行处理,从而完成 RPC 的调用闭环。 如果有多个 server 端同时注册了某个 RPC 服务,那么 client 端会随机的挑选一个 server 端发送请求。如果想指定某个 server 端处理,可以在 client 端的 ctx 中按照如下方法设置 ToAddr: ```cpp auto ctx_ptr = proxy->NewContextSharedPtr(); // mqtt://{{target server mqtt id}} ctx_ptr->SetToAddr("mqtt://target_server_mqtt_id"); auto status = proxy->Foo(ctx_ptr, req, rsp); ``` 在整个 RPC 过程中,底层使用的 Mqtt Topic 名称格式如下: - Server 端 - 订阅 Req 使用的 topic(两个都会订阅): - `$share/aimrt/aimrt_rpc_req/${func_name}` - `aimrt_rpc_req/${server_id}/${func_name}` - 发布 Rsp 使用的 topic:`aimrt_rpc_rsp/${client_id}/${func_name}` - Client 端 - 发布 Req 使用的 topic(二选一): - `aimrt_rpc_req/${func_name}` - `aimrt_rpc_req/${server_id}/${func_name}` - 订阅 Rsp 使用的 topic:`aimrt_rpc_rsp/${client_id}/${func_name}` 其中`${client_id}`、`${server_id}`是 Client 端和 Server 端需要保证在同一个 Mqtt broker 环境下全局唯一的一个值,一般使用在 Mqtt broker 处注册的 id。`${func_name}`是 url 编码后的 AimRT RPC 方法名称。Server 端订阅使用共享订阅,保证只有一个服务端处理请求。此项特性需要支持 Mqtt5.0 协议的 Broker。 例如,client 端向 Mqtt broker 注册的 id 为`example_client`,func 名称为`/aimrt.protocols.example.ExampleService/GetBarData`,则`${client_id}`值为`example_client`,`${func_name}`值为`%2Faimrt.protocols.example.ExampleService%2FGetBarData`。 Client -> Server 的 Mqtt 数据包格式整体分 5 段: - 序列化类型,一般是`pb`或`json` - client 端想要 server 端回复 rsp 的 mqtt topic 名称。client 端自己需要订阅这个 mqtt topic - msg id,4 字节,server 端会原封不动的封装到 rsp 包里,供 client 端定位 rsp 对应哪个 req - context 区 - context 数量,1 字节,最大 255 个 context - context_1 key, 2 字节长度 + 数据区 - context_2 key, 2 字节长度 + 数据区 - ... - msg 数据 ``` | n(0~255) [1 byte] | content type [n byte] | m(0~255) [1 byte] | rsp topic name [m byte] | msg id [4 byte] | context num [1 byte] | context_1 key size [2 byte] | context_1 key data [key_1_size byte] | context_1 val size [2 byte] | context_1 val data [val_1_size byte] | context_2 key size [2 byte] | context_2 key data [key_2_size byte] | context_2 val size [2 byte] | context_2 val data [val_2_size byte] | ... | msg data [remaining byte] ``` Server -> Client 的 Mqtt 数据包格式整体分 4 段: - 序列化类型,一般是`pb`或`json` - msg id,4 字节,req 中的 msg id - status code,4 字节,框架错误码,如果这个部分不为零,则代表服务端发生了错误,数据段将没有内容 - msg 数据 ``` | n(0~255) [1 byte] | content type [n byte] | msg id [4 byte] | status code [4 byte] | msg data [remaining byte] ``` ## mqtt 类型 Channel 后端 `mqtt`类型的 Channel 后端是**mqtt_plugin**中提供的一种 Channel 后端,用于通过 mqtt 的方式来发布和订阅消息。其所有的配置项如下: | 节点 | 类型 | 是否可选 | 默认值 | 作用 | | -------------------------------- | ------ | -------- | ------ | -------------------------------------- | | pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 | | pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 | | pub_topics_options[i].qos | int | 必选 | 2 | Publish 端 mqtt qos,取值范围:0/1/2 | | sub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 | | sub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 | | sub_topics_options[i].qos | int | 必选 | 2 | Subscribe 端 mqtt qos,取值范围:0/1/2 | 以下是一个简单的发布端的示例: ```yaml aimrt: plugin: plugins: - name: mqtt_plugin path: ./libaimrt_mqtt_plugin.so options: broker_addr: tcp://127.0.0.1:1883 client_id: example_publisher max_pkg_size_k: 1024 channel: backends: - type: mqtt options: pub_topics_options: - topic_name: "(.*)" qos: 2 pub_topics_options: - topic_name: "(.*)" enable_backends: [mqtt] ``` 以下则是一个简单的订阅端的示例: ```yaml aimrt: plugin: plugins: - name: mqtt_plugin path: ./libaimrt_mqtt_plugin.so options: broker_addr: tcp://127.0.0.1:1883 client_id: example_subscriber max_pkg_size_k: 1024 channel: backends: - type: mqtt sub_topics_options: - topic_name: "(.*)" enable_backends: [mqtt] ``` 以上示例中,发布端和订阅端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt broker,发布端也配置了所有的消息都通过 mqtt 后端进行处理,订阅端也配置了所有消息都可以从 mqtt 后端触发回调,从而打通消息发布订阅的链路。 在这个过程中,底层使用的 Mqtt Topic 名称格式为:`/channel/${topic_name}/${message_type}`。其中,`${topic_name}`为 AimRT 的 Topic 名称,`${message_type}`为 url 编码后的 AimRT 消息名称。 例如,AimRT Topic 名称为`test_topic`,消息类型为`pb:aimrt.protocols.example.ExampleEventMsg`,则最终 Mqtt 的 topic 名称为:`/channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsg`。 在 AimRT 发布端发布数据到订阅端这个链路上,Mqtt 数据包格式整体分 3 段: - 序列化类型,一般是`pb`或`json` - context 区 - context 数量,1 字节,最大 255 个 context - context_1 key, 2 字节长度 + 数据区 - context_2 key, 2 字节长度 + 数据区 - ... - 数据 ``` | n(0~255) [1 byte] | content type [n byte] | context num [1 byte] | context_1 key size [2 byte] | context_1 key data [key_1_size byte] | context_1 val size [2 byte] | context_1 val data [val_1_size byte] | context_2 key size [2 byte] | context_2 key data [key_2_size byte] | context_2 val size [2 byte] | context_2 val data [val_2_size byte] | ... | msg data [len - 1 - n byte] | ```