python之redis 发布订阅、集群连接、Redis 集群Docker版常用命令、集群的创建、删除、重启、常用运维命令等等
一、redis 发布订阅1.1 redis 发布订阅redis 发布订阅消息发布后如果订阅者没有启动错过即错过。不会再数据库中持久化在 Redis 的发布-订阅系统中消息类型有两种主要形式message和pmessage。这两种类型的消息分别对应不同的订阅方式message这是标准的消息类型出现这种类型的消息是因为订阅者使用了标准的subscribe方法订阅了具体的频道。{type:message,pattern:None,channel:b111,data:b{content: 1}}pmessage这种消息类型出现是因为订阅者使用了psubscribe方法订阅了频道模式pattern。模式订阅允许使用通配符订阅多个频道比如psubscribe(news.*)会订阅所有以news.开头的频道。如果你收到的是pmessage类型的消息说明你是在使用模式订阅psubscribe而不是直接订阅具体的频道subscribe。{type:pmessage,pattern:b111,channel:b111,data:b{content: 1}}importtimefrom redisimportStrictRedis,ConnectionPool from typingimportOptionalclassRedisManager:管理 Redis 发布/订阅操作的类def__init__(self,host:str127.0.0.1,port:int6379,db:int0,password:str123456789):初始化 Redis 连接池和客户端poolConnectionPool(hosthost,portport,dbdb,passwordpassword,decode_responsesTrue # 自动将响应解码为字符串)self.redisStrictRedis(connection_poolpool)defpublish(self,channel:str,message:str)-bool:向指定频道发布消息try:self.redis.publish(channel,message)returnTrue except Exception as e:# 发布消息失败returnFalse defsubscribe(self,channel:str,callback:Optional[callable]None)-None:标准模式-订阅频道并处理消息pubsubself.redis.pubsub()pubsub.subscribe(channel)# 已订阅频道formessage in pubsub.listen():ifmessage[type]message:datamessage[data]self.logger.info(f在 {channel} 接收到消息: {data})ifcallback:callback(data)else:print(f消息: {data})defpsubscribe(self,channel:str,callback:Optional[callable]None)-None:频道模式-订阅频道并处理消息pubsubself.redis.pubsub()pubsub.psubscribe(channel)# 已订阅频道formessage in pubsub.listen():print(message)defunsubscribe(self,channel:str)-None:取消订阅频道pubsubself.redis.pubsub()pubsub.unsubscribe(channel)# 已取消订阅频道 defmain():# 初始化 Redis 管理器 redis_managerRedisManager()channeler_test# 示例发布消息fori inrange(5):redis_manager.publish(channel,str(i))time.sleep(1)# 示例订阅消息 redis_manager.subscribe(channel)if__name____main__:main()二、redis_集群链接卸载冲突的 redis-py-clusterpip install redis-py-cluster确保 redis 是最新版7.1.0 已内置集群和异步功能pip install --upgrade redisimporttracebackfromredis.clusterimportRedisCluster,ClusterNode# 注意这里fromredis.exceptionsimportRedisErrordefadd_devices_to_online_set(device_ids,set_keyonline_devices): 连接到 Redis 集群并使用 pipeline 批量将设备ID添加到 Set 中。 :param device_ids: List[str] 设备ID列表 :param set_key: str Set的Key名称 :return: dict 添加结果统计成功新增、已存在 result_summary{added:0,existed:0,errors:0,details:[]}try:startup_nodes2[ClusterNode(11.22.33.44,6381),ClusterNode(11.22.33.44,6382),ClusterNode(11.22.33.44,6383),]redis_dbRedisCluster(startup_nodesstartup_nodes,decode_responsesTrue,skip_full_coverage_checkTrue,passwordxxxxxxxxxx)redis_db.ping()print(成功连接到 Redis 集群)withredis_db.pipeline(transactionFalse)aspipe:fordevice_idindevice_ids:print(f准备向 Set {set_key} 添加设备 {device_id}...)pipe.sadd(set_key,device_id)pipe_resultpipe.execute()# pipe_result是设备列表对应的结果每个是0或1fordevice_id,sadd_resinzip(device_ids,pipe_result):ifsadd_res1:print(f成功添加新设备 {device_id} 到 {set_key}。)result_summary[added]1result_summary[details].append({device_id:device_id,status:added})elifsadd_res0:print(f设备 {device_id} 已存在于 {set_key} 中。)result_summary[existed]1result_summary[details].append({device_id:device_id,status:existed})else:print(f设备 {device_id} 添加出现未知结果:{sadd_res})fromredisclusterimportRedisClusterdefprint_hi():# 构建所有的节点startup_nodes[{host:127.0.0.1,port:6370},{host:127.0.0.2,port:6371},{host:127.0.0.3,port:6373}]# 构建StrictRedisCluster对象redis_dbRedisCluster(startup_nodesstartup_nodes,decode_responsesTrue,password123456789)withredis_db.pipeline(transactionFalse)aspipe:withopen(./Port.csv,r)asf:pipe.sadd(DN2010161015893863)resultpipe.execute()if__name____main__:print_hi()三、全新一键部署正常集群解决所有连接拒绝问题0. 编写一键部署脚本redis-set-password.shvimredis-set-password.shchmodx redis-set-password.sh ./redis-set-password.sh# 1. 清理之前的错误容器dockerrm-f$(dockerps-aq-fnameredis-node)2/dev/null# 2. 启动 6 个容器使用宿主机网络--network host 代替桥接不再使用 -p 映射echo使用宿主机网络启动 6 个共享 127.0.0.1 的 Redis 节点...forportin{7001..7006};dodockerrun-d--nameredis-node-${port}\--networkhost\redis:7.2.5\redis-server--port${port}\--cluster-enabledyes\--cluster-config-file nodes-${port}.conf\--cluster-node-timeout5000\--appendonlyyes\--requirepassmockmock\--masterauthmockmock\--protected-mode nodoneecho等待容器启动完成...1. 组建集群在宿主机终端直接执行以下命令将这 6 个独立节点合并为集群3 主 3 从redis-cli-amockmock--clustercreate\127.0.0.1:7001127.0.0.1:7002127.0.0.1:7003\127.0.0.1:7004127.0.0.1:7005127.0.0.1:7006\--cluster-replicas1--cluster-yes执行成功后控制台会输出[OK] All 16384 slots covered.2. 验证集群是否创建成功组建完成后使用以下命令进行验证查看集群状态首选验证redis-cli-p7001-amockmockcluster info如果输出中的第一行是cluster_state:ok说明集群已完全正常。查看节点主从分配信息redis-cli-p7001-amockmockcluster nodes此命令会列出所有 6 个节点并明确标识哪些是master哪些是slave。写入/读取数据测试redis-cli-c-p7001-amockmocksettest_key123redis-cli-c-p7001-amockmockget test_key如果集群或者容器挂掉处于停止状态你不需要重新组建集群。因为节点配置文件nodes-*.conf和持久化数据仍然保留在对应的容器内部只需重新启动这些容器集群就会自动恢复dockerstart redis-node-7001 redis-node-7002 redis-node-7003 redis-node-7004 redis-node-7005 redis-node-7006启动完成后等待几秒钟让节点间完成握手即可再次验证集群状态redis-cli-p7001-amockmockcluster info