重构插件心跳管理,添加插件超时检测功能,优化 IPC 逻辑

This commit is contained in:
daiqingshuang
2025-09-02 17:29:58 +08:00
parent 3b236f7a39
commit 38095e67de
7 changed files with 37 additions and 51 deletions

View File

@@ -2,7 +2,6 @@
#include "ipc/shm_mgr.h"
#include "plugin_manage/plugin_host_manager.h"
#include <filesystem>
#include "misc_type.h"
#include "ipc/ipc_node.h"
@@ -10,21 +9,10 @@
boost::atomic<heartbeat_t>* engine_heartbeat = nullptr;
void on_engine_exit() {
if (engine_heartbeat) {
*engine_heartbeat = 0;
}
}
int main(int argc, char *argv[])
{
std::atexit(on_engine_exit);
std::at_quick_exit(on_engine_exit);
shm_mgr::get_instance().init(true);
engine_heartbeat = shm_mgr::msg_shm().construct_with_name<boost::atomic<heartbeat_t>>("engine_heartbeat", true);
// 初始化心跳
*engine_heartbeat = std::chrono::high_resolution_clock::now().time_since_epoch().count();
engine_heartbeat = shm_mgr::msg_shm().construct_with_name<boost::atomic<heartbeat_t>>("engine_heartbeat", std::chrono::high_resolution_clock::now().time_since_epoch().count());
// 创建一个心跳线程
std::thread([]() {
while (engine_heartbeat && *engine_heartbeat != 0) {
@@ -34,20 +22,12 @@ int main(int argc, char *argv[])
}).detach();
engine_ipc_node node;
auto& host_manager = plugin_host_manager::get_instance();
for (int i = 0; i < 20; ++i) {
std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)");
plugin_host_manager::get_instance().load_plugin(path);
}
uint32_t times = 0;
while (true) {
// 模拟主循环
std::this_thread::sleep_for(std::chrono::milliseconds(100));
node.process_rpc();
if (++times > 10) {
break; // 运行10次后退出
}
host_manager.tick();
std::this_thread::sleep_for(std::chrono::milliseconds::zero());
}
*engine_heartbeat = 0;

View File

@@ -45,6 +45,10 @@ void plugin_host_manager::unload_plugin(uint32_t plugin_id) {
}
}
void plugin_host_manager::tick() {
}
plugin_instance* plugin_host_manager::get_plugin_instance(uint32_t plugin_id) {
std::lock_guard lock(mutex_);
const auto& it = plugin_instances_.find(plugin_id);

View File

@@ -11,6 +11,7 @@ public:
plugin_instance* load_plugin(const std::filesystem::path& plugin_binary_path);
void unload_plugin(uint32_t plugin_id);
void tick();
plugin_instance* get_plugin_instance(uint32_t plugin_id);

View File

@@ -61,12 +61,4 @@ plugin_instance::plugin_instance(boost::asio::io_context& ctx,
plugin_node_.open_queue(get_plugin_id_name(id));
}
plugin_instance::~plugin_instance() {
// output_queue->push({});
// if (input_segment) {
// bi::shared_memory_object::remove(get_shm_in_name(id).c_str());
// }
// if (output_segment) {
// bi::shared_memory_object::remove(get_shm_out_name(id).c_str());
// }
}
plugin_instance::~plugin_instance() {}

View File

@@ -11,15 +11,11 @@ struct plugin_instance {
~plugin_instance();
uint32_t id = 0;
// 共享内存资源
// std::unique_ptr<boost::interprocess::managed_shared_memory> input_segment;
// std::unique_ptr<boost::interprocess::managed_shared_memory> output_segment;
// lock_free_queue* input_queue = nullptr; // AudioEngine -> Host
// lock_free_queue* output_queue = nullptr; // Host -> AudioEngine
// 控制连接
// std::shared_ptr<rpc_session> session;
// 状态标识
std::atomic_bool is_processing_done{ true }; // 插件是否处理完毕
std::atomic_bool is_registered{ false }; // RPC会话是否注册
@@ -35,7 +31,8 @@ struct plugin_instance {
}
process = nullptr;
}
private:
plugin_ipc_remote_node plugin_node_;
private:
std::shared_ptr<boost::process::process> process; // 沙箱进程句柄
};

View File

@@ -168,10 +168,12 @@ public:
explicit plugin_ipc_node(const std::string& in_queue_name) {
engine_queue.open_queue("engine_queue");
plugin_queue.create_queue(in_queue_name);
engine_running_ = shm_mgr::msg_shm().find<boost::atomic<heartbeat_t>>("engine_heartbeat").first;
if (!engine_running_) {
engine_heartbeat_ = shm_mgr::msg_shm().find<boost::atomic<heartbeat_t>>("engine_heartbeat").first;
if (!engine_heartbeat_) {
throw std::runtime_error("无法找到 engine_running 标志");
}
plugin_heartbeat_ = shm_mgr::msg_shm().construct_with_name<boost::atomic<heartbeat_t>>(in_queue_name + "_heartbeat");
*plugin_heartbeat_ = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
// ------------------- RPC 调用 -------------------
@@ -183,11 +185,11 @@ public:
[[nodiscard]] auto is_engine_running() const {
// 读取引擎心跳
const auto& val = engine_running_->load();
const auto& val = engine_heartbeat_->load();
if (val == 0) {
return false; // 引擎已退出
}
// 心跳阈值 (5 秒)
// 心跳阈值 (2 秒)
static constexpr auto threshold = std::chrono::seconds(2);
// 心跳间隔超过阈值则视为引擎已崩溃
const auto duration = std::chrono::high_resolution_clock::now().time_since_epoch() - heartbeat_duration_t(val);
@@ -196,7 +198,8 @@ public:
private:
ipc_node engine_queue; // 指向引擎主队列(只写)
ipc_node plugin_queue; // 插件自队列(只读)
boost::atomic<heartbeat_t>* engine_running_;
boost::atomic<heartbeat_t>* engine_heartbeat_;
boost::atomic<heartbeat_t>* plugin_heartbeat_;
};
/**
@@ -210,7 +213,24 @@ public:
void open_queue(const std::string& in_queue_name) {
plugin_queue.open_queue(in_queue_name);
plugin_heartbeat_ = shm_mgr::msg_shm().find<boost::atomic<heartbeat_t>>(in_queue_name + "_heartbeat").first;
}
[[nodiscard]] auto is_timeout(int32_t in_seconds) const {
if (!plugin_heartbeat_) {
return true;
}
const auto& val = plugin_heartbeat_->load();
if (val == 0) {
return true; // 插件已退出
}
// 心跳阈值 (5 秒)
static constexpr auto threshold = std::chrono::seconds(in_seconds);
// 心跳间隔超过阈值则视为插件已超时
const auto duration = std::chrono::high_resolution_clock::now().time_since_epoch() - heartbeat_duration_t(val);
return duration >= threshold;
}
private:
ipc_node plugin_queue{};
boost::atomic<heartbeat_t>* plugin_heartbeat_ = nullptr;
};

View File

@@ -19,14 +19,6 @@ int main(int argc, char *argv[])
shm_mgr::get_instance().init();
plugin_ipc_node node(argv[1]);
{
for (int i = 0; i < argc; ++i) {
const auto& proxy = node.call_rpc<engine_rpc::log>();
proxy->set_str(argv[i]);
proxy->set_level(spdlog::level::info);
proxy.submit();
}
}
while (true) {
if (!node.is_engine_running())