feat: enhance record_playback_plugin with storage policy configuration (#142)

* feat: enhance record_playback_plugin with storage policy configuration

* feat: add executor configuration for record_playback_plugin in YAML files

* fix: standardize formatting in YAML configuration for record_playback_plugin

* docs: improve clarity in record_playback_plugin documentation

* refactor: rename executor to timer_executor in record_playback_plugin

* refactor: rename timer_executor to executor in record_playback_plugin and update documentation for storage policy options

* format

* docs: clarify timer_executor requirement in record_playback_plugin documentation

* docs: update message count period in YAML and markdown files for record_playback_plugin

* feat: improve validation of storage policy options in record_playback_plugin

* feat: update RecordAction to use storage_executor_ref_ in InitExecutor and remove CommitRecord method

* feat: update RecordAction to use timer_executor in InitExecutor and clean up includes in record_playback_plugin
This commit is contained in:
ATT_POWER 2025-01-07 14:37:16 +08:00 committed by GitHub
parent f2a42a7a95
commit 487dec80dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 183 additions and 31 deletions

View File

@ -26,15 +26,21 @@
| service_name | string | 可选 | "" | RPC Service Name不填则使用根据协议生成的默认值 | | service_name | string | 可选 | "" | RPC Service Name不填则使用根据协议生成的默认值 |
| type_support_pkgs | array | 可选 | [] | type support 包配置 | | type_support_pkgs | array | 可选 | [] | type support 包配置 |
| type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 | | type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 |
| timer_executor | string | 录制模式下必选 | [] | 录制使用的执行器,要求必须支持 time schedule |
| record_actions | array | 可选 | [] | 录制动作配置 | | record_actions | array | 可选 | [] | 录制动作配置 |
| record_actions[i].name | string | 必选 | "" | 动作名称 | | record_actions[i].name | string | 必选 | "" | 动作名称 |
| record_actions[i].options | map | 必选 | - | 动作选项 | | record_actions[i].options | map | 必选 | - | 动作选项 |
| record_actions[i].options.bag_path | string | 必选 | "" | 录制包存放的路径 | | record_actions[i].options.bag_path | string | 必选 | "" | 录制包存放的路径 |
| record_actions[i].options.max_bag_size_m | unsigned int | 可选 | 2048 | 录制包 db 最大尺寸,单位 MB |
| record_actions[i].options.max_bag_num | unsigned int | 可选 | 0 | 录制包的最大个数超出后将删除最早的包。0 表示无限大 |
| record_actions[i].options.mode | string | 必选 | "" | 录制模式不区分大小写立即模式imd信号触发模式signal | | record_actions[i].options.mode | string | 必选 | "" | 录制模式不区分大小写立即模式imd信号触发模式signal |
| record_actions[i].options.max_preparation_duration_s | unsigned int | 可选 | 0 | 最大提前数据预备时间,仅 signal 模式下生效 | | record_actions[i].options.max_preparation_duration_s | unsigned int | 可选 | 0 | 最大提前数据预备时间,仅 signal 模式下生效 |
| record_actions[i].options.executor | string | 必选 | "" | 录制使用的执行器,要求必须是线程安全的 | | record_actions[i].options.executor | string | 必选 | "" | 录制使用的执行器,要求必须是线程安全的 |
| record_actions[i].options.storage_policy | map | 可选 | - | 录制包的存储策略 |
| record_actions[i].options.storage_policy.max_bag_size_m | unsigned int | 可选 | 2048 | 录制包 db 最大尺寸,单位 MB |
| record_actions[i].options.storage_policy.max_bag_num | unsigned int | 可选 | 0 | 录制包的最大个数超出后将删除最早的包。0 表示无限大 |
| record_actions[i].options.storage_policy.msg_write_interval | unsigned int | 可选 | 1000 | 每收到多少消息提交一次事务 |
| record_actions[i].options.storage_policy.msg_write_interval_time | unsigned int | 可选 | 1000 | 每过多少时间提交一次事务,默认单位 ms|
| record_actions[i].options.storage_policy.journal_mode | string | 可选 | memory | sqlite3 日志模式,不区分大小写,现存在 [delete、truncate、persist、memory、wal、off](https://www.sqlite.org/pragma.html#pragma_journal_mode) 六种模式|
| record_actions[i].options.storage_policy.synchronous_mode | string | 可选 | full | sqlite3 同步模式,不区分大小写,现存在 [extra、full、normal、off](https://www.sqlite.org/pragma.html#pragma_synchronous) 四种模式 |
| record_actions[i].options.topic_meta_list | array | 可选 | [] | 要录制的 topic 和类型 | | record_actions[i].options.topic_meta_list | array | 可选 | [] | 要录制的 topic 和类型 |
| record_actions[i].options.topic_meta_list[j].topic_name | string | 必选 | "" | 要录制的 topic | | record_actions[i].options.topic_meta_list[j].topic_name | string | 必选 | "" | 要录制的 topic |
| record_actions[i].options.topic_meta_list[j].msg_type | string | 必选 | "" | 要录制的消息类型 | | record_actions[i].options.topic_meta_list[j].msg_type | string | 必选 | "" | 要录制的消息类型 |
@ -55,6 +61,8 @@
请注意,**record_playback_plugin**中是以`action`为单元管理录制、播放动作的,每个录制/播放`action`可以有自己的模式、线程、包路径等参数,也可以独立触发。使用时可以根据数据实际大小和频率,为每个 action 分配合理的资源。 请注意,**record_playback_plugin**中是以`action`为单元管理录制、播放动作的,每个录制/播放`action`可以有自己的模式、线程、包路径等参数,也可以独立触发。使用时可以根据数据实际大小和频率,为每个 action 分配合理的资源。
`record_playback_plugin` 的录制模式支持配置每个事务之间间隔的条数和时间,并且支持配置 `sqlite3` 的日志模式配置和同步模式配置,关于 `sqlite3` 具体模式的不同可以参考 [sqlite3 journal mode](https://www.sqlite.org/pragma.html#pragma_journal_mode)和 [sqlite3 synchronous](https://www.sqlite.org/pragma.html#pragma_synchronous)。
以下是一个信号触发录制功能的简单示例配置: 以下是一个信号触发录制功能的简单示例配置:
```yaml ```yaml
@ -71,16 +79,22 @@ aimrt:
- name: record_playback_plugin - name: record_playback_plugin
path: ./libaimrt_record_playback_plugin.so path: ./libaimrt_record_playback_plugin.so
options: options:
timer_executor: storage_executor # require time schedule!
type_support_pkgs: type_support_pkgs:
- path: ./libexample_event_ts_pkg.so - path: ./libexample_event_ts_pkg.so
record_actions: record_actions:
- name: my_signal_record - name: my_signal_record
options: options:
bag_path: ./bag bag_path: ./bag
max_bag_size_m: 2048
mode: signal # imd/signal mode: signal # imd/signal
max_preparation_duration_s: 10 # Effective only in signal mode max_preparation_duration_s: 10 # Effective only in signal mode
executor: record_thread # require thread safe! executor: record_thread # require thread safe!
storage_policy:
max_bag_size_m: 2048
msg_write_interval: 1000 # message count period
msg_write_interval_time: 1000 # ms
journal_mode: WAL
synchronous_mode: full
topic_meta_list: topic_meta_list:
- topic_name: test_topic - topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg msg_type: pb:aimrt.protocols.example.ExampleEventMsg
@ -89,6 +103,10 @@ aimrt:
executors: executors:
- name: record_thread - name: record_thread
type: simple_thread type: simple_thread
- name: storage_executor
type: asio_thread
options:
thread_num: 2
channel: channel:
# ... # ...
rpc: rpc:

View File

@ -9,14 +9,20 @@ aimrt:
options: options:
type_support_pkgs: type_support_pkgs:
- path: ./libexample_event_ts_pkg.so - path: ./libexample_event_ts_pkg.so
timer_executor: storage_executor # require time schedule!
record_actions: record_actions:
- name: my_imd_record - name: my_imd_record
options: options:
bag_path: ./bag bag_path: ./bag
max_bag_size_m: 2048
max_bag_num: 10
mode: imd # imd/signal mode: imd # imd/signal
executor: record_thread # require thread safe! executor: record_thread # require thread safe!
storage_policy:
max_bag_size_m: 2048
max_bag_num: 10
msg_write_interval: 1000 # message count period
msg_write_interval_time: 1000 # ms
journal_mode: WAL
synchronous_mode: full
topic_meta_list: topic_meta_list:
- topic_name: test_topic - topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg msg_type: pb:aimrt.protocols.example.ExampleEventMsg
@ -26,13 +32,18 @@ aimrt:
backends: backends:
- type: console - type: console
executor: executor:
executors: executors:
- name: storage_executor
type: asio_thread
options:
thread_num: 2
- name: work_thread_pool - name: work_thread_pool
type: asio_thread type: asio_thread
options: options:
thread_num: 4 thread_num: 4
- name: record_thread - name: record_thread
type: simple_thread type: simple_thread
channel: channel:
backends: backends:
- type: local - type: local

View File

@ -15,14 +15,20 @@ aimrt:
options: options:
type_support_pkgs: type_support_pkgs:
- path: ./libexample_event_ts_pkg.so - path: ./libexample_event_ts_pkg.so
timer_executor: storage_executor # require time schedule!
record_actions: record_actions:
- name: my_imd_record - name: my_imd_record
options: options:
bag_path: ./bag bag_path: ./bag
max_bag_size_m: 2048
max_bag_num: 10
mode: imd # imd/signal mode: imd # imd/signal
executor: record_thread # require thread safe! executor: record_thread # require thread safe!
storage_policy:
max_bag_size_m: 2048
max_bag_num: 10
msg_write_interval: 1000 # message count period
msg_write_interval_time: 1000 # ms
journal_mode: WAL
synchronous_mode: full
topic_meta_list: topic_meta_list:
- topic_name: test_topic - topic_name: test_topic
msg_type: ros2:example_ros2/msg/RosTestMsg msg_type: ros2:example_ros2/msg/RosTestMsg
@ -33,6 +39,10 @@ aimrt:
- type: console - type: console
executor: executor:
executors: executors:
- name: storage_executor
type: asio_thread
options:
thread_num: 2
- name: work_thread_pool - name: work_thread_pool
type: asio_thread type: asio_thread
options: options:

View File

@ -16,15 +16,21 @@ aimrt:
options: options:
type_support_pkgs: type_support_pkgs:
- path: ./libexample_event_ts_pkg.so - path: ./libexample_event_ts_pkg.so
timer_executor: storage_executor # require time schedule!
record_actions: record_actions:
- name: my_signal_record - name: my_signal_record
options: options:
bag_path: ./bag bag_path: ./bag
max_bag_size_m: 2048
max_bag_num: 10
mode: signal # imd/signal mode: signal # imd/signal
max_preparation_duration_s: 10 # Effective only in signal mode max_preparation_duration_s: 10 # Effective only in signal mode
executor: record_thread # require thread safe! executor: record_thread # require thread safe!
storage_policy:
max_bag_size_m: 2048
max_bag_num: 10
msg_write_interval: 1000 # message count period
msg_write_interval_time: 1000 # ms
journal_mode: WAL
synchronous_mode: full
topic_meta_list: topic_meta_list:
- topic_name: test_topic - topic_name: test_topic
msg_type: pb:aimrt.protocols.example.ExampleEventMsg msg_type: pb:aimrt.protocols.example.ExampleEventMsg
@ -35,6 +41,10 @@ aimrt:
- type: console - type: console
executor: executor:
executors: executors:
- name: storage_executor
type: asio_thread
options:
thread_num: 2
- name: work_thread_pool - name: work_thread_pool
type: asio_thread type: asio_thread
options: options:

View File

@ -5,6 +5,8 @@
#include <fstream> #include <fstream>
#include <future> #include <future>
#include <string>
#include <unordered_set>
#include "record_playback_plugin/global.h" #include "record_playback_plugin/global.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -19,8 +21,15 @@ struct convert<aimrt::plugins::record_playback_plugin::RecordAction::Options> {
Node node; Node node;
node["bag_path"] = rhs.bag_path; node["bag_path"] = rhs.bag_path;
node["max_bag_size_m"] = rhs.max_bag_size_m;
node["max_bag_num"] = rhs.max_bag_num; Node storage_policy;
storage_policy["max_bag_size_m"] = rhs.storage_policy.max_bag_size_m;
storage_policy["max_bag_num"] = rhs.storage_policy.max_bag_num;
storage_policy["msg_write_interval"] = rhs.storage_policy.msg_write_interval;
storage_policy["msg_write_interval_time"] = rhs.storage_policy.msg_write_interval_time;
storage_policy["synchronous_mode"] = rhs.storage_policy.synchronous_mode;
storage_policy["journal_mode"] = rhs.storage_policy.journal_mode;
node["storage_policy"] = storage_policy;
if (rhs.mode == Options::Mode::kImd) { if (rhs.mode == Options::Mode::kImd) {
node["mode"] = "imd"; node["mode"] = "imd";
@ -48,12 +57,6 @@ struct convert<aimrt::plugins::record_playback_plugin::RecordAction::Options> {
rhs.bag_path = node["bag_path"].as<std::string>(); rhs.bag_path = node["bag_path"].as<std::string>();
if (node["max_bag_size_m"])
rhs.max_bag_size_m = node["max_bag_size_m"].as<uint32_t>();
if (node["max_bag_num"])
rhs.max_bag_num = node["max_bag_num"].as<uint32_t>();
auto mode = aimrt::common::util::StrToLower(node["mode"].as<std::string>()); auto mode = aimrt::common::util::StrToLower(node["mode"].as<std::string>());
if (mode == "imd") { if (mode == "imd") {
rhs.mode = Options::Mode::kImd; rhs.mode = Options::Mode::kImd;
@ -63,6 +66,42 @@ struct convert<aimrt::plugins::record_playback_plugin::RecordAction::Options> {
throw aimrt::common::util::AimRTException("Invalid record mode: " + mode); throw aimrt::common::util::AimRTException("Invalid record mode: " + mode);
} }
if (node["storage_policy"]) {
Node storage_policy = node["storage_policy"];
if (storage_policy["max_bag_size_m"])
rhs.storage_policy.max_bag_size_m = storage_policy["max_bag_size_m"].as<uint32_t>();
if (storage_policy["max_bag_num"])
rhs.storage_policy.max_bag_num = storage_policy["max_bag_num"].as<uint32_t>();
if (storage_policy["msg_write_interval"])
rhs.storage_policy.msg_write_interval = storage_policy["msg_write_interval"].as<uint32_t>();
if (storage_policy["msg_write_interval_time"])
rhs.storage_policy.msg_write_interval_time = storage_policy["msg_write_interval_time"].as<uint32_t>();
if (storage_policy["journal_mode"]) {
auto journal_mode = aimrt::common::util::StrToLower(storage_policy["journal_mode"].as<std::string>());
static const std::unordered_set<std::string> valid_journal_mode_set = {"delete", "truncate", "persist", "memory", "wal", "off"};
if (!valid_journal_mode_set.contains(journal_mode)) {
throw aimrt::common::util::AimRTException("Invalid journal mode: " + journal_mode);
}
rhs.storage_policy.journal_mode = journal_mode;
}
if (storage_policy["synchronous_mode"]) {
auto synchronous_mode = aimrt::common::util::StrToLower(storage_policy["synchronous_mode"].as<std::string>());
static const std::unordered_set<std::string> valid_synchronous_mode_set = {"off", "normal", "full", "extra"};
if (!valid_synchronous_mode_set.contains(synchronous_mode)) {
throw aimrt::common::util::AimRTException("Invalid synchronous mode: " + synchronous_mode);
}
rhs.storage_policy.synchronous_mode = synchronous_mode;
}
}
if (node["max_preparation_duration_s"]) if (node["max_preparation_duration_s"])
rhs.max_preparation_duration_s = node["max_preparation_duration_s"].as<uint64_t>(); rhs.max_preparation_duration_s = node["max_preparation_duration_s"].as<uint64_t>();
@ -165,7 +204,7 @@ void RecordAction::Initialize(YAML::Node options) {
} }
// misc // misc
max_bag_size_ = options_.max_bag_size_m * 1024 * 1024; max_bag_size_ = options_.storage_policy.max_bag_size_m * 1024 * 1024;
max_preparation_duration_ns_ = options_.max_preparation_duration_s * 1000000000; max_preparation_duration_ns_ = options_.max_preparation_duration_s * 1000000000;
sqlite3_config(SQLITE_CONFIG_SINGLETHREAD); sqlite3_config(SQLITE_CONFIG_SINGLETHREAD);
@ -177,6 +216,7 @@ void RecordAction::Start() {
AIMRT_CHECK_ERROR_THROW( AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit, std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'."); "Method can only be called when state is 'Init'.");
sync_timer_->Reset();
} }
void RecordAction::Shutdown() { void RecordAction::Shutdown() {
@ -188,10 +228,14 @@ void RecordAction::Shutdown() {
CloseDb(); CloseDb();
stop_promise.set_value(); stop_promise.set_value();
}); });
sync_timer_->Cancel();
sync_timer_->SyncWait();
stop_promise.get_future().wait(); stop_promise.get_future().wait();
} }
void RecordAction::InitExecutor() { void RecordAction::InitExecutor(aimrt::executor::ExecutorRef timer_executor) {
AIMRT_CHECK_ERROR_THROW( AIMRT_CHECK_ERROR_THROW(
state_.load() == State::kInit, state_.load() == State::kInit,
"Method can only be called when state is 'Init'."); "Method can only be called when state is 'Init'.");
@ -205,6 +249,20 @@ void RecordAction::InitExecutor() {
executor_, "Can not get executor {}.", options_.executor); executor_, "Can not get executor {}.", options_.executor);
AIMRT_CHECK_ERROR_THROW( AIMRT_CHECK_ERROR_THROW(
executor_.ThreadSafe(), "Record executor {} is not thread safe!", options_.executor); executor_.ThreadSafe(), "Record executor {} is not thread safe!", options_.executor);
auto timer_task = [this]() {
executor_.Execute([this]() {
if (db_ == nullptr)
return;
cur_exec_count_ = 1; // avoid BEGIN again
sqlite3_exec(db_, "COMMIT", 0, 0, 0);
buf_array_view_cache_.clear();
buf_cache_.clear();
sqlite3_exec(db_, "BEGIN", 0, 0, 0);
});
};
sync_timer_ = executor::CreateTimer(timer_executor, std::chrono::milliseconds(options_.storage_policy.msg_write_interval_time), std::move(timer_task), false);
} }
void RecordAction::RegisterGetExecutorFunc( void RecordAction::RegisterGetExecutorFunc(
@ -411,7 +469,7 @@ void RecordAction::AddRecordImpl(OneRecord&& record) {
cur_data_size_ += 24; // id + topic_id + timestamp cur_data_size_ += 24; // id + topic_id + timestamp
++cur_exec_count_; ++cur_exec_count_;
if (cur_exec_count_ >= 1000) [[unlikely]] { if (cur_exec_count_ >= options_.storage_policy.msg_write_interval) [[unlikely]] {
cur_exec_count_ = 0; cur_exec_count_ = 0;
sqlite3_exec(db_, "COMMIT", 0, 0, 0); sqlite3_exec(db_, "COMMIT", 0, 0, 0);
buf_array_view_cache_.clear(); buf_array_view_cache_.clear();
@ -436,7 +494,11 @@ void RecordAction::OpenNewDb(uint64_t start_timestamp) {
AIMRT_TRACE("Open new db, path: {}", cur_db_file_path_); AIMRT_TRACE("Open new db, path: {}", cur_db_file_path_);
// sqlite3_exec(db_, "PRAGMA synchronous = OFF; ", 0, 0, 0); std::string journal_mode_sql = "PRAGMA journal_mode = " + options_.storage_policy.journal_mode + ";";
sqlite3_exec(db_, journal_mode_sql.c_str(), 0, 0, 0);
std::string synchronous_mode_sql = "PRAGMA synchronous = " + options_.storage_policy.synchronous_mode + ";";
sqlite3_exec(db_, synchronous_mode_sql.c_str(), 0, 0, 0);
// create table // create table
std::string sql = R"str( std::string sql = R"str(
@ -467,7 +529,7 @@ data BLOB NOT NULL);
.start_timestamp = start_timestamp}); .start_timestamp = start_timestamp});
// check and del db file // check and del db file
if (options_.max_bag_num > 0 && metadata_.files.size() > options_.max_bag_num) { if (options_.storage_policy.max_bag_num > 0 && metadata_.files.size() > options_.storage_policy.max_bag_num) {
auto itr = metadata_.files.begin(); auto itr = metadata_.files.begin();
std::filesystem::remove(real_bag_path_ / itr->path); std::filesystem::remove(real_bag_path_ / itr->path);
metadata_.files.erase(itr); metadata_.files.erase(itr);

View File

@ -3,12 +3,15 @@
#pragma once #pragma once
#include <cstdint>
#include <deque> #include <deque>
#include <filesystem> #include <filesystem>
#include <memory> #include <memory>
#include <string>
#include <vector> #include <vector>
#include "aimrt_module_cpp_interface/executor/executor.h" #include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/executor/timer.h"
#include "aimrt_module_cpp_interface/util/buffer.h" #include "aimrt_module_cpp_interface/util/buffer.h"
#include "aimrt_module_cpp_interface/util/type_support.h" #include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/util/topic_meta_key.h" #include "core/util/topic_meta_key.h"
@ -24,15 +27,23 @@ class RecordAction {
public: public:
struct Options { struct Options {
std::string bag_path; std::string bag_path;
uint32_t max_bag_size_m = 2048;
uint32_t max_bag_num = 0;
enum class Mode { enum class Mode {
kImd, kImd,
kSignal, kSignal,
}; };
Mode mode = Mode::kImd; Mode mode = Mode::kImd;
struct StoragePolicy {
uint32_t max_bag_size_m = 2048;
uint32_t max_bag_num = 0;
uint32_t msg_write_interval = 1000;
uint32_t msg_write_interval_time = 1000;
std::string synchronous_mode = "full";
std::string journal_mode = "memory";
};
StoragePolicy storage_policy;
uint64_t max_preparation_duration_s = 0; uint64_t max_preparation_duration_s = 0;
std::string executor; std::string executor;
@ -61,7 +72,7 @@ class RecordAction {
void Start(); void Start();
void Shutdown(); void Shutdown();
void InitExecutor(); void InitExecutor(aimrt::executor::ExecutorRef);
const Options& GetOptions() const { return options_; } const Options& GetOptions() const { return options_; }
@ -103,6 +114,8 @@ class RecordAction {
aimrt::runtime::core::util::TopicMetaKey::Hash> aimrt::runtime::core::util::TopicMetaKey::Hash>
topic_meta_map_; topic_meta_map_;
std::shared_ptr<aimrt::executor::TimerBase> sync_timer_;
size_t max_bag_size_ = 0; size_t max_bag_size_ = 0;
size_t cur_data_size_ = 0; size_t cur_data_size_ = 0;
double estimated_overhead_ = 1.5; double estimated_overhead_ = 1.5;

View File

@ -2,6 +2,9 @@
// All rights reserved. // All rights reserved.
#include "record_playback_plugin/record_playback_plugin.h" #include "record_playback_plugin/record_playback_plugin.h"
#include <chrono>
#include <cstdio>
#include <utility>
#include "aimrt_module_cpp_interface/rpc/rpc_handle.h" #include "aimrt_module_cpp_interface/rpc/rpc_handle.h"
#include "core/aimrt_core.h" #include "core/aimrt_core.h"
@ -20,6 +23,7 @@ struct convert<aimrt::plugins::record_playback_plugin::RecordPlaybackPlugin::Opt
Node node; Node node;
node["service_name"] = rhs.service_name; node["service_name"] = rhs.service_name;
node["timer_executor"] = rhs.timer_executor;
node["type_support_pkgs"] = YAML::Node(); node["type_support_pkgs"] = YAML::Node();
for (const auto& type_support_pkg : rhs.type_support_pkgs) { for (const auto& type_support_pkg : rhs.type_support_pkgs) {
@ -62,6 +66,9 @@ struct convert<aimrt::plugins::record_playback_plugin::RecordPlaybackPlugin::Opt
} }
} }
if (node["timer_executor"])
rhs.timer_executor = node["timer_executor"].as<std::string>();
if (node["record_actions"] && node["record_actions"].IsSequence()) { if (node["record_actions"] && node["record_actions"].IsSequence()) {
for (const auto& record_action_node : node["record_actions"]) { for (const auto& record_action_node : node["record_actions"]) {
auto record_action = Options::RecordActionOptions{ auto record_action = Options::RecordActionOptions{
@ -194,16 +201,31 @@ bool RecordPlaybackPlugin::Initialize(runtime::core::AimRTCore* core_ptr) noexce
RegisterPlaybackChannel(); RegisterPlaybackChannel();
}); });
core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPostInitExecutor,
[this] {
if (record_action_map_.size() != 0) {
timer_executor_ref_ = core_ptr_->GetExecutorManager().GetExecutor(options_.timer_executor);
AIMRT_CHECK_ERROR_THROW(timer_executor_ref_,
"Can not get executor {}!", options_.timer_executor);
AIMRT_CHECK_ERROR_THROW(timer_executor_ref_.SupportTimerSchedule(),
"Executor {} didn't support TimerSchedule!", options_.timer_executor);
}
for (auto& itr : record_action_map_) {
itr.second->InitExecutor(timer_executor_ref_);
}
for (auto& itr : playback_action_map_) {
itr.second->InitExecutor();
}
});
core_ptr_->RegisterHookFunc( core_ptr_->RegisterHookFunc(
runtime::core::AimRTCore::State::kPostStart, runtime::core::AimRTCore::State::kPostStart,
[this] { [this] {
for (auto& itr : record_action_map_) { for (auto& itr : record_action_map_) {
itr.second->InitExecutor();
itr.second->Start(); itr.second->Start();
} }
for (auto& itr : playback_action_map_) { for (auto& itr : playback_action_map_) {
itr.second->InitExecutor();
itr.second->Start(); itr.second->Start();
} }
}); });
@ -324,7 +346,6 @@ void RecordPlaybackPlugin::RegisterRecordChannel() {
msg_wrapper.info.msg_type, serialization_type); msg_wrapper.info.msg_type, serialization_type);
return; return;
} }
record_action.AddRecord( record_action.AddRecord(
RecordAction::OneRecord{ RecordAction::OneRecord{
.timestamp = cur_timestamp, .timestamp = cur_timestamp,

View File

@ -3,7 +3,10 @@
#pragma once #pragma once
#include <memory>
#include "aimrt_core_plugin_interface/aimrt_core_plugin_base.h" #include "aimrt_core_plugin_interface/aimrt_core_plugin_base.h"
#include "aimrt_module_cpp_interface/executor/executor.h"
#include "aimrt_module_cpp_interface/executor/timer.h"
#include "aimrt_module_cpp_interface/util/type_support.h" #include "aimrt_module_cpp_interface/util/type_support.h"
#include "core/util/type_support_pkg_loader.h" #include "core/util/type_support_pkg_loader.h"
#include "record_playback_plugin/playback_action.h" #include "record_playback_plugin/playback_action.h"
@ -19,7 +22,9 @@ class RecordPlaybackPlugin : public AimRTCorePluginBase {
struct TypeSupportPkg { struct TypeSupportPkg {
std::string path; std::string path;
}; };
std::vector<TypeSupportPkg> type_support_pkgs; std::vector<TypeSupportPkg> type_support_pkgs;
std::string timer_executor;
struct RecordActionOptions { struct RecordActionOptions {
std::string name; std::string name;
@ -62,6 +67,8 @@ class RecordPlaybackPlugin : public AimRTCorePluginBase {
std::vector<std::unique_ptr<runtime::core::util::TypeSupportPkgLoader>> type_support_pkg_loader_vec_; std::vector<std::unique_ptr<runtime::core::util::TypeSupportPkgLoader>> type_support_pkg_loader_vec_;
aimrt::executor::ExecutorRef timer_executor_ref_;
struct TypeSupportWrapper { struct TypeSupportWrapper {
const Options::TypeSupportPkg& options; const Options::TypeSupportPkg& options;
aimrt::util::TypeSupportRef type_support_ref; aimrt::util::TypeSupportRef type_support_ref;