仿 muduo库 从零实现高并发TCP服务框架
目录附: 项目代码链接:1. 我的开发环境2. 设计思想3. 模块设计4. 模块描述4.1 核心模块4.2 功能性模块5. 模块实现5.0 日志模块5.1 Buffer模块5.2 Socket模块5.3 Channel模块有什么干什么5.3 Poller模块有什么干什么附: 项目代码链接:high-concurrency_server: 本项目基于C实现的 轻量级、高并发 TCP通信框架上层搭建HTTP服务 参考了muduo库的设计思想采用主从Reactor架构多线程核心逻辑无锁设计极大提高效率……https://gitee.com/waspqj/high-concurrency_server这篇文章我们一起从零实现一个高并发服务器项目。做出TCP服务框架之后呢在这基础之上我们再搭建一个HTTP的应用层当然搭建其他服务也是一样的全凭兴趣。1. 我的开发环境4核 4G的 Ubuntu 云服务器VSCode。2. 设计思想基于Reactor模型多线程主从Reactor的架构One Thread One Loop 的思想……好的是不是有点懵别担心后面会慢慢解释这些先对这些词混个眼熟吧。3. 模块设计如果说设计思想是指导方针那么模块设计就是计划表说完模块设计就可以按照计划逐步实现了。我愿意将模块分为两大类功能性模块核心模块。怎么区别简单来说核心模块面向需求核心功能性模块为核心模块服务或者说功能性模块就是核心模块的一部分。那么接下来怎么展开说呢先简单说说核心模块如果先说功能模块那么它们究竟有什么用在哪里起作用是搞不清楚的。4. 模块描述4.1 核心模块TcpServer所有模块的功能整合管理所有的Connection和Loop帮助上层快速搭建TCP服务框架。EventLoop整合事件监听和事件处理一个Loop就是一个事件执行循环前面提到的One Thread One Loop简单来说就是在一个线程里执行一个事件循环也就是一个EventLoop。ConnectionTCP的连接管理说起来就这一句话不过这是所有模块里最难最复杂的模块了。4.2 功能性模块Buffer数据管理模块提供丰富的读写接口方便快速解析和读写。SocketTCP socket编程接口封装快速创建并管理监听套接字和普通套接字。Channel事件管理模块管理要监听什么事件以及事件触发的回调。Poller事件监听模块监听事件触发是Reactor模型的核心模块。TimeWheel时间轮模块管理定时任务自动执行定时任务。看到这里有人要说了这人叽里咕噜的说啥呢 非常正常特别是对于Reactor模型不熟悉的朋友那么对Reactor模型有一点了解的朋友就会发现其中很多模块在各种使用Reactor模型的 的代码里都是常见的甚至是通用的。不熟悉没关系熟悉当然更好一会我们写起代码来再慢慢体会。5. 模块实现刚刚我们从上到下简单介绍了各个模块接下来的模块实现我们从下往上写。5.0 日志模块前面漏掉了这个模块对于调试和运维还是非常非常重要的这段代码是我以前造的轮子如果你以前也写过这样的代码也可以拿来用。我采用了策略模式可以选择打印到标准输出也可以选择打印到文件里。我这个其实写的真挺好的可以借鉴就是和主题没多大关系我就不贴出来了如果想看顺着路径去仓库链接找吧 -source/log.hpp)5.1 Buffer模块TCP协议是面向字节流的典型协议因此解析和缓存是我们处理数据最麻烦部分这个模块就是来简化这个步骤的。Buffer最常被用作用户级别的数据缓冲区收到数据就直接写到里面。具体每个口我就不介绍了比较多但是不难。是vector但是具体读写呢都是当成指针来的。写出来其实挺有用的以后拿到别的项目用起来也挺爽的。#define BUFFER_DEFAULT_SIZE 1024 class Buffer { private: std::vectorchar _buffer; uint64_t _write_pos; uint64_t _read_pos; public: Buffer() : _write_pos(0), _read_pos(0), _buffer(BUFFER_DEFAULT_SIZE) {} //buffer的起始位置 char *GetStartPos() {return *_buffer.begin();} //获取读位置 char *GetReadPos() {return GetStartPos() _read_pos;} //获取写位置 char *GetWritePos() {return GetStartPos() _write_pos;} //可读长度 uint64_t ReadableLen() {return _write_pos - _read_pos;} //缓冲区末尾可写长度 uint64_t TailBufferLen() {return _buffer.size() - _write_pos;} //缓冲区开头可写长度 uint64_t HeadBufferLen() {return _read_pos;} //确保缓冲区足够写 void EnsureLen(uint64_t len) { if (len TailBufferLen()) return; else if(len HeadBufferLen() TailBufferLen()) //把数据移动到最前面 { uint64_t read_len ReadableLen(); std::copy(GetReadPos(), GetWritePos(), GetStartPos()); _read_pos 0; _write_pos read_len; } else //扩容 _buffer.resize(_write_pos len); } //移动读位置 void MoveReadPos(uint64_t len) { assert(len ReadableLen()); _read_pos len; } //移动写位置 void MoveWritePos(uint64_t len) { assert(len TailBufferLen()); _write_pos len; } //写入数据void*) void Write(const void *data, uint64_t len) { EnsureLen(len); std::copy((char*)data, (char*)data len, GetWritePos()); } //写入数据string) void WriteString(std::string str) { Write((void*)str.c_str(), str.size()); } //写入数据buffer) void WriteBuffer(Buffer buf) { Write(buf.GetReadPos(), buf.ReadableLen()); } //写入并移动写位置 void WriteAndPush(const void *data, uint64_t len) { if(len 0) return; Write(data, len); MoveWritePos(len); } void WriteAndPush(std::string str) { WriteString(str); MoveWritePos(str.size()); } void WriteAndPush(Buffer buf) { WriteBuffer(buf); MoveWritePos(buf.ReadableLen()); } //读取数据 void Read(void *data, uint64_t len) { assert(len ReadableLen()); std::copy(GetReadPos(), GetReadPos() len, (char*)data); } //读取数据(string) std::string ReadasString(uint64_t len) { assert(len ReadableLen()); std::string str; str.resize(len); Read(str[0], len); return str; } //读取数据(one line) std::string ReadOneLine() { char* pos (char*)memchr(GetReadPos(), \n, ReadableLen()) ; if(pos NULL) return ; uint64_t len (uint64_t)(pos - GetReadPos()) 1; return ReadasString(len); } //读取并移动读位置 void ReadAndPop(void *data, uint64_t len) { Read(data, len); MoveReadPos(len); } std::string ReadAndPop(uint64_t len) { std::string ret ReadasString(len); MoveReadPos(len); return ret; } std::string ReadAndPop() { std::string ret ReadOneLine(); MoveReadPos(ret.size()); return ret; } };为了描述的完整性呢我贴出了所有的内容但是其中一些接口如果暂时感觉用不到那就先不着急写用到再说5.2 Socket模块一般涉及到网络的项目都会有这个模块吧看看我是怎么封装的#define defaultsockfd -1 class Socket { private: int _sockfd; public: Socket() : _sockfd(defaultsockfd) {} Socket(int fd) : _sockfd(fd) {} ~Socket(){LOG(DEBUG) close socket _sockfd; } int _Fd(){return _sockfd;} //创建套接字 bool Create() { //创建tcp套接字 _sockfd socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(_sockfd 0) { LOG(ERROR) create socket error; return false; } LOG(DEBUG) create socket success; return true; } //绑定ip和端口号 bool Bind(const std::string ip, uint16_t port) { sockaddr_in addr; memset(addr, 0, sizeof addr); addr.sin_family AF_INET; addr.sin_port htons(port); inet_pton(AF_INET, ip.c_str(), addr.sin_addr); int n bind(_sockfd, (sockaddr*)addr, sizeof addr); if(n 0) { LOG(ERROR) bind error; return false; } LOG(DEBUG) bind success; return true; } #define MAXLISTEN 128 //backlog表示了accept队列的最大长度与后续高并发场景有关128 是一个经验值 //开启监听状态 bool Listen(int backlog MAXLISTEN) { int n listen(_sockfd, backlog); if(n 0) { LOG(ERROR) listen error; return false; } LOG(DEBUG) listen success; return true; } //向服务器发起连接 bool Connect(const std::string ip, uint16_t port) { sockaddr_in addr; memset(addr, 0, sizeof addr); addr.sin_family AF_INET; addr.sin_port htons(port); inet_pton(AF_INET, ip.c_str(), addr.sin_addr); int n connect(_sockfd, (sockaddr*)addr, sizeof addr); if(n 0) { LOG(ERROR) connect error; return false; } LOG(DEBUG) connect success; return true; } //接受连接 int Accept() { int fd accept(_sockfd, nullptr, nullptr); if(fd 0) { LOG(ERROR) accept error: fd; return -1; } LOG(DEBUG) accept success; return fd; } //接受数据 ssize_t Recv(void* buff, size_t len, int flags 0) { ssize_t n recv(_sockfd, buff, len, flags); // recv 返回0, 表示对端关闭, 放入判错逻辑, 最终返回0 here n0 to n0 if(n 0) { if(errno EAGAIN || errno EINTR) return 0; // LOG(ERROR) recv error; return -1; } LOG(DEBUG) recv success: n; return n; } //发送数据 ssize_t Send(const void* buff, size_t len, int flags 0) { ssize_t n send(_sockfd, buff, len, flags); if(n 0) { if(errno EAGAIN || errno EINTR) return 0; LOG(ERROR) send error; return -1; } LOG(DEBUG) send success; return n; } //关闭套接字 void Close() { if(_sockfd ! defaultsockfd) close(_sockfd); } //创建一个tcp服务端连接 bool BuildTcpServer(uint16_t port, bool is_block true, const std::string ip 0.0.0.0, int backlog MAXLISTEN) { if(!Create()) return false; if(!is_block) SetNonBlock(); ReUseAddr(); if(!Bind(ip, port)) return false; if(!Listen(backlog)) return false; // ReUseAddr(); 怎么能比bind更晚调用 return true; } //创建一个tcp客户端连接 bool BuildTcpClient(const std::string ip, uint16_t port, bool is_block true) { if(!Create()) return false; if(!is_block) SetNonBlock(); ReUseAddr(); if(!Connect(ip, port)) return false; return true; } void ReUseAddr() { int opt 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof opt); } void SetNonBlock() { int fl fcntl(_sockfd, F_GETFL); if(fl 0) return; fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK); } };要说接下来这个模块就无法避开Reactor模型了简单介绍一下Reactor 反应堆模型是一种基于事件驱动的高并发设计模式它的核心思想是通过一个 I/O 多路复用器如epoll、select监听多个文件描述符当某个描述符就绪读写事件就绪就由调用对应的回调函数进行处理。经典的单 Reactor 模型包含三个部分事件源文件描述符、监听者epoll、poll、select、事件分发器EventLoop。现在可以对应上我们的模块了Channel对应了对文件描述符的事件和事件对应的回调函数管理Poller对应监听者EventLoop来执行就绪事件的回调函数。5.3 Channel模块这个模块我们来具体拆解一下它具体需要有什么具体要干什么。有什么1. (文件描述符)_fd是谁的事件管理、2. _events需要监听的事件、3. _revents被触发的事件、4. 事件回调各种事件被触发后执行的回调函数、5. 事件执行器真正执行回调的模块class Channel { public: using EventCallback std::functionvoid(); private: int _fd; EventLoop *loop; // 事件执行器 uint32_t _events; //需要监控的事件 uint32_t _revents; //触发的事件 EventCallback _write_callback; //写事件回调 EventCallback _read_callback; //读事件回调 EventCallback _error_callback; //错误事件回调 EventCallback _close_callback; //关闭事件回调 EventCallback _event_callback; //默认事件回调 }干什么1. 对于_event 上层要能简单的设置是否要监听读写事件并且当事件被设置更新了要在Poller中立马更新监听。2. 对于_revent Poller监听到的触发事件要能设置进来。3. 对于事件回调 一共五个上层都能设置具体他们干什么Channel是不知道的只是放在这里等调用。4. 对于事件执行器 它是真正执行回调的地方Channel需要给他提供一个方法把这个文件描述符被触发的事件的回调都执行一遍。对应HandleEventsclass Poller; class EventLoop; class Channel{ public: Channel(EventLoop *eventloop, int fd) : loop(eventloop), _fd(fd), _events(0), _revents(0) {} ~Channel() {} int _Fd() {return _fd;} uint32_t _Events() {return _events;} void SetRevents(uint32_t revents) {_revents revents;} void SetWriteCall(const EventCallback cb){_write_callback cb;} void SetReadCall(const EventCallback cb){_read_callback cb;} void SetErrorCall(const EventCallback cb){_error_callback cb;} void SetCloseCall(const EventCallback cb){_close_callback cb;} void SetEventCall(const EventCallback cb){_event_callback cb;} //是否监控可读 bool IsRead() {return (_events EPOLLIN);} //是否监控可写 bool IsWrite() {return (_events EPOLLOUT);} //设置监控可读 void SetRead() {_events | EPOLLIN; Update();} //设置监控可写 void SetWrite() {_events | EPOLLOUT; Update();} // 取消监控可读 void UnsetRead() {_events ~EPOLLIN; Update();} // 取消监控可写 void UnsetWrite() {_events ~EPOLLOUT; Update();} // 取消所有事件监控 void UnsetAll() {_events 0; Update();} // 更新poller监控的事件 void Update(); // 取消对fd的监控 void Remove(); //处理触发事件 void HandleEvents() { // 处理读事件 //对端主动发FIN断联,读取最后数据 if((_revents EPOLLIN) || (_revents EPOLLRDHUP) || (_revents EPOLLPRI)) { // LOG(DEBUG) _fd : _read_callback; if(_read_callback) _read_callback(); // 处理默认事件 // if(_event_callback) _event_callback(); } //处理写事件 if(_revents EPOLLOUT) { if(_write_callback) _write_callback(); // 处理默认事件 // if(_event_callback) _event_callback(); // here refresh } //处理错误事件 实际上都是关闭连接,一次就好 这里的处理顺序很有问题,待修改 if(_revents EPOLLERR) { if(_error_callback) _error_callback(); } //处理关闭事件 else if(_revents EPOLLHUP) { if(_close_callback) _close_callback(); } // 处理完前面的任务,再刷新时间片,不然执行任务时间过长,导致误以为连接不活跃 if(_event_callback) _event_callback(); // here refresh } };看完代码是不是还有很多疑惑比如事件回调的处理顺序Update Remove这两个函数怎么没实现 没关系再看下去迷雾会慢慢散开……5.4 Poller模块事件监听模块这个模块要用到epoll系列的接口不熟悉的先去看看再看代码 和上个模块一样我们通过解析这个模块的成员和方法来理解这个模块有什么1. (事件监听文件描述符_epfd、2.被触发事件集合_revents、3.管理的Channel_channelsclass Poller { private: int _epfd; struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件 std::unordered_mapint, Channel* _channels;// Poller监听的Channel }干什么1. 更新监听事件有点眼熟上面Channel的两个接口就是对应要调用这里的接口才能实现所以分开声明和实现。2. 监听事件主要的本职工作嘛调用epoll_wait把触发的事件epoll_event们拿出来通过里面保存的文件描述符epoll_event.data.fd找到对应Channel并更新Channel的_revents别忘了真正执行回调的还不是这里而是更上层EventLoop)所以通过参数把触发事件的Channel都交给上层让上层去执行回调。3. 管理Channel 在_channels 里的添加 和 删除。#define MAXPOLLEVENTS 1024 // 一次最多获取的活跃事件数 class Poller { private: int _epfd; struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件 std::unordered_mapint, Channel* _channels;// Poller监听的Channel // 对epoll接口的封装 public: Poller() { // 一次监听的最大事件数已经,不受限制了这个参数没啥意义 _epfd epoll_create(MAXPOLLEVENTS); } ~Poller() { close(_epfd); } // 对于epoll监控的文件描述符进行更新 -- void UpdateEpoll(Channel *channel, int op) { auto it _channels.find(channel-_Fd()); assert(it ! _channels.end()); epoll_event event; event.events channel-_Events(); event.data.fd channel-_Fd(); int ret epoll_ctl(_epfd, op, channel-_Fd(), event); if(ret 0) { LOG(ERROR) epoll_ctl error; abort(); } } // 监听就绪事件 void Poll(std::vectorChannel * *active_channels) { // epoll_wait(epfd, events, maxevents, timeout) int nfds epoll_wait(_epfd, _revents, MAXPOLLEVENTS, -1); if(nfds 0) { if(errno EINTR)// 被信号中断 return; LOG(ERROR) epoll_wait error; abort(); } for(int i 0; i nfds; i) { auto it _channels.find(_revents[i].data.fd); assert(it ! _channels.end()); it-second-SetRevents(_revents[i].events); // 设置触发事件到channel中 active_channels-push_back(it-second); } } bool HasChannel(Channel *channel) { auto it _channels.find(channel-_Fd()); if(it _channels.end()) return false; return true; } // 交给下层调用的接口 public: void Update(Channel *channel) { if(!HasChannel(channel)) { _channels.emplace(channel-_Fd(), channel); UpdateEpoll(channel, EPOLL_CTL_ADD); } else UpdateEpoll(channel, EPOLL_CTL_MOD); } void Remove(Channel *channel) { if(HasChannel(channel)) { UpdateEpoll(channel, EPOLL_CTL_DEL); _channels.erase(channel-_Fd()); } } };到这里大部分基础模块已经完成了下一章将会揭晓高并发的核心秘密。未完待续……仿 muduo库 从零实现高并发TCP服务框架 二-CSDN博客https://blog.csdn.net/2502_91433987/article/details/161198186?sharetypeblogdetailsharerId161198186sharereferPCsharesource2502_91433987spm1011.2480.3001.8118