【缓存技术】Redis实战:从缓存策略到分布式锁
【缓存技术】Redis实战从缓存策略到分布式锁引言缓存是提升系统性能的关键技术Redis作为高性能的键值存储系统被广泛应用于缓存、会话管理、消息队列等场景。本文将详细介绍Redis的核心特性、缓存策略和分布式锁实现。一、Redis基础1.1 数据结构数据类型说明应用场景String字符串缓存、计数器Hash哈希表对象存储List列表消息队列Set无序集合去重、交集Sorted Set有序集合排行榜1.2 基本操作import redis # 连接Redis r redis.Redis(hostlocalhost, port6379, db0) # String操作 r.set(name, Alice) print(r.get(name)) # Hash操作 r.hset(user:1, mapping{ name: Alice, age: 30, email: aliceexample.com }) print(r.hgetall(user:1)) # List操作 r.lpush(messages, Hello) r.lpush(messages, World) print(r.lrange(messages, 0, -1)) # Set操作 r.sadd(tags, python, redis, web) print(r.smembers(tags)) # Sorted Set操作 r.zadd(leaderboard, {Alice: 100, Bob: 90}) print(r.zrange(leaderboard, 0, -1, withscoresTrue))二、缓存策略2.1 缓存模式# Cache-Aside模式 def get_user(user_id): # 先从缓存获取 user r.get(fuser:{user_id}) if user: return json.loads(user) # 缓存未命中从数据库获取 user db.query(fSELECT * FROM users WHERE id {user_id}) # 更新缓存 r.set(fuser:{user_id}, json.dumps(user), ex3600) return user2.2 缓存淘汰策略# 配置Redis缓存策略 # maxmemory-policy allkeys-lru # LRU最近最少使用 # LFU最不经常使用 # FIFO先进先出 # 设置缓存过期时间 r.set(temp_data, value, ex60) # 60秒过期 r.set(session:123, data, px3600000) # 毫秒过期2.3 缓存一致性# 写操作时更新缓存 def update_user(user_id, data): # 更新数据库 db.execute(fUPDATE users SET ... WHERE id {user_id}) # 删除缓存让下次读取时重新加载 r.delete(fuser:{user_id}) # 或者更新缓存 # r.set(fuser:{user_id}, json.dumps(data))三、分布式锁3.1 基本实现def acquire_lock(lock_name, acquire_timeout10): 获取分布式锁 identifier str(uuid.uuid4()) end time.time() acquire_timeout while time.time() end: # 使用SET NX不存在时设置 if r.set(flock:{lock_name}, identifier, nxTrue, ex10): return identifier time.sleep(0.01) return None def release_lock(lock_name, identifier): 释放分布式锁 # 使用Lua脚本保证原子性 script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end r.eval(script, 1, flock:{lock_name}, identifier)3.2 Redlock算法from redis import Redis from time import time class Redlock: def __init__(self, servers): self.servers [Redis(hosthost, portport) for host, port in servers] self.quorum (len(servers) // 2) 1 def acquire(self, lock_name, ttl10000): 获取Redlock分布式锁 identifier str(uuid.uuid4()) acquired 0 start_time time() * 1000 for server in self.servers: try: if server.set(flock:{lock_name}, identifier, nxTrue, pxttl): acquired 1 except Exception: pass elapsed (time() * 1000) - start_time if acquired self.quorum and elapsed ttl: return { valid: True, identifier: identifier, ttl: ttl - elapsed } # 获取失败释放已获取的锁 self.release(lock_name, identifier) return {valid: False} def release(self, lock_name, identifier): 释放Redlock分布式锁 for server in self.servers: try: script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) end server.eval(script, 1, flock:{lock_name}, identifier) except Exception: pass四、高级特性4.1 管道操作# 使用管道提升性能 pipe r.pipeline() # 添加多个操作 pipe.set(key1, value1) pipe.set(key2, value2) pipe.get(key1) pipe.get(key2) # 一次性执行 results pipe.execute() print(results)4.2 事务# 使用事务 with r.pipeline() as pipe: while True: try: pipe.watch(balance) current int(pipe.get(balance)) if current 100: pipe.multi() pipe.decrby(balance, 100) pipe.incr(withdrawn) pipe.execute() break else: break except redis.WatchError: continue4.3 发布/订阅# 发布者 pubsub r.pubsub() r.publish(news, Hello World) # 订阅者 pubsub.subscribe(news) for message in pubsub.listen(): if message[type] message: print(fReceived: {message[data]})五、性能优化5.1 连接池from redis import ConnectionPool # 创建连接池 pool ConnectionPool( hostlocalhost, port6379, db0, max_connections100 ) # 使用连接池 r redis.Redis(connection_poolpool)5.2 数据分片# 简单的分片策略 def get_shard(key): 根据key计算分片 shards [0, 1, 2] return shards[hash(key) % len(shards)] def get_from_shard(key): shard get_shard(key) r redis.Redis(dbshard) return r.get(key)5.3 批量操作# 批量获取 keys [user:1, user:2, user:3] values r.mget(keys) # 批量设置 mapping { user:1: data1, user:2: data2 } r.mset(mapping)六、监控与运维6.1 监控指标# 获取Redis信息 info r.info() print(f内存使用: {info[used_memory_human]}) print(f连接数: {info[connected_clients]}) print(f命中率: {info[keyspace_hits] / (info[keyspace_hits] info[keyspace_misses]):.2%})6.2 持久化配置# RDB持久化 # save 900 1 # 900秒内至少1个key变化 # save 300 10 # 300秒内至少10个key变化 # AOF持久化 # appendonly yes # appendfsync everysec6.3 集群配置# Redis Cluster配置 from redis.cluster import RedisCluster rc RedisCluster( hostlocalhost, port7000, decode_responsesTrue ) rc.set(foo, bar) print(rc.get(foo))七、实战案例缓存系统7.1 实现缓存装饰器def cache(key_prefix, ttl3600): def decorator(func): def wrapper(*args, **kwargs): # 生成缓存key key f{key_prefix}:{args[0]} # 尝试从缓存获取 cached r.get(key) if cached: return json.loads(cached) # 执行函数 result func(*args, **kwargs) # 缓存结果 r.set(key, json.dumps(result), exttl) return result return wrapper return decorator cache(user) def get_user(user_id): return db.query(fSELECT * FROM users WHERE id {user_id})7.2 限流实现def rate_limit(user_id, limit100, window3600): 限流实现 key frate_limit:{user_id} # 增加计数 count r.incr(key) if count 1: # 设置过期时间 r.expire(key, window) return count limit八、常见问题与解决方案8.1 缓存穿透# 使用布隆过滤器解决缓存穿透 from bloom_filter import BloomFilter bloom BloomFilter(max_elements1000000, error_rate0.01) def get_user(user_id): # 先检查布隆过滤器 if user_id not in bloom: return None # 从缓存获取 user r.get(fuser:{user_id}) if user: return json.loads(user) # 从数据库获取 user db.query(fSELECT * FROM users WHERE id {user_id}) if user: bloom.add(user_id) r.set(fuser:{user_id}, json.dumps(user)) return user8.2 缓存击穿# 使用互斥锁解决缓存击穿 def get_hot_data(key): # 先尝试获取缓存 data r.get(key) if data: return json.loads(data) # 获取分布式锁 lock acquire_lock(flock:{key}) if not lock: # 获取锁失败等待重试 time.sleep(0.1) return get_hot_data(key) try: # 再次检查缓存 data r.get(key) if data: return json.loads(data) # 从数据库加载 data db.query(fSELECT * FROM hot_data WHERE key {key}) # 更新缓存 r.set(key, json.dumps(data), ex3600) return data finally: release_lock(flock:{key}, lock)8.3 缓存雪崩# 设置随机过期时间避免缓存雪崩 import random def set_cache_with_random_ttl(key, value, base_ttl3600): # 添加随机偏移 ttl base_ttl random.randint(0, 300) r.set(key, value, exttl)九、结语Redis是一个功能强大的缓存和数据存储系统掌握其核心特性和最佳实践对于构建高性能系统至关重要。本文介绍了Redis的基础操作、缓存策略、分布式锁和性能优化等内容希望能帮助你更好地使用Redis。#Redis #缓存 #分布式锁 #性能优化