1、非公平锁1、底层结构map2、实现方式是lua脚本map3、实现逻辑map不存在创建map并创建key-value,加锁成功若map存在判断key是否为请当前线程是当前线程则value1value1用于支持可重入,如果key不是当前线程则返回过期时间2、公平锁脚本参数说明KEYS[1]锁的哈希表存储持有锁的线程ID及其重入次数KEYS[2]列表作为 FIFO 等待队列存储等待线程的IDKEYS[3]有序集合存储每个等待线程的超时时间戳score 超时时间点ARGV[1]锁的租约时间毫秒ARGV[2]当前线程的唯一标识IDARGV[3]线程的等待时间毫秒即每个线程在队列中最多等多久ARGV[4]当前时间戳毫秒while true do local firstThreadId2 redis.call(lindex, KEYS[2], 0); if firstThreadId2 false then break; end; local timeout tonumber(redis.call(zscore, KEYS[3], firstThreadId2)); if timeout tonumber(ARGV[4]) then // remove the item from the queue and timeout set // NOTE we do not alter any other timeout redis.call(zrem, KEYS[3], firstThreadId2); redis.call(lpop, KEYS[2]); else break; end; end; // check if the lock can be acquired now if (redis.call(exists, KEYS[1]) 0) and ((redis.call(exists, KEYS[2]) 0) or (redis.call(lindex, KEYS[2], 0) ARGV[2])) then // remove this thread from the queue and timeout set redis.call(lpop, KEYS[2]); redis.call(zrem, KEYS[3], ARGV[2]); // decrease timeouts for all waiting in the queue local keys redis.call(zrange, KEYS[3], 0, -1); for i 1, #keys, 1 do redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]); end; // acquire the lock and set the TTL for the lease redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; // check if the lock is already held, and this is a re-entry if redis.call(hexists, KEYS[1], ARGV[2]) 1 then redis.call(hincrby, KEYS[1], ARGV[2],1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; // the lock cannot be acquired // check if the thread is already in the queue local timeout redis.call(zscore, KEYS[3], ARGV[2]); if timeout ~ false then // the real timeout is the timeout of the prior thread // in the queue, but this is approximately correct, and // avoids having to traverse the queue return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]); end; // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the // threadWaitTime local lastThreadId redis.call(lindex, KEYS[2], -1); local ttl; if lastThreadId ~ false and lastThreadId ~ ARGV[2] then ttl tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]); else ttl redis.call(pttl, KEYS[1]); end; local timeout ttl tonumber(ARGV[3]) tonumber(ARGV[4]); if redis.call(zadd, KEYS[3], timeout, ARGV[2]) 1 then redis.call(rpush, KEYS[2], ARGV[2]); end; return ttl;第1步清理队列中已超时的等待线程while true do local firstThreadId2 redis.call(lindex, KEYS[2], 0); if firstThreadId2 false then break; end; local timeout tonumber(redis.call(zscore, KEYS[3], firstThreadId2)); if timeout tonumber(ARGV[4]) then redis.call(zrem, KEYS[3], firstThreadId2); redis.call(lpop, KEYS[2]); else break; end; end循环检查队列头部的线程是否已超时timeout 当前时间。如果超时则将其从等待队列KEYS[2]和超时集合KEYS[3]中移除。一旦遇到头部线程未超时停止清理。 作用防止已超时的线程长期占据队列位置影响后续线程获取锁。第2步尝试获取锁锁空闲且队列允许if (redis.call(exists, KEYS[1]) 0) and ((redis.call(exists, KEYS[2]) 0) or (redis.call(lindex, KEYS[2], 0) ARGV[2])) then -- 从队列和超时集合中移除当前线程 redis.call(lpop, KEYS[2]); redis.call(zrem, KEYS[3], ARGV[2]); -- 减少队列中剩余所有等待线程的超时时间 local keys redis.call(zrange, KEYS[3], 0, -1); for i 1, #keys, 1 do redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]); end; -- 获取锁并设置租约时间 redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end条件锁不存在空闲且等待队列为空或队列头正是当前线程。如果当前线程就是队头将其从队列和超时集合中移除表明它即将获得锁。然后遍历超时集合中的所有剩余线程减少它们的超时时间zincrby负值。这是因为队列中前面的线程已获得锁后面的线程应相应减少等待时间公平锁中的时间补偿。最后通过hset和pexpire将锁分配给当前线程返回nil表示获取成功。第3步处理锁重入if redis.call(hexists, KEYS[1], ARGV[2]) 1 then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end如果当前线程已经持有该锁哈希表中存在该线程ID则重入计数加1并刷新锁的过期时间。 支持可重入锁同一线程可多次获取锁而不会死锁。第4步当前线程已在等待队列中local timeout redis.call(zscore, KEYS[3], ARGV[2]); if timeout ~ false then return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]); end如果当前线程已经在等待队列中通过zscore检查则直接返回剩余等待时间。 公式超时时间戳 - 等待时间 - 当前时间。客户端可根据此值决定继续等待还是放弃。第5步将当前线程加入等待队列local lastThreadId redis.call(lindex, KEYS[2], -1); local ttl; if lastThreadId ~ false and lastThreadId ~ ARGV[2] then ttl tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]); else ttl redis.call(pttl, KEYS[1]); end local timeout ttl tonumber(ARGV[3]) tonumber(ARGV[4]); if redis.call(zadd, KEYS[3], timeout, ARGV[2]) 1 then redis.call(rpush, KEYS[2], ARGV[2]); end return ttl;先获取队列最后一个线程ID即队尾。计算当前线程的基准等待时间ttl如果队列非空且最后一个线程不是自己正常情况ttl 最后一个线程的超时时间戳 - 当前时间。否则队列空或最后一个线程是自己ttl 锁当前的剩余时间pttl。然后计算出当前线程的超时时间戳ttl 等待时间 当前时间。将当前线程加入超时集合zadd和队列尾部rpush。最后返回ttl告诉客户端需要等待多久毫秒。总结返回值含义nil成功获取锁首次获取或重入正数毫秒无法获取锁返回需要等待的时间负数或0等待超时或立即失败由调用方解释该脚本实现了公平、可重入、带超时的分布式锁所有操作在 Redis 中原子执行避免了竞态条件。