274 lines
11 KiB
Markdown
Raw Normal View History

2024-09-23 16:01:31 +08:00
# Mqtt 插件
## 相关链接
参考示例:
- {{ '[mqtt_plugin]({}/src/examples/plugins/mqtt_plugin)'.format(code_site_root_path_url) }}
## 插件概述
**mqtt_plugin**是一个基于 mqtt 协议实现的网络传输插件,此插件提供了以下组件:
- `mqtt`类型 RPC 后端
- `mqtt`类型 Channel 后端
插件的配置项如下:
| 节点 | 类型 | 是否可选| 默认值 | 作用 |
| ---- | ---- | ---- | ---- | ---- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸单位KB |
关于**mqtt_plugin**的配置,使用注意点如下:
- `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。
- `client_id`表示本节点连接 mqtt broker 时的 client id。
- `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意必须 broker 也要支持该尺寸才行。
**mqtt_plugin**插件基于[paho.mqtt.c](https://github.com/eclipse/paho.mqtt.c)封装在使用时Channel 订阅回调、RPC Server 处理方法、RPC Client 返回时,使用的都是**paho.mqtt.c**提供的线程,当使用者在回调中阻塞了线程时,有可能导致无法继续接收/发送消息。正如 Module 接口文档中所述,一般来说,如果回调中的任务非常轻量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里处理。
以下是一个简单的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_mqtt_client
max_pkg_size_k: 1024
```
## mqtt 类型 RPC 后端
`mqtt`类型的 RPC 后端是**mqtt_plugin**中提供的一种 RPC 后端,用于通过 mqtt 的方式来调用和处理 AimRT RPC 请求。其所有的配置项如下:
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
|------------------------------------|---------|-------|------|----------------------------------------------|
| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 |
| clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 |
| clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| clients_options[i].server_mqtt_id | string | 可选 | "" | RPC Func 发起调用时请求的 mqtt 服务端 id |
| clients_options[i].qos | int | 可选 | 2 | RPC Client 端 mqtt qos取值范围0/1/2 |
| servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 |
| servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| servers_options[i].allow_share | bool | 可选 | true | 该 RPC 服务是否允许共享订阅,不允许的话该服务只能通过指定 server id 进行调用 |
| servers_options[i].qos | int | 可选 | 2 | RPC Server 端 mqtt qos取值范围0/1/2 |
以下是一个简单的客户端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_client
max_pkg_size_k: 1024
executor:
executors:
- name: timeout_handle
type: time_wheel
rpc:
backends:
- type: mqtt
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
qos: 0
clients_options:
- func_name: "(.*)"
enable_backends: [mqtt]
```
以下则是一个简单的服务端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_server
max_pkg_size_k: 1024
rpc:
backends:
- type: mqtt
options:
servers_options:
- func_name: "(.*)"
allow_share: true
qos: 0
servers_options:
- func_name: "(.*)"
enable_backends: [mqtt]
```
以上示例中Client 端和 Server 端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt brokerClient 端也配置了所有的 RPC 请求都通过 mqtt 后端进行处理,从而完成 RPC 的调用闭环。
如果有多个 server 端同时注册了某个 RPC 服务,那么 client 端会随机的挑选一个 server 端发送请求。如果想指定某个 server 端处理,可以在 client 端的 ctx 中按照如下方法设置 ToAddr
```cpp
auto ctx_ptr = proxy->NewContextSharedPtr();
// mqtt://{{target server mqtt id}}
ctx_ptr->SetToAddr("mqtt://target_server_mqtt_id");
auto status = proxy->Foo(ctx_ptr, req, rsp);
```
在整个 RPC 过程中,底层使用的 Mqtt Topic 名称格式如下:
- Server 端
- 订阅 Req 使用的 topic两个都会订阅
- `$share/aimrt/aimrt_rpc_req/${func_name}`
- `aimrt_rpc_req/${server_id}/${func_name}`
- 发布 Rsp 使用的 topic`aimrt_rpc_rsp/${client_id}/${func_name}`
- Client 端
- 发布 Req 使用的 topic二选一
- `aimrt_rpc_req/${func_name}`
- `aimrt_rpc_req/${server_id}/${func_name}`
- 订阅 Rsp 使用的 topic`aimrt_rpc_rsp/${client_id}/${func_name}`
其中`${client_id}``${server_id}`是 Client 端和 Server 端需要保证在同一个 Mqtt broker 环境下全局唯一的一个值,一般使用在 Mqtt broker 处注册的 id。`${func_name}`是 url 编码后的 AimRT RPC 方法名称。Server 端订阅使用共享订阅,保证只有一个服务端处理请求。此项特性需要支持 Mqtt5.0 协议的 Broker。
例如client 端向 Mqtt broker 注册的 id 为`example_client`func 名称为`/aimrt.protocols.example.ExampleService/GetBarData`,则`${client_id}`值为`example_client``${func_name}`值为`%2Faimrt.protocols.example.ExampleService%2FGetBarData`
Client -> Server 的 Mqtt 数据包格式整体分 5 段:
- 序列化类型,一般是`pb``json`
- client 端想要 server 端回复 rsp 的 mqtt topic 名称。client 端自己需要订阅这个 mqtt topic
- msg id4 字节server 端会原封不动的封装到 rsp 包里,供 client 端定位 rsp 对应哪个 req
- context 区
- context 数量1 字节,最大 255 个 context
- context_1 key, 2 字节长度 + 数据区
- context_2 key, 2 字节长度 + 数据区
- ...
- msg 数据
```
| n(0~255) [1 byte] | content type [n byte]
| m(0~255) [1 byte] | rsp topic name [m byte]
| msg id [4 byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [remaining byte]
```
Server -> Client 的 Mqtt 数据包格式整体分 4 段:
- 序列化类型,一般是`pb``json`
- msg id4 字节req 中的 msg id
- status code4 字节,框架错误码,如果这个部分不为零,则代表服务端发生了错误,数据段将没有内容
- msg 数据
```
| n(0~255) [1 byte] | content type [n byte]
| msg id [4 byte]
| status code [4 byte]
| msg data [remaining byte]
```
## mqtt 类型 Channel 后端
`mqtt`类型的 Channel 后端是**mqtt_plugin**中提供的一种 Channel 后端,用于通过 mqtt 的方式来发布和订阅消息。其所有的配置项如下:
| 节点 | 类型 | 是否可选| 默认值 | 作用 |
| ---- | ---- | ---- | ---- | ---- |
| pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| pub_topics_options[i].qos | int | 必选 | 2 | Publish 端 mqtt qos取值范围0/1/2 |
| sub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| sub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| sub_topics_options[i].qos | int | 必选 | 2 | Subscribe 端 mqtt qos取值范围0/1/2 |
以下是一个简单的发布端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_publisher
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
options:
pub_topics_options:
- topic_name: "(.*)"
qos: 2
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
```
以下则是一个简单的订阅端的示例:
```yaml
aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: tcp://127.0.0.1:1883
client_id: example_subscriber
max_pkg_size_k: 1024
channel:
backends:
- type: mqtt
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
```
以上示例中,发布端和订阅端都连上了`tcp://127.0.0.1:1883`这个地址的一个 Mqtt broker发布端也配置了所有的消息都通过 mqtt 后端进行处理,订阅端也配置了所有消息都可以从 mqtt 后端触发回调,从而打通消息发布订阅的链路。
在这个过程中,底层使用的 Mqtt Topic 名称格式为:`/channel/${topic_name}/${message_type}`。其中,`${topic_name}`为 AimRT 的 Topic 名称,`${message_type}`为 url 编码后的 AimRT 消息名称。
例如AimRT Topic 名称为`test_topic`,消息类型为`pb:aimrt.protocols.example.ExampleEventMsg`,则最终 Mqtt 的 topic 名称为:`/channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsg`
在 AimRT 发布端发布数据到订阅端这个链路上Mqtt 数据包格式整体分 3 段:
- 序列化类型,一般是`pb``json`
- context 区
- context 数量1 字节,最大 255 个 context
- context_1 key, 2 字节长度 + 数据区
- context_2 key, 2 字节长度 + 数据区
- ...
- 数据
```
| n(0~255) [1 byte] | content type [n byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [len - 1 - n byte] |
```