添加共享内存管理和音频缓冲区功能,更新 CMake 配置以支持 Boost 组件

This commit is contained in:
daiqingshuang
2025-08-26 17:19:56 +08:00
parent b96404e317
commit 5266d6429d
24 changed files with 568 additions and 189 deletions

View File

@@ -35,6 +35,8 @@ setup_project_options(
INTERFACE_TARGET config_target
)
add_definitions(-D_WIN32_WINNT=0x0A00 -DWIN32_LEAN_AND_MEAN -DNOMINMAX)
# NOTE: 硬编码构建目录可能会降低灵活性。
# 通常,更推荐的做法是让用户在调用 CMake 时通过 -B <build_dir> 参数来指定构建目录,
# 以支持灵活的 out-of-source builds。
@@ -50,7 +52,7 @@ find_package(gRPC CONFIG REQUIRED)
find_package(Protobuf CONFIG REQUIRED)
find_package(ZeroMQ CONFIG REQUIRED)
find_package(spdlog CONFIG REQUIRED)
find_package(Boost CONFIG REQUIRED COMPONENTS system interprocess lockfree asio process)
find_package(Boost CONFIG REQUIRED COMPONENTS system interprocess lockfree asio process uuid circular_buffer thread)
# --- Protocol Buffers 编译 (Protocol Buffers Compilation) ---
# 使用自定义函数编译 .proto 文件,自动生成 C++ 源代码和 gRPC 服务代码。

View File

@@ -30,14 +30,9 @@ target_link_libraries(${PROJECT_NAME} PRIVATE
gRPC::grpc++
protobuf::libprotobuf
libzmq
Boost::process
Boost::interprocess
Boost::system
Boost::lockfree
Boost::asio
AlichoProto
AlichoMisc
ws2_32
Boost::process
)
target_compile_definitions(${PROJECT_NAME} PRIVATE _WIN32_WINNT=0x0A00 WIN32_LEAN_AND_MEAN)
target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src)

View File

@@ -1,21 +1,29 @@
#include "ipc/shm_manager.h"
#include "plugin_manage/plugin_host_manager.h"
#include "rpc/rpc_session.h"
#include <filesystem>
#include "ipc/ipc_node.h"
#include "rpc/engine_rpc.h"
int main(int argc, char *argv[])
{
std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)");
plugin_host_manager::get_instance().load_plugin(path);
shm_manager::get_instance().init(true);
engine_ipc_node node;
// std::filesystem::path path(R"(D:\Projects\Alicho\src\backend\src\vst2_host\test\4Front Piano x64.dll)");
// plugin_host_manager::get_instance().load_plugin(path);
uint32_t times = 0;
while (true) {
// 模拟主循环
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (++times > 10) {
break; // 运行10次后退出
}
node.process_rpc();
// if (++times > 10) {
// break; // 运行10次后退出
// }
}
return 0;

View File

@@ -1,5 +1,8 @@
#include "plugin_host_manager.h"
#include <filesystem>
#include <spdlog/spdlog.h>
#include "plugin_instance.h"
plugin_host_manager::~plugin_host_manager() {

View File

@@ -4,7 +4,6 @@
#include "lazy_singleton.h"
#include "plugin_instance.h"
#include "rpc/rpc_session.h"
class plugin_host_manager : public lazy_singleton<plugin_host_manager> {
public:
@@ -33,5 +32,4 @@ private:
std::unordered_map<uint32_t, std::unique_ptr<plugin_instance>> plugin_instances_;
std::atomic_uint32_t plugin_id_ = 1;
boost::asio::io_context io_context_;
rpc_server rpc_server_{ io_context_ };
};

View File

@@ -1,10 +1,5 @@
#include "engine_rpc.h"
void __rpc_handler_func_PROCESS_DONE(uint64_t seq_id, const rpc::process_done& body) {
namespace engine_rpc {
RPC_REG(LOG, log_impl)
}
void __rpc_handler_func_REGISTER_HOST(uint64_t seq_id, const rpc::register_host& body) {
}

View File

@@ -1,8 +1,20 @@
#pragma once
#include "rpc/common.h"
#include "rpc/rpc_type.h"
#include "rpc/rpc_manager.h"
REGISTER_RPC_HANDLER(LOG, rpc::log)
REGISTER_RPC_HANDLER(REGISTER_HOST, rpc::register_host)
REGISTER_RPC_HANDLER(PROCESS_DONE, rpc::process_done)
namespace engine_rpc {
struct log_impl : log {
void process() {
const auto str = get_str();
spdlog::log(level, "[Engine] {}", str->c_str());
destory_str();
}
};
struct test {
uint32_t id;
void process() {
spdlog::info("收到测试消息ID: {}", id);
}
};
}

View File

@@ -33,3 +33,10 @@ target_link_libraries(${PROJECT_NAME} PRIVATE config_target)
target_link_libraries(${PROJECT_NAME} PUBLIC spdlog::spdlog)
target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src)
add_os_definitions(${PROJECT_NAME})
target_link_libraries(${PROJECT_NAME} PUBLIC
Boost::circular_buffer
Boost::uuid
Boost::thread
Boost::interprocess
Boost::lockfree
)

View File

@@ -0,0 +1 @@
#include "ipc_node.h"

View File

@@ -0,0 +1,108 @@
#pragma once
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/container/vector.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include "shm_manager.h"
#include "rpc/rpc_manager.h"
namespace bi = boost::interprocess;
namespace bc = boost::container;
class ipc_node {
public:
friend class engine_ipc_node;
friend class plugin_ipc_node;
using queue_t = boost::lockfree::spsc_queue<rpc_message_t, boost::lockfree::capacity<1024>>;
~ipc_node() {
close_queue();
}
void create_queue(const std::string& in_name) {
rpc_queue = shm_manager::msg_construct<queue_t>(in_name.c_str());
if (!rpc_queue) {
throw std::runtime_error("无法创建IPC队列");
}
is_queue_owner = true;
}
void open_queue(const std::string& in_name) {
auto [queue_ptr, size] = shm_manager::msg_find<queue_t>(in_name.c_str());
if (!queue_ptr) {
throw std::runtime_error("无法打开IPC队列");
}
rpc_queue = queue_ptr;
is_queue_owner = false;
}
void close_queue() {
if (!rpc_queue)
return;
if (is_queue_owner) {
shm_manager::msg_destroy_ptr(rpc_queue);
}
rpc_queue = nullptr;
is_queue_owner = false;
}
template<typename Msg>
void call_rpc(const Msg& in_msg) {
auto message_id = Msg::rpc_id;
auto data_ptr = reinterpret_cast<const std::byte*>(std::addressof(in_msg));
auto data_size = sizeof(Msg);
rpc_message_t msg;
msg.message_id = message_id;
msg.data.insert(msg.data.end(), data_ptr, data_ptr + data_size);
rpc_queue->push(std::move(msg));
}
void process_rpc() {
rpc_message_t msg;
while (rpc_queue->pop(msg)) {
// 处理消息
rpc_message_handler::get_instance().handle_message(msg);
}
}
protected:
queue_t* rpc_queue;
bool is_queue_owner = false;
private:
ipc_node(): rpc_queue(nullptr) {
}
};
class engine_ipc_node {
public:
engine_ipc_node() {
engine_queue.create_queue("engine_queue");
}
void process_rpc() {
engine_queue.process_rpc();
}
private:
ipc_node engine_queue;
};
class plugin_ipc_node {
public:
plugin_ipc_node(const std::string& in_queue_name) {
engine_queue.open_queue("engine_queue");
plugin_queue.create_queue(in_queue_name);
}
template<typename Msg>
void call_rpc(const Msg& in_msg) {
engine_queue.call_rpc(in_msg);
}
void process_rpc() {
plugin_queue.process_rpc();
}
private:
ipc_node engine_queue;
ipc_node plugin_queue;
};

View File

@@ -0,0 +1 @@
#include "plugin_sandbox_data.h"

View File

@@ -0,0 +1,9 @@
#pragma once
#include <boost/atomic.hpp>
class plugin_sandbox_data {
public:
boost::atomic<bool> ready{ false };
boost::atomic<uint64_t> frame_counter{0};
char name[256];
};

View File

@@ -0,0 +1 @@
#include "shm_audio_buffer.h"

View File

@@ -0,0 +1,158 @@
// shm_audio_buffer.h
#pragma once
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/container/vector.hpp>
#include <memory>
#include <stdexcept>
#include "shm_manager.h"
namespace bi = boost::interprocess;
namespace bc = boost::container;
class shm_audio_buffer {
public:
using segment_manager = bi::managed_shared_memory::segment_manager;
using mutex_type = bi::interprocess_mutex;
using scoped_lock = bi::scoped_lock<mutex_type>;
using buffer_allocator_t = bi::allocator<float, segment_manager>;
using channel_vec_t = bc::vector<float, buffer_allocator_t>;
using channel_allocator_t = bi::allocator<channel_vec_t, segment_manager>;
using channel_container_t = bc::vector<channel_vec_t, channel_allocator_t>;
// 构造函数显式接收分配器
explicit shm_audio_buffer()
: buffers{shm_manager::ab_mgr<channel_allocator_t>()}
{
auto* shm = shm_manager::get_instance().get_audio_buffer_shm();
if (!shm) {
throw std::runtime_error("Shared memory segment not initialized");
}
}
// 设置通道数
void channels(size_t in_ch) {
if (in_ch == 0) {
throw std::invalid_argument("Channel count must be greater than 0");
}
auto alloc = shm_manager::ab_mgr<buffer_allocator_t>();
buffers.clear();
buffers.reserve(in_ch);
for (size_t i = 0; i < in_ch; ++i) {
buffers.emplace_back(alloc);
}
}
// 获取通道数
[[nodiscard]] size_t channels() const {
return buffers.size();
}
// 设置块大小
void block_size(size_t in_size) {
if (buffers.empty()) {
throw std::logic_error("Channels must be set before block size");
}
for (auto& buf : buffers) {
buf.resize(in_size, 0.0f); // 初始化为0
}
}
// 获取块大小
[[nodiscard]] size_t block_size() const {
if (buffers.empty()) return 0;
return buffers[0].size();
}
// 获取指定通道的数据指针
[[nodiscard]] float* data(size_t ch) {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
return buffers[ch].data();
}
[[nodiscard]] const float* data(size_t ch) const {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
return buffers[ch].data();
}
// 获取所有通道的指针数组
[[nodiscard]] bc::vector<float*> headers() {
bc::vector<float*> ptrs;
ptrs.reserve(buffers.size());
for (auto& buf : buffers) {
ptrs.push_back(buf.data());
}
return ptrs;
}
[[nodiscard]] bc::vector<const float*> headers() const {
bc::vector<const float*> ptrs;
ptrs.reserve(buffers.size());
for (const auto& buf : buffers) {
ptrs.push_back(buf.data());
}
return ptrs;
}
// 获取指定通道的向量引用
[[nodiscard]] channel_vec_t& block(size_t ch) {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
return buffers[ch];
}
[[nodiscard]] const channel_vec_t& block(size_t ch) const {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
return buffers[ch];
}
// 清空所有缓冲区数据
void clear() {
for (auto& buf : buffers) {
std::ranges::fill(buf, 0.0f);
}
}
// 复制数据到缓冲区
void copy_from(size_t ch, const float* src, size_t samples) {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
if (samples > buffers[ch].size()) {
throw std::out_of_range("Sample count exceeds buffer size");
}
std::copy_n(src, samples, buffers[ch].begin());
}
// 复制数据从缓冲区
void copy_to(size_t ch, float* dst, size_t samples) const {
if (ch >= buffers.size()) {
throw std::out_of_range("Channel index out of range");
}
if (samples > buffers[ch].size()) {
throw std::out_of_range("Sample count exceeds buffer size");
}
std::copy_n(buffers[ch].begin(), samples, dst);
}
private:
channel_container_t buffers;
};

View File

@@ -0,0 +1,15 @@
#include "shm_manager.h"
#include "shm_audio_buffer.h"
bool shm_manager::allocate_shm() {
try {
message_shm_block = create_shm_block(message_shm_name, message_shm_size);
audio_buffer_shm_block = create_shm_block(audio_buffer_shm_name, audio_buffer_shm_size);
str_shm_block = create_shm_block(string_shm_name, str_shm_size);
return true;
} catch (const bi::interprocess_exception& e) {
spdlog::error("无法创建共享内存:{}", e.what());
return false;
}
}

View File

@@ -0,0 +1,127 @@
#pragma once
#include <spdlog/spdlog.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "lazy_singleton.h"
#include "rpc/common.h"
class shm_audio_buffer;
/**
* 共享内存管理类,管理共享内存的创建和销毁
* 会在内存已满时自动创建新的共享内存段
* 同时管理Plugin Sandbox的插槽和AudioBuffer
*/
class shm_manager : public lazy_singleton<shm_manager> {
public:
using shm_ptr = std::shared_ptr<bi::managed_shared_memory>;
const char* message_shm_name = "alicho_message_ipc_memory";
const char* audio_buffer_shm_name = "alicho_audio_buffer_ipc_memory";
const char* string_shm_name = "alicho_string_memory";
const size_t message_shm_size = 1024 * 1024 * 256; // 256MB
const size_t audio_buffer_shm_size = 1024 * 1024 * 512; // 512MB
const size_t str_shm_size = 1024 * 1024 * 256; // 256MB
void init(bool in_is_engine = false) {
is_engine = in_is_engine;
allocate_shm();
}
~shm_manager() {
if (is_engine) {
bi::shared_memory_object::remove(message_shm_name);
bi::shared_memory_object::remove(audio_buffer_shm_name);
bi::shared_memory_object::remove(string_shm_name);
}
}
template<typename T>
static auto msg_mgr() {
return T{ get_instance().message_shm_block->get_segment_manager() };
}
template<typename T>
static auto ab_mgr() {
return T{ get_instance().audio_buffer_shm_block->get_segment_manager() };
}
template<typename T>
static auto msg_construct(const char* name) {
return get_instance().message_shm_block->construct<T>(name)();
}
template<typename T>
static auto ab_construct(const char* name) {
return get_instance().audio_buffer_shm_block->construct<T>(name)();
}
template<typename T>
static auto msg_find(const char* name) {
return get_instance().message_shm_block->find<T>(name);
}
template<typename T>
static auto ab_find(const char* name) {
return get_instance().audio_buffer_shm_block->find<T>(name);
}
template<typename T>
static auto msg_find_or_construct(const char* name) {
return get_instance().message_shm_block->find_or_construct<T>(name)();
}
template<typename T>
static auto ab_find_or_construct(const char* name) {
return get_instance().audio_buffer_shm_block->find_or_construct<T>(name)();
}
template<typename T>
static void msg_destroy(const char* name) {
get_instance().message_shm_block->destroy<T>(name);
}
template<typename T>
static void ab_destroy(const char* name) {
get_instance().audio_buffer_shm_block->destroy<T>(name);
}
template<typename T>
static void msg_destroy_ptr(const T* ptr) {
get_instance().message_shm_block->destroy_ptr(ptr);
}
template<typename T>
static void ab_destroy_ptr(const T* ptr) {
get_instance().audio_buffer_shm_block->destroy_ptr(ptr);
}
shm_string* create_str(const std::string& id, const std::string& str) {
const auto ptr = str_shm_block->find_or_construct<shm_string>(id.c_str())(str.c_str(),
char_allocator{ str_shm_block->get_segment_manager() });
return ptr;
}
shm_string* create_str(const std::string& id) {
return create_str(id, "");
}
shm_string* find_str(const std::string& id) {
auto [ptr, size] = str_shm_block->find<shm_string>(id.c_str());
return ptr;
}
void destroy_str(const std::string& id) {
str_shm_block->destroy<shm_string>(id.c_str());
}
void destroy_str(shm_string* ptr) {
str_shm_block->destroy_ptr(ptr);
}
auto* get_message_shm() { return message_shm_block.get(); }
auto* get_audio_buffer_shm() { return audio_buffer_shm_block.get(); }
private:
// 分配新的共享内存段
bool allocate_shm();
auto create_shm_block(const std::string& in_name, size_t in_size) {
if (is_engine) {
bi::shared_memory_object::remove(in_name.c_str());
return std::make_shared<bi::managed_shared_memory>(bi::open_or_create, in_name.c_str(), in_size);
}
return std::make_shared<bi::managed_shared_memory>(bi::open_only, in_name.c_str());
}
bool is_engine = false;
shm_ptr message_shm_block;
shm_ptr audio_buffer_shm_block;
shm_ptr str_shm_block;
};

View File

@@ -1,7 +1,12 @@
#pragma once
#include <string>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/container/string.hpp>
namespace bi = boost::interprocess;
namespace bc = boost::container;
constexpr size_t AUDIO_BLOCK_SIZE = 4096; // 最大音频块大小
constexpr size_t QUEUE_CAPACITY = 4; // 音频队列容量
@@ -16,6 +21,15 @@ struct audio_block {
// 单生产者单消费者无锁队列
using lock_free_queue = boost::lockfree::spsc_queue<audio_block, boost::lockfree::capacity<QUEUE_CAPACITY>>;
template<typename T>
using allocator_t = bi::allocator<T, bi::managed_shared_memory::segment_manager>;
using char_allocator = allocator_t<char>;
using shm_string = bc::basic_string<char, std::char_traits<char>, char_allocator>;
template<typename Msg>
struct msg_id_t{};
// 根据 plugin_id 生成 IPC 资源名称
inline std::string get_uds_path() {
#if ALICHO_PLATFORM_WINDOWS

View File

@@ -1,11 +1,26 @@
#pragma once
#include <spdlog/spdlog.h>
#include <boost/container/vector.hpp>
#include "lazy_singleton.h"
#include "rpc_type.h"
#include "common.h"
#include "ipc/shm_manager.h"
using rpc_callback = std::function<void(uint64_t, const std::vector<std::byte>&)>;
using rpc_callback = std::function<void(const void*)>;
namespace bc = boost::container;
struct rpc_message_t {
using alloca_t = allocator_t<std::byte>;
rpc_message_t() : message_id(rpc::message_type::WTF),
data(shm_manager::msg_mgr<alloca_t>()) {
}
rpc::message_type message_id;
bc::vector<std::byte, alloca_t> data;
};
class rpc_message_handler : public lazy_singleton<rpc_message_handler> {
public:
@@ -15,33 +30,29 @@ public:
handlers_[type] = std::move(handler);
}
void handle_message(const rpc::rpc_header& in_header, const std::vector<std::byte>& in_payload) {
const auto& it = handlers_.find(in_header.type);
void handle_message(const rpc_message_t& in_msg) {
const auto& it = handlers_.find(in_msg.message_id);
if (it != handlers_.end()) {
it->second(in_header.seq_id, in_payload);
spdlog::info("处理RPC消息ID: {}", static_cast<uint32_t>(in_msg.message_id));
it->second(in_msg.data.data());
} else {
spdlog::warn("未处理的RPC消息类型: {}", static_cast<uint32_t>(in_header.type));
spdlog::warn("未处理的RPC消息类型: {}", static_cast<uint32_t>(in_msg.message_id));
}
}
private:
std::unordered_map<rpc::message_type, rpc_callback> handlers_;
};
template<typename T>
template<typename Msg>
struct rpc_handler_register {
explicit rpc_handler_register(rpc::message_type in_type, const std::function<void(uint64_t, const T&)>& in_func) {
auto cast_func = [in_func](uint64_t in_seq_id, const std::vector<std::byte>& msg) {
#if ALICHO_DEBUG
assert(msg.size() == sizeof(T));
#endif
auto payload = reinterpret_cast<const T*>(msg.data());
in_func(in_seq_id, *payload);
explicit rpc_handler_register(rpc::message_type in_type) {
auto real_func = [] (const void* payload){
((Msg*)payload)->process();
};
rpc_message_handler::get_instance().register_rpc_handler(in_type, cast_func);
rpc_message_handler::get_instance().register_rpc_handler(in_type, real_func);
}
};
#define REGISTER_RPC_HANDLER(type, body_type) \
void __rpc_handler_func_##type(uint64_t seq_id, const body_type& body); \
static rpc_handler_register<body_type> __rpc_handler_register_##type(rpc::message_type::type, __rpc_handler_func_##type);
#define RPC_REG(type, t) \
inline static auto t##_register = rpc_handler_register<t>(rpc::message_type::##type); \

View File

@@ -1,8 +1,16 @@
// RPC调用消息体结构, 由AudioEngine和PluginHost之间传输
#pragma once
#include <cstdint>
#include <boost/container/vector.hpp>
#include <boost/uuid.hpp>
#include <spdlog/spdlog.h>
#include "ipc/shm_manager.h"
#include "common.h"
namespace bc = boost::container;
#define DEFINE_ID(type) static constexpr auto rpc_id = rpc::message_type::##type;
namespace rpc {
enum class message_type : uint32_t {
WTF = 0, // 未知消息类型, 用于调试
@@ -18,36 +26,35 @@ namespace rpc {
SET_PARAMETER, // 设置参数
SHUTDOWN // 关闭插件
};
struct rpc_header {
message_type type; // 消息类型
uint64_t seq_id; // 消息序列号
uint32_t payload_size; // 有效载荷大小
};
}
namespace rpc {
namespace engine_rpc {
struct log {
uint32_t id; // 插件ID
spdlog::level::level_enum level;
std::array<char, MAX_PAYLOAD_SIZE> str;
};
DEFINE_ID(LOG)
struct register_host {
uint32_t id;
};
log() : level(spdlog::level::info) {
id = boost::uuids::random_generator()();
}
struct process_done {
uint32_t id; // 插件ID
};
struct parameter_value_changed {
uint32_t id; // 插件ID
uint32_t parameter_id; // 参数ID
float value; // 新值
void set_str(const std::string& in_str) {
auto str = shm_manager::get_instance().create_str(get_id(), in_str);
*str = in_str.c_str();
}
void set_level(spdlog::level::level_enum in_level) {
level = in_level;
}
protected:
void destory_str() {
shm_manager::get_instance().destroy_str(get_id());
}
auto get_id() const {
return boost::uuids::to_string(id) + "-log_shm_string";
}
auto get_str() const {
return shm_manager::get_instance().find_str(get_id());
}
protected:
boost::uuids::uuid id;
spdlog::level::level_enum level;
};
}
namespace rpc {
}

View File

@@ -1,41 +0,0 @@
#include "session.h"
#include "rpc_manager.h"
void rpc::session::start() {
do_read_header();
}
void rpc::session::stop() {
}
void rpc::session::do_read_header() {
auto self = shared_from_this();
// 读取消息到 read_queue_ 中
auto func = [self](boost::system::error_code ec, std::size_t in_size) {
if (ec)
return;
self->do_read_body();
};
boost::asio::async_read(socket_, boost::asio::buffer{read_header_, 4}, func);
}
void rpc::session::do_read_body() {
auto self = shared_from_this();
auto func = [self](boost::system::error_code ec, std::size_t in_size) {
if (ec)
return;
// 处理读取到的消息
rpc_message_handler::get_instance().handle_message(self->read_header_, self->read_body_);
};
read_body_.resize(read_header_.payload_size);
boost::asio::async_read(socket_, boost::asio::buffer(read_body_, read_header_.payload_size), func);
}
void rpc::session::do_write() {
}
void rpc::server::do_accept() {
}

View File

@@ -1,72 +0,0 @@
#pragma once
#include <deque>
#include <memory>
#include <boost/asio.hpp>
#include "common.h"
#include "rpc_type.h"
namespace rpc {
class session : public std::enable_shared_from_this<session> {
public:
explicit session(boost::asio::local::stream_protocol::socket&& in_socket)
: socket_(std::move(in_socket)) {
}
void start();
void stop();
template<typename Msg>
auto async_call(Msg&& msg) {
// 这里可以实现异步调用逻辑
// 比如将消息放入写队列,触发写操作等
return boost::asio::post(socket_.get_executor(), [this, msg = std::forward<Msg>(msg)]() mutable {
write_queue_.emplace_back(std::move(msg));
if (write_queue_.size() == 1) {
do_write();
}
});
}
private:
void do_read_header();
void do_read_body();
void do_write();
boost::asio::local::stream_protocol::socket socket_;
rpc_header read_header_;
std::vector<std::byte> read_body_;
std::deque<std::vector<std::byte>> write_queue_; // 写队列, 用于发送消息
std::atomic_uint32_t next_id_{ 1 };
};
class server : public std::enable_shared_from_this<server> {
public:
explicit server(boost::asio::io_context& io_context) : acceptor_(io_context,
boost::asio::local::stream_protocol::endpoint(get_uds_path()), false) {
}
private:
void do_accept();
boost::asio::local::stream_protocol::acceptor acceptor_;;
};
class client {
public:
explicit client(boost::asio::io_context& io_context) {
boost::asio::local::stream_protocol::socket socket_(io_context);
socket_.connect(get_uds_path());
session_ = std::make_shared<session>(std::move(socket_));
session_->start();
}
template<typename Msg>
auto async_call(Msg&& msg) {
}
private:
std::shared_ptr<session> session_;
};
}

View File

@@ -17,10 +17,6 @@ target_link_libraries(${PROJECT_NAME} PRIVATE
config_target
glfw
AlichoMisc
Boost::interprocess
Boost::lockfree
Boost::asio
Boost::system
)
target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src)
register_plugin_host(${PROJECT_NAME})

View File

@@ -7,13 +7,24 @@
#include "spdlog/spdlog.h"
#include <boost/interprocess/managed_shared_memory.hpp>
#include "ipc/ipc_node.h"
namespace bi = boost::interprocess;
int main(int argc, char *argv[])
{
if (argc != 3)
return 1;
// if (argc != 3)
// return 1;
shm_manager::get_instance().init();
engine_rpc::log t{};
t.set_str("Hello from Plugin Sandbox!");
t.set_level(spdlog::level::info);
plugin_ipc_node node("plugin1");
node.call_rpc(t);
#if 0
std::string shm_input_path = argv[1];
std::string shm_output_path = argv[2];
@@ -65,5 +76,6 @@ int main(int argc, char *argv[])
bi::shared_memory_object::remove(shm_input_path.c_str());
bi::shared_memory_object::remove(shm_output_path.c_str());
#endif
return 0;
}

View File

@@ -14,5 +14,17 @@
}, {
"name" : "boost-lockfree",
"version>=" : "1.88.0"
}, {
"name" : "tbb",
"version>=" : "2022.1.0"
}, {
"name" : "boost-circular-buffer",
"version>=" : "1.88.0"
}, {
"name" : "boost-uuid",
"version>=" : "1.88.0"
}, {
"name" : "boost-thread",
"version>=" : "1.88.0"
} ]
}