基于SSE实现站内消息实时推送
目录背景说明websocket和sse的区别前端如何使用sse后端如何使用sse发送消息给sse服务后端的sse服务main.goservice/sse.goapi/index.goCI流程SSE服务的Dockerfilejenkins ci任务脚本使用SSE镜像小结踩坑说明1.SSE前端不直连的坑2.关于SSE重连机制的坑3.前端不关闭SSE连接的坑背景说明之前项目实现了web站内消息即某些功能模块的某些动作会触发站内消息用户在自己的消息中心里面可以实时收到消息通知。例如任务模块等。站内消息的目的就是为了让用户及时获取通知及时响应通知。以前做的站内消息针对web端获取消息是通过js轮询后端接口获取最新的消息。但是轮询不能保证消息的及时性还会增加服务器端的压力。轮询本质上就是循环拉取消息但是这次我们想要实时性的消息只能从推入手。记住一句话涉及消息类的需求一定会在“推拉”上进行选择。所以我们调研了一下市面上关于站内消息的实现手段大概如下7种该图引用自程序员小富本次实现站内消息要求实时性好一点所以我们着重考虑调研SSE。全称是Server-sent events。严格地说HTTP 协议无法做到服务器主动推送信息。但是有一种变通方法就是服务器向客户端声明接下来要发送的是流信息streaming。也就是说发送的不是一次性的数据包而是一个数据流会连续不断地发送过来。这时客户端不会关闭连接会一直等着服务器发过来的新的数据流视频播放就是这样的例子。本质上这种通信就是以流信息的方式完成一次用时很长的下载。SSE 就是利用这种机制使用流信息向浏览器推送信息。它基于 HTTP 协议目前除了 IE/Edge其他浏览器都支持。SSE在服务器和客户端之间打开一个单向通道服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息在有数据变更时从服务器流式传输到客户端。整体的实现思路有点类似于在线视频播放视频流会连续不断的推送到浏览器你也可以理解成客户端在完成一次用时很长网络不畅的下载。sse通信原理websocket和sse的区别SSE 是基于HTTP协议的它们不需要特殊的协议或服务器实现即可工作WebSocket需单独服务器来处理协议。SSE 单向通信只能由服务端向客户端单向通信webSocket全双工通信即通信的双方可以同时发送和接受信息。SSE 实现简单开发成本低无需引入其他组件WebSocket传输数据需做二次解析开发门槛高一些。SSE 默认支持断线重连WebSocket则需要自己实现。SSE 只能传送文本消息二进制数据需要经过编码后传送WebSocket默认支持传送二进制数据。SSE好像一直不被大家所熟知一部分原因是出现了WebSockets这个提供了更丰富的协议来执行双向、全双工通信。对于游戏、即时通信以及需要双向近乎实时更新的场景拥有双向通道更具吸引力。但是在某些情况下不需要从客户端发送数据。而你只需要一些服务器操作的更新。比如站内信、未读消息数、状态更新、股票行情、监控数量等场景SEE不管是从实现的难易和成本上都更加有优势。此外SSE 具有WebSockets在设计上缺乏的多种功能例如自动重新连接、事件ID和发送任意事件的能力。前端只需进行一次HTTP请求带上唯一ID打开事件流监听服务端推送的事件就可以了。前端如何使用sse!-- * Description: * Date: 2023-05-23 16:41:23 * LastEditTime: 2023-05-23 16:44:04 -- !DOCTYPE html html langen head meta charsetUTF-8 meta http-equivX-UA-Compatible contentIEedge meta nameviewport contentwidthdevice-width, initial-scale1.0 titleDocument/title /head script let source null; //这个值设置的10表示当前用户的主键id前面的sse是固定的 let user sse_10; if (window.EventSource) { // 建立连接 source new EventSource(http://wxqb.avlyun.org:8066/msg_event?stream user); setMessageInnerHTML(准备与用户 user 建立连接); /** * 连接一旦建立就会触发open事件 * 另一种写法source.onopen function (event) {} */ source.addEventListener(open, function (e) { setMessageInnerHTML(已连接); }, false); /** * 客户端收到服务器发来的数据 * 另一种写法source.onmessage function (event) {} */ source.addEventListener(message, function (e) { setMessageInnerHTML(e.data); }); } else { setMessageInnerHTML(你的浏览器不支持SSE); } function setMessageInnerHTML(msg) { console.log(msg); } /script body /body /html在进行sse的调试工作时直接使用上面这个html右键通过浏览器打开这个html页面即可。效果如下后端如何使用sse发送消息给sse服务/** * 调用我们一个go语言启动一个web服务可以接收我们推送的消息再实时传递给已连接的客户端 */ public function callSSEPushMsg(array $msg,int $userId){ try{ $client new GuzzleClient(); $uriconfig(sse_push_msg_uri); $postJsonArr[ streamsse_.$userId, msg$msg //是个数组 ]; //记录调用第三方api的参数信息写入日志 $logInfo[ uri$uri, json_param$postJsonArr ]; $logInfoStrjson_encode($logInfo); //调用wx api $res $client-request(POST, $uri, [ // debug true, //调试阶段开启 headers [ Content-Type application/json ], json $postJsonArr, timeout 60 ]); //验证调用是否成功,成功返回{errcode:0,errmsg:ok} $responseCode$res-getStatusCode(); if($responseCode!200){ Log::write(MsgCenter-callSSEPushMsg fail:response code is not 200;restapi info:{$logInfoStr},error); return false; } return true; }catch (RequestException $e) { Log::write(MsgCenter-callSSEPushMsg 异常; restapi info:{$logInfoStr},error); $requestExInfoPsr7\str($e-getRequest()); Log::write(MsgCenter-callSSEPushMsg 异常; request:{$requestExInfo},error); if ($e-hasResponse()) { $reponseExInfoPsr7\str($e-getResponse()); Log::write(MsgCenter-callSSEPushMsg 异常; response:{$reponseExInfo},error); } return false; } } /** * 用于项目任务分配给特定参与人时记录消息到db然后调用golang的sse服务推送消息给前端 */ public function sendMsgForAssignTask(int $projectId,int $taskId,array $toUserIds){ //先查询消息里面依赖的项目信息 $proInfoDb::table(hl_project_manage) -field(num,name) -where(id,eq,$projectId) -find(); if(empty($proInfo)){ Log::error(sendMsgForAssignTask 异常当前项目id为{$projectId}不存在于hl_project_manage中无法发送消息); return false; } //查询消息里面依赖的任务信息 $taskInfoDb::table(hl_project_task_manage) -field(name,priority) -where(id,eq,$taskId) -find(); if(empty($taskInfo)){ Log::error(sendMsgForAssignTask 异常当前任务id为{$taskId} 不存在于hl_project_task_manage中无法发送消息); return false; } //主要消息 $priMsg您有新的项目任务待处理; //构建消息附加信息 $subMsg[ project_or_customer_number$proInfo[num], project_name$proInfo[name], project_id$projectId, task_name$taskInfo[name], task_priority$taskInfo[priority], task_id$taskId ]; //批量插入消息 $typeId$this-getMethodMapTypeIdConfig()[__FUNCTION__]; $baseRow[ type_id$typeId, pri_msg$priMsg, sub_msgjson_encode($subMsg,JSON_UNESCAPED_UNICODE), typeself::MSG_TYPE_PROJECT ]; //一个任务可以分配给多个参与者 $toUserIdsarray_values(array_unique($toUserIds)); foreach($toUserIds as $userId){ $tmpRow[to_user_id$userId]; $tmpRowarray_merge($baseRow,$tmpRow); $tmpIdDb::table(hl_msg)-insertGetId($tmpRow); //循环调用golang的sse的一个接收消息的接口 $sseMsg$this-buildSSEMsg($tmpId,self::MSG_TYPE_PROJECT,$typeId,$priMsg,$subMsg); self::callSSEPushMsg($sseMsg,$userId); } return true; }后端的sse服务main.go主程序起了一个sse的服务以及一个http的api。package main import ( code.avlyun.org/wzp/wxqb-sse/api code.avlyun.org/wzp/wxqb-sse/service net/http ) func main() { sseServer : service.GetSse() // Create a new Mux and set the handler mux : http.NewServeMux() mux.HandleFunc(/msg_event, func(w http.ResponseWriter, r *http.Request) { params : r.URL.Query() streamName : params.Get(stream) //根据前端传递的stream参数来建立一个stream只有先建立stream才能往stream里面读写消息 sseServer.CreateStream(streamName) //不添加跨域允许的话会导致js无法与sse建立连接 // 设置允许跨域请求的头部 w.Header().Set(Access-Control-Allow-Origin, *) w.Header().Set(Access-Control-Allow-Methods, GET, POST, PUT, DELETE, OPTIONS) w.Header().Set(Access-Control-Allow-Headers, Content-Type) sseServer.ServeHTTP(w, r) }) mux.HandleFunc(/push_msg, api.PushMsg) http.ListenAndServe(:8066, mux) }service/sse.gopackage service import ( sse github.com/r3labs/sse/v2 sync ) var sseServer sse.Server var onceSse sync.Once func GetSse() sse.Server { onceSse.Do(func() { sseServer *sse.New() }) return sseServer }api/index.go用来给消息埋点的位置用来调用这个api推送消息给sse的特定连接。package api import ( code.avlyun.org/wzp/wxqb-sse/service encoding/json fmt sse github.com/r3labs/sse/v2 io/ioutil net/http ) //post请求 func PushMsg(w http.ResponseWriter, r *http.Request) { if r.Method POST { // 读取请求体数据 bodyBytes, err : ioutil.ReadAll(r.Body) if err ! nil { http.Error(w, body不能为空需要json参数, http.StatusBadRequest) return } defer r.Body.Close() // 业务逻辑处理将post的json参数的stream和msg参数取出来存到变量中 // 解析请求中的 JSON 数据 // 解析JSON为map类型 var data map[string]interface{} err json.Unmarshal(bodyBytes, data) if err ! nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // 获取json的stream的value streamName, ok : data[stream].(string) if !ok { http.Error(w, stream只能是字符串, http.StatusBadRequest) return } // 获取json的msg的value msg, ok : data[msg].(interface{}) if !ok { http.Error(w, msg只能是interface类型, http.StatusBadRequest) return } //把msg转换为json字符串 jsonData, err : json.Marshal(msg) fmt.Print(string(jsonData)) if err ! nil { http.Error(w, msg不是json格式, http.StatusBadRequest) return } sseServer : service.GetSse() // Publish a payload to the stream sseServer.Publish(streamName, sse.Event{ Data: jsonData, }) // 返回响应结果 w.WriteHeader(http.StatusOK) resp : map[string]interface{}{ code: 1, msg: , data: struct { }{}, } jsonResp, err : json.Marshal(resp) if err ! nil { // 处理错误 http.Error(w, responce json encode err, http.StatusBadRequest) } w.Header().Set(Content-Type, application/json; charsetutf-8) w.WriteHeader(http.StatusOK) _, err w.Write(jsonResp) if err ! nil { // 处理错误 } } else { http.Error(w, Invalid HTTP method, http.StatusMethodNotAllowed) } }关于SSE第三方包的源码说明CI流程SSE服务的Dockerfile多阶段构件一个SSE后端服务用于支撑sse消息的实时推送。FROM golang:1.18.10 as goBuilder RUN go env -w GOPROXYhttps://goproxy.cn,direct COPY ./backend-code/ /my-code/ WORKDIR /my-code/ #因为我们改了sse源码包的内容所以代码仓库里面包含了vendor我们基于vendor编译 RUN go build -modvendor -v -o /go/bin/sse /my-code/main.go FROM centos:centos7 COPY --fromgoBuilder /go/bin/sse /go/bin/sse #RUN timedatectl set-timezone Asia/Shanghai EXPOSE 8066 CMD /go/bin/ssejenkins ci任务脚本对应jenkins自由任务里的shell脚本用于触发构件动作后生成sse镜像并推送到内网harbor仓库。# 私有镜像仓库地址 docker_registry10.251.9.191:5000 wuhan_barborharbor.avlyun.org # jenkins工作目录 workspace/var/lib/jenkins/workspace/ # 来到jenkins工作目录 cd ${workspace} # 创建临时目录 tempdirdate %s%N mkdir ${tempdir} ## ---------------- 全局 config 相关 ---------------- # 配置中心目录名称本质上也是配置中心从gitlab拉取配置信息后的暂存目录 configdirqa-wxqb-config #---------------- 后端构建(php-fpm镜像) ---------------- #代码目录名 backenddirqa-wxqb-sse # 配置文件准备 cp -R ${configdir}/* ${tempdir}/ # 拷贝配置中心的配置文件到临时目录 #源代码准备 mkdir ${tempdir}/backend-code cp -R ${backenddir}/* ${tempdir}/backend-code/ # 拷贝后端代码到临时目录 # docker 构建 cd ${tempdir} #开始构建后端 docker build -f ./sse-dockerfile -t${docker_registry}/wzp/qa-wxqb-sse:${version} . # 清除所有临时文件 cd ${workspace} rm -rf ${tempdir} #上传镜像, QA环境只上传 latest 镜像 #docker image push ${docker_registry}/wzp/qa-wxqb-sse:latest #上传武汉私有化镜像仓库harbor的g-project docker tag ${docker_registry}/wzp/qa-wxqb-sse:${version} ${wuhan_barbor}/g-project/prod-wxqb-sse:${version} #登陆武汉harbor docker login ${wuhan_barbor} -u wangzhiping -p xxxxxx #推送后端镜像 docker push ${wuhan_barbor}/g-project/prod-wxqb-sse:${version}使用SSE镜像#sse服务 docker run --nameqa-wxqb-sse \ -p 8066:8066 \ --network qa-wxqb-bridge-network \ -e TZAsia/Chongqing \ -d harbor.avlyun.org/g-project/prod-wxqb-sse:1.0.0小结golang的sse服务是基于github.com/r3labs/sse/v2实现的该依赖包的核心逻辑是是基于一个叫stream的map来实现的每个连接建立时需要传递stream参数告知服务端的sse你是谁后续推送的消息都是往这个sream里面发布然后客户端的h5的EventSource对象就能收到最新的消息。当然还需要前端还要写一下js来接收json消息并渲染为特定的样式呈现给用户。效果图踩坑说明1.SSE前端不直连的坑之前前端的代码没有针对不同的开发环境诸如测试、生产抽离源码里面存在变动的配置项导致前端代码里面关于js连接sse的地址写死了为了适配前端后端只能尝试通过nginx代理转发sse的连接即前端先连接nginx然后nginx代理转发到go的sse服务。location /msg_event { access_log /var/log/nginx/proxy_sse.log; error_log /var/log/nginx/proxy_sse_error.log; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection Upgrade; proxy_pass http://go-sse:8066; }虽然可以用但是发现上线后当用户在同一个浏览器开了多个tab选项卡时导致电脑会变得很卡浏览器发送的相关请求都会很卡。初步怀疑是nginx是短连接对sse的支持有点不太友好比起js直连go的sse服务存在性能的损耗。下面这段是一个sse服务的的nginx代理转发配置在线上服务上工作正常感觉sse是可以走代理的只是超时时间需要设置得长一点location /web2/ { # 10.251.8.64:8066 proxy_pass http://127.0.0.1:20013/; chunked_transfer_encoding off; proxy_read_timeout 86400s; proxy_connect_timeout 1h; proxy_cache off; proxy_buffering off; proxy_redirect off; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header Connection keep-alive; proxy_set_header Upgrade $http_upgrade; proxy_headers_hash_max_size 51200; proxy_headers_hash_bucket_size 6400; proxy_set_header Cache-Control no-cache; add_header Access-Control-Allow-Origin * always; add_header Access-Control-Allow-Credentials true always; add_header Access-Control-Allow-Methods GET, OPTIONS always; add_header Access-Control-Allow-Headers Origin,Authorization,Accept,X-Requested-With always; if ($request_method OPTIONS) { # 如果请求方法为 OPTIONS则返回 204 (无内容) add_header Access-Control-Allow-Origin *; add_header Access-Control-Allow-Methods GET, OPTIONS; add_header Access-Control-Allow-Headers Origin,Authorization,Accept,X-Requested-With; add_header Access-Control-Max-Age 1728000; add_header Content-Type text/plain charsetUTF-8; add_header Content-Length 0; return 204; } }2.关于SSE重连机制的坑前端之前没有针对sse建立连接失败后对重连次数做限制导致只要第一次连不上sse浏览器就会每隔两秒发送一次重新尝试连接的请求当你有多个tab选项卡时会并发发送类似dos攻击了会导致nginx卡顿最后改成SSE改为直连前端优化连接失败的重试次数限制后发现系统不卡顿了3.前端不关闭SSE连接的坑针对sse的tcp连接如果在前端建立过多使用完毕不断开连接的话会导致一个浏览器占用tcp的连接数超过浏览器的上限就会影响其他http接口的请求导致其他接口持续pending参考文档https://juejin.cn/post/7122014462181113887