Files
Alicho/docs/api/communication.md
2025-10-28 10:27:49 +08:00

42 KiB
Raw Permalink Blame History

通信系统API参考

目录

概述

通信系统模块是频后端系统的核心连接组件实现了多进程间高效通信和数据传输。它采用双通道设计将控制命令和音频数据分离处理以达到最佳性能和灵活性。系统可在本地进程间通信和网络远程通信中使用相同的API提供统一的通信层抽象。

核心特性:

  • 双通道通信架构控制通道ZeroMQ和数据通道共享内存
  • 灵活的消息路由基于消息类型和QoS级别的自动路由
  • 多种通信模式支持请求-响应、发布-订阅、推送-拉取等模式
  • 零拷贝数据传输:大型音频缓冲区通过共享内存零拷贝传输
  • 智能传输选择根据消息大小和优先级自动选择最佳传输方式
  • 服务质量保障支持不同级别的QoS策略
  • 线程安全设计所有组件支持多线程并发操作

双通道通信架构

系统采用双通道设计模式,将通信分为两种不同的通道:

  1. 控制通道ZeroMQ

    • 处理小型、高频消息
    • 传输控制命令、状态更新和事件通知
    • 提供可靠的消息传递保证
    • 支持多种通信模式(请求-响应、发布-订阅等)
  2. 数据通道(共享存)

    • 处理大型数据块,如音频缓冲区
    • 实现零拷贝数据传输,最小化性能开销
    • 提供无锁环形缓冲区和三缓冲机制
    • 适用于实时音频数据和大型二进制数据

双通道设计示意图:

┌────────────────────┐                 ┌───────────────────┐
│    进程 A          │                 │    进程 B          │
│                    │                 │                    │
│  ┌──────────────┐  │                 │  ┌──────────────┐  │
│  │  应用逻辑    │  │                 │  │  应用逻辑    │  │
│  └───────┬──────┘  │                 │  └──────┬───────┘  │
│          │         │                 │         │          │
│  ┌───────┴──────┐  │      ZeroMQ     │  ┌──────┴───────┐  │
│  │ 通信管理器   │◄─┼─────────────────┼─►│ 通信管理器   │  │
│  └───────┬──────┘  │   (控制通道)    │  └──────┬───────┘  │
│          │         │                 │         │          │
│  ┌──────┴──────┐  │   共享内存      │  ┌──────┴───────┐  │
│  │ 共享内存管理 │◄─┼─────────────────┼─►│ 共享内存管理 │  │
│  └──────────────┘  │   (数据通道)    │  └──────────────┘  │
└────────────────────┘                 └────────────────────┘

消息系统

消息接口

IMessage定义了所通信消息的基本接口:

class IMessage {
public:
    virtual ~IMessage() = default;
    
    // 消息类型和标识
    virtual std::string message_type() const = 0;
    virtual std::string message_id() const = 0;
    
    // 端点信息
    virtual Endpoint source() const = 0;
    virtual Endpoint destination() const = 0;
    
    // 消息属性
    virtual MessagePriority priority() const = 0;
    virtual std::chrono::system_clock::time_point timestamp() const = 0;
    
    // 数据访问
    virtual const std::vector<uint8_t>& raw_data() const = 0;
    virtual size_t size() const = 0;
    
    // 传输相关
    virtual bool should_use_shared_memory() const;
    virtual TransportChannel preferred_channel() const;
    
    // 工具方法
    virtual std::unique_ptr<IMessage> clone() const = 0;
    virtual std::string to_string() const = 0;
};

基本消息类Message实现了大部分通用功能:

class Message : public IMessage {
public:
    Message();
    Message(const std::string& id, const Endpoint& source, 
            const Endpoint& destination = Endpoint());
    
    // 实现基本接口
    // ...
    
    // 设置方法
    void set_message_id(const std::string& id);
    void set_source(const Endpoint& source);
    void set_destination(const Endpoint& destination);
    void set_priority(MessagePriority priority);
    void set_timestamp(const std::chrono::system_clock::time_point& time);
    void set_raw_data(const std::vector<uint8_t>& data);
    void set_raw_data(std::vector<uint8_t>&& data);
    
protected:
    std::string message_id_;
    Endpoint source_;
    Endpoint destination_;
    MessagePriority priority_;
    std::chrono::system_clock::time_point timestamp_;
    std::vector<uint8_t> raw_data_;
    
    // 消息大小阈值(大于此值使用共享内存)
    static constexpr size_t SHARED_MEMORY_THRESHOLD = 4 * 1024; // 4KB
};

使用MessageFactory建消息:

// 创建消息的工厂方法
template<typename MessageType, typename... Args>
static std::unique_ptr<MessageType> create(Args&&... args) {
    auto message = std::make_unique<MessageType>(std::forward<Args>(args)...);
    if (message->message_id().empty()) {
        message->set_message_id(generate_message_id());
    }
    return message;
}

息优先级

系统定义了五个消息优先级级别:

enum class MessagePriority {
    Low = 0,      // 低优先级(日志、统计等)
    Normal = 1,   // 普通优先级(一般控制消息)
    High = 2,     // 高优先级(重要状态更新)
    Critical = 3, // 关键优先级(错误、紧急停止)
    Realtime = 4  // 实时优先级(音频处理相关)
};

不同优先级消息的处理方式:

  • Low:使用尽力为传输,可能延迟处理
  • Normal:标准传优先级,保证传递但不保证时序
  • High:优先处理通常使用专用线程
  • Critical:最高传递保证,可能中断其他处理
  • Realtime:专用于实时音频数据,使用最低延迟通道

消息路由

消息路由规则定义了不同消息类型的传输策略:

struct MessageRoute {
    std::string message_type;     // 消息类型
    std::string destination;      // 目标地址
    RoutingStrategy strategy;     // 路由策略
    QoSLevel qos;                // QoS级别
    int priority;                // 优先级0-255
};

路由策略选项:

enum class RoutingStrategy {
    Auto,              // 自动选择小消息用ZMQ大数据用共享内存
    ZeroMQOnly,        // 仅使用ZeroMQ
    SharedMemoryOnly,  // 仅使用共享内存
    Hybrid             // 混合模式根据QoS和消息类型
};

服务质量QoS级别

enum class QoSLevel {
    BestEffort,       // 尽力而为(最快,可能丢失)
    Reliable,         // 可靠传输(保证送达)
    RealTime,         // 实时传输(低延迟优先)
    Guaranteed        // 保证传输(可靠+时)
};

ZeroMQ传输

通信模式

系统实现了ZeroMQ的多种通信模式

  1. 请求-响应 (REQ-REP)

    • 同步请求-响应模
    • 适用于需要确认的命令和查询
    • 实现类:ZmqRequestClientZmqReplyServer
  2. 发布-订阅 (PUB-SUB)

    • 单向广播通信
    • 适用于事件通知和状态更新
    • 实现类:ZmqPublisherZmqSubscriber
  3. 推送-拉取 (PUSH-PULL)

    • 单向流水线通信
    • 适用于工作负载分配
    • 实现类:ZmqPusherZmqPuller

ZmqTransportBase

所有ZeroMQ传输实现的基类

class ZmqTransportBase {
public:
    explicit ZmqTransportBase(const ZmqTransportConfig& config);
    virtual ~ZmqTransportBase();
    
    // 基础操作
    virtual ZmqTransportError initialize() = 0;
    virtual ZmqTransportError shutdown();
    virtual ZmqTransportError send_message(std::unique_ptr<IMessage> message) = 0;
    
    // 配置和状态
    bool is_connected() const;
    const ZmqTransportConfig& config() const;
    void set_message_handler(std::shared_ptr<IZmqMessageHandler> handler);
    
    // 统计信息
    struct Statistics {
        std::atomic<uint64_t> messages_sent{0};
        std::atomic<uint64_t> messages_received{0};
        std::atomic<uint64_t> bytes_sent{0};
        std::atomic<uint64_t> bytes_received{0};
        std::atomic<uint64_t> connection_errors{0};
        std::atomic<uint64_t> send_errors{0};
        std::atomic<uint64_t> receive_errors{0};
        std::chrono::steady_clock::time_point last_activity;
    };
    
    const Statistics& get_statistics() const;
    void reset_statistics();
    
protected:
    // 内部实现方法...
};

传输工厂提供了创建不同类型传输对象的方法:

class ZmqTransportFactory {
public:
    static std::unique_ptr<ZmqTransportBase> create_request_client(const ZmqTransportConfig& config);
    static std::unique_ptr<ZmqTransportBase> create_reply_server(const ZmqTransportConfig& config);
    static std::unique_ptr<ZmqTransportBase> create_publisher(const ZmqTransportConfig& config);
    static std::unique_ptr<ZmqTransportBase> create_subscriber(const ZmqTransportConfig& config);
    static std::unique_ptr<ZmqTransportBase> create_pusher(const ZmqTransportConfig& config);
    static std::unique_ptr<ZmqTransportBase> create_puller(const ZmqTransportConfig& config);
    
    // 根据字符串创建传输对象
    static std::unique_ptr<ZmqTransportBase> create_transport(const std::string& type,
                                                           const ZmqTransportConfig& config);
};

传输配置

ZeroMQ传输配置示例

ZmqTransportConfig config;
config.endpoint = "tcp://localhost:5555";
config.io_threads = 2;
config.socket_type = ZMQ_REQ;  // 或 ZMQ_REP, ZMQ_PUB, ZMQ_SUB 等
config.connect_timeout_ms = 5000;
config.send_timeout_ms = 1000;
config.recv_timeout_ms = 1000;
config.high_water_mark = 1000;
config.linger_ms = 1000;
config.enable_reconnect = true;
config.reconnect_interval_ms = 1000;
config.max_reconnect_attempts = 10;
config.enable_heartbeat = true;

共享内存通信

共享内存管理器

SharedMemoryManager提供了共享内存段的创建、管理和分配功能:

class SharedMemoryManager {
public:
    using ManagedSharedMemory = boost::interprocess::managed_shared_memory;
    using VoidAllocator = boost::interprocess::allocator<void, ManagedSharedMemory::segment_manager>;
    
    explicit SharedMemoryManager(const SharedMemoryConfig& config);
    ~SharedMemoryManager();
    
    // 基础操作
    SharedMemoryError initialize();
    SharedMemoryError shutdown();
    bool is_initialized() const;
    
    // 内存分配
    template<typename T>
    T* allocate_object(const std::string& name);
    
    template<typename T>
    T* find_object(const std::string& name);
    
    template<typename T>
    bool deallocate_object(const std::string& name);
    
    void* allocate_raw(size_t size, const std::string& name = "");
    void* find_raw(const std::string& name);
    bool deallocate_raw(const std::string& name);
    
    // 统计信息
    struct Statistics {
        size_t total_size;
        size_t free_size;
        size_t used_size;
        size_t num_allocations;
        size_t largest_free_block;
    };
    
    Statistics get_statistics() const;
    
    // 获取分配器
    VoidAllocator get_allocator() const;
};

共享内存配置示例:

SharedMemoryConfig config;
config.segment_name = "audio_backend_shm";
config.segment_size = 16 * 1024 * 1024;  // 16MB
config.create_if_not_exists = true;
config.remove_on_destroy = true;
config.mutex_name = "audio_backend_mutex";
config.condition_name = "audio_backend_cond";
config.semaphore_name = "audio_backend_sem";
config.use_huge_pages = false;

环形缓冲区

LockFreeRingBuffer是个线程安全、无锁的环形缓冲区实现,特别适合实时音频数据传输:

template<typename T>
class LockFreeRingBuffer {
public:
    LockFreeRingBuffer(SharedMemoryManager& shm_manager, const std::string& name, size_t capacity);
    ~LockFreeRingBuffer();
    
    // 生产者操作
    bool try_push(const T& item);
    bool try_push(T&& item);
    template<typename... Args>
    bool try_emplace(Args&&... args);
    
    // 消费者操作
    bool try_pop(T& item);
    bool try_peek(T& item) const;
    
    // 状态查询
    bool empty() const;
    bool full() const;
    size_t size() const;
    size_t capacity() const;
    size_t available_space() const;
    
    // 批量操作
    size_t try_push_batch(const T* items, size_t count);
    size_t try_pop_batch(T* items, size_t max_count);
};

三缓冲机制

TripleBuffer是一种殊的缓冲区实现,适用于需要无阻塞更新的场景,如实时图形渲染或音频处理:

template<typename T>
class TripleBuffer {
public:
    TripleBuffer(SharedMemoryManager& shm_manager, const std::string& name);
    ~TripleBuffer();
    
    // 生产者操作
    T* get_write_buffer();           // 获取写缓冲区
    void commit_write();             // 提交写操作
    void discard_write();            // 丢弃写操作
    
    // 消费者操作
    const T* get_read_buffer();      // 获取读缓冲区
    void commit_read();              // 提交读操作
    
    // 状态查询
    bool has_new_data() const;       // 是否有新数据
    size_t pending_writes() const;   // 待写入数量
};

三缓冲机制工作原理:

  1. 使用三个缓冲区:读缓冲区、写缓冲区和中间缓冲区
  2. 生产者始终写入写缓冲区,消费者始终从读缓冲区读取
  3. 生产者完成写入后,交换写缓冲区和中间缓冲区
  4. 消费者需要新数据时,交换读缓冲区和中间缓冲区
  5. 两端都无需等待对方,最大限度降低阻塞

通信管理器

CommunicationManager

CommunicationManager系统的核心组件整合了ZeroMQ和共享内存通信提供统一的接口

class CommunicationManager {
public:
    explicit CommunicationManager(const CommunicationManagerConfig& config);
    ~CommunicationManager();
    
    // 初始化和关闭
    CommunicationError initialize();
    CommunicationError shutdown();
    bool is_initialized() const;
    
    // 消息发送
    CommunicationError send_message(std::unique_ptr<IMessage> message, 
                                   const std::string& destination = "");
    
    CommunicationError send_message_with_qos(std::unique_ptr<IMessage> message,
                                            const std::string& destination,
                                            QoSLevel qos);
    
    CommunicationError broadcast_message(std::unique_ptr<IMessage> message);
    
    // 消息接收(设置回调)
    using MessageCallback = std::function<void(std::unique_ptr<IMessage>)>;
    void set_message_callback(const std::string& message_type, MessageCallback callback);
    void remove_message_callback(const std::string& message_type);
    
    // 路由管理
    void add_route(const MessageRoute& route);
    void remove_route(const std::string& message_type);
    void clear_routes();
    std::vector<MessageRoute> get_routes() const;
    
    // 传输管理
    CommunicationError add_zmq_transport(const std::string& name, const ZmqTransportConfig& config);
    CommunicationError remove_zmq_transport(const std::string& name);
    std::vector<std::string> get_active_transports() const;
    
    // 事件监听
    void add_event_listener(std::shared_ptr<ICommunicationEventListener> listener);
    void remove_event_listener(std::shared_ptr<ICommunicationEventListener> listener);
    
    // 统计信息
    const CommunicationStatistics& get_statistics() const;
    void reset_statistics();
    void print_statistics() const;
    
    // 配置管理
    const CommunicationManagerConfig& config() const;
    void update_routing_strategy(RoutingStrategy strategy);
    void update_qos_level(QoSLevel qos);
};

通信管理器配置示例:

CommunicationManagerConfig config;
config.process_name = "audio_engine";
config.routing_strategy = RoutingStrategy::Auto;
config.enable_zmq = true;
config.enable_shm = true;
config.large_message_threshold = 32 * 1024;  // 32KB
config.default_qos = QoSLevel::Reliable;
config.max_pending_messages = 1000;
config.send_timeout = std::chrono::milliseconds(5000);
config.receive_timeout = std::chrono::milliseconds(5000);
config.enable_statistics = true;
config.enable_logging = true;

工厂方法

CommunicationManagerFactory提供了创建各种预配置通信管理器的工厂方法:

class CommunicationManagerFactory {
public:
    // 创建默认配置的通信管理器
    static std::unique_ptr<CommunicationManager> create_default(const std::string& process_name);
    
    // 创建仅使用ZeroMQ的通信管理器
    static std::unique_ptr<CommunicationManager> create_zmq_only(const std::string& process_name,
                                                                const std::vector<ZmqTransportConfig>& configs);
    
    // 创建仅使用共享内存的通信管理器
    static std::unique_ptr<CommunicationManager> create_shm_only(const std::string& process_name,
                                                                const SharedMemoryConfig& config);
    
    // 创建混合模式的通信管理器
    static std::unique_ptr<CommunicationManager> create_hybrid(const std::string& process_name,
                                                              const std::vector<ZmqTransportConfig>& zmq_configs,
                                                              const SharedMemoryConfig& shm_config);
    
    // 从配置文件创建
    static std::unique_ptr<CommunicationManager> create_from_config(const std::string& config_file);
};

实现示例

ZeroMQ通信示例

请求-响应模式

#include "communication.h"
#include <iostream>
#include <thread>
#include <chrono>

using namespace audio_backend::communication;

// 服务器端
void run_server() {
    // 创建服务器配置
    ZmqTransportConfig config;
    config.endpoint = "tcp://*:5555";  // 绑定到所有接口的5555端口
    config.socket_type = ZMQ_REP;      // 回复套接字类型
    config.io_threads = 1;
    
    // 创建回复服务器
    auto server = ZmqTransportFactory::create_reply_server(config);
    
    // 初始化服务器
    auto result = server->initialize();
    if (result != ZmqTransportError::Success) {
        std::cerr << "服务器初始化失" << std::endl;
        return;
    }
    
    // 设置消息处理器
    server->set_message_handler(std::make_shared<ZmqMessageHandler>([](std::unique_ptr<IMessage> message) {
        // 处理接收到的消息
        std::cout << "收到消息: " << message->message_type() << std::endl;
        
        // 创建响应消息
        auto response = MessageFactory::create<Message>("ResponseType");
        response->set_raw_data(std::vector<uint8_t>{1, 2, 3, 4});
        
        // 发送响应
        return response;
    }));
    
    // 启动服务器(在实际应用中会有一个事件循环)
    std::cout << "服务器运行中..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(60));
    
    // 关闭服务器
    server->shutdown();
}

// 客户端
void run_client() {
    // 创建客户端配置
    ZmqTransportConfig config;
    config.endpoint = "tcp://localhost:5555";  // 连接到本地5555端
    config.socket_type = ZMQ_REQ;             // 请求套接字类型
    config.io_threads = 1;
    
    // 创建请求客户端
    auto client = ZmqTransportFactory::create_request_client(config);
    
    // 初始化客户端
    auto result = client->initialize();
    if (result != ZmqTransportError::Success) {
        std::cerr << "客户端初始化失" << std::endl;
        return;
    }
    
    // 创建请求消息
    auto request = MessageFactory::create<Message>("RequestType");
    request->set_raw_data(std::vector<uint8_t>{5, 6, 7, 8});
    
    // 发送同步请求并等待响应
    std::unique_ptr<IMessage> response;
    result = dynamic_cast<ZmqRequestClient*>(client.get())->send_request(
        std::move(request), 
        response, 
        std::chrono::milliseconds(5000)
    );
    
    if (result == ZmqTransportError::Success && response) {
        std::cout << "收到响应: " << response->message_type() 
                 << ", 大小: " << response->size() << " 字节" << std::endl;
    } else {
        std::cerr << "请求失败" << std::endl;
    }
    
    // 关闭客户端
    client->shutdown();
}

发布-订阅模式

#include "communication.h"
#include <iostream>
#include <thread>
#include <chrono>

using namespace audio_backend::communication;

// 发布者
void run_publisher() {
    // 创建发布者配置
    ZmqTransportConfig config;
    config.endpoint = "tcp://*:5556";  // 绑定到所有接口的5556端口
    config.socket_type = ZMQ_PUB;      // 发布套接字类型
    
    // 创建发布者
    auto publisher = ZmqTransportFactory::create_publisher(config);
    publisher->initialize();
    
    // 定期发布消息
    for (int i = 0; i < 10; ++i) {
        // 创建状态更新消息
        auto status_message = MessageFactory::create<Message>("StatusUpdate");
        std::string status_data = "系统状态: " + std::to_string(i);
        std::vector<uint8_t> raw_data(status_data.begin(), status_data.end());
        status_message->set_raw_data(raw_data);
        
        // 发布消息
        dynamic_cast<ZmqPublisher*>(publisher.get())->publish("status", std::move(status_message));
        
        std::cout << "发布状态更新: " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    
    publisher->shutdown();
}

// 订阅者
void run_subscriber() {
    // 创建订阅者配置
    ZmqTransportConfig config;
    config.endpoint = "tcp://localhost:5556";  // 连接到本地5556端
    config.socket_type = ZMQ_SUB;             // 订阅套接字类型
    
    // 创建订阅者
    auto subscriber = ZmqTransportFactory::create_subscriber(config);
    
    // 设置消息处理器
    subscriber->set_message_handler(std::make_shared<ZmqMessageHandler>([](std::unique_ptr<IMessage> message) {
        // 处理接收到的消息
        std::cout << "收到状态更新: ";
        const auto& data = message->raw_data();
        std::string text(data.begin(), data.end());
        std::cout << text << std::endl;
    }));
    
    // 初始化并订阅主题
    subscriber->initialize();
    dynamic_cast<ZmqSubscriber*>(subscriber.get())->subscribe("status");
    
    // 等待消息
    std::cout << "等待状态更新..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(15));
    
    subscriber->shutdown();
}

共享内存通信示例

环形缓冲区

#include "communication.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>

using namespace audio_backend::communication;

// 音频样本缓冲区演示
void ring_buffer_demo() {
    // 创建共享内存管理器
    SharedMemoryConfig config;
    config.segment_name = "audio_demo";
    config.segment_size = 1024 * 1024;  // 1MB
    config.create_if_not_exists = true;
    config.remove_on_destroy = true;
    
    SharedMemoryManager shm_manager(config);
    if (shm_manager.initialize() != SharedMemoryError::Success) {
        std::cerr << "共享内存初始化败" << std::endl;
        return;
    }
    
    // 创建环形缓冲区
    const size_t buffer_size = 1024;  // 存储1024个float样本
    LockFreeRingBuffer<float> audio_buffer(shm_manager, "audio_samples", buffer_size);
    
    // 生产者线程
    std::thread producer([&audio_buffer]() {
        // 生成一个440Hz的正波
        const float frequency = 440.0f;  // A4音符
        const float sample_rate = 48000.0f;
        const float two_pi = 6.28318530718f;
        
        for (int i = 0; i < 48000; ++i) {  // 生成1秒音频
            float t = static_cast<float>(i) / sample_rate;
            float sample = std::sin(two_pi * frequency * t);
            
            while (!audio_buffer.try_push(sample)) {
                // 缓冲区已满,等待一下
                std::this_thread::sleep_for(std::chrono::microseconds(10));
            }
            
            // 模拟实时生成每1000个样本报告一次
            if (i % 1000 == 0) {
                std::cout << "已生成 " << i << " 个样本,缓冲区大小: " 
                          << audio_buffer.size() << "/" << audio_buffer.capacity() << std::endl;
            }
        }
        std::cout << "生产者完成" << std::endl;
    });
    
    // 消费者线程
    std::thread consumer([&audio_buffer]() {
        // 给生产者一点时间先生成一些数据
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        std::vector<float> output_buffer;  // 模拟输出设备的缓冲区
        const size_t frames_per_buffer = 480;  // 每次处理480帧10ms @ 48kHz
        
        int total_samples = 0;
        while (total_samples < 48000) {  // 处理1秒音频
            // 等待缓冲区中有足够的数据
            while (audio_buffer.size() < frames_per_buffer && total_samples < 48000) {
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
            
            // 模拟音频处理回调
            output_buffer.resize(frames_per_buffer);
            size_t samples_read = audio_buffer.try_pop_batch(output_buffer.data(), frames_per_buffer);
            
            total_samples += samples_read;
            
            // 模拟处理音频(在实际应用中,这里会将数据发送到音频设备)
            float sum = 0.0f;
            for (size_t i = 0; i < samples_read; ++i) {
                sum += std::abs(output_buffer[i]);  // 计算平均振幅
            }
            
            if (samples_read > 0) {
                float avg_amplitude = sum / samples_read;
                std::cout << "已处理 " << total_samples << " 个样本,平均振幅: " 
                          << avg_amplitude << std::endl;
            }
            
            // 模拟固定间隔的回调
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
        std::cout << "消费者完成" << std::endl;
    });
    
    // 等待线程完成
    producer.join();
    consumer.join();
    
    // 清理资源
    shm_manager.shutdown();
}

三缓冲机制

#include "communication.h"
#include <iostream>
#include <thread>
#include <chrono>

using namespace audio_backend::communication;

// 音频处理状态结构体
struct AudioProcessingState {
    float volume;           // 音量
    int sample_rate;        // 采样率
    bool mute;              // 静音状态
    float eq_bands[10];     // 均衡器频段
    char preset_name[32];   // 预设名称
};

// 三缓冲机制演示
void triple_buffer_demo() {
    // 创建共享内存管理器
    SharedMemoryConfig config;
    config.segment_name = "audio_state_demo";
    config.segment_size = 64 * 1024;  // 64KB
    config.create_if_not_exists = true;
    config.remove_on_destroy = true;
    
    SharedMemoryManager shm_manager(config);
    if (shm_manager.initialize() != SharedMemoryError::Success) {
        std::cerr << "共享内存初始化败" << std::endl;
        return;
    }
    
    // 创建三缓冲区
    TripleBuffer<AudioProcessingState> state_buffer(shm_manager, "audio_state");
    
    // UI线程生产者
    std::thread ui_thread([&state_buffer]() {
        for (int i = 0; i < 100; ++i) {
            // 获取写缓冲区
            AudioProcessingState* state = state_buffer.get_write_buffer();
            
            // 模拟用户界面更改音频参数
            state->volume = 0.8f + (std::sin(i * 0.1f) * 0.2f);  // 音量在0.6-1.0之间化
            state->mute = (i % 20 > 15);  // 偶尔静音
            state->sample_rate = 48000;
            
            // 模拟均衡器调整
            for (int band = 0; band < 10; ++band) {
                state->eq_bands[band] = std::sin(i * 0.05f + band * 0.5f) * 0.5f;
            }
            
            // 模拟预设选择
            snprintf(state->preset_name, sizeof(state->preset_name), 
                    "Preset %d", (i / 10) % 5);
            
            // 提交更改
            state_buffer.commit_write();
            
            std::cout << "UI线程: 更新状态 #" << i 
                      << ", 音量=" << state->volume 
                      << ", 静音=" << (state->mute ? "是" : "否")
                      << ", 预设=" << state->preset_name << std::endl;
            
            // 模拟UI更新间隔
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
        std::cout << "UI线程完成" << std::endl;
    });
    
    // 音频处理线程(消费者)
    std::thread audio_thread([&state_buffer]() {
        AudioProcessingState last_state = {};
        
        for (int i = 0; i < 200; ++i) {
            // 检查是否有新数据
            if (state_buffer.has_new_data()) {
                // 获取最新状态
                const AudioProcessingState* state = state_buffer.get_read_buffer();
                
                // 检测变化
                bool volume_changed = std::abs(state->volume - last_state.volume) > 0.01f;
                bool mute_changed = state->mute != last_state.mute;
                bool preset_changed = strcmp(state->preset_name, last_state.preset_name) != 0;
                
                if (volume_changed || mute_changed || preset_changed) {
                    std::cout << "音频线程: 检测到变化 #" << i 
                              << ", 音量=" << state->volume 
                              << ", 静音=" << (state->mute ? "是" : "否")
                              << ", 预设=" << state->preset_name << std::endl;
                }
                
                // 保存当前状态
                last_state = *state;
                
                // 标记已读取
                state_buffer.commit_read();
            }
            
            // 模拟音频处理间隔5ms高于UI更新率
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
        }
        std::cout << "音频线程完成" << std::endl;
    });
    
    // 等待线程完成
    ui_thread.join();
    audio_thread.join();
    
    // 清理资源
    shm_manager.shutdown();
}

混合通信示例

#include "communication.h"
#include <iostream>
#include <thread>
#include <chrono>

using namespace audio_backend::communication;

// 自定义音频控制消息
class AudioControlMessage : public Message {
public:
    enum class Command {
        Play,
        Pause,
        Stop,
        SetVolume,
        SetParameter
    };
    
    AudioControlMessage(Command cmd, float value = 0.0f)
        : Message("audio.control"), command_(cmd), value_(value) {
        // 设置为高优先级
        set_priority(MessagePriority::High);
    }
    
    Command command() const { return command_; }
    float value() const { return value_; }
    
    std::string message_type() const override { 
        return "audio.control"; 
    }
    
    // 控制消息使用ZeroMQ传输
    TransportChannel preferred_channel() const override { 
        return TransportChannel::ZmqControl; 
    }
    
    std::string to_string() const override {
        std::string cmd_str;
        switch(command_) {
            case Command::Play: cmd_str = "Play"; break;
            case Command::Pause: cmd_str = "Pause"; break;
            case Command::Stop: cmd_str = "Stop"; break;
            case Command::SetVolume: cmd_str = "SetVolume"; break;
            case Command::SetParameter: cmd_str = "SetParameter"; break;
        }
        return "AudioControlMessage(" + cmd_str + ", " + std::to_string(value_) + ")";
    }
    
    std::unique_ptr<IMessage> clone() const override {
        return std::make_unique<AudioControlMessage>(*this);
    }
    
private:
    Command command_;
    float value_;
};

// 自定义音频数据消息
class AudioDataMessage : public Message {
public:
    explicit AudioDataMessage(const std::vector<float>& samples)
        : Message("audio.data"), samples_(samples) {
        // 设置为实时优先级
        set_priority(MessagePriority::Realtime);
    }
    
    const std::vector<float>& samples() const { return samples_; }
    
    std::string message_type() const override { 
        return "audio.data"; 
    }
    
    // 大音频数据使用共享内存传输
    TransportChannel preferred_channel() const override { 
        return TransportChannel::SharedMemory; 
    }
    
    // 大于32KB的消息应使用共享内存
    bool should_use_shared_memory() const override {
        return samples_.size() * sizeof(float) > 32 * 1024;
    }
    
    std::string to_string() const override {
        return "AudioDataMessage(samples=" + std::to_string(samples_.size()) + ")";
    }
    
    std::unique_ptr<IMessage> clone() const override {
        return std::make_unique<AudioDataMessage>(*this);
    }
    
private:
    std::vector<float> samples_;
};

// 混合通信示例
void hybrid_communication_demo() {
    // 创建音频引擎
    auto engine_comm = CommunicationManagerFactory::create_default("audio_engine");
    
    // 创建UI进程
    auto ui_comm = CommunicationManagerFactory::create_default("ui_process");
    
    // 初始化通信
    engine_comm->initialize();
    ui_comm->initialize();
    
    // 设置音频引擎消息处理回调
    engine_comm->set_message_callback("audio.control", [](std::unique_ptr<IMessage> message) {
        auto control = dynamic_cast<AudioControlMessage*>(message.get());
        if (control) {
            std::cout << "引擎: 收到控制命令: " << control->to_string() << std::endl;
            
            // 模拟处理命令
            switch (control->command()) {
                case AudioControlMessage::Command::Play:
                    std::cout << "引擎: 开始播放" << std::endl;
                    break;
                case AudioControlMessage::Command::Pause:
                    std::cout << "引擎: 暂停播放" << std::endl;
                    break;
                case AudioControlMessage::Command::Stop:
                    std::cout << "引擎: 停止播放" << std::endl;
                    break;
                case AudioControlMessage::Command::SetVolume:
                    std::cout << "引擎: 设置音量: " << control->value() << std::endl;
                    break;
                default:
                    std::cout << "引擎: 未知命令" << std::endl;
                    break;
            }
        }
    });
    
    engine_comm->set_message_callback("audio.data", [](std::unique_ptr<IMessage> message) {
        auto data = dynamic_cast<AudioDataMessage*>(message.get());
        if (data) {
            const auto& samples = data->samples();
            std::cout << "引擎: 收到音频数据: " << samples.size() << " 样本" << std::endl;
            
            // 计算音频统计信息
            float sum = 0.0f, max = 0.0f;
            for (float sample : samples) {
                sum += std::abs(sample);
                max = std::max(max, std::abs(sample));
            }
            
            float avg = sum / samples.size();
            std::cout << "引擎: 音频统计 - 平均: " << avg << ", 最大值: " << max << std::endl;
        }
    });
    
    // 设置UI消息处理回
    ui_comm->set_message_callback("engine.status", [](std::unique_ptr<IMessage> message) {
        std::cout << "UI: 收到状态更新: " << message->to_string() << std::endl;
    });
    
    // 启动引擎处理线程(实际应用中会有单独的线程)
    std::thread engine_thread([&engine_comm]() {
        std::cout << "引擎线程: 等待消息..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(10));
    });
    
    // 模拟UI发送控制命
    std::cout << "UI: 发送Play命令" << std::endl;
    auto play_cmd = MessageFactory::create<AudioControlMessage>(AudioControlMessage::Command::Play);
    ui_comm->send_message(std::move(play_cmd), "audio_engine");
    
    // 稍等片刻
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // 发送音量命令
    std::cout << "UI: 发送SetVolume命令" << std::endl;
    auto volume_cmd = MessageFactory::create<AudioControlMessage>(
        AudioControlMessage::Command::SetVolume, 0.8f);
    ui_comm->send_message_with_qos(std::move(volume_cmd), "audio_engine", QoSLevel::Reliable);
    
    // 稍等片刻
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // 发送大量音频数据(使用共享内存)
    std::cout << "UI: 发送音频数据" << std::endl;
    std::vector<float> audio_samples(48000 * 2);  // 1秒的48kHz立体声音频
    
    // 生成一个440Hz的正波
    for (size_t i = 0; i < audio_samples.size() / 2; ++i) {
        float sample = std::sin(2.0f * 3.14159f * 440.0f * i / 48000.0f);
        audio_samples[i * 2] = sample;       // 左声道
        audio_samples[i * 2 + 1] = sample;   // 右声道
    }
    
    auto audio_data = MessageFactory::create<AudioDataMessage>(audio_samples);
    ui_comm->send_message(std::move(audio_data), "audio_engine");
    
    // 等待引擎线程结束
    engine_thread.join();
    
    // 关闭通信
    engine_comm->shutdown();
    ui_comm->shutdown();
}

性能优化

传输选择策略

系统在以下情况下自动选择不同的传输方式:

  1. ZeroMQ控制通道适用于:

    • 小型控制消息(<4KB
    • 需要保证送达的消息
    • 事件和状态更新
    • 需要广播的消息
  2. **共享内存(数据道)**适用于:

    • 大型数据传输(>4KB
    • 音频缓冲区传输
    • 高频、低延迟的数据传输
    • 实时处理流

优化策略:

  • 消息大小阈值默认为4KB可根据系统性能特性调整
  • 对于频繁传输的小消息,可以批量合并后使用共享内存
  • 针对不同QoS级别选择不同传输方式

内存管理

针对高性能音频处理的内存优化策略:

  1. 内存对齐

    • 所有共享内存缓冲区按SIMD指令集要求对齐通常为16、32或64字节
    • 使用AlignedBuffer<T, Alignment>模板确保确对齐
  2. 内存复用

    • 使用内存池模式预分配常用大小的缓冲区
    • 环形缓冲区实现零拷贝读写
  3. 锁无关操作

    • LockFreeRingBuffer使原子操作避免锁开销
    • 三缓冲机制完全避免生产者-消费者之间的阻塞
  4. 大页内存

    • 支持启用大页内存huge pages提高性能
    • 适用于大型共享内存段

线程模型

通信系统的多线程设计:

  1. 背景线程

    • 每个ZmqTransportBase例有一个专用工作线程
    • 共享内存管理不需要专用线程
  2. I/O线程

    • ZeroMQ上下文使用可配置数量的I/O线程
    • 推荐每个核心1-2个I/O线程
  3. 线程安全

    • 所有公共接口都是线程安全的
    • 使用细粒度锁减少争用
    • 关键路径使用无锁数据结构

跨平台考虑

Windows特定实现

  • 共享内存使用Windows命名共享内存CreateFileMapping/MapViewOfFile
  • 进程间同步:使用命名互斥量和事件对象
  • 高精度计时器使用QueryPerformanceCounter/QueryPerformanceFrequency
  • 网络通信使用Winsock2 API

Windows平台的注意项

  • 共享内存名称前缀为"Global"以支持跨用户会话通信
  • 权限控制使用SECURITY_ATTRIBUTES和DACL
  • 大页内存需要开启SeLockMemoryPrivilege特权

Linux特定实现

  • 共享内存支持POSIX SHMshm_open/mmapSystem V SHMshmget/shmat
  • 进程间同步使用POSIX命名信号量和共享互斥量
  • 高精度计时器使用clock_gettime(CLOCK_MONOTONIC)
  • 网络通信使用标准BSD套接字API

Linux平台的注意事项

  • 共享内存路径前缀为"/dev/shm/"
  • 权限控制使用标准POSIX权限模型
  • 大页内存需要配置HugeTLB系统参数

macOS特定实现

  • 共享内存使用POSIX SHMmacOS不支持System V SHM
  • 进程间同步使用POSIX命名信号量和共享互斥量
  • 高精度计时器使用mach_absolute_time()
  • 网络通信使用标准BSD套接字API

macOS平台的注意事

  • macOS中POSIX SHM对象路径默认限制为"/tmp/"目录
  • 沙盒应用需要特殊权限访问共享内存
  • Apple Silicon (ARM) 架构需要考虑字节序和SIMD指令差异