diff --git a/src/backend/src/engine/src/main.cpp b/src/backend/src/engine/src/main.cpp index 58688ad..86213b1 100644 --- a/src/backend/src/engine/src/main.cpp +++ b/src/backend/src/engine/src/main.cpp @@ -24,6 +24,8 @@ int main(int argc, char *argv[]) engine_ipc_node node; auto& host_manager = plugin_host_manager::get_instance(); + auto host_ins = host_manager.load_plugin(""); + host_ins->plugin_node_.get_audio_rb().acquire_pending_block(); while (true) { node.process_rpc(); diff --git a/src/backend/src/misc/src/audio_ring_buffer.h b/src/backend/src/misc/src/audio_ring_buffer.h index 86cbe4a..5234f1e 100644 --- a/src/backend/src/misc/src/audio_ring_buffer.h +++ b/src/backend/src/misc/src/audio_ring_buffer.h @@ -1,16 +1,12 @@ #pragma once #include -#include -#include #include #include #include #include -#include -#include -#include -#include + +#include "ipc/shm_mgr.h" namespace bip = boost::interprocess; @@ -30,8 +26,8 @@ public: } audio_block(T* ptr, const size_t sz, const uint64_t ts = 0) : data(ptr), - size(sz), - timestamp(ts) { + size(sz), + timestamp(ts) { } }; @@ -52,9 +48,9 @@ private: shared_control_block() = default; shared_control_block(size_t qcap, size_t abuf_size, size_t bsize) : queue_capacity(qcap), - audio_buffer_size(abuf_size), - block_size(bsize), - initialized(1) { + audio_buffer_size(abuf_size), + block_size(bsize), + initialized(1) { } }; @@ -66,86 +62,51 @@ private: queue_entry() = default; queue_entry(size_t offset, size_t sz, uint64_t ts) : audio_offset(offset), - size(sz), - timestamp(ts) { + size(sz), + timestamp(ts) { } }; - // Boost共享内存管理器 - bip::managed_shared_memory* segment_; - bool owns_segment_; + // 使用 shm_mgr 的音频缓冲区段 + shm_block audio_shm_; + std::string buffer_name_; // 缓冲区唯一标识 // 共享内存中的数据结构 shared_control_block* control_; queue_entry* pending_queue_; queue_entry* processed_queue_; T* audio_buffer_; + public: - // 构造函数1:创建新的共享内存段 - audio_ring_buffer(const char* segment_name, - size_t segment_size, - size_t queue_capacity, - size_t audio_buffer_capacity) : segment_(nullptr), - owns_segment_(true) { - // 尝试创建新的共享内存段 - try { segment_ = new bip::managed_shared_memory(bip::create_only, segment_name, segment_size); } - catch (const bip::interprocess_exception&) { - // 如果已存在,则删除后重新创建 - bip::shared_memory_object::remove(segment_name); - segment_ = new bip::managed_shared_memory(bip::create_only, segment_name, segment_size); - } - - initialize_shared_structures(queue_capacity, audio_buffer_capacity); - } - - // 构造函数2:连接到现有共享内存段 - explicit audio_ring_buffer(const char* segment_name) : segment_(nullptr), - owns_segment_(true) { - segment_ = new bip::managed_shared_memory(bip::open_only, segment_name); - - connect_to_existing_structures(); - } - - // 构造函数3:使用外部提供的managed_shared_memory - audio_ring_buffer(bip::managed_shared_memory& external_segment, - size_t queue_capacity, - size_t audio_buffer_capacity) : segment_(&external_segment), - owns_segment_(false) { - initialize_shared_structures(queue_capacity, audio_buffer_capacity); - } - - // 构造函数4:连接到外部managed_shared_memory中的现有结构 - explicit audio_ring_buffer(bip::managed_shared_memory& external_segment) : segment_(&external_segment), - owns_segment_(false) { - connect_to_existing_structures(); - } - - ~audio_ring_buffer() { if (owns_segment_ && segment_) { delete segment_; } } + audio_ring_buffer() = default; + ~audio_ring_buffer() = default; // 移动构造函数和赋值操作符 - audio_ring_buffer(audio_ring_buffer&& other) noexcept : segment_(other.segment_), - owns_segment_(other.owns_segment_), + audio_ring_buffer(audio_ring_buffer&& other) noexcept : audio_shm_(std::move(other.audio_shm_)), + buffer_name_(std::move(other.buffer_name_)), control_(other.control_), pending_queue_(other.pending_queue_), processed_queue_(other.processed_queue_), audio_buffer_(other.audio_buffer_) { - other.segment_ = nullptr; - other.owns_segment_ = false; + other.control_ = nullptr; + other.pending_queue_ = nullptr; + other.processed_queue_ = nullptr; + other.audio_buffer_ = nullptr; } audio_ring_buffer& operator=(audio_ring_buffer&& other) noexcept { if (this != &other) { - if (owns_segment_ && segment_) { delete segment_; } - - segment_ = other.segment_; - owns_segment_ = other.owns_segment_; + audio_shm_ = std::move(other.audio_shm_); + buffer_name_ = std::move(other.buffer_name_); control_ = other.control_; pending_queue_ = other.pending_queue_; processed_queue_ = other.processed_queue_; audio_buffer_ = other.audio_buffer_; - other.segment_ = nullptr; - other.owns_segment_ = false; + other.control_ = nullptr; + other.pending_queue_ = nullptr; + other.processed_queue_ = nullptr; + other.audio_buffer_ = nullptr; } return *this; } @@ -155,13 +116,47 @@ public: audio_ring_buffer& operator=(const audio_ring_buffer&) = delete; - // 获取可用的音频块用于写入 + auto create_buffer(const std::string& in_name, + size_t block_count, // 块数量 + size_t block_size) { // 单块大小(以 T 元素计) + /* 将直观的块逻辑映射到内部实现: + queue_capacity = block_count + audio_buffer_capacity = block_count * block_size + */ + const size_t queue_capacity = block_count; + const size_t audio_buffer_capacity = block_count * block_size; + + audio_shm_ = shm_mgr::ab_shm(); + buffer_name_ = in_name; + initialize_shared_structures(queue_capacity, // 队列容量 + audio_buffer_capacity, // 整体缓冲元素数 + block_size); // 记录单块大小 + return control_ != nullptr; + } + + auto open_buffer(const std::string& in_name) { + audio_shm_ = shm_mgr::ab_shm(); + buffer_name_ = in_name; + connect_to_existing_structures(); + return control_ != nullptr; + } + + // 获取可用的音频块用于写入 - 支持自动扩容 auto acquire_pending_block(size_t required_size) { auto write_pos = control_->audio_write_pos.load(std::memory_order_acquire); auto read_pos = control_->audio_read_pos.load(std::memory_order_acquire); auto available = calculate_available_space(write_pos, read_pos); - if (available < required_size) { return audio_block(); } + + // 如果当前缓冲区空间不足,尝试扩展缓冲区 + if (available < required_size) { + if (!try_expand_audio_buffer(required_size)) { + return audio_block(); // 扩展失败 + } + // 重新获取位置,因为缓冲区可能已扩展 + write_pos = control_->audio_write_pos.load(std::memory_order_acquire); + available = calculate_available_space(write_pos, read_pos); + } // 处理环绕 if (write_pos + required_size > control_->audio_buffer_size) { @@ -179,9 +174,17 @@ public: auto queue_write = control_->pending_write_pos.load(std::memory_order_acquire); auto queue_read = control_->pending_read_pos.load(std::memory_order_acquire); - if (is_queue_full(queue_write, queue_read)) { return false; } + // 如果队列满了,尝试扩展队列 + if (is_queue_full(queue_write, queue_read)) { + if (!try_expand_queues()) { + return false; // 扩展失败 + } + // 重新获取位置 + queue_write = control_->pending_write_pos.load(std::memory_order_acquire); + queue_read = control_->pending_read_pos.load(std::memory_order_acquire); + } - auto audio_offset = block.data - audio_buffer_; + auto audio_offset = block.data - audio_buffer_; pending_queue_[queue_write] = queue_entry(audio_offset, block.size, timestamp); control_->audio_write_pos.store(audio_offset + block.size, std::memory_order_release); @@ -211,7 +214,12 @@ public: auto processed_write = control_->processed_write_pos.load(std::memory_order_acquire); auto processed_read = control_->processed_read_pos.load(std::memory_order_acquire); - if (is_queue_full(processed_write, processed_read)) { return false; } + // 如果处理队列满了,尝试扩展 + if (is_queue_full(processed_write, processed_read)) { + if (!try_expand_queues()) { return false; } + processed_write = control_->processed_write_pos.load(std::memory_order_acquire); + processed_read = control_->processed_read_pos.load(std::memory_order_acquire); + } processed_queue_[processed_write] = pending_queue_[pending_read]; @@ -255,13 +263,13 @@ public: [[nodiscard]] auto pending_count() const { auto write = control_->pending_write_pos.load(std::memory_order_acquire); auto read = control_->pending_read_pos.load(std::memory_order_acquire); - return (write >= read) ? (write - read) : (control_->queue_capacity - read + write); + return write >= read ? write - read : control_->queue_capacity - read + write; } [[nodiscard]] auto processed_count() const { auto write = control_->processed_write_pos.load(std::memory_order_acquire); auto read = control_->processed_read_pos.load(std::memory_order_acquire); - return (write >= read) ? (write - read) : (control_->queue_capacity - read + write); + return write >= read ? write - read : control_->queue_capacity - read + write; } [[nodiscard]] auto available_audio_space() const { @@ -279,55 +287,199 @@ public: control_->audio_read_pos.store(0, std::memory_order_release); } - // Boost特有的功能 - [[nodiscard]] auto& get_segment() { return *segment_; } - [[nodiscard]] const auto& get_segment() const { return *segment_; } + // 新增:获取缓冲区统计信息 + struct buffer_stats { + size_t queue_capacity; + size_t audio_buffer_size; + size_t pending_count; + size_t processed_count; + size_t available_space; + }; + + [[nodiscard]] buffer_stats get_stats() const { + return { + control_->queue_capacity, + control_->audio_buffer_size, + pending_count(), + processed_count(), + available_audio_space() + }; + } + + // 兼容性接口 - 返回segment_manager + [[nodiscard]] auto get_segment_manager() const { return audio_shm_.get_segment_manager(); } - // 获取共享内存使用统计 - [[nodiscard]] auto get_free_memory() const { return segment_->get_free_memory(); } - [[nodiscard]] auto get_size() const { return segment_->get_size(); } private: - void initialize_shared_structures(size_t queue_capacity, size_t audio_buffer_capacity) { - // 在共享内存中构造控制块 - control_ = segment_->construct("AudioRingBuffer_Control")(queue_capacity, - audio_buffer_capacity, - sizeof(T)); + void initialize_shared_structures(size_t queue_capacity, + size_t audio_buffer_capacity, + size_t block_size) { + /* 控制块构造改为保存 block_size */ + const std::string control_name = buffer_name_ + "_Control"; + auto ctrl_ret = audio_shm_.find_or_construct(control_name); + if (ctrl_ret.second > 0) { // 首次创建 + *ctrl_ret.first = shared_control_block(queue_capacity, audio_buffer_capacity, block_size); + } + control_ = ctrl_ret.first; // 构造队列 - pending_queue_ = segment_->construct("AudioRingBuffer_PendingQueue")[queue_capacity](); - processed_queue_ = segment_->construct("AudioRingBuffer_ProcessedQueue")[queue_capacity](); + const std::string pending_name = buffer_name_ + "_PendingQueue"; + const std::string processed_name = buffer_name_ + "_ProcessedQueue"; + + auto pending_result = audio_shm_.find_or_construct(pending_name); + auto processed_result = audio_shm_.find_or_construct(processed_name); + + // 如果内存不足,这里会自动扩容 + if (!pending_result.first) { + // 重试一次,因为可能刚刚扩容了 + pending_result = audio_shm_.find_or_construct(pending_name); + } + if (!processed_result.first) { processed_result = audio_shm_.find_or_construct(processed_name); } + + pending_queue_ = pending_result.first; + processed_queue_ = processed_result.first; // 构造音频缓冲区 - audio_buffer_ = segment_->construct("AudioRingBuffer_AudioBuffer")[audio_buffer_capacity](); + const std::string audio_name = buffer_name_ + "_AudioBuffer"; + auto audio_result = audio_shm_.find_or_construct(audio_name); + + if (!audio_result.first) { + // 重试一次 + audio_result = audio_shm_.find_or_construct(audio_name); + } + + audio_buffer_ = audio_result.first; + + if (!control_ || !pending_queue_ || !processed_queue_ || !audio_buffer_) { + throw std::runtime_error("Failed to initialize audio ring buffer structures"); + } } void connect_to_existing_structures() { // 连接到现有的共享内存结构 - auto control_result = segment_->find("AudioRingBuffer_Control"); + const std::string control_name = buffer_name_ + "_Control"; + auto control_result = audio_shm_.find(control_name); if (!control_result.first) { - throw std::runtime_error("AudioRingBuffer control block not found in shared memory"); + throw std::runtime_error("AudioRingBuffer control block not found: " + control_name); } control_ = control_result.first; - auto pending_result = segment_->find("AudioRingBuffer_PendingQueue"); + const std::string pending_name = buffer_name_ + "_PendingQueue"; + auto pending_result = audio_shm_.find(pending_name); if (!pending_result.first) { - throw std::runtime_error("AudioRingBuffer pending queue not found in shared memory"); + throw std::runtime_error("AudioRingBuffer pending queue not found: " + pending_name); } pending_queue_ = pending_result.first; - auto processed_result = segment_->find("AudioRingBuffer_ProcessedQueue"); + const std::string processed_name = buffer_name_ + "_ProcessedQueue"; + auto processed_result = audio_shm_.find(processed_name); if (!processed_result.first) { - throw std::runtime_error("AudioRingBuffer processed queue not found in shared memory"); + throw std::runtime_error("AudioRingBuffer processed queue not found: " + processed_name); } processed_queue_ = processed_result.first; - auto audio_result = segment_->find("AudioRingBuffer_AudioBuffer"); - if (!audio_result.first) { - throw std::runtime_error("AudioRingBuffer audio buffer not found in shared memory"); - } + const std::string audio_name = buffer_name_ + "_AudioBuffer"; + auto audio_result = audio_shm_.find(audio_name); + if (!audio_result.first) { throw std::runtime_error("AudioRingBuffer audio buffer not found: " + audio_name); } audio_buffer_ = audio_result.first; } + /** + * @brief 当可写空间不足时扩展音频缓冲区 + * @param required_size 本次写入真正需要的字节(元素)数 + * @return true 扩容并就绪;false 内存不足或构造失败 + * + * 扩容策略: + * 1. 目标大小 = max(当前大小 * 2, 当前写指针 + required_size + 1) + * 保证一次扩容即可容纳此次写入,且至少是当前容量的 2 倍, + * 避免频繁扩容。 + * 2. 在新缓冲区首地址重新线性布局已有数据, + * 并修正 audio_read_pos/audio_write_pos。 + * 3. 更新 control_->audio_buffer_size 以及 audio_buffer_ 指针。 + */ + bool try_expand_audio_buffer(size_t required_size) { + const size_t cur_size = control_->audio_buffer_size; + const size_t write_pos = control_->audio_write_pos.load(std::memory_order_acquire); + const size_t read_pos = control_->audio_read_pos.load(std::memory_order_acquire); + + /* ------------------ 1. 计算新容量 ------------------ */ + size_t new_size = cur_size; + const size_t used = write_pos >= read_pos ? write_pos - read_pos : cur_size - read_pos + write_pos; + + // 一次扩容即可写入 required_size + while (new_size - used - 1 < required_size) { new_size <<= 1; } + + /* ------------------ 2. 创建新缓冲区 ------------------ */ + const std::string new_name = buffer_name_ + "_AudioBuffer_" + std::to_string(new_size); + + auto raw_seg = audio_shm_.get_segment_manager(); + if (!raw_seg) + return false; + + // Boost 构造数组: construct(name)[count]() + T* new_buf{}; + try { new_buf = raw_seg->construct(new_name.c_str())[new_size](); } + catch (const bi::bad_alloc&) { + spdlog::error("Audio buffer expand failed: not enough shared memory"); + return false; + } + + /* ------------------ 3. 拷贝旧数据 ------------------ */ + if (used) { + if (write_pos >= read_pos) { // 无环绕 + std::memcpy(new_buf, audio_buffer_ + read_pos, used * sizeof(T)); + } + else { // 有环绕 + const size_t tail = cur_size - read_pos; + std::memcpy(new_buf, audio_buffer_ + read_pos, tail * sizeof(T)); + std::memcpy(new_buf + tail, audio_buffer_, write_pos * sizeof(T)); + } + } + + /* ------------------ 4. 更新索引 ------------------ */ + audio_buffer_ = new_buf; + control_->audio_buffer_size = new_size; + control_->audio_read_pos.store(0, std::memory_order_release); + control_->audio_write_pos.store(used, std::memory_order_release); + + spdlog::info("Audio buffer '{}' expanded: {} -> {} elements", buffer_name_, cur_size, new_size); + return true; + } + + // 尝试扩展队列 + bool try_expand_queues() { + const size_t new_capacity = control_->queue_capacity * 2; + + // 扩展 pending 队列 + const std::string new_pending_name = buffer_name_ + "_PendingQueue_Expanded_" + + std::to_string(std::chrono::steady_clock::now().time_since_epoch(). + count()); + auto new_pending_result = audio_shm_.find_or_construct(new_pending_name); + if (!new_pending_result.first) { return false; } + + // 扩展 processed 队列 + const std::string new_processed_name = buffer_name_ + "_ProcessedQueue_Expanded_" + + std::to_string(std::chrono::steady_clock::now().time_since_epoch(). + count()); + auto new_processed_result = audio_shm_.find_or_construct(new_processed_name); + if (!new_processed_result.first) { return false; } + + // 复制现有队列数据 + if (pending_queue_) { + std::memcpy(new_pending_result.first, pending_queue_, control_->queue_capacity * sizeof(queue_entry)); + } + if (processed_queue_) { + std::memcpy(new_processed_result.first, processed_queue_, control_->queue_capacity * sizeof(queue_entry)); + } + + // 更新指针和容量 + pending_queue_ = new_pending_result.first; + processed_queue_ = new_processed_result.first; + control_->queue_capacity = new_capacity; + + spdlog::info("Queue capacity expanded to {} for buffer: {}", new_capacity, buffer_name_); + return true; + } + [[nodiscard]] auto calculate_available_space(size_t write_pos, size_t read_pos) const { if (write_pos >= read_pos) { return control_->audio_buffer_size - write_pos + read_pos - 1; } return read_pos - write_pos - 1; @@ -338,28 +490,18 @@ private: } }; -// 工厂函数,用于简化创建过程 +// 更新的工厂函数 template -class AudioRingBufferFactory { +class audio_ring_buffer_factory { public: - static audio_ring_buffer create_new(const char* segment_name, - size_t segment_size, - size_t queue_capacity, - size_t audio_buffer_capacity) { - return audio_ring_buffer(segment_name, segment_size, queue_capacity, audio_buffer_capacity); + // 推荐的新接口 - 使用 shm_mgr + static audio_ring_buffer create_new(const std::string& buffer_name, + size_t queue_capacity, + size_t audio_buffer_capacity) { + return audio_ring_buffer(buffer_name, queue_capacity, audio_buffer_capacity); } - static audio_ring_buffer connect_existing(const char* segment_name) { - return audio_ring_buffer(segment_name); - } - - static audio_ring_buffer create_in_segment(bip::managed_shared_memory& segment, - size_t queue_capacity, - size_t audio_buffer_capacity) { - return audio_ring_buffer(segment, queue_capacity, audio_buffer_capacity); - } - - static audio_ring_buffer connect_in_segment(bip::managed_shared_memory& segment) { - return audio_ring_buffer(segment); + static audio_ring_buffer connect_existing(const std::string& buffer_name) { + return audio_ring_buffer(buffer_name); } }; diff --git a/src/backend/src/misc/src/ipc/ipc_node.h b/src/backend/src/misc/src/ipc/ipc_node.h index 0ff8f07..a0c175a 100644 --- a/src/backend/src/misc/src/ipc/ipc_node.h +++ b/src/backend/src/misc/src/ipc/ipc_node.h @@ -25,6 +25,7 @@ #include #include +#include "audio_ring_buffer.h" #include "ipc_queue.h" #include "misc_type.h" #include "shm_mgr.h" @@ -165,13 +166,12 @@ private: */ class plugin_ipc_node { public: - explicit plugin_ipc_node(const std::string& in_queue_name) { + explicit plugin_ipc_node(const std::string& in_queue_name): audio_rb() { engine_queue.open_queue("engine_queue"); plugin_queue.create_queue(in_queue_name); + audio_rb.open_buffer(in_queue_name); engine_heartbeat_ = shm_mgr::msg_shm().find>("engine_heartbeat"); - if (!engine_heartbeat_) { - throw std::runtime_error("无法找到 engine_running 标志"); - } + if (!engine_heartbeat_) { throw std::runtime_error("无法找到 engine_running 标志"); } plugin_heartbeat_ = shm_mgr::msg_shm().construct_with_name>(in_queue_name + "_heartbeat"); *plugin_heartbeat_ = std::chrono::high_resolution_clock::now().time_since_epoch().count(); } @@ -196,10 +196,11 @@ public: return duration < threshold; } private: - ipc_node engine_queue; // 指向引擎主队列(只写) - ipc_node plugin_queue; // 插件自队列(只读) + ipc_node engine_queue; // 指向引擎主队列(只写) + ipc_node plugin_queue; // 插件自队列(只读) boost::atomic* engine_heartbeat_; boost::atomic* plugin_heartbeat_; + audio_ring_buffer audio_rb; }; /** @@ -208,12 +209,15 @@ private: */ class plugin_ipc_remote_node { public: + plugin_ipc_remote_node() : audio_rb() {} + template auto call_rpc() { return plugin_queue.call_rpc(); } void open_queue(const std::string& in_queue_name) { plugin_queue.open_queue(in_queue_name); plugin_heartbeat_ = shm_mgr::msg_shm().find>(in_queue_name + "_heartbeat"); + audio_rb.create_buffer(in_queue_name, 10, 512); // 10块512采样点的缓冲区 } [[nodiscard]] auto is_timeout(int32_t in_seconds) const { @@ -230,7 +234,10 @@ public: const auto duration = std::chrono::high_resolution_clock::now().time_since_epoch() - heartbeat_duration_t(val); return duration >= threshold; } + + [[nodiscard]] auto& get_audio_rb() { return audio_rb; } private: - ipc_node plugin_queue{}; + ipc_node plugin_queue{}; boost::atomic* plugin_heartbeat_ = nullptr; + audio_ring_buffer audio_rb; }; diff --git a/src/backend/src/vst2_host/src/vst2host.h b/src/backend/src/vst2_host/src/vst2host.h index 1d6bb29..8ce11ae 100644 --- a/src/backend/src/vst2_host/src/vst2host.h +++ b/src/backend/src/vst2_host/src/vst2host.h @@ -5,6 +5,7 @@ #include "library_handle/library_handle.h" #include +#include "audio_ring_buffer.h" #include "vec.h"