zhangyi1357 7d5fe68319 update
2024-09-30 17:22:32 +08:00

275 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Mqtt 插件
## 相关链接
参考示例:
- {{ '[mqtt_plugin]({}/src/examples/plugins/mqtt_plugin)'.format(code_site_root_path_url) }}
## 插件概述
**mqtt_plugin**是一个基于 mqtt 协议实现的网络传输插件,此插件提供了以下组件:
- `mqtt`类型 RPC 后端
- `mqtt`类型 Channel 后端
插件的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| -------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
| truststore | string | 可选 | "" | broker的CA证书路径 |
关于**mqtt_plugin**的配置,使用注意点如下:
- `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。
- `client_id`表示本节点连接 mqtt broker 时的 client id。
- `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意必须 broker 也要支持该尺寸才行。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项。
**mqtt_plugin**插件基于[paho.mqtt.c](https://github.com/eclipse/paho.mqtt.c)封装在使用时Channel 订阅回调、RPC Server 处理方法、RPC Client 返回时,使用的都是**paho.mqtt.c**提供的线程,当使用者在回调中阻塞了线程时,有可能导致无法继续接收/发送消息。正如 Module 接口文档中所述,一般来说,如果回调中的任务非常轻量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里处理。
以下是一个简单的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_mqtt_client
max_pkg_size_k: 1024
```
## mqtt 类型 RPC 后端
`mqtt`类型的 RPC 后端是**mqtt_plugin**中提供的一种 RPC 后端,用于通过 mqtt 的方式来调用和处理 AimRT RPC 请求。其所有的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| --------------------------------- | ------ | -------- | ------ | ---------------------------------------------------------------------------- |
| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 |
| clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 |
| clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| clients_options[i].server_mqtt_id | string | 可选 | "" | RPC Func 发起调用时请求的 mqtt 服务端 id |
| clients_options[i].qos | int | 可选 | 2 | RPC Client 端 mqtt qos取值范围0/1/2 |
| servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 |
| servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| servers_options[i].allow_share | bool | 可选 | true | 该 RPC 服务是否允许共享订阅,不允许的话该服务只能通过指定 server id 进行调用 |
| servers_options[i].qos | int | 可选 | 2 | RPC Server 端 mqtt qos取值范围0/1/2 |
以下是一个简单的客户端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_client
max_pkg_size_k: 1024
executor:
executors:
- name: timeout_handle
type: time_wheel
rpc:
backends:
- type: mqtt
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
qos: 0
clients_options:
- func_name: "(.*)"
enable_backends: [mqtt]
```
以下则是一个简单的服务端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_server
max_pkg_size_k: 1024
rpc:
backends:
- type: mqtt
options:
servers_options:
- func_name: "(.*)"
allow_share: true
qos: 0
servers_options:
- func_name: "(.*)"
enable_backends: [mqtt]
```
以上示例中Client 端和 Server 端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt brokerClient 端也配置了所有的 RPC 请求都通过 mqtt 后端进行处理,从而完成 RPC 的调用闭环。
如果有多个 server 端同时注册了某个 RPC 服务,那么 client 端会随机的挑选一个 server 端发送请求。如果想指定某个 server 端处理,可以在 client 端的 ctx 中按照如下方法设置 ToAddr
```cpp
auto ctx_ptr = proxy->NewContextSharedPtr();
// mqtt://{{target server mqtt id}}
ctx_ptr->SetToAddr("mqtt://target_server_mqtt_id");
auto status = proxy->Foo(ctx_ptr, req, rsp);
```
在整个 RPC 过程中,底层使用的 Mqtt Topic 名称格式如下:
- Server 端
- 订阅 Req 使用的 topic两个都会订阅
- `$share/aimrt/aimrt_rpc_req/${func_name}`
- `aimrt_rpc_req/${server_id}/${func_name}`
- 发布 Rsp 使用的 topic`aimrt_rpc_rsp/${client_id}/${func_name}`
- Client 端
- 发布 Req 使用的 topic二选一
- `aimrt_rpc_req/${func_name}`
- `aimrt_rpc_req/${server_id}/${func_name}`
- 订阅 Rsp 使用的 topic`aimrt_rpc_rsp/${client_id}/${func_name}`
其中`${client_id}``${server_id}`是 Client 端和 Server 端需要保证在同一个 Mqtt broker 环境下全局唯一的一个值,一般使用在 Mqtt broker 处注册的 id。`${func_name}`是 url 编码后的 AimRT RPC 方法名称。Server 端订阅使用共享订阅,保证只有一个服务端处理请求。此项特性需要支持 Mqtt5.0 协议的 Broker。
例如client 端向 Mqtt broker 注册的 id 为`example_client`func 名称为`/aimrt.protocols.example.ExampleService/GetBarData`,则`${client_id}`值为`example_client``${func_name}`值为`%2Faimrt.protocols.example.ExampleService%2FGetBarData`
Client -> Server 的 Mqtt 数据包格式整体分 5 段:
- 序列化类型,一般是`pb``json`
- client 端想要 server 端回复 rsp 的 mqtt topic 名称。client 端自己需要订阅这个 mqtt topic
- msg id4 字节server 端会原封不动的封装到 rsp 包里,供 client 端定位 rsp 对应哪个 req
- context 区
- context 数量1 字节,最大 255 个 context
- context_1 key, 2 字节长度 + 数据区
- context_2 key, 2 字节长度 + 数据区
- ...
- msg 数据
```
| n(0~255) [1 byte] | content type [n byte]
| m(0~255) [1 byte] | rsp topic name [m byte]
| msg id [4 byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [remaining byte]
```
Server -> Client 的 Mqtt 数据包格式整体分 4 段:
- 序列化类型,一般是`pb``json`
- msg id4 字节req 中的 msg id
- status code4 字节,框架错误码,如果这个部分不为零,则代表服务端发生了错误,数据段将没有内容
- msg 数据
```
| n(0~255) [1 byte] | content type [n byte]
| msg id [4 byte]
| status code [4 byte]
| msg data [remaining byte]
```
## mqtt 类型 Channel 后端
`mqtt`类型的 Channel 后端是**mqtt_plugin**中提供的一种 Channel 后端,用于通过 mqtt 的方式来发布和订阅消息。其所有的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| -------------------------------- | ------ | -------- | ------ | -------------------------------------- |
| pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| pub_topics_options[i].qos | int | 必选 | 2 | Publish 端 mqtt qos取值范围0/1/2 |
| sub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| sub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| sub_topics_options[i].qos | int | 必选 | 2 | Subscribe 端 mqtt qos取值范围0/1/2 |
以下是一个简单的发布端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_publisher
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
options:
pub_topics_options:
- topic_name: "(.*)"
qos: 2
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
```
以下则是一个简单的订阅端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_subscriber
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
```
以上示例中,发布端和订阅端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt broker发布端也配置了所有的消息都通过 mqtt 后端进行处理,订阅端也配置了所有消息都可以从 mqtt 后端触发回调,从而打通消息发布订阅的链路。
在这个过程中,底层使用的 Mqtt Topic 名称格式为:`/channel/${topic_name}/${message_type}`。其中,`${topic_name}`为 AimRT 的 Topic 名称,`${message_type}`为 url 编码后的 AimRT 消息名称。
例如AimRT Topic 名称为`test_topic`,消息类型为`pb:aimrt.protocols.example.ExampleEventMsg`,则最终 Mqtt 的 topic 名称为:`/channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsg`
在 AimRT 发布端发布数据到订阅端这个链路上Mqtt 数据包格式整体分 3 段:
- 序列化类型,一般是`pb``json`
- context 区
- context 数量1 字节,最大 255 个 context
- context_1 key, 2 字节长度 + 数据区
- context_2 key, 2 字节长度 + 数据区
- ...
- 数据
```
| n(0~255) [1 byte] | content type [n byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [len - 1 - n byte] |
```