添加插件远程调用节点和超时支持,增强 IPC 队列管理功能

This commit is contained in:
daiqingshuang
2025-08-29 17:18:26 +08:00
parent a03ba378bc
commit ec29f5aaf6
9 changed files with 155 additions and 55 deletions

View File

@@ -7,23 +7,35 @@
#include "ipc/ipc_node.h"
#include "rpc/engine_rpc.h"
boost::atomic_bool* engine_running = nullptr;
void on_engine_exit() {
if (engine_running) {
*engine_running = false;
}
}
int main(int argc, char *argv[])
{
std::atexit(on_engine_exit);
std::at_quick_exit(on_engine_exit);
shm_mgr::get_instance().init(true);
engine_running = shm_mgr::msg_shm().construct_with_name<boost::atomic_bool>("engine_running", true);
engine_ipc_node node;
// std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)");
// plugin_host_manager::get_instance().load_plugin(path);
std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)");
plugin_host_manager::get_instance().load_plugin(path);
// uint32_t times = 0;
uint32_t times = 0;
while (true) {
// 模拟主循环
std::this_thread::sleep_for(std::chrono::milliseconds(100));
node.process_rpc();
// if (++times > 10) {
// break; // 运行10次后退出
// }
if (++times > 10) {
break; // 运行10次后退出
}
}
return 0;

View File

@@ -31,18 +31,18 @@ plugin_instance* plugin_host_manager::load_plugin(const std::filesystem::path& p
}
catch (const std::exception& e) {
// TODO: 如果创建失败,清理已分配的资源
spdlog::error("Failed to load plugin {}: {}", plugin_binary_path.string(), e.what());
spdlog::error("无法加载插件 {}{}", plugin_binary_path.string(), e.what());
}
return ptr;
}
void plugin_host_manager::unload_plugin(uint32_t plugin_id) {
}
void plugin_host_manager::register_session(uint32_t plugin_id, std::shared_ptr<rpc_session> session) {
if (auto instance = get_plugin_instance(plugin_id)) {
instance->kill_process();
std::lock_guard lock(mutex_);
plugin_instances_.erase(plugin_id);
}
}
plugin_instance* plugin_host_manager::get_plugin_instance(uint32_t plugin_id) {

View File

@@ -12,8 +12,6 @@ public:
plugin_instance* load_plugin(const std::filesystem::path& plugin_binary_path);
void unload_plugin(uint32_t plugin_id);
void register_session(uint32_t plugin_id, std::shared_ptr<rpc_session> session);
plugin_instance* get_plugin_instance(uint32_t plugin_id);
template<typename Func>

View File

@@ -5,6 +5,10 @@
namespace bi = boost::interprocess;
namespace bp = boost::process;
[[nodiscard]] static auto get_plugin_id_name(uint32_t in_id) {
return "plugin_" + std::to_string(in_id);
}
auto execute_plugin_process(uint32_t id, boost::asio::io_context& ctx, const std::filesystem::path& plugin_path) {
if (!std::filesystem::exists(plugin_path)) {
throw std::runtime_error("Plugin path does not exist: " + plugin_path.string());
@@ -31,7 +35,7 @@ auto execute_plugin_process(uint32_t id, boost::asio::io_context& ctx, const std
throw std::runtime_error("Host executable not found: " + host_path.string());
}
return new bp::process(ctx, host_path.string(), { get_shm_in_name(id), get_shm_out_name(id) });
return new bp::process(ctx, host_path.string(), { get_plugin_id_name(id) });
}
plugin_instance::plugin_instance(boost::asio::io_context& ctx,
@@ -41,24 +45,28 @@ plugin_instance::plugin_instance(boost::asio::io_context& ctx,
id = in_id;
// 创建共享内存段
input_segment = std::make_unique<bi::managed_shared_memory>(bi::open_or_create, get_shm_in_name(id).c_str(), SHM_SIZE);
output_segment = std::make_unique<bi::managed_shared_memory>(bi::open_or_create, get_shm_out_name(id).c_str(), SHM_SIZE);
// input_segment = std::make_unique<bi::managed_shared_memory>(bi::open_or_create, get_shm_in_name(id).c_str(), SHM_SIZE);
// output_segment = std::make_unique<bi::managed_shared_memory>(bi::open_or_create, get_shm_out_name(id).c_str(), SHM_SIZE);
// 获取锁无阻塞队列
input_queue = input_segment->find_or_construct<lock_free_queue>("input_queue")();
output_queue = output_segment->find_or_construct<lock_free_queue>("output_queue")();
// input_queue = input_segment->find_or_construct<lock_free_queue>("input_queue")();
// output_queue = output_segment->find_or_construct<lock_free_queue>("output_queue")();
process = std::shared_ptr<boost::process::process>(execute_plugin_process(id, ctx, in_plugin_path), [](boost::process::process* p) {
delete p;
});
if (!is_process_running()) {
throw std::runtime_error("插件进程启动失败, 退出码: " + std::to_string(process->exit_code()));
}
plugin_node_.open_queue(get_plugin_id_name(id));
}
plugin_instance::~plugin_instance() {
output_queue->push({});
if (input_segment) {
bi::shared_memory_object::remove(get_shm_in_name(id).c_str());
}
if (output_segment) {
bi::shared_memory_object::remove(get_shm_out_name(id).c_str());
}
// output_queue->push({});
// if (input_segment) {
// bi::shared_memory_object::remove(get_shm_in_name(id).c_str());
// }
// if (output_segment) {
// bi::shared_memory_object::remove(get_shm_out_name(id).c_str());
// }
}

View File

@@ -3,21 +3,19 @@
#include <boost/process.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include "ipc/ipc_node.h"
#include "rpc/common.h"
class rpc_session;
struct plugin_instance {
plugin_instance(boost::asio::io_context& ctx, uint32_t in_id, const std::filesystem::path& in_plugin_path);
~plugin_instance();
uint32_t id = 0;
std::shared_ptr<boost::process::process> process; // 沙箱进程句柄
// 共享内存资源
std::unique_ptr<boost::interprocess::managed_shared_memory> input_segment;
std::unique_ptr<boost::interprocess::managed_shared_memory> output_segment;
lock_free_queue* input_queue = nullptr; // AudioEngine -> Host
lock_free_queue* output_queue = nullptr; // Host -> AudioEngine
// std::unique_ptr<boost::interprocess::managed_shared_memory> input_segment;
// std::unique_ptr<boost::interprocess::managed_shared_memory> output_segment;
// lock_free_queue* input_queue = nullptr; // AudioEngine -> Host
// lock_free_queue* output_queue = nullptr; // Host -> AudioEngine
// 控制连接
// std::shared_ptr<rpc_session> session;
@@ -29,4 +27,14 @@ struct plugin_instance {
auto is_process_running() const {
return process && process->running();
}
void kill_process() {
if (is_process_running()) {
process->terminate();
process->wait();
}
process = nullptr;
}
private:
plugin_ipc_remote_node plugin_node_;
std::shared_ptr<boost::process::process> process; // 沙箱进程句柄
};

View File

@@ -45,6 +45,7 @@ class ipc_node {
public:
friend class engine_ipc_node; // 引擎端友元,可直接访问 rpc_queue
friend class plugin_ipc_node; // 插件端友元
friend class plugin_ipc_remote_node; // 插件端远程调用节点友元
/**
* @brief 在共享内存中创建一条新的 SPSC 队列
@@ -61,7 +62,7 @@ public:
* @throws std::runtime_error 打开失败时抛出
*/
void open_queue(const std::string& in_name) {
rpc_queue.open_queue(in_name);
rpc_queue.open_queue(in_name, std::chrono::seconds(10));
}
template<typename T>
@@ -166,6 +167,10 @@ public:
explicit plugin_ipc_node(const std::string& in_queue_name) {
engine_queue.open_queue("engine_queue");
plugin_queue.create_queue(in_queue_name);
engine_running_ = shm_mgr::msg_shm().find<boost::atomic_bool>("engine_running").first;
if (!engine_running_) {
throw std::runtime_error("无法找到 engine_running 标志");
}
}
// ------------------- RPC 调用 -------------------
@@ -175,7 +180,27 @@ public:
// ------------------- RPC 处理 -------------------
void process_rpc() { plugin_queue.process_rpc(); }
auto is_engine_running() const {
return engine_running_ && engine_running_->load();
}
private:
ipc_node engine_queue; // 指向引擎主队列(只写)
ipc_node plugin_queue; // 插件自队列(只读)
ipc_node engine_queue; // 指向引擎主队列(只写)
ipc_node plugin_queue; // 插件自队列(只读)
boost::atomic_bool* engine_running_;
};
/**
* @class plugin_ipc_remote_node
* @brief 在Audio Engine中的代理节点, 用于发送远程RPC调用到插件进程
*/
class plugin_ipc_remote_node {
public:
template<typename Msg>
auto call_rpc() { return plugin_queue.call_rpc<Msg>(); }
void open_queue(const std::string& in_queue_name) {
plugin_queue.open_queue(in_queue_name);
}
private:
ipc_node plugin_queue{};
};

View File

@@ -48,19 +48,57 @@ public:
}
is_owner_ = true;
}
void open_queue(const std::string& in_name) {
void open_queue(const std::string& in_name, const std::optional<std::chrono::milliseconds> timeout = std::nullopt) {
name_ = in_name;
const auto& block = shm_mgr::msg_shm();
const auto& queue_name = get_queue_name();
const auto& mutex_name = get_mutex_name();
auto [deque_ptr, size] = block.find<ipc_msg_deque_t>(queue_name.c_str());
const auto& start_time = std::chrono::steady_clock::now();
// 查找队列,支持超时重试
ipc_msg_deque_t* deque_ptr = nullptr;
do {
auto [found_deque_ptr, size] = block.find<ipc_msg_deque_t>(queue_name.c_str());
deque_ptr = found_deque_ptr;
if (deque_ptr) break;
if (timeout.has_value()) {
const auto& elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed >= timeout.value()) {
throw std::runtime_error("打开IPC队列超时");
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
break;
}
} while (timeout.has_value());
if (!deque_ptr) {
throw std::runtime_error("无法打开IPC队列");
}
deque_ = deque_ptr;
auto [mutex_ptr, msize] = block.find<boost::interprocess::interprocess_mutex>(mutex_name.c_str());
// 查找互斥锁,支持超时重试
boost::interprocess::interprocess_mutex* mutex_ptr = nullptr;
do {
auto [found_mutex_ptr, msize] = block.find<boost::interprocess::interprocess_mutex>(mutex_name.c_str());
mutex_ptr = found_mutex_ptr;
if (mutex_ptr) break;
if (timeout.has_value()) {
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed >= timeout.value()) {
throw std::runtime_error("打开IPC队列锁超时");
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
break;
}
} while (timeout.has_value());
if (!mutex_ptr) {
throw std::runtime_error("无法打开IPC队列锁");
}
@@ -73,7 +111,7 @@ public:
}
bool push(const T& msg) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*mutex_);
boost::interprocess::scoped_lock lock(*mutex_);
try {
deque_->push_back(msg);
return true;
@@ -83,7 +121,7 @@ public:
}
bool pop(T& msg) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*mutex_);
boost::interprocess::scoped_lock lock(*mutex_);
if (deque_->empty()) {
return false;
}
@@ -92,20 +130,20 @@ public:
return true;
}
bool empty() const {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*mutex_);
[[nodiscard]] auto empty() const {
boost::interprocess::scoped_lock lock(*mutex_);
return deque_->empty();
}
size_t size() const {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*mutex_);
[[nodiscard]] auto size() const {
boost::interprocess::scoped_lock lock(*mutex_);
return deque_->size();
}
auto get_queue_name() const {
[[nodiscard]] auto get_queue_name() const {
return name_ + "_queue";
}
auto get_mutex_name() const {
[[nodiscard]] auto get_mutex_name() const {
return name_ + "_mutex";
}
};

View File

@@ -164,7 +164,7 @@ private:
[[nodiscard]] auto create_shm_block(const std::string& in_name, std::size_t in_size) const {
if (is_engine) {
bi::shared_memory_object::remove(in_name.c_str()); // 先清理旧段
return std::make_shared<bi::managed_shared_memory>(bi::open_or_create, in_name.c_str(), in_size);
return std::make_shared<bi::managed_shared_memory>(bi::create_only, in_name.c_str(), in_size);
}
return std::make_shared<bi::managed_shared_memory>(bi::open_only, in_name.c_str());
}

View File

@@ -13,16 +13,27 @@ namespace bi = boost::interprocess;
int main(int argc, char *argv[])
{
// if (argc != 3)
// return 1;
if (argc != 2)
return 1;
shm_mgr::get_instance().init();
plugin_ipc_node node("plugin1");
plugin_ipc_node node(argv[1]);
{
const auto& proxy = node.call_rpc<engine_rpc::log>();
proxy->set_str("Hello from Plugin Sandbox!");
proxy->set_level(spdlog::level::info);
proxy.submit();
for (int i = 0; i < argc; ++i) {
const auto& proxy = node.call_rpc<engine_rpc::log>();
proxy->set_str(argv[i]);
proxy->set_level(spdlog::level::info);
proxy.submit();
}
}
while (true) {
if (!node.is_engine_running())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
node.process_rpc();
}
#if 0