Python自动化办公:三合一消息推送实战(钉钉、微信、QQ)
1. 为什么需要三合一消息推送在日常工作中我们经常遇到这样的场景服务器突然宕机需要立即通知运维人员项目进度需要同步给团队成员或者系统监控到异常需要及时告警。这些消息往往需要根据不同的紧急程度和接收对象分发到不同的通讯平台。比如紧急故障可能需要同时推送钉钉、微信和QQ普通通知可能只需要发送到项目微信群个人提醒更适合通过QQ私聊发送如果每次都要手动打开不同平台发送消息不仅效率低下还容易遗漏重要通知。这就是为什么我们需要一个统一的Python脚本能够根据预设规则自动将消息推送到指定平台。我去年负责的一个电商项目就遇到过这样的问题大促期间订单系统偶尔会出现短暂异常需要立即通知技术团队排查。最初我们是用人工在多个群里复制粘贴报警信息经常出现消息发错群或者漏发的情况。后来用Python实现了自动化推送报警响应速度提升了70%以上。2. 钉钉机器人消息推送实战2.1 准备工作首先需要创建一个钉钉群聊这个群就是消息的接收方。我建议专门创建一个系统通知这样的群只用来接收自动化消息。创建好群后点击右上角的群设置选择智能群助手点击添加机器人选择自定义机器人这里有个小技巧建议给机器人起个容易识别的名字比如订单系统监控或者服务器报警这样收到消息时一眼就能知道来源。2.2 安全设置钉钉机器人提供了三种安全设置方式我强烈推荐使用加签方式在安全设置中选择加签系统会生成一个密钥务必妥善保存同时记下提供的Webhook地址这里有个实际项目中遇到的坑加签密钥和Webhook地址一旦离开当前页面就无法再次查看所以一定要第一时间复制保存。我有次不小心关掉了页面不得不删除机器人重新创建。2.3 Python代码实现下面是完整的钉钉消息推送代码我加了详细注释import json import time import requests import hmac import hashlib import base64 import urllib.parse # 消息内容 dingding_text 服务器CPU使用率超过90%请立即处理 # 从机器人设置获取的Webhook地址 webhook https://oapi.dingtalk.com/robot/send?access_token你的token # 从机器人设置获取的加签密钥 secret 你的加签密钥 # 生成时间戳 timestamp str(round(time.time() * 1000)) # 计算签名 secret_enc secret.encode(utf-8) string_to_sign {}\n{}.format(timestamp, secret) string_to_sign_enc string_to_sign.encode(utf-8) hmac_code hmac.new(secret_enc, string_to_sign_enc, digestmodhashlib.sha256).digest() sign urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 构造最终请求URL url %stimestamp%ssign%s % (webhook, timestamp, sign) # 设置请求头 headers {Content-Type: application/json} # 构造消息体 data { msgtype: text, text: { content: f{dingding_text}\n } } # 发送请求 res requests.post(url, datajson.dumps(data), headersheaders) # 检查发送结果 if res.json()[errmsg] ok: print(钉钉消息发送成功) else: print(f发送失败{res.text})实际使用中我建议把这段代码封装成函数这样可以在不同地方调用def send_dingtalk_message(content, webhook, secret): # 上面代码的实现... return res.json()[errmsg] ok3. 微信消息推送方案3.1 Server酱介绍Server酱是目前最常用的微信消息推送服务之一它通过微信公众号来转发消息。相比直接调用微信APIServer酱有几个优势不需要企业微信账号个人微信号就能用设置简单5分钟就能搞定免费版完全够用不过要注意的是Server酱免费版每天最多发送5条消息如果需求量大可以考虑升级付费版。3.2 获取SendKey使用Server酱只需要一个关键参数SendKey。获取步骤访问Server酱官网并登录在KeyAPI页面找到SendKey关注对应的微信公众号用于接收消息这里有个实用技巧可以创建多个SendKey分别对应不同的消息类型。比如一个用于服务器报警一个用于业务通知这样在微信里就能通过发送者区分消息类型。3.3 Python实现代码import json import requests def send_wechat_message(title, content, send_key): 发送微信消息 :param title: 消息标题 :param content: 消息内容 :param send_key: Server酱SendKey :return: 是否发送成功 url fhttps://sctapi.ftqq.com/{send_key}.send headers {Content-Type: application/x-www-form-urlencoded} data { text: title, content: content } try: response requests.post(url, datadata, headersheaders) result json.loads(response.text) return result[data][error] SUCCESS except Exception as e: print(f微信消息发送异常{str(e)}) return False # 使用示例 send_wechat_message( title数据库备份通知, content今日数据库备份已完成大小2.3GB, send_key你的SendKey )在实际项目中我通常会把这个函数和钉钉推送函数放在同一个工具模块中方便统一调用。4. QQ消息推送实现4.1 Qmsg酱服务介绍Qmsg酱是专门为QQ设计的消息推送服务它通过QQ机器人来实现消息转发。相比前两种方案QQ推送更适合个人使用或者小团队内部通知。Qmsg酱的特点是支持私聊和群消息可以指定多个QQ号接收消息实时性强4.2 配置步骤访问Qmsg酱官网并登录添加机器人QQ为好友在控制台获取你的Qmsg Key这里有个注意事项机器人QQ可能会被误认为是骚扰账号而被封建议先用小号测试。4.3 Python代码实现import json import requests def send_qq_message(content, qmsg_key, qq_numbersNone): 发送QQ消息 :param content: 消息内容 :param qmsg_key: Qmsg酱Key :param qq_numbers: 要的QQ号列表可选 :return: 是否发送成功 url fhttps://qmsg.zendee.cn/send/{qmsg_key} headers {Content-Type: application/x-www-form-urlencoded} data {msg: content} if qq_numbers: data[qq] ,.join(qq_numbers) try: response requests.post(url, datadata, headersheaders) result json.loads(response.text) return result[success] except Exception as e: print(fQQ消息发送异常{str(e)}) return False # 使用示例 send_qq_message( content你的代码已通过CI测试可以合并了, qmsg_key你的QmsgKey, qq_numbers[12345678] # 要提醒的QQ号 )5. 三合一消息推送整合5.1 设计统一接口现在我们已经实现了三种消息推送方式接下来需要将它们整合成一个统一的接口。我推荐这样设计class MessageSender: def __init__(self, config): :param config: 包含各平台配置的字典 self.dingtalk_webhook config.get(dingtalk_webhook) self.dingtalk_secret config.get(dingtalk_secret) self.wechat_key config.get(wechat_key) self.qq_key config.get(qq_key) def send(self, content, platforms, titleNone, qq_numbersNone): 统一发送消息 :param content: 消息内容 :param platforms: 要发送的平台列表如 [dingtalk, wechat, qq] :param title: 微信消息标题(可选) :param qq_numbers: QQ号列表(可选) :return: 各平台发送结果字典 results {} if dingtalk in platforms and self.dingtalk_webhook and self.dingtalk_secret: results[dingtalk] send_dingtalk_message( content, self.dingtalk_webhook, self.dingtalk_secret ) if wechat in platforms and self.wechat_key: results[wechat] send_wechat_message( title or content, content, self.wechat_key ) if qq in platforms and self.qq_key: results[qq] send_qq_message( content, self.qq_key, qq_numbers ) return results5.2 配置文件示例建议把各平台的配置信息放在配置文件中# config.yaml dingtalk: webhook: 你的钉钉Webhook secret: 你的钉钉加签密钥 wechat: key: 你的Server酱SendKey qq: key: 你的Qmsg酱Key5.3 完整使用示例import yaml from message_sender import MessageSender # 加载配置 with open(config.yaml) as f: config yaml.safe_load(f) # 创建发送器实例 sender MessageSender(config) # 发送重要告警所有平台 results sender.send( content数据库主库宕机请立即处理, platforms[dingtalk, wechat, qq], title紧急告警, qq_numbers[12345678, 87654321] ) print(发送结果, results)在实际项目中我通常会根据消息级别决定发送到哪些平台。比如紧急告警所有平台短信电话重要通知钉钉微信普通提醒QQ或微信6. 高级功能与优化建议6.1 消息模板功能对于经常发送的类似消息可以设计模板功能class MessageSender: # ... 其他代码 ... def send_template(self, template_name, data, platforms): 发送模板消息 :param template_name: 模板名称 :param data: 模板数据 :param platforms: 发送平台 :return: 发送结果 templates { server_alert: [{level}] {service} 出现异常{error}, task_notify: 任务 {task_name} 已完成耗时{time_cost} } if template_name not in templates: raise ValueError(f未知模板{template_name}) content templates[template_name].format(**data) return self.send(content, platforms)使用示例sender.send_template( template_nameserver_alert, data{ level: CRITICAL, service: 订单服务, error: 响应超时 }, platforms[dingtalk, wechat] )6.2 消息发送限流为了防止消息轰炸建议实现简单的限流功能from collections import defaultdict from time import time class RateLimiter: def __init__(self, max_count, period): :param max_count: 周期内最大发送次数 :param period: 周期长度(秒) self.max_count max_count self.period period self.counts defaultdict(int) self.last_reset defaultdict(float) def check(self, key): now time() if now - self.last_reset[key] self.period: self.counts[key] 0 self.last_reset[key] now if self.counts[key] self.max_count: return False self.counts[key] 1 return True # 在MessageSender中使用 class MessageSender: def __init__(self, config): # ... 其他初始化 ... self.rate_limiter RateLimiter(max_count10, period60) # 每分钟最多10条 def send(self, content, platforms, titleNone, qq_numbersNone): if not self.rate_limiter.check(global): print(发送频率过高请稍后再试) return {} # ... 原有发送逻辑 ...6.3 消息发送日志记录消息发送日志对于后续排查问题很有帮助import logging from datetime import datetime logging.basicConfig(filenamemessage.log, levellogging.INFO) class MessageSender: # ... 其他代码 ... def send(self, content, platforms, titleNone, qq_numbersNone): log_data { timestamp: datetime.now().isoformat(), content: content, platforms: platforms, title: title, qq_numbers: qq_numbers } try: results self._send_impl(content, platforms, title, qq_numbers) log_data.update({ results: results, status: success }) logging.info(log_data) return results except Exception as e: log_data.update({ error: str(e), status: failed }) logging.error(log_data) raise7. 实际项目中的应用案例去年我负责的一个电商项目中我们使用这套消息推送系统实现了以下功能服务器监控报警当CPU、内存或磁盘使用率超过阈值时自动发送报警到运维团队的钉钉群和值班人员的微信。订单异常通知当系统检测到异常订单如短时间内同一用户大量下单时会通知风控团队的QQ群。定时任务报告每天凌晨的数据库备份、数据统计等定时任务完成后会自动发送执行结果到技术负责人的微信。系统维护通知在计划维护前会提前发送通知到所有相关人员的各个平台。实现这些功能后团队的响应速度明显提升重要消息的漏看率降低了90%以上。特别是在大促期间这套系统帮助我们及时发现并处理了多个潜在问题。