IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。Redis 做缓存性能卓越但一旦缓存出了问题所有请求直接打到数据库上瞬间就能将数据库压垮。缓存层在实践中面临三大经典难题穿透、击穿、雪崩。每个名词听起来都很可怕但理解了它们的成因和应对之道你就能轻松化解。本文不仅讲原理更用 Python 实现完整的防御方案——布隆过滤器、互斥锁、逻辑过期、随机 TTL 和缓存一致性策略。读完就能直接用到你的项目里。1. 缓存穿透1.1 什么是缓存穿透查询一个根本不存在的数据缓存层和数据库层都没有。这类请求每次都穿过缓存直接打到数据库当有大量这种恶意请求时数据库压力骤增甚至宕机。请求 → 缓存(miss)→ 数据库(miss)→ 返回空因为缓存不命中每次请求都击穿缓存直抵数据库。1.2 解决方案一缓存空对象当数据库查询不到数据时依然将一个空值写入缓存并设置较短的过期时间。这样下次请求就直接命中缓存不会访问数据库。importredisimporttimerredis.Redis(hostlocalhost,port6379,decode_responsesTrue)def get_product(product_id): cache_keyfproduct:{product_id}# 1. 查缓存cachedr.get(cache_key)ifcached is not None:ifcachedNULL:print(缓存命中空对象)returnNone print(缓存命中真实数据)returncached# 2. 查数据库模拟productquery_db(product_id)# 可能返回 None# 3. 写入缓存ifproduct is None: r.setex(cache_key,60,NULL)# 空值缓存 60 秒print(写入空对象缓存)else: r.setex(cache_key,300, product)# 真实数据缓存 5 分钟print(写入真实数据缓存)returnproduct def query_db(product_id):模拟数据库查询# 假设只有 id 为 1 的产品存在ifproduct_id1:returniPhone 15returnNone# 测试print(get_product(1))# 缓存未命中查数据库写入真实数据print(get_product(1))# 缓存命中真实数据print(get_product(2))# 缓存未命中查数据库写入空对象print(get_product(2))# 缓存命中空对象优点简单直接。缺点若恶意攻击者不断用不同的不存在 ID 请求缓存中会充满大量NULL键浪费内存。此时需要布隆过滤器。1.3 解决方案二布隆过滤器布隆过滤器Bloom Filter是一个概率型数据结构用来判断“一个元素一定不在集合中”或“可能在集合中”。它由bitmap和多个哈希函数组成。添加元素时通过多个哈希函数计算位置将 bitmap 相应位置 1。查询时同样计算多个位置如果任何一个位置为 0则元素肯定不存在如果全为 1则元素可能存在有一定误判率但不会漏判。布隆过滤器的误判率与 bit 数组长度和哈希函数个数有关可以通过参数控制。布隆过滤器原理示意图添加apple:hash1(apple)2→ setbit2hash2(apple)5→ setbit5hash3(apple)7→ setbit7查询orange:hash1(orange)2→1hash2(orange)5→1hash3(orange)8→0→ 一定不存在Python 实现基于 Redis 位图的布隆过滤器我们使用多个哈希函数通过hashlib模拟在 Redis 的 String 位图上操作。importhashlibimportmathimportredis class BloomFilter:基于 Redis 位图的布隆过滤器 def __init__(self, redis_client, key,expected_items100000,false_positive_rate0.01): expected_items: 预期元素数量 false_positive_rate: 可接受的误判率 self.redisredis_client self.keykey# 根据预期元素和误判率计算位图大小和哈希函数个数self.bit_sizeint(-expected_items * math.log(false_positive_rate)/(math.log(2)**2))self.hash_countint(self.bit_size / expected_items * math.log(2))def _hash(self, item, seed):使用不同 seed 生成多个哈希值 hhashlib.md5((str(seed)str(item)).encode())returnint(h.hexdigest(),16)% self.bit_size def add(self, item):添加元素到布隆过滤器forseedinrange(self.hash_count): offsetself._hash(item, seed)self.redis.setbit(self.key, offset,1)returnTrue def exists(self, item):检查元素是否可能存在forseedinrange(self.hash_count): offsetself._hash(item, seed)ifself.redis.getbit(self.key, offset)0:returnFalsereturnTrue# 使用示例rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)bfBloomFilter(r,bloom:products,expected_items1000000,false_positive_rate0.01)# 预加载所有存在的商品 ID 到布隆过滤器existing_product_ids[1,2,3,4,5]# 模拟数据库中存在的IDforpidinexisting_product_ids: bf.add(pid)# 查询时先经过布隆过滤器def get_product_with_bloom(product_id):ifnot bf.exists(product_id): print(f布隆过滤器判定 {product_id} 不存在直接返回)returnNone# 可能存在于缓存或数据库returnget_product(product_id)# 使用前面定义的函数print(get_product_with_bloom(999))# 直接返回 None不会查库print(get_product_with_bloom(1))# 可能命中输出示例布隆过滤器判定999不存在直接返回 None 缓存未命中查询数据库... 写入真实数据缓存 iPhone15这样绝大多数不存在的 ID 被拦截在布隆过滤器层大大减轻数据库压力。 生产环境可以使用redis-py官方提供的布隆过滤器模块RedisBloom或者使用pybloom库。核心原理完全相同。2. 缓存击穿2.1 什么是缓存击穿一个热点数据 key 在缓存过期的瞬间大量请求同时涌向数据库去重建该缓存。因为重建缓存通常需要一定时间比如复杂的 SQL 查询这会导致数据库瞬间负载飙升甚至崩溃。大量并发请求 → 缓存过期(miss)→ 同时打到数据库与穿透不同击穿的数据是存在的只是因为热点 key 过期。2.2 解决方案一互斥锁让第一个请求去查数据库并重建缓存其他请求等待结果而不是都去查数据库。使用 Redis 的SETNX实现分布式互斥锁。importuuidimporttimeimportthreading def get_hotspot_with_lock(product_id): cache_keyfproduct:{product_id}lock_keyflock:product:{product_id}lock_valuestr(uuid.uuid4())# 1. 查缓存cachedr.get(cache_key)ifcached:returncached# 2. 尝试获取锁ifr.set(lock_key, lock_value,nxTrue,ex5):# 锁过期5秒防死锁try:# 双重检查cachedr.get(cache_key)ifcached:returncached# 3. 查询数据库productquery_db(product_id)# 可能耗时ifproduct: r.setex(cache_key,300, product)returnproduct finally:# 4. 释放锁Lua 保证原子性unlock_scriptifredis.call(get, KEYS[1])ARGV[1]thenreturnredis.call(del, KEYS[1])elsereturn0end r.eval(unlock_script,1, lock_key, lock_value)else:# 获取锁失败等待并重试time.sleep(0.05)returnget_hotspot_with_lock(product_id)# 递归重试# 模拟并发请求def test_concurrent(): def task(): resultget_hotspot_with_lock(1)print(f线程 {threading.current_thread().name}: {result})threads[threading.Thread(targettask)for_inrange(10)]fortinthreads: t.start()fortinthreads: t.join()test_concurrent()优点简单可靠保证同一时刻只有一个线程查询数据库。缺点线程等待可能影响响应时间锁的粒度需要控制。2.3 解决方案二逻辑过期除了物理 TTL再给缓存值加一个逻辑过期时间字段。当物理过期后并不立即删除缓存而是由后台线程异步更新缓存请求在此期间仍然返回旧值。importjsonimportthreading from datetimeimportdatetime, timedelta LOGICAL_TTL300# 逻辑过期时间秒def set_with_logical_expiry(key, value,logical_ttlLOGICAL_TTL):存储带逻辑过期时间的值 data{data:value,expire_at:(datetime.now() timedelta(secondslogical_ttl)).timestamp()}r.set(key, json.dumps(data))def get_with_logical_expiry(key): cachedr.get(key)ifnot cached:returnNone itemjson.loads(cached)dataitem[data]expire_atitem[expire_at]# 如果逻辑时间已过期启动异步刷新ifdatetime.now().timestamp()expire_at:# 使用互斥锁避免大量并发刷新lock_keyflock:logical:{key}ifr.set(lock_key,1,nxTrue,ex5): threading.Thread(targetrefresh_cache,args(key,)).start()# 仍然返回旧值returndatareturndata def refresh_cache(key):异步刷新缓存实际应查数据库 time.sleep(0.2)# 模拟数据库查询new_valuefnew_value_for_{key}# 这里应该是 query_db() 的结果set_with_logical_expiry(key, new_value)print(f异步刷新缓存: {key})优点用户请求不会阻塞始终返回缓存数据可能是旧值。缺点数据可能短暂不一致适合允许最终一致性的场景。3. 缓存雪崩3.1 什么是缓存雪崩大量缓存 key 在同一时间过期或者 Redis 服务宕机导致所有请求瞬间打到数据库就像雪崩一样压垮数据库。常见诱因设置了相同的过期时间大量 key 同时失效。Redis 集群大面积故障。3.2 解决方案一随机 TTL给缓存过期时间加上一个随机偏移量避免集体失效。importrandom def set_with_random_ttl(key, value,base_ttl300,random_range60):基础 TTL ± 随机偏移 ttlbase_ttl random.randint(-random_range, random_range)ttlmax(ttl,60)# 至少 60 秒r.setex(key, ttl, value)print(f{key} 过期时间: {ttl}s)# 设置一批缓存观察过期时间foriinrange(10): set_with_random_ttl(fhot:key:{i}, fvalue_{i})输出示例hot:key:0 过期时间: 332s hot:key:1 过期时间: 268s hot:key:2 过期时间: 319s...3.3 解决方案二多级缓存与降级结合本地缓存如 Python 字典或cachetools当 Redis 不可用时使用本地缓存兜底或者直接降级返回默认值避免请求直达数据库。from cachetoolsimportTTLCache local_cacheTTLCache(maxsize1000,ttl60)def get_with_fallback(key): try: valuer.get(key)ifvalue: local_cache[key]value# 更新本地缓存returnvalue except Exception as e: print(fRedis 异常: {e})# Redis 不可用走本地缓存returnlocal_cache.get(key,默认值)4. 缓存一致性策略使用缓存不可避免地会面临数据一致性问题数据库数据更新了缓存怎么同步4.1 Cache Aside 模式旁路缓存这是最经典的缓存策略读先读缓存缓存没有则读数据库再写入缓存。写先更新数据库再删除缓存。为什么是删除缓存而不是更新缓存因为更新缓存可能涉及复杂计算而且若并发写可能产生脏数据。删除缓存是更轻量和安全的选择。def update_product(product_id, new_data):# 1. 更新数据库save_to_db(product_id, new_data)# 2. 删除缓存r.delete(fproduct:{product_id})print(缓存已删除)4.2 延迟双删为了防止数据库主从延迟导致的不一致可以在写入数据库后延迟一段时间再次删除缓存。def update_product_with_delay(product_id, new_data,delay0.5):# 1. 删除缓存r.delete(fproduct:{product_id})# 2. 更新数据库save_to_db(product_id, new_data)# 3. 延迟后再次删除缓存threading.Timer(delay, lambda: r.delete(fproduct:{product_id})).start()4.3 最终一致性方案对于一致性要求极高的场景可以基于 binlog 异步更新缓存如使用 Canal MQ。这里展示一个简化的消息通知方案# 发布方数据库更新后def notify_cache_evict(channel, key): r.publish(channel, key)# 订阅方def cache_evict_subscriber(): pubsubr.pubsub()pubsub.subscribe(cache:evict)formsginpubsub.listen():ifmsg[type]message:r.delete(msg[data])print(f删除缓存: {msg[data]})5. 动手试试穿透实验停掉布隆过滤器用脚本循环请求 1000 个不存在的商品 ID观察数据库压力然后开启布隆过滤器对比。击穿模拟在 Redis 中设置一个热点键过期时间为 2 秒同时启动 20 个并发线程读取该键统计未命中次数。分别用互斥锁和逻辑过期方案对比。雪崩模拟设置 100 个键过期时间集中在 5 秒内观察过期瞬间的数据库请求量然后改为随机 TTL观察平滑程度。一致性验证模拟并发读写验证 Cache Aside 模式下删除缓存后可能存在的短期不一致观察最终结果。预期效果布隆过滤器拦截绝大部分无效请求互斥锁让数据库只查询一次随机 TTL 使缓存过期平缓分布Cache Aside 在读多写少下表现优异。6. 总结缓存三大难题是面试和生产的常客理解原理后用代码将它们一一化解并不复杂。下一篇我们将深入 Redis 的内存管理与淘汰策略了解当内存满时 Redis 如何优雅地“断舍离”。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维