zmq源码分析之pipe attach时机
文章目录**1. 调用层次结构****2. 完整调用链****场景 1zmq_connect()****场景 2zmq_bind()****3. attach_pipe 的实现****4. 不同 Socket 类型的 xattach_pipe****4.1 DEALER****4.2 PUSH****4.3 PULL****4.4 REQ****4.5 ROUTER****4.6 PUB****4.7 SUB****5. attach 的完整流程图****6. attach 的时机****7. 实际示例****示例 1DEALER 连接多个服务端****示例 2PUSH-PULL 模式****8. attach 时发生了什么****9. 总结**ZeroMQ 创建两个 pipe 来实现双向通信new_pipes[0] 本地端 pipe附加到当前 socketnew_pipes[1] 远程端 pipe附加到 session 对象这种设计确保了消息可以在两个方向上独立流动提高了通信效率和可靠性。1. 调用层次结构应用层 ↓ zmq_connect() / zmq_bind() ↓ socket_base_t::connect() / bind() ↓ 创建 Pipe (pipepair) ↓ socket_base_t::attach_pipe() ← 第 1 层 ↓ xattach_pipe() ← 第 2 层虚函数子类实现 ↓ dealer_t::xattach_pipe() ← 第 3 层 ↓ _lb.attach() / _fq.attach() ← 第 4 层2. 完整调用链场景 1zmq_connect()// socket_base.cpp - connect 流程intzmq::socket_base_t::connect(constchar*endpoint_uri_){// 1. 创建 Sessionsession_base_t*sessionsession_base_t::create(...);// 2. 创建双向 Pipepipe_t*new_pipes[2]{NULL,NULL};rcpipepair(parents,new_pipes,hwms,conflates);// 3. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],subscribe_to_all,true);// new_pipes[0] 连接到 Socket// new_pipes[1] 连接到 Session// 4. Session 也附加 Pipesession-attach_pipe(new_pipes[1]);// 5. 保存端点add_endpoint(...);return0;}场景 2zmq_bind()// socket_base.cpp - bind 流程以 inproc 为例intzmq::socket_base_t::bind(constchar*endpoint_uri_){if(protocolprotocol_name::inproc){// 1. 创建 Pipepipe_t*new_pipes[2]{NULL,NULL};rcpipepair(parents,new_pipes,hwms,conflates);// 2. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],true,true);// 3. 保存端点add_endpoint(...);return0;}// TCP 等其他协议创建监听器等待连接// 当有连接时也会调用 attach_pipe}3. attach_pipe 的实现// socket_base.cpp 第 408 行voidzmq::socket_base_t::attach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 1. 设置事件接收者pipe_-set_event_sink(this);// 2. 添加到 Socket 的 Pipe 列表_pipes.push_back(pipe_);// 3. ★★★ 调用子类的 xattach_pipe ★★★xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);// 4. 如果 Socket 正在终止立即终止 Pipeif(is_terminating()){register_term_acks(1);pipe_-terminate(false);}}关键点✅pipe_-set_event_sink(this)让 Pipe 知道事件发给谁✅_pipes.push_back()Socket 管理所有 Pipe✅xattach_pipe()虚函数不同 Socket 类型有不同行为4. 不同 Socket 类型的 xattach_pipe4.1 DEALER// dealer.cpp 第 48 行voidzmq::dealer_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测路由发送探测消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_-write(probe_msg);pipe_-flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和出站队列 ★★★_fq.attach(pipe_);// Fair Queuing接收_lb.attach(pipe_);// Load Balancing发送}用途DEALER 需要双向负载均衡4.2 PUSH// push.cpp 第 47 行voidzmq::push_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);// 1. 不延迟 Pipe 终止没有接收者pipe_-set_nodelay();zmq_assert(pipe_);// 2. ★★★ 只附加到出站队列 ★★★_lb.attach(pipe_);}用途PUSH 只发送不需要入站队列4.3 PULL// pull.cppvoidzmq::pull_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);zmq_assert(pipe_);// ★★★ 只附加到入站队列 ★★★_fq.attach(pipe_);}用途PULL 只接收不需要出站队列4.4 REQ// req.cpp - REQ 继承自 DEALER// 没有重写 xattach_pipe使用 DEALER 的实现voidzmq::req_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 调用父类 DEALER 的实现dealer_t::xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);}用途REQ 和 DEALER 一样但有自己的状态机4.5 ROUTER// router.cpp 第 72 行voidzmq::router_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测发送空消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_-write(probe_msg);pipe_-flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和路由表 ★★★_fq.attach(pipe_);// 3. 为 Pipe 分配路由 IDif(!locally_initiated_options.router_mandatory){// 为远程对端生成路由 IDblob_t routing_idgenerate_routing_id();pipe_-set_router_socket_routing_id(routing_id);}}用途ROUTER 需要维护路由表知道哪个 ID 对应哪个 Pipe4.6 PUB// pub.cppvoidzmq::pub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到分发器 ★★★_dist.attach(pipe_);// 如果订阅了所有消息立即发送订阅if(subscribe_to_all_){msg_t subscribe_msg;subscribe_msg.init();subscribe_msg.set_flags(msg_t::subscribe);pipe_-write(subscribe_msg);pipe_-flush();subscribe_msg.close();}}用途PUB 使用dist_t分发器而不是lb_t4.7 SUB// sub.cppvoidzmq::sub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到入站队列 ★★★_fq.attach(pipe_);// 发送订阅消息if(!subscribe_to_all_){// 发送用户设置的订阅for(autosubscription:_subscriptions){msg_t sub_msg;sub_msg.init_size(subscription.size());memcpy(sub_msg.data(),subscription.data(),subscription.size());sub_msg.set_flags(msg_t::subscribe);pipe_-write(sub_msg);pipe_-flush();sub_msg.close();}}}用途SUB 在连接时自动发送订阅消息5. attach 的完整流程图┌─────────────────────────────────────────────────────────┐ │ 应用层zmq_connect(socket, tcp://localhost:5555) │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::connect() │ │ - 创建 Session │ │ - 创建 Pipe (pipepair) │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::attach_pipe(pipe_) │ │ - pipe_-set_event_sink(this) │ │ - _pipes.push_back(pipe_) │ │ - xattach_pipe(pipe_) ← 虚函数 │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────┴────────────┐ │ │ ┌───────▼───────┐ ┌──────▼──────┐ │ DEALER │ │ PUSH │ │ xattach_pipe │ │ xattach_pipe│ │ _fq.attach() │ │ _lb.attach()│ │ _lb.attach() │ │ │ └───────────────┘ └─────────────┘6. attach 的时机时机触发操作调用位置connect()主动连接[socket_base.cpp:628](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L628)bind()(inproc)绑定 inproc[socket_base.cpp:1108](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L1108)accept()接受连接监听器触发pipe 配对inproc 配对[session_base.cpp:375](file:///home/victory/test/zmq-build/libzmq/src/session_base.cpp#L375)7. 实际示例示例 1DEALER 连接多个服务端// 应用代码void*dealerzmq_socket(ctx,ZMQ_DEALER);zmq_connect(dealer,tcp://server1:5555);zmq_connect(dealer,tcp://server2:5555);zmq_connect(dealer,tcp://server3:5555);// 内部流程// 第 1 次 connect:// pipe1 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe1)// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 次 connect:// pipe2 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe2)// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// 第 3 次 connect:// pipe3 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe3)// _lb.attach(pipe3)// _lb._pipes: [pipe1, pipe2, pipe3]// 发送时 Round-Robin:// 消息 1 → pipe1// 消息 2 → pipe2// 消息 3 → pipe3// 消息 4 → pipe1 (循环)示例 2PUSH-PULL 模式// PUSH 端void*pushzmq_socket(ctx,ZMQ_PUSH);zmq_bind(push,tcp://*:5555);// 第 1 个 PULL 连接// pipe1 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 个 PULL 连接// pipe2 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// PULL 端void*pullzmq_socket(ctx,ZMQ_PULL);zmq_connect(pull,tcp://push:5555);// pipe → attach_pipe() → pull_t::xattach_pipe()// _fq.attach(pipe)8. attach 时发生了什么voidzmq::lb_t::attach(pipe_t*pipe_){// 1. 添加到列表_pipes.push_back(pipe_);// 2. 立即激活移到活跃区activated(pipe_);}voidzmq::lb_t::activated(pipe_t*pipe_){// 将 Pipe 移到活跃区前 _active 个位置_pipes.swap(_pipes.index(pipe_),_active);_active;}效果attach 前 _pipes: [] _active0 attach pipe1: _pipes: [pipe1] _active1 attach pipe2: _pipes: [pipe1, pipe2] _active2 attach pipe3: _pipes: [pipe1, pipe2, pipe3] _active39. 总结attach的调用场景✅zmq_connect()每次连接时✅zmq_bind()inproc 绑定时✅接受连接TCP 服务端接受客户端✅inproc 配对inproc bind/connect 配对时调用链zmq_connect/bind ↓ socket_base_t::attach_pipe() ↓ xattach_pipe() (虚函数) ↓ _lb.attach() / _fq.attach() / _dist.attach()不同 Socket 的行为DEALER/REQ/CLIENT_fq _lb双向PUSH_lb只发送PULL_fq只接收ROUTER_fq 路由表PUB_dist广播SUB_fq 自动订阅一句话attach在每个连接建立时调用让 Socket 知道有哪些 Pipe 可用