C++ 服务端进阶(一)—— 从 Reactor 到 Connection:服务骨架设计(工程版)
一、为什么要从 handler lambda 升级到 Connection 对象在最开始的 mini epoll Reactor demo 里我们常常这样写reactor.add(client_fd, EPOLLIN, [](int cfd) { char buf[1024]; int n read(cfd, buf, sizeof(buf)); if (n 0) { write(cfd, buf, n); } else { reactor.remove(cfd); } });这种写法能跑但只能算函数式 demo一旦你要加连接状态读缓冲区 / 写缓冲区请求解析日志超时关闭策略代码就会迅速失控。所以服务端真正应该做的第一步是建立这个抽象fd ↓ Connection ↓ onRead() onWrite() onClose()也就是连接不是一个数字而是一个对象。二、这一篇的核心目标这一篇只做一件很关键的事把 Reactor 的处理单元从“fd → lambda”升级成“fd → Connection 对象”同时把结构彻底理顺正确结构应该是main ↓ 创建 Reactor ↓ 把监听 fd 注册给 Reactor ↓ reactor.loop() 统一事件循环 ↓ server_fd 事件 → Acceptor 处理 accept client_fd 事件 → Connection 处理 onRead / onWrite / onClose也就是说main()不再自己写第二套 epoll 循环Reactor成为唯一事件循环中心Acceptor负责接连接Connection负责处理连接这一版控制流就是干净的。三、最终工程结构推荐目录. ├── EventHandler.h ├── Reactor.h ├── Reactor.cpp ├── Connection.h ├── Connection.cpp ├── Acceptor.h ├── Acceptor.cpp ├── main.cpp职责划分EventHandler.h统一抽象事件处理接口。Reactor.h / Reactor.cpp负责epoll 创建注册 / 修改 / 删除 fdfd → EventHandler 映射统一事件循环Connection.h / Connection.cpp负责持有连接 fd读写缓冲区onRead / onWrite / onCloseAcceptor.h / Acceptor.cpp负责监听 socket 的 accept创建新 Connection注册给 Reactormain.cpp只负责初始化创建监听 socket创建 Reactor创建 Acceptor启动 loop这就是一个标准的最小服务骨架。四、先定义统一事件接口EventHandler.h#ifndef EVENT_HANDLER_H #define EVENT_HANDLER_H class EventHandler { public: virtual ~EventHandler() default; virtual int fd() const 0; virtual void onRead() 0; virtual void onWrite() 0; virtual void onClose() 0; }; #endif这一层是干什么的Reactor 不应该知道你是监听 socket还是普通连接还是以后别的事件源它只需要知道谁能处理事件所以抽象成统一接口fd()onRead()onWrite()onClose()这样Acceptor可以实现它Connection也可以实现它这就是面向对象后的 Reactor。五、定义 ReactorReactor.h#ifndef REACTOR_H #define REACTOR_H #include unordered_map #include cstdint class EventHandler; class Reactor { public: Reactor(); ~Reactor(); bool addHandler(EventHandler* handler, uint32_t events); bool modifyHandler(int fd, uint32_t events); void removeHandler(int fd); void loop(); private: int epollFd_; std::unordered_mapint, EventHandler* handlers_; }; #endifReactor.cpp#include Reactor.h #include EventHandler.h #include sys/epoll.h #include unistd.h #include iostream #include vector #include cstdlib Reactor::Reactor() { epollFd_ epoll_create1(0); if (epollFd_ -1) { std::cerr epoll_create1 failed std::endl; std::exit(1); } } Reactor::~Reactor() { for (auto pair : handlers_) { delete pair.second; } handlers_.clear(); if (epollFd_ 0) { close(epollFd_); epollFd_ -1; } } bool Reactor::addHandler(EventHandler* handler, uint32_t events) { int fd handler-fd(); epoll_event ev{}; ev.events events; ev.data.fd fd; if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, ev) -1) { std::cerr epoll_ctl add failed, fd fd std::endl; return false; } handlers_[fd] handler; return true; } bool Reactor::modifyHandler(int fd, uint32_t events) { epoll_event ev{}; ev.events events; ev.data.fd fd; if (epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, ev) -1) { std::cerr epoll_ctl mod failed, fd fd std::endl; return false; } return true; } void Reactor::removeHandler(int fd) { epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr); auto it handlers_.find(fd); if (it ! handlers_.end()) { delete it-second; handlers_.erase(it); } } void Reactor::loop() { constexpr int MAX_EVENTS 64; std::vectorepoll_event events(MAX_EVENTS); while (true) { int n epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1); if (n -1) { continue; } for (int i 0; i n; i) { int fd events[i].data.fd; uint32_t ev events[i].events; auto it handlers_.find(fd); if (it handlers_.end()) { continue; } EventHandler* handler it-second; if (ev (EPOLLERR | EPOLLHUP)) { handler-onClose(); continue; } if (ev EPOLLIN) { handler-onRead(); } if (ev EPOLLOUT) { handler-onWrite(); } } } }Reactor 这一版最关键的变化从以前的fd → lambda升级到fd → EventHandler*这意味着 Reactor 只负责等待事件查找 handler分发回调它已经完全具备“调度器”的味道了。六、定义 ConnectionConnection.h#ifndef CONNECTION_H #define CONNECTION_H #include EventHandler.h #include string class Reactor; class Connection : public EventHandler { public: Connection(int fd, Reactor* reactor); ~Connection() override; int fd() const override; void onRead() override; void onWrite() override; void onClose() override; private: int fd_; Reactor* reactor_; std::string readBuffer_; std::string writeBuffer_; }; #endifConnection.cpp#include Connection.h #include Reactor.h #include unistd.h #include errno.h #include iostream #include sys/epoll.h Connection::Connection(int fd, Reactor* reactor) : fd_(fd), reactor_(reactor) { } Connection::~Connection() { if (fd_ 0) { close(fd_); fd_ -1; } } int Connection::fd() const { return fd_; } void Connection::onRead() { char buffer[1024]; while (true) { ssize_t n read(fd_, buffer, sizeof(buffer)); if (n 0) { readBuffer_.append(buffer, n); std::cout [Connection::onRead] fd fd_ recv: std::string(buffer, n) std::endl; // demo: echo 回写 writeBuffer_.append(buffer, n); } else if (n 0) { std::cout [Connection::onRead] client closed fd fd_ std::endl; onClose(); return; } else { if (errno EAGAIN || errno EWOULDBLOCK) { break; } std::cerr [Connection::onRead] read error fd fd_ std::endl; onClose(); return; } } if (!writeBuffer_.empty()) { reactor_-modifyHandler(fd_, EPOLLIN | EPOLLOUT); } } void Connection::onWrite() { while (!writeBuffer_.empty()) { ssize_t n write(fd_, writeBuffer_.data(), writeBuffer_.size()); if (n 0) { writeBuffer_.erase(0, n); } else { if (errno EAGAIN || errno EWOULDBLOCK) { break; } std::cerr [Connection::onWrite] write error fd fd_ std::endl; onClose(); return; } } if (writeBuffer_.empty()) { reactor_-modifyHandler(fd_, EPOLLIN); } } void Connection::onClose() { if (fd_ 0) { int oldFd fd_; fd_ -1; reactor_-removeHandler(oldFd); } }Connection 这一层最该记住什么1Connection 持有连接自己的状态比如fd_readBuffer_writeBuffer_2Connection 自己处理自己的生命周期谁负责连接谁负责关闭。3Connection 只关心“这个连接”不关心别的 fd也不关心 Reactor 怎么 wait。所以Reactor 是调度层Connection 是连接层七、定义 AcceptorAcceptor.h#ifndef ACCEPTOR_H #define ACCEPTOR_H #include EventHandler.h class Reactor; class Acceptor : public EventHandler { public: Acceptor(int listenFd, Reactor* reactor); ~Acceptor() override; int fd() const override; void onRead() override; void onWrite() override; void onClose() override; private: int listenFd_; Reactor* reactor_; }; #endifAcceptor.cpp#include Acceptor.h #include Reactor.h #include Connection.h #include unistd.h #include iostream #include fcntl.h #include errno.h namespace { int setNonBlocking(int fd) { int flags fcntl(fd, F_GETFL, 0); if (flags -1) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } Acceptor::Acceptor(int listenFd, Reactor* reactor) : listenFd_(listenFd), reactor_(reactor) { } Acceptor::~Acceptor() { if (listenFd_ 0) { close(listenFd_); listenFd_ -1; } } int Acceptor::fd() const { return listenFd_; } void Acceptor::onRead() { while (true) { int clientFd accept(listenFd_, nullptr, nullptr); if (clientFd -1) { if (errno EAGAIN || errno EWOULDBLOCK) { break; } std::cerr [Acceptor::onRead] accept error std::endl; break; } if (setNonBlocking(clientFd) -1) { close(clientFd); continue; } std::cout [Acceptor] new connection fd clientFd std::endl; Connection* conn new Connection(clientFd, reactor_); if (!reactor_-addHandler(conn, EPOLLIN)) { delete conn; } } } void Acceptor::onWrite() { // 监听 socket 一般不关心写事件 } void Acceptor::onClose() { // 监听 fd 一般不会像普通连接那样关闭这里先留空 }为什么还要单独有个 Acceptor因为监听 socket 和普通连接本质不是一回事监听 fdserver_fd负责accept客户端 fdclient_fd负责read/write/close所以用两个对象分开是很自然的AcceptorConnection这也让后面扩展主从 Reactor 很容易。八、main.cpp最终合理版main.cpp#include Reactor.h #include Acceptor.h #include arpa/inet.h #include fcntl.h #include sys/socket.h #include unistd.h #include iostream namespace { int setNonBlocking(int fd) { int flags fcntl(fd, F_GETFL, 0); if (flags -1) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } int main() { int serverFd socket(AF_INET, SOCK_STREAM, 0); if (serverFd -1) { std::cerr socket failed std::endl; return 1; } int opt 1; setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt)); if (setNonBlocking(serverFd) -1) { std::cerr setNonBlocking failed std::endl; close(serverFd); return 1; } sockaddr_in addr{}; addr.sin_family AF_INET; addr.sin_port htons(8080); addr.sin_addr.s_addr INADDR_ANY; if (bind(serverFd, (sockaddr*)addr, sizeof(addr)) -1) { std::cerr bind failed std::endl; close(serverFd); return 1; } if (listen(serverFd, 128) -1) { std::cerr listen failed std::endl; close(serverFd); return 1; } Reactor reactor; Acceptor* acceptor new Acceptor(serverFd, reactor); if (!reactor.addHandler(acceptor, EPOLLIN)) { delete acceptor; close(serverFd); return 1; } std::cout server running on :8080 std::endl; reactor.loop(); return 0; }九、这一版的控制流终于完全合理了你现在可以把完整控制流理解成main ↓ 创建 Reactor ↓ 创建 Acceptor监听 fd ↓ Acceptor 注册到 Reactor ↓ reactor.loop() ↓ epoll_wait() ↓ fd ready ↓ 找到对应 handler ↓ 如果是 server_fd → Acceptor::onRead() 如果是 client_fd → Connection::onRead()/onWrite()/onClose()这就是一套真正合理的最小 Reactor 骨架。十、和之前版本相比升级在哪之前的问题是1main 自己也在循环2Reactor 也在循环3监听 fd 没纳入统一分发体系所以结构是别扭的。现在这一版1只有一个事件循环reactor.loop();2所有 fd 都归 Reactor 管包括serverFdclientFd3Acceptor 和 Connection 分层清晰这是最关键的工程升级。十一、这一篇真正建立了什么能力这一篇最大的价值不是“代码更多了”而是建立了三层结构1Reactor调度层负责事件循环和分发。2Acceptor接入层负责接连接。3Connection连接层负责一个连接的读写和关闭。你从这里开始就不再是在写 socket 示例而是在写服务端骨架十二、这一版还有哪些不足这是最小合理版但还远不是工业级。还缺智能指针管理对象生命周期Connection 上下文对象写事件更精细控制半包/粘包处理定时器 / 超时多 Reactor / 多线程协程接入但没关系这些正是后面的升级路线。十三、本篇一句话总结mini epoll demo 解决的是“事件怎么来”而这一篇解决的是“事件交给谁以及连接如何成为对象”。也就是从fd → lambda升级到fd → Acceptor / Connection这一步就是从 demo 走向工程的关键分水岭。十四、下一篇这一篇做完最顺的下一篇就是《C 服务端进阶二—— 多 Reactor 线程模型高并发架构》因为你现在已经有了非阻塞 fdepollReactorAcceptorConnection下一步自然就是从单 Reactor 升级到多 Reactor / 主从 Reactor