C++ 服务端进阶(六)—— 工程化落地:协议、缓冲区与超时机制(最终闭环篇)
一、这一篇到底在做什么到第五篇为止你已经完成Connection结构多 Reactor并发协程执行Connection 协程模型 你已经可以写一个结构正确 架构正确 执行优雅 的服务端但还差最后一步❗工程问题现实中的问题你的当前代码其实还存在这些问题1️⃣ 半包 / 粘包问题read 一次 ≠ 一条完整消息2️⃣ 写缓冲问题write 不一定一次写完3️⃣ 连接无超时客户端不发数据 → 永远占着连接4️⃣ 生命周期不安全close / remove 时机容易出错 所以这一篇的目标是让这套模型具备“工程可用性”二、本篇目标在现有架构基础上补齐Connection ↓ 协议解析半包/粘包 ↓ Buffer 管理 ↓ 超时机制 ↓ 安全关闭三、最终结构第六篇. ├── Task.h ├── Reactor.h / Reactor.cpp ├── ReactorThread.h / ReactorThread.cpp ├── ReactorThreadPool.h / ReactorThreadPool.cpp ├── SocketUtil.h / SocketUtil.cpp ├── Connection.h / Connection.cpp ├── Timer.h / Timer.cpp └── main.cpp四、本篇新增核心能力1️⃣ Buffer 管理关键❗问题之前read(fd, buffer) 没有考虑半包多条消息残留数据✅ 方案引入 readBuffer_std::string readBuffer_; 数据流变成socket → readBuffer → 解析 → writeBuffer2️⃣ 简单协议设计避免半包我们用最简单的协议[消息长度][消息内容]举例5hello 表示长度5内容hello3️⃣ 超时机制Timer❗问题客户端连上不发数据连接永远不释放 ❌✅ 方案记录时间lastActiveTime_定期检查超过 N 秒 → 关闭4️⃣ 生命周期控制重点必须保证removeFd → close → 标记无效顺序正确。五、核心代码实现1️⃣ Timer.h#ifndef TIMER_H #define TIMER_H #include chrono class Timer { public: static int64_t now() { return std::chrono::duration_caststd::chrono::seconds( std::chrono::steady_clock::now().time_since_epoch()).count(); } }; #endif2️⃣ Connection.h升级#ifndef CONNECTION_H #define CONNECTION_H #include Task.h #include string class Reactor; class Connection { public: Connection(int fd, Reactor reactor); ~Connection(); void start(); bool isTimeout(int64_t now); private: DetachedTask run(); void closeSelf(); void processBuffer(); private: int fd_; Reactor reactor_; std::string readBuffer_; std::string writeBuffer_; int64_t lastActiveTime_; }; #endif3️⃣ Connection.cpp核心升级#include Connection.h #include Reactor.h #include Timer.h #include unistd.h #include errno.h #include iostream Connection::Connection(int fd, Reactor reactor) : fd_(fd), reactor_(reactor) { lastActiveTime_ Timer::now(); } Connection::~Connection() { if (fd_ 0) { ::close(fd_); } } void Connection::start() { run(); } bool Connection::isTimeout(int64_t now) { return (now - lastActiveTime_) 60; // 60秒超时 } DetachedTask Connection::run() { char buffer[1024]; while (true) { co_await reactor_.readable(fd_); while (true) { ssize_t n ::read(fd_, buffer, sizeof(buffer)); if (n 0) { lastActiveTime_ Timer::now(); readBuffer_.append(buffer, n); processBuffer(); } else if (n 0) { closeSelf(); co_return; } else { if (errno EAGAIN) break; closeSelf(); co_return; } } while (!writeBuffer_.empty()) { co_await reactor_.writable(fd_); ssize_t wn ::write(fd_, writeBuffer_.data(), writeBuffer_.size()); if (wn 0) { writeBuffer_.erase(0, wn); } else if (errno ! EAGAIN) { closeSelf(); co_return; } } } } void Connection::processBuffer() { while (true) { if (readBuffer_.size() 1) return; int len readBuffer_[0] - 0; if (readBuffer_.size() (size_t)(1 len)) return; std::string msg readBuffer_.substr(1, len); readBuffer_.erase(0, 1 len); std::cout [msg] msg std::endl; // echo writeBuffer_.append(std::to_string(len)); writeBuffer_.append(msg); } } void Connection::closeSelf() { if (fd_ 0) { reactor_.removeFd(fd_); ::close(fd_); fd_ -1; } }4️⃣ main.cpp增加超时清理#include ReactorThreadPool.h #include SocketUtil.h #include Connection.h #include Timer.h #include vector #include iostream int main() { int listenFd createListenFd(8080); ReactorThreadPool pool(4); pool.start(); std::vectorConnection* conns; while (true) { int clientFd accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK); if (clientFd 0) { Reactor* sub pool.getNextReactor(); Connection* conn new Connection(clientFd, *sub); conn-start(); conns.push_back(conn); } // 简单超时扫描 int64_t now Timer::now(); for (auto it conns.begin(); it ! conns.end();) { if ((*it)-isTimeout(now)) { delete *it; it conns.erase(it); } else { it; } } } }六、本篇核心提升总结前五篇解决的是模型正确 ✔架构正确 ✔执行正确 ✔第六篇解决的是协议正确 ✔数据完整 ✔连接可控 ✔资源安全 ✔七、这一篇你最该记住的 5 句话read 一次 ≠ 一条消息Buffer 是服务端的核心协议决定服务端复杂度所有连接都必须可回收工程问题决定系统能不能上线八、整个 6 篇最终总结① Connection → 结构 ② 多 Reactor → 并发 ③ 协程 → 执行 ④ 多 Reactor 协程 → 架构融合 ⑤ Connection 协程 → 模型优化 ⑥ 工程化 → 实战落地九、终极一句话你已经从“会写 socket” → “会设计高并发服务端系统”总结epoll 是基础Reactor 是调度Connection 是骨架多 Reactor 是架构协程是执行模型工程化决定系统是否能上线