diff --git a/src/backend/src/engine/src/main.cpp b/src/backend/src/engine/src/main.cpp index cb2dda2..a25175c 100644 --- a/src/backend/src/engine/src/main.cpp +++ b/src/backend/src/engine/src/main.cpp @@ -7,23 +7,35 @@ #include "ipc/ipc_node.h" #include "rpc/engine_rpc.h" +boost::atomic_bool* engine_running = nullptr; + +void on_engine_exit() { + if (engine_running) { + *engine_running = false; + } +} int main(int argc, char *argv[]) { + std::atexit(on_engine_exit); + std::at_quick_exit(on_engine_exit); + shm_mgr::get_instance().init(true); + engine_running = shm_mgr::msg_shm().construct_with_name("engine_running", true); + engine_ipc_node node; - // std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)"); - // plugin_host_manager::get_instance().load_plugin(path); + std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)"); + plugin_host_manager::get_instance().load_plugin(path); - // uint32_t times = 0; + uint32_t times = 0; while (true) { // 模拟主循环 std::this_thread::sleep_for(std::chrono::milliseconds(100)); node.process_rpc(); - // if (++times > 10) { - // break; // 运行10次后退出 - // } + if (++times > 10) { + break; // 运行10次后退出 + } } return 0; diff --git a/src/backend/src/engine/src/plugin_manage/plugin_host_manager.cpp b/src/backend/src/engine/src/plugin_manage/plugin_host_manager.cpp index 5338faf..6f3f13d 100644 --- a/src/backend/src/engine/src/plugin_manage/plugin_host_manager.cpp +++ b/src/backend/src/engine/src/plugin_manage/plugin_host_manager.cpp @@ -31,18 +31,18 @@ plugin_instance* plugin_host_manager::load_plugin(const std::filesystem::path& p } catch (const std::exception& e) { // TODO: 如果创建失败,清理已分配的资源 - spdlog::error("Failed to load plugin {}: {}", plugin_binary_path.string(), e.what()); + spdlog::error("无法加载插件 {}:{}", plugin_binary_path.string(), e.what()); } return ptr; } void plugin_host_manager::unload_plugin(uint32_t plugin_id) { - -} - -void plugin_host_manager::register_session(uint32_t plugin_id, std::shared_ptr session) { - + if (auto instance = get_plugin_instance(plugin_id)) { + instance->kill_process(); + std::lock_guard lock(mutex_); + plugin_instances_.erase(plugin_id); + } } plugin_instance* plugin_host_manager::get_plugin_instance(uint32_t plugin_id) { diff --git a/src/backend/src/engine/src/plugin_manage/plugin_host_manager.h b/src/backend/src/engine/src/plugin_manage/plugin_host_manager.h index c02e5a5..36cb9d8 100644 --- a/src/backend/src/engine/src/plugin_manage/plugin_host_manager.h +++ b/src/backend/src/engine/src/plugin_manage/plugin_host_manager.h @@ -12,8 +12,6 @@ public: plugin_instance* load_plugin(const std::filesystem::path& plugin_binary_path); void unload_plugin(uint32_t plugin_id); - void register_session(uint32_t plugin_id, std::shared_ptr session); - plugin_instance* get_plugin_instance(uint32_t plugin_id); template diff --git a/src/backend/src/engine/src/plugin_manage/plugin_instance.cpp b/src/backend/src/engine/src/plugin_manage/plugin_instance.cpp index 8df97b4..b33945d 100644 --- a/src/backend/src/engine/src/plugin_manage/plugin_instance.cpp +++ b/src/backend/src/engine/src/plugin_manage/plugin_instance.cpp @@ -5,6 +5,10 @@ namespace bi = boost::interprocess; namespace bp = boost::process; +[[nodiscard]] static auto get_plugin_id_name(uint32_t in_id) { + return "plugin_" + std::to_string(in_id); +} + auto execute_plugin_process(uint32_t id, boost::asio::io_context& ctx, const std::filesystem::path& plugin_path) { if (!std::filesystem::exists(plugin_path)) { throw std::runtime_error("Plugin path does not exist: " + plugin_path.string()); @@ -31,7 +35,7 @@ auto execute_plugin_process(uint32_t id, boost::asio::io_context& ctx, const std throw std::runtime_error("Host executable not found: " + host_path.string()); } - return new bp::process(ctx, host_path.string(), { get_shm_in_name(id), get_shm_out_name(id) }); + return new bp::process(ctx, host_path.string(), { get_plugin_id_name(id) }); } plugin_instance::plugin_instance(boost::asio::io_context& ctx, @@ -41,24 +45,28 @@ plugin_instance::plugin_instance(boost::asio::io_context& ctx, id = in_id; // 创建共享内存段 - input_segment = std::make_unique(bi::open_or_create, get_shm_in_name(id).c_str(), SHM_SIZE); - output_segment = std::make_unique(bi::open_or_create, get_shm_out_name(id).c_str(), SHM_SIZE); + // input_segment = std::make_unique(bi::open_or_create, get_shm_in_name(id).c_str(), SHM_SIZE); + // output_segment = std::make_unique(bi::open_or_create, get_shm_out_name(id).c_str(), SHM_SIZE); // 获取锁无阻塞队列 - input_queue = input_segment->find_or_construct("input_queue")(); - output_queue = output_segment->find_or_construct("output_queue")(); + // input_queue = input_segment->find_or_construct("input_queue")(); + // output_queue = output_segment->find_or_construct("output_queue")(); process = std::shared_ptr(execute_plugin_process(id, ctx, in_plugin_path), [](boost::process::process* p) { delete p; }); + if (!is_process_running()) { + throw std::runtime_error("插件进程启动失败, 退出码: " + std::to_string(process->exit_code())); + } + plugin_node_.open_queue(get_plugin_id_name(id)); } plugin_instance::~plugin_instance() { - output_queue->push({}); - if (input_segment) { - bi::shared_memory_object::remove(get_shm_in_name(id).c_str()); - } - if (output_segment) { - bi::shared_memory_object::remove(get_shm_out_name(id).c_str()); - } + // output_queue->push({}); + // if (input_segment) { + // bi::shared_memory_object::remove(get_shm_in_name(id).c_str()); + // } + // if (output_segment) { + // bi::shared_memory_object::remove(get_shm_out_name(id).c_str()); + // } } diff --git a/src/backend/src/engine/src/plugin_manage/plugin_instance.h b/src/backend/src/engine/src/plugin_manage/plugin_instance.h index 858fdba..c88265d 100644 --- a/src/backend/src/engine/src/plugin_manage/plugin_instance.h +++ b/src/backend/src/engine/src/plugin_manage/plugin_instance.h @@ -3,21 +3,19 @@ #include #include +#include "ipc/ipc_node.h" #include "rpc/common.h" -class rpc_session; - struct plugin_instance { plugin_instance(boost::asio::io_context& ctx, uint32_t in_id, const std::filesystem::path& in_plugin_path); ~plugin_instance(); uint32_t id = 0; - std::shared_ptr process; // 沙箱进程句柄 // 共享内存资源 - std::unique_ptr input_segment; - std::unique_ptr output_segment; - lock_free_queue* input_queue = nullptr; // AudioEngine -> Host - lock_free_queue* output_queue = nullptr; // Host -> AudioEngine + // std::unique_ptr input_segment; + // std::unique_ptr output_segment; + // lock_free_queue* input_queue = nullptr; // AudioEngine -> Host + // lock_free_queue* output_queue = nullptr; // Host -> AudioEngine // 控制连接 // std::shared_ptr session; @@ -29,4 +27,14 @@ struct plugin_instance { auto is_process_running() const { return process && process->running(); } + void kill_process() { + if (is_process_running()) { + process->terminate(); + process->wait(); + } + process = nullptr; + } +private: + plugin_ipc_remote_node plugin_node_; + std::shared_ptr process; // 沙箱进程句柄 }; diff --git a/src/backend/src/misc/src/ipc/ipc_node.h b/src/backend/src/misc/src/ipc/ipc_node.h index 7befa6f..b2f797f 100644 --- a/src/backend/src/misc/src/ipc/ipc_node.h +++ b/src/backend/src/misc/src/ipc/ipc_node.h @@ -45,6 +45,7 @@ class ipc_node { public: friend class engine_ipc_node; // 引擎端友元,可直接访问 rpc_queue friend class plugin_ipc_node; // 插件端友元 + friend class plugin_ipc_remote_node; // 插件端远程调用节点友元 /** * @brief 在共享内存中创建一条新的 SPSC 队列 @@ -61,7 +62,7 @@ public: * @throws std::runtime_error 打开失败时抛出 */ void open_queue(const std::string& in_name) { - rpc_queue.open_queue(in_name); + rpc_queue.open_queue(in_name, std::chrono::seconds(10)); } template @@ -166,6 +167,10 @@ public: explicit plugin_ipc_node(const std::string& in_queue_name) { engine_queue.open_queue("engine_queue"); plugin_queue.create_queue(in_queue_name); + engine_running_ = shm_mgr::msg_shm().find("engine_running").first; + if (!engine_running_) { + throw std::runtime_error("无法找到 engine_running 标志"); + } } // ------------------- RPC 调用 ------------------- @@ -175,7 +180,27 @@ public: // ------------------- RPC 处理 ------------------- void process_rpc() { plugin_queue.process_rpc(); } + auto is_engine_running() const { + return engine_running_ && engine_running_->load(); + } private: - ipc_node engine_queue; // 指向引擎主队列(只写) - ipc_node plugin_queue; // 插件自队列(只读) + ipc_node engine_queue; // 指向引擎主队列(只写) + ipc_node plugin_queue; // 插件自队列(只读) + boost::atomic_bool* engine_running_; +}; + +/** + * @class plugin_ipc_remote_node + * @brief 在Audio Engine中的代理节点, 用于发送远程RPC调用到插件进程 + */ +class plugin_ipc_remote_node { +public: + template + auto call_rpc() { return plugin_queue.call_rpc(); } + + void open_queue(const std::string& in_queue_name) { + plugin_queue.open_queue(in_queue_name); + } +private: + ipc_node plugin_queue{}; }; diff --git a/src/backend/src/misc/src/ipc/ipc_queue.h b/src/backend/src/misc/src/ipc/ipc_queue.h index 6a3a5b2..74868a1 100644 --- a/src/backend/src/misc/src/ipc/ipc_queue.h +++ b/src/backend/src/misc/src/ipc/ipc_queue.h @@ -48,19 +48,57 @@ public: } is_owner_ = true; } - void open_queue(const std::string& in_name) { + void open_queue(const std::string& in_name, const std::optional timeout = std::nullopt) { name_ = in_name; const auto& block = shm_mgr::msg_shm(); const auto& queue_name = get_queue_name(); const auto& mutex_name = get_mutex_name(); - auto [deque_ptr, size] = block.find(queue_name.c_str()); + const auto& start_time = std::chrono::steady_clock::now(); + + // 查找队列,支持超时重试 + ipc_msg_deque_t* deque_ptr = nullptr; + do { + auto [found_deque_ptr, size] = block.find(queue_name.c_str()); + deque_ptr = found_deque_ptr; + + if (deque_ptr) break; + + if (timeout.has_value()) { + const auto& elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed >= timeout.value()) { + throw std::runtime_error("打开IPC队列超时"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + break; + } + } while (timeout.has_value()); + if (!deque_ptr) { throw std::runtime_error("无法打开IPC队列"); } deque_ = deque_ptr; - auto [mutex_ptr, msize] = block.find(mutex_name.c_str()); + // 查找互斥锁,支持超时重试 + boost::interprocess::interprocess_mutex* mutex_ptr = nullptr; + do { + auto [found_mutex_ptr, msize] = block.find(mutex_name.c_str()); + mutex_ptr = found_mutex_ptr; + + if (mutex_ptr) break; + + if (timeout.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (elapsed >= timeout.value()) { + throw std::runtime_error("打开IPC队列锁超时"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + break; + } + } while (timeout.has_value()); + if (!mutex_ptr) { throw std::runtime_error("无法打开IPC队列锁"); } @@ -73,7 +111,7 @@ public: } bool push(const T& msg) { - boost::interprocess::scoped_lock lock(*mutex_); + boost::interprocess::scoped_lock lock(*mutex_); try { deque_->push_back(msg); return true; @@ -83,7 +121,7 @@ public: } bool pop(T& msg) { - boost::interprocess::scoped_lock lock(*mutex_); + boost::interprocess::scoped_lock lock(*mutex_); if (deque_->empty()) { return false; } @@ -92,20 +130,20 @@ public: return true; } - bool empty() const { - boost::interprocess::scoped_lock lock(*mutex_); + [[nodiscard]] auto empty() const { + boost::interprocess::scoped_lock lock(*mutex_); return deque_->empty(); } - size_t size() const { - boost::interprocess::scoped_lock lock(*mutex_); + [[nodiscard]] auto size() const { + boost::interprocess::scoped_lock lock(*mutex_); return deque_->size(); } - auto get_queue_name() const { + [[nodiscard]] auto get_queue_name() const { return name_ + "_queue"; } - auto get_mutex_name() const { + [[nodiscard]] auto get_mutex_name() const { return name_ + "_mutex"; } }; diff --git a/src/backend/src/misc/src/ipc/shm_mgr.h b/src/backend/src/misc/src/ipc/shm_mgr.h index add3d15..1dbec48 100644 --- a/src/backend/src/misc/src/ipc/shm_mgr.h +++ b/src/backend/src/misc/src/ipc/shm_mgr.h @@ -164,7 +164,7 @@ private: [[nodiscard]] auto create_shm_block(const std::string& in_name, std::size_t in_size) const { if (is_engine) { bi::shared_memory_object::remove(in_name.c_str()); // 先清理旧段 - return std::make_shared(bi::open_or_create, in_name.c_str(), in_size); + return std::make_shared(bi::create_only, in_name.c_str(), in_size); } return std::make_shared(bi::open_only, in_name.c_str()); } diff --git a/src/backend/src/vst2_host/src/main.cpp b/src/backend/src/vst2_host/src/main.cpp index b7841f3..864f0e3 100644 --- a/src/backend/src/vst2_host/src/main.cpp +++ b/src/backend/src/vst2_host/src/main.cpp @@ -13,16 +13,27 @@ namespace bi = boost::interprocess; int main(int argc, char *argv[]) { - // if (argc != 3) - // return 1; + if (argc != 2) + return 1; + shm_mgr::get_instance().init(); - plugin_ipc_node node("plugin1"); + plugin_ipc_node node(argv[1]); { - const auto& proxy = node.call_rpc(); - proxy->set_str("Hello from Plugin Sandbox!"); - proxy->set_level(spdlog::level::info); - proxy.submit(); + for (int i = 0; i < argc; ++i) { + const auto& proxy = node.call_rpc(); + proxy->set_str(argv[i]); + proxy->set_level(spdlog::level::info); + proxy.submit(); + } + } + + while (true) { + if (!node.is_engine_running()) + break; + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + node.process_rpc(); } #if 0