重构音频环形缓冲区,简化构造函数,支持动态扩容和共享内存管理

This commit is contained in:
daiqingshuang
2025-09-03 18:30:59 +08:00
parent c3f1664156
commit 24675ba237
4 changed files with 275 additions and 123 deletions

View File

@@ -24,6 +24,8 @@ int main(int argc, char *argv[])
engine_ipc_node node;
auto& host_manager = plugin_host_manager::get_instance();
auto host_ins = host_manager.load_plugin("");
host_ins->plugin_node_.get_audio_rb().acquire_pending_block();
while (true) {
node.process_rpc();

View File

@@ -1,16 +1,12 @@
#pragma once
#include <atomic>
#include <cstddef>
#include <cstring>
#include <memory>
#include <type_traits>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/creation_tags.hpp>
#include "ipc/shm_mgr.h"
namespace bip = boost::interprocess;
@@ -30,8 +26,8 @@ public:
}
audio_block(T* ptr, const size_t sz, const uint64_t ts = 0) : data(ptr),
size(sz),
timestamp(ts) {
size(sz),
timestamp(ts) {
}
};
@@ -52,9 +48,9 @@ private:
shared_control_block() = default;
shared_control_block(size_t qcap, size_t abuf_size, size_t bsize) : queue_capacity(qcap),
audio_buffer_size(abuf_size),
block_size(bsize),
initialized(1) {
audio_buffer_size(abuf_size),
block_size(bsize),
initialized(1) {
}
};
@@ -66,86 +62,51 @@ private:
queue_entry() = default;
queue_entry(size_t offset, size_t sz, uint64_t ts) : audio_offset(offset),
size(sz),
timestamp(ts) {
size(sz),
timestamp(ts) {
}
};
// Boost共享内存管理器
bip::managed_shared_memory* segment_;
bool owns_segment_;
// 使用 shm_mgr 的音频缓冲区段
shm_block audio_shm_;
std::string buffer_name_; // 缓冲区唯一标识
// 共享内存中的数据结构
shared_control_block* control_;
queue_entry* pending_queue_;
queue_entry* processed_queue_;
T* audio_buffer_;
public:
// 构造函数1创建新的共享内存段
audio_ring_buffer(const char* segment_name,
size_t segment_size,
size_t queue_capacity,
size_t audio_buffer_capacity) : segment_(nullptr),
owns_segment_(true) {
// 尝试创建新的共享内存段
try { segment_ = new bip::managed_shared_memory(bip::create_only, segment_name, segment_size); }
catch (const bip::interprocess_exception&) {
// 如果已存在,则删除后重新创建
bip::shared_memory_object::remove(segment_name);
segment_ = new bip::managed_shared_memory(bip::create_only, segment_name, segment_size);
}
initialize_shared_structures(queue_capacity, audio_buffer_capacity);
}
// 构造函数2连接到现有共享内存段
explicit audio_ring_buffer(const char* segment_name) : segment_(nullptr),
owns_segment_(true) {
segment_ = new bip::managed_shared_memory(bip::open_only, segment_name);
connect_to_existing_structures();
}
// 构造函数3使用外部提供的managed_shared_memory
audio_ring_buffer(bip::managed_shared_memory& external_segment,
size_t queue_capacity,
size_t audio_buffer_capacity) : segment_(&external_segment),
owns_segment_(false) {
initialize_shared_structures(queue_capacity, audio_buffer_capacity);
}
// 构造函数4连接到外部managed_shared_memory中的现有结构
explicit audio_ring_buffer(bip::managed_shared_memory& external_segment) : segment_(&external_segment),
owns_segment_(false) {
connect_to_existing_structures();
}
~audio_ring_buffer() { if (owns_segment_ && segment_) { delete segment_; } }
audio_ring_buffer() = default;
~audio_ring_buffer() = default;
// 移动构造函数和赋值操作符
audio_ring_buffer(audio_ring_buffer&& other) noexcept : segment_(other.segment_),
owns_segment_(other.owns_segment_),
audio_ring_buffer(audio_ring_buffer&& other) noexcept : audio_shm_(std::move(other.audio_shm_)),
buffer_name_(std::move(other.buffer_name_)),
control_(other.control_),
pending_queue_(other.pending_queue_),
processed_queue_(other.processed_queue_),
audio_buffer_(other.audio_buffer_) {
other.segment_ = nullptr;
other.owns_segment_ = false;
other.control_ = nullptr;
other.pending_queue_ = nullptr;
other.processed_queue_ = nullptr;
other.audio_buffer_ = nullptr;
}
audio_ring_buffer& operator=(audio_ring_buffer&& other) noexcept {
if (this != &other) {
if (owns_segment_ && segment_) { delete segment_; }
segment_ = other.segment_;
owns_segment_ = other.owns_segment_;
audio_shm_ = std::move(other.audio_shm_);
buffer_name_ = std::move(other.buffer_name_);
control_ = other.control_;
pending_queue_ = other.pending_queue_;
processed_queue_ = other.processed_queue_;
audio_buffer_ = other.audio_buffer_;
other.segment_ = nullptr;
other.owns_segment_ = false;
other.control_ = nullptr;
other.pending_queue_ = nullptr;
other.processed_queue_ = nullptr;
other.audio_buffer_ = nullptr;
}
return *this;
}
@@ -155,13 +116,47 @@ public:
audio_ring_buffer& operator=(const audio_ring_buffer&) = delete;
// 获取可用的音频块用于写入
auto create_buffer(const std::string& in_name,
size_t block_count, // 块数量
size_t block_size) { // 单块大小(以 T 元素计)
/* 将直观的块逻辑映射到内部实现:
queue_capacity = block_count
audio_buffer_capacity = block_count * block_size
*/
const size_t queue_capacity = block_count;
const size_t audio_buffer_capacity = block_count * block_size;
audio_shm_ = shm_mgr::ab_shm();
buffer_name_ = in_name;
initialize_shared_structures(queue_capacity, // 队列容量
audio_buffer_capacity, // 整体缓冲元素数
block_size); // 记录单块大小
return control_ != nullptr;
}
auto open_buffer(const std::string& in_name) {
audio_shm_ = shm_mgr::ab_shm();
buffer_name_ = in_name;
connect_to_existing_structures();
return control_ != nullptr;
}
// 获取可用的音频块用于写入 - 支持自动扩容
auto acquire_pending_block(size_t required_size) {
auto write_pos = control_->audio_write_pos.load(std::memory_order_acquire);
auto read_pos = control_->audio_read_pos.load(std::memory_order_acquire);
auto available = calculate_available_space(write_pos, read_pos);
if (available < required_size) { return audio_block(); }
// 如果当前缓冲区空间不足,尝试扩展缓冲区
if (available < required_size) {
if (!try_expand_audio_buffer(required_size)) {
return audio_block(); // 扩展失败
}
// 重新获取位置,因为缓冲区可能已扩展
write_pos = control_->audio_write_pos.load(std::memory_order_acquire);
available = calculate_available_space(write_pos, read_pos);
}
// 处理环绕
if (write_pos + required_size > control_->audio_buffer_size) {
@@ -179,9 +174,17 @@ public:
auto queue_write = control_->pending_write_pos.load(std::memory_order_acquire);
auto queue_read = control_->pending_read_pos.load(std::memory_order_acquire);
if (is_queue_full(queue_write, queue_read)) { return false; }
// 如果队列满了,尝试扩展队列
if (is_queue_full(queue_write, queue_read)) {
if (!try_expand_queues()) {
return false; // 扩展失败
}
// 重新获取位置
queue_write = control_->pending_write_pos.load(std::memory_order_acquire);
queue_read = control_->pending_read_pos.load(std::memory_order_acquire);
}
auto audio_offset = block.data - audio_buffer_;
auto audio_offset = block.data - audio_buffer_;
pending_queue_[queue_write] = queue_entry(audio_offset, block.size, timestamp);
control_->audio_write_pos.store(audio_offset + block.size, std::memory_order_release);
@@ -211,7 +214,12 @@ public:
auto processed_write = control_->processed_write_pos.load(std::memory_order_acquire);
auto processed_read = control_->processed_read_pos.load(std::memory_order_acquire);
if (is_queue_full(processed_write, processed_read)) { return false; }
// 如果处理队列满了,尝试扩展
if (is_queue_full(processed_write, processed_read)) {
if (!try_expand_queues()) { return false; }
processed_write = control_->processed_write_pos.load(std::memory_order_acquire);
processed_read = control_->processed_read_pos.load(std::memory_order_acquire);
}
processed_queue_[processed_write] = pending_queue_[pending_read];
@@ -255,13 +263,13 @@ public:
[[nodiscard]] auto pending_count() const {
auto write = control_->pending_write_pos.load(std::memory_order_acquire);
auto read = control_->pending_read_pos.load(std::memory_order_acquire);
return (write >= read) ? (write - read) : (control_->queue_capacity - read + write);
return write >= read ? write - read : control_->queue_capacity - read + write;
}
[[nodiscard]] auto processed_count() const {
auto write = control_->processed_write_pos.load(std::memory_order_acquire);
auto read = control_->processed_read_pos.load(std::memory_order_acquire);
return (write >= read) ? (write - read) : (control_->queue_capacity - read + write);
return write >= read ? write - read : control_->queue_capacity - read + write;
}
[[nodiscard]] auto available_audio_space() const {
@@ -279,55 +287,199 @@ public:
control_->audio_read_pos.store(0, std::memory_order_release);
}
// Boost特有的功能
[[nodiscard]] auto& get_segment() { return *segment_; }
[[nodiscard]] const auto& get_segment() const { return *segment_; }
// 新增:获取缓冲区统计信息
struct buffer_stats {
size_t queue_capacity;
size_t audio_buffer_size;
size_t pending_count;
size_t processed_count;
size_t available_space;
};
[[nodiscard]] buffer_stats get_stats() const {
return {
control_->queue_capacity,
control_->audio_buffer_size,
pending_count(),
processed_count(),
available_audio_space()
};
}
// 兼容性接口 - 返回segment_manager
[[nodiscard]] auto get_segment_manager() const { return audio_shm_.get_segment_manager(); }
// 获取共享内存使用统计
[[nodiscard]] auto get_free_memory() const { return segment_->get_free_memory(); }
[[nodiscard]] auto get_size() const { return segment_->get_size(); }
private:
void initialize_shared_structures(size_t queue_capacity, size_t audio_buffer_capacity) {
// 在共享内存中构造控制块
control_ = segment_->construct<shared_control_block>("AudioRingBuffer_Control")(queue_capacity,
audio_buffer_capacity,
sizeof(T));
void initialize_shared_structures(size_t queue_capacity,
size_t audio_buffer_capacity,
size_t block_size) {
/* 控制块构造改为保存 block_size */
const std::string control_name = buffer_name_ + "_Control";
auto ctrl_ret = audio_shm_.find_or_construct<shared_control_block>(control_name);
if (ctrl_ret.second > 0) { // 首次创建
*ctrl_ret.first = shared_control_block(queue_capacity, audio_buffer_capacity, block_size);
}
control_ = ctrl_ret.first;
// 构造队列
pending_queue_ = segment_->construct<queue_entry>("AudioRingBuffer_PendingQueue")[queue_capacity]();
processed_queue_ = segment_->construct<queue_entry>("AudioRingBuffer_ProcessedQueue")[queue_capacity]();
const std::string pending_name = buffer_name_ + "_PendingQueue";
const std::string processed_name = buffer_name_ + "_ProcessedQueue";
auto pending_result = audio_shm_.find_or_construct<queue_entry>(pending_name);
auto processed_result = audio_shm_.find_or_construct<queue_entry>(processed_name);
// 如果内存不足,这里会自动扩容
if (!pending_result.first) {
// 重试一次,因为可能刚刚扩容了
pending_result = audio_shm_.find_or_construct<queue_entry>(pending_name);
}
if (!processed_result.first) { processed_result = audio_shm_.find_or_construct<queue_entry>(processed_name); }
pending_queue_ = pending_result.first;
processed_queue_ = processed_result.first;
// 构造音频缓冲区
audio_buffer_ = segment_->construct<T>("AudioRingBuffer_AudioBuffer")[audio_buffer_capacity]();
const std::string audio_name = buffer_name_ + "_AudioBuffer";
auto audio_result = audio_shm_.find_or_construct<T>(audio_name);
if (!audio_result.first) {
// 重试一次
audio_result = audio_shm_.find_or_construct<T>(audio_name);
}
audio_buffer_ = audio_result.first;
if (!control_ || !pending_queue_ || !processed_queue_ || !audio_buffer_) {
throw std::runtime_error("Failed to initialize audio ring buffer structures");
}
}
void connect_to_existing_structures() {
// 连接到现有的共享内存结构
auto control_result = segment_->find<shared_control_block>("AudioRingBuffer_Control");
const std::string control_name = buffer_name_ + "_Control";
auto control_result = audio_shm_.find<shared_control_block>(control_name);
if (!control_result.first) {
throw std::runtime_error("AudioRingBuffer control block not found in shared memory");
throw std::runtime_error("AudioRingBuffer control block not found: " + control_name);
}
control_ = control_result.first;
auto pending_result = segment_->find<queue_entry>("AudioRingBuffer_PendingQueue");
const std::string pending_name = buffer_name_ + "_PendingQueue";
auto pending_result = audio_shm_.find<queue_entry>(pending_name);
if (!pending_result.first) {
throw std::runtime_error("AudioRingBuffer pending queue not found in shared memory");
throw std::runtime_error("AudioRingBuffer pending queue not found: " + pending_name);
}
pending_queue_ = pending_result.first;
auto processed_result = segment_->find<queue_entry>("AudioRingBuffer_ProcessedQueue");
const std::string processed_name = buffer_name_ + "_ProcessedQueue";
auto processed_result = audio_shm_.find<queue_entry>(processed_name);
if (!processed_result.first) {
throw std::runtime_error("AudioRingBuffer processed queue not found in shared memory");
throw std::runtime_error("AudioRingBuffer processed queue not found: " + processed_name);
}
processed_queue_ = processed_result.first;
auto audio_result = segment_->find<T>("AudioRingBuffer_AudioBuffer");
if (!audio_result.first) {
throw std::runtime_error("AudioRingBuffer audio buffer not found in shared memory");
}
const std::string audio_name = buffer_name_ + "_AudioBuffer";
auto audio_result = audio_shm_.find<T>(audio_name);
if (!audio_result.first) { throw std::runtime_error("AudioRingBuffer audio buffer not found: " + audio_name); }
audio_buffer_ = audio_result.first;
}
/**
* @brief 当可写空间不足时扩展音频缓冲区
* @param required_size 本次写入真正需要的字节(元素)数
* @return true 扩容并就绪false 内存不足或构造失败
*
* 扩容策略:
* 1. 目标大小 = max(当前大小 * 2, 当前写指针 + required_size + 1)
* 保证一次扩容即可容纳此次写入,且至少是当前容量的 2 倍,
* 避免频繁扩容。
* 2. 在新缓冲区首地址重新线性布局已有数据,
* 并修正 audio_read_pos/audio_write_pos。
* 3. 更新 control_->audio_buffer_size 以及 audio_buffer_ 指针。
*/
bool try_expand_audio_buffer(size_t required_size) {
const size_t cur_size = control_->audio_buffer_size;
const size_t write_pos = control_->audio_write_pos.load(std::memory_order_acquire);
const size_t read_pos = control_->audio_read_pos.load(std::memory_order_acquire);
/* ------------------ 1. 计算新容量 ------------------ */
size_t new_size = cur_size;
const size_t used = write_pos >= read_pos ? write_pos - read_pos : cur_size - read_pos + write_pos;
// 一次扩容即可写入 required_size
while (new_size - used - 1 < required_size) { new_size <<= 1; }
/* ------------------ 2. 创建新缓冲区 ------------------ */
const std::string new_name = buffer_name_ + "_AudioBuffer_" + std::to_string(new_size);
auto raw_seg = audio_shm_.get_segment_manager();
if (!raw_seg)
return false;
// Boost 构造数组: construct<T>(name)[count]()
T* new_buf{};
try { new_buf = raw_seg->construct<T>(new_name.c_str())[new_size](); }
catch (const bi::bad_alloc&) {
spdlog::error("Audio buffer expand failed: not enough shared memory");
return false;
}
/* ------------------ 3. 拷贝旧数据 ------------------ */
if (used) {
if (write_pos >= read_pos) { // 无环绕
std::memcpy(new_buf, audio_buffer_ + read_pos, used * sizeof(T));
}
else { // 有环绕
const size_t tail = cur_size - read_pos;
std::memcpy(new_buf, audio_buffer_ + read_pos, tail * sizeof(T));
std::memcpy(new_buf + tail, audio_buffer_, write_pos * sizeof(T));
}
}
/* ------------------ 4. 更新索引 ------------------ */
audio_buffer_ = new_buf;
control_->audio_buffer_size = new_size;
control_->audio_read_pos.store(0, std::memory_order_release);
control_->audio_write_pos.store(used, std::memory_order_release);
spdlog::info("Audio buffer '{}' expanded: {} -> {} elements", buffer_name_, cur_size, new_size);
return true;
}
// 尝试扩展队列
bool try_expand_queues() {
const size_t new_capacity = control_->queue_capacity * 2;
// 扩展 pending 队列
const std::string new_pending_name = buffer_name_ + "_PendingQueue_Expanded_" +
std::to_string(std::chrono::steady_clock::now().time_since_epoch().
count());
auto new_pending_result = audio_shm_.find_or_construct<queue_entry>(new_pending_name);
if (!new_pending_result.first) { return false; }
// 扩展 processed 队列
const std::string new_processed_name = buffer_name_ + "_ProcessedQueue_Expanded_" +
std::to_string(std::chrono::steady_clock::now().time_since_epoch().
count());
auto new_processed_result = audio_shm_.find_or_construct<queue_entry>(new_processed_name);
if (!new_processed_result.first) { return false; }
// 复制现有队列数据
if (pending_queue_) {
std::memcpy(new_pending_result.first, pending_queue_, control_->queue_capacity * sizeof(queue_entry));
}
if (processed_queue_) {
std::memcpy(new_processed_result.first, processed_queue_, control_->queue_capacity * sizeof(queue_entry));
}
// 更新指针和容量
pending_queue_ = new_pending_result.first;
processed_queue_ = new_processed_result.first;
control_->queue_capacity = new_capacity;
spdlog::info("Queue capacity expanded to {} for buffer: {}", new_capacity, buffer_name_);
return true;
}
[[nodiscard]] auto calculate_available_space(size_t write_pos, size_t read_pos) const {
if (write_pos >= read_pos) { return control_->audio_buffer_size - write_pos + read_pos - 1; }
return read_pos - write_pos - 1;
@@ -338,28 +490,18 @@ private:
}
};
// 工厂函数,用于简化创建过程
// 更新的工厂函数
template<typename T>
class AudioRingBufferFactory {
class audio_ring_buffer_factory {
public:
static audio_ring_buffer<T> create_new(const char* segment_name,
size_t segment_size,
size_t queue_capacity,
size_t audio_buffer_capacity) {
return audio_ring_buffer<T>(segment_name, segment_size, queue_capacity, audio_buffer_capacity);
// 推荐的新接口 - 使用 shm_mgr
static audio_ring_buffer<T> create_new(const std::string& buffer_name,
size_t queue_capacity,
size_t audio_buffer_capacity) {
return audio_ring_buffer<T>(buffer_name, queue_capacity, audio_buffer_capacity);
}
static audio_ring_buffer<T> connect_existing(const char* segment_name) {
return audio_ring_buffer<T>(segment_name);
}
static audio_ring_buffer<T> create_in_segment(bip::managed_shared_memory& segment,
size_t queue_capacity,
size_t audio_buffer_capacity) {
return audio_ring_buffer<T>(segment, queue_capacity, audio_buffer_capacity);
}
static audio_ring_buffer<T> connect_in_segment(bip::managed_shared_memory& segment) {
return audio_ring_buffer<T>(segment);
static audio_ring_buffer<T> connect_existing(const std::string& buffer_name) {
return audio_ring_buffer<T>(buffer_name);
}
};

View File

@@ -25,6 +25,7 @@
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/queue.hpp>
#include "audio_ring_buffer.h"
#include "ipc_queue.h"
#include "misc_type.h"
#include "shm_mgr.h"
@@ -165,13 +166,12 @@ private:
*/
class plugin_ipc_node {
public:
explicit plugin_ipc_node(const std::string& in_queue_name) {
explicit plugin_ipc_node(const std::string& in_queue_name): audio_rb() {
engine_queue.open_queue("engine_queue");
plugin_queue.create_queue(in_queue_name);
audio_rb.open_buffer(in_queue_name);
engine_heartbeat_ = shm_mgr::msg_shm().find<boost::atomic<heartbeat_t>>("engine_heartbeat");
if (!engine_heartbeat_) {
throw std::runtime_error("无法找到 engine_running 标志");
}
if (!engine_heartbeat_) { throw std::runtime_error("无法找到 engine_running 标志"); }
plugin_heartbeat_ = shm_mgr::msg_shm().construct_with_name<boost::atomic<heartbeat_t>>(in_queue_name + "_heartbeat");
*plugin_heartbeat_ = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
@@ -196,10 +196,11 @@ public:
return duration < threshold;
}
private:
ipc_node engine_queue; // 指向引擎主队列(只写)
ipc_node plugin_queue; // 插件自队列(只读)
ipc_node engine_queue; // 指向引擎主队列(只写)
ipc_node plugin_queue; // 插件自队列(只读)
boost::atomic<heartbeat_t>* engine_heartbeat_;
boost::atomic<heartbeat_t>* plugin_heartbeat_;
audio_ring_buffer<float> audio_rb;
};
/**
@@ -208,12 +209,15 @@ private:
*/
class plugin_ipc_remote_node {
public:
plugin_ipc_remote_node() : audio_rb() {}
template<typename Msg>
auto call_rpc() { return plugin_queue.call_rpc<Msg>(); }
void open_queue(const std::string& in_queue_name) {
plugin_queue.open_queue(in_queue_name);
plugin_heartbeat_ = shm_mgr::msg_shm().find<boost::atomic<heartbeat_t>>(in_queue_name + "_heartbeat");
audio_rb.create_buffer(in_queue_name, 10, 512); // 10块512采样点的缓冲区
}
[[nodiscard]] auto is_timeout(int32_t in_seconds) const {
@@ -230,7 +234,10 @@ public:
const auto duration = std::chrono::high_resolution_clock::now().time_since_epoch() - heartbeat_duration_t(val);
return duration >= threshold;
}
[[nodiscard]] auto& get_audio_rb() { return audio_rb; }
private:
ipc_node plugin_queue{};
ipc_node plugin_queue{};
boost::atomic<heartbeat_t>* plugin_heartbeat_ = nullptr;
audio_ring_buffer<float> audio_rb;
};

View File

@@ -5,6 +5,7 @@
#include "library_handle/library_handle.h"
#include <GLFW/glfw3.h>
#include "audio_ring_buffer.h"
#include "vec.h"