您的位置:首页 > 技术中心 > 数据库 >

详细聊聊redis中的分布式锁

时间:2023-04-07 16:32

我们都知道分布式环境下要使用分布式锁才行。那么分布式锁都需要有哪些特点呢?单机redis怎么加锁?redis集群加锁有哪些坑呢?别急,下面我们一步步解开Redis分布式锁的面纱。

分布式锁的特点

  • 1.独占性

不论在任何情况下都只能有一个线程持有锁。

  • 2.高可用

redis集群环境不能因为某一个节点宕机而出现获取锁或释放锁失败。【相关推荐:Redis视频教程】

  • 3.防死锁

必须有超时控制机制或者撤销操作。

  • 4.不乱抢

自己加锁,自己释放。不能释放别人加的锁。

  • 5.重入性

同一线程可以多次加锁。

redis单机怎么实现

一般情况下都是使用setnx+lua脚本实现。

直接贴代码

package com.fandf.test.redis;import cn.hutool.core.util.IdUtil;import cn.hutool.core.util.RandomUtil;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.Collections;import java.util.concurrent.TimeUnit;/** * redis 单机锁 * * @author fandongfeng * @date 2023/3/29 06:52 */@Slf4j@Servicepublic class RedisLock {    @Resource    RedisTemplate<String, Object> redisTemplate;    private static final String SELL_LOCK = "kill:";    /**     * 模拟秒杀     *     * @return 是否成功     */    public String kill() {        String productId = "123";        String key = SELL_LOCK + productId;        //锁value,解锁时 用来判断当前锁是否是自己加的        String value = IdUtil.fastSimpleUUID();        //加锁 十秒钟过期 防死锁        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);        if (!flag) {            return "加锁失败";        }        try {            String productKey = "good123";            //获取商品库存            Integer stock = (Integer) redisTemplate.opsForValue().get(productKey);            if (stock == null) {                //模拟录入数据, 实际应该加载时从数据库读取                redisTemplate.opsForValue().set(productKey, 100);                stock = 100;            }            if (stock <= 0) {                return "卖完了,下次早点来吧";            }            //扣减库存, 模拟随机卖出数量            int randomInt = RandomUtil.randomInt(1, 10);            redisTemplate.opsForValue().decrement(productKey, randomInt);            // 修改db,可以丢到队列里慢慢处理            return "成功卖出" + randomInt + "个,库存剩余" + redisTemplate.opsForValue().get(productKey) + "个";        } finally {//            //这种方法会存在删除别人加的锁的可能//            redisTemplate.delete(key);//            if(value.equals(redisTemplate.opsForValue().get(key))){//                //因为if条件的判断和 delete不是原子性的,//                //if条件判断成功后,恰好锁到期自己解锁//                //此时别的线程如果持有锁了,就会把别人的锁删除掉//                redisTemplate.delete(key);//            }            //使用lua脚本保证判断和删除的原子性            String luaScript =                    "if (redis.call('get',KEYS[1]) == ARGV[1]) then " +                            "return redis.call('del',KEYS[1]) " +                            "else " +                            "return 0 " +                            "end";            redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value);        }    }}

进行单元测试,模拟一百个线程同时进行秒杀

package com.fandf.test.redis;import org.junit.jupiter.api.DisplayName;import org.junit.jupiter.api.RepeatedTest;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.parallel.Execution;import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;/** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */@SpringBootTestclass SignServiceTest {      @Resource    RedisLock redisLock;    @RepeatedTest(100)    @Execution(CONCURRENT)    public void redisLock() {        String result = redisLock.kill();        if("加锁失败".equals(result)) {        }else {            System.out.println(result);        }    }}

只有三个线程抢到了锁

成功卖出5个,库存剩余95个成功卖出8个,库存剩余87个成功卖出7个,库存剩余80个

redis锁有什么问题?

总的来说有两个:

  • 1.无法重入。
  • 2.我们为了防止死锁,加锁时都会加上过期时间,这个时间大部分情况下都是根据经验对现有业务评估得出来的,但是万一程序阻塞或者异常,导致执行了很长时间,锁过期就会自动释放了。此时如果别的线程拿到锁,执行逻辑,就有可能出现问题。

那么这两个问题有没有办法解决呢?有,接下来我们就来讲讲Redisson

Redisson实现分布式锁

Redisson是什么?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

springboot集成Redisson

集成很简单,只需两步

  1. pom引入依赖
<dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson-spring-boot-starter</artifactId></dependency>
  1. application.yml增加redis配置
spring:  application:    name: test  redis:    host: 127.0.0.1    port: 6379

使用也很简单,只需要注入RedissonClient即可

package com.fandf.test.redis;import lombok.extern.slf4j.Slf4j;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * @author fandongfeng */@Component@Slf4jpublic class RedissonTest {    @Resource    RedissonClient redissonClient;    public void test() {        RLock rLock = redissonClient.getLock("anyKey");        //rLock.lock(10, TimeUnit.SECONDS);        rLock.lock();        try {            // do something        } catch (Exception e) {            log.error("业务异常", e);        } finally {            rLock.unlock();        }    }    }

可能不了解redisson的小伙伴会不禁发出疑问。
what?加锁时不需要加过期时间吗?这样会不会导致死锁啊。解锁不需要判断是不是自己持有吗?
哈哈,别着急,我们接下来一步步揭开redisson的面纱。

Redisson lock()源码跟踪

我们来一步步跟着lock()方法看下源码(本地redisson版本为3.20.0)

//RedissonLock.class@Overridepublic void lock() {    try {        lock(-1, null, false);    } catch (InterruptedException e) {        throw new IllegalStateException();    }}

查看lock(-1, null, false);方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {        //获取当前线程id        long threadId = Thread.currentThread().getId();        //加锁代码块, 返回锁的失效时间        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);        // lock acquired        if (ttl == null) {            return;        }        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);        pubSub.timeout(future);        RedissonLockEntry entry;        if (interruptibly) {            entry = commandExecutor.getInterrupted(future);        } else {            entry = commandExecutor.get(future);        }        try {            while (true) {                ttl = tryAcquire(-1, leaseTime, unit, threadId);                // lock acquired                if (ttl == null) {                    break;                }                // waiting for message                if (ttl >= 0) {                    try {                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                    } catch (InterruptedException e) {                        if (interruptibly) {                            throw e;                        }                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                    }                } else {                    if (interruptibly) {                        entry.getLatch().acquire();                    } else {                        entry.getLatch().acquireUninterruptibly();                    }                }            }        } finally {            unsubscribe(entry, threadId);        }//        get(lockAsync(leaseTime, unit));    }

我们看下它是怎么上锁的,也就是tryAcquire方法

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    //真假加锁方法 tryAcquireAsync    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {    super(commandExecutor, name);    this.commandExecutor = commandExecutor;    this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {    RFuture<Long> ttlRemainingFuture;    if (leaseTime > 0) {        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else {        //waitTime和leaseTime都是-1,所以走这里           //过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();        //跟进去源码发现默认值是30秒, private long lockWatchdogTimeout = 30 * 1000;        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);    ttlRemainingFuture = new CompletableFutureWrapper<>(s);    //加锁成功,开启子线程进行续约    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {        // lock acquired        if (ttlRemaining == null) {            if (leaseTime > 0) {                //如果指定了过期时间,则不续约                internalLockLeaseTime = unit.toMillis(leaseTime);            } else {                //没指定过期时间,或者小于0,在这里实现锁自动续约                scheduleExpirationRenewal(threadId);            }        }        return ttlRemaining;    });    return new CompletableFutureWrapper<>(f);}

上面代码里面包含加锁和锁续约的逻辑,我们先来看看加锁的代码

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,            "if ((redis.call('exists', KEYS[1]) == 0) " +                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +                    "return nil; " +                "end; " +                "return redis.call('pttl', KEYS[1]);",            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

这里就看的很明白了吧,redisson使用了lua脚本来保证了命令的原子性。
redis.call('hexists', KEYS[1], ARGV[2]) 查看 key value 是否存在。

Redis Hexists 命令用于查看哈希表的指定字段是否存在。
如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0 。

127.0.0.1:6379> hexists 123 uuid(integer) 0127.0.0.1:6379> hincrby 123 uuid 1(integer) 1127.0.0.1:6379> hincrby 123 uuid 1(integer) 2127.0.0.1:6379> hincrby 123 uuid 1(integer) 3127.0.0.1:6379> hexists 123 uuid(integer) 1127.0.0.1:6379> hgetall 1231) "uuid"2) "3"127.0.0.1:6379>

当key不存在,或者已经含有给定字段(也就是已经加过锁了,这里是为了实现重入性),直接对字段的值+1
这个字段的值,也就是ARGV[2], 取得是getLockName(threadId)方法,我们再看看这个字段的值是什么

    protected String getLockName(long threadId) {        return id + ":" + threadId;    }    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {        super(commandExecutor, name);        this.commandExecutor = commandExecutor;        this.id = commandExecutor.getServiceManager().getId();        this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();        this.entryName = id + ":" + name;    }    //commandExecutor.getServiceManager() 的id默认值    private final String id = UUID.randomUUID().toString();

这里就明白了,字段名称是 uuid + : + threadId

接下来我们看看锁续约的代码scheduleExpirationRenewal(threadId);

protected void scheduleExpirationRenewal(long threadId) {    ExpirationEntry entry = new ExpirationEntry();    //判断该实例是否加过锁    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);    if (oldEntry != null) {        //重入次数+1        oldEntry.addThreadId(threadId);    } else {        //第一次加锁        entry.addThreadId(threadId);        try {            //锁续约核心代码            renewExpiration();        } finally {            if (Thread.currentThread().isInterrupted()) {                //如果线程异常终止,则关闭锁续约线程                cancelExpirationRenewal(threadId);            }        }    }}

我们看看renewExpiration()方法

private void renewExpiration() {    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());    if (ee == null) {        return;    }    //新建一个线程执行    Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {        @Override        public void run(Timeout timeout) throws Exception {            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());            if (ent == null) {                return;            }            Long threadId = ent.getFirstThreadId();            if (threadId == null) {                return;            }            //设置锁过期时间为30秒            CompletionStage<Boolean> future = renewExpirationAsync(threadId);            future.whenComplete((res, e) -> {                if (e != null) {                    log.error("Can't update lock {} expiration", getRawName(), e);                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());                    return;                }                //检查锁是还否存在                if (res) {                    // reschedule itself 10后调用自己                    renewExpiration();                } else {                    //关闭续约                    cancelExpirationRenewal(null);                }            });        }    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);    //注意上行代码internalLockLeaseTime / 3,    //internalLockLeaseTime默认30s,那么也就是10s检查一次    ee.setTimeout(task);}//设置锁过期时间为internalLockLeaseTime  也就是30s  lua脚本保证原子性protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +                    "return 1; " +                    "end; " +                    "return 0;",            Collections.singletonList(getRawName()),            internalLockLeaseTime, getLockName(threadId));}

OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。

我们写个测试类看看

package com.fandf.test.redis;import org.junit.jupiter.api.Test;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;/** * @Description: * @author: fandongfeng * @date: 2023-3-2416:45 */@SpringBootTestclass RedissonTest {    @Resource    private RedissonClient redisson;    @Test    public void watchDog() throws InterruptedException {        RLock lock = redisson.getLock("123");        lock.lock();        Thread.sleep(1000000);    }}

查看锁的过期时间,及是否续约

127.0.0.1:6379> keys *1) "123"127.0.0.1:6379> ttl 123(integer) 30127.0.0.1:6379> ttl 123(integer) 26127.0.0.1:6379> ttl 123(integer) 24127.0.0.1:6379> ttl 123(integer) 22127.0.0.1:6379> ttl 123(integer) 21127.0.0.1:6379> ttl 123(integer) 20127.0.0.1:6379> ttl 123(integer) 30127.0.0.1:6379> ttl 123(integer) 28127.0.0.1:6379>

我们再改改代码,看看是否可重入和字段名称是否和我们预期一致

package com.fandf.test.redis;import org.junit.jupiter.api.Test;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;/** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */@SpringBootTestclass RedissonTest {    @Resource    private RedissonClient redisson;    @Test    public void watchDog() throws InterruptedException {        RLock lock = redisson.getLock("123");        lock.lock();        lock.lock();        lock.lock();        //加了三次锁,此时重入次数为3        Thread.sleep(3000);        //解锁一次,此时重入次数变为3        lock.unlock();        Thread.sleep(1000000);    }}
127.0.0.1:6379> keys *1) "123"127.0.0.1:6379>127.0.0.1:6379> ttl 123(integer) 24127.0.0.1:6379> hgetall 1231) "df7f4c71-b57b-455f-acee-936ad8475e01:12"2) "3"127.0.0.1:6379>127.0.0.1:6379> hgetall 1231) "df7f4c71-b57b-455f-acee-936ad8475e01:12"2) "2"127.0.0.1:6379>

我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。

Redlock算法

redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?

假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤

  • 1.客户端记录当前系统时间,以毫秒为单位
  • 2.依次尝试从5个redis实例中,使用相同key获取锁
    当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。
  • 3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
    只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功
  • 4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
  • 5.如果获取锁失败,所有的redis实例都会进行解锁
    防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题

这里有下面几个点需要注意:

  • 1.我们都知道单机的redis是cp的,但是集群情况下redis是ap的,所以运行Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。
  • 2.为了提高redis节点宕机的容错率,可以使用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运行,那么至少要部署2*1+1=3台主节点。

更多编程相关知识,请访问:编程视频!!

以上就是详细聊聊redis中的分布式锁的详细内容,更多请关注Gxl网其它相关文章!

热门排行

今日推荐

热门手游