如何使用redis实现分布式锁
时间:2019-07-05 16:45
使用Redis实现分布式锁 redis特性介绍 1、支持丰富的数据类型,如String、List、Map、Set、ZSet等。 2、支持数据持久化,RDB和AOF两种方式 3、支持集群工作模式,分区容错性强 4、单线程,顺序处理命令 5、支持事务 6、支持发布与订阅 Redis实现分布式锁使用了SETNX命令: SETNX key value 将key的值设为value ,当且仅当key不存在。 若给定的key已经存在,则SETNX不做任何动作。 SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。 可用版本:>= 1.0.0时间复杂度:O(1)返回值: 设置成功,返回 1 。 设置失败,返回 0 。 首先,我们需要封装一个公共的Redis访问工具类。该类需要注入RedisTemplate实例和ValueOperations实例,使用ValueOperations实例是因为Redis实现的分布式锁使用了最简单的String类型。另外,我们需要封装3个方法,分别是setIfObsent (String key, String value)、 expire (String key, long timeout, TimeUnit unit) 、delete (String key) ,分别对应Redis的SETNX、expire、del命令。以下是Redis访问工具类的具体实现: 完成了Redis访问工具类的实现,现在需要考虑的是如何去模拟竞争分布式锁。因为Redis本身就是支持分布式集群的,所以只需要模拟出多线程处理业务场景。这里采用线程池来模拟,以下是测试类的具体实现: 通过上面这段代码,可能会产生以下几个疑问: 线程如果获取分布式锁失败,为什么不尝试重新获取锁? 线程获取分布式锁成功后,设置了锁的失效时间,这个失效时间长短如何确定? 线程业务处理结束后,为什么要做删除锁的操作? 针对这几个疑问,我们可以来讨论下。 第一,Redis的SETNX命令,如果key已经存在,则不会做任何操作,所以SETNX实现的分布式锁并不是可重入锁。当然,也可以自己通过代码实现重试n次或者直至获取到分布式锁为止。但是,这不能保证竞争的公平性,某个线程会因为一直等待锁而阻塞。因此,Redis实现的分布式锁更适用于对共享资源一写多读的场景。 第二,分布式锁必须设置失效时间,而且失效时间必须大于业务处理所需的时间(保证数据一致性)。所以,在测试阶段尽可能准确的预测出业务正常处理所需的时间,设置失效时间是防止因为业务处理过程的某些原因导致死锁的情况。 第三,业务处理结束,必须要做删除锁的操作。 上面设置分布式锁和为锁设置失效时间是通过两个操作步骤完成的,更合理的方式应该是把设置分布式锁和为锁设置失效时间通过一个操作完成。要么都成功,要么都失败。实现代码如下: 运行单元测试类,结果如下: 更多Redis相关知识,请访问Redis使用教程栏目! 以上就是如何使用redis实现分布式锁的详细内容,更多请关注gxlsystem.com其它相关文章!redis> EXISTS job # job 不存在
(integer) 0
redis> SETNX job "programmer" # job 设置成功
(integer) 1
redis> SETNX job "code-farmer" # 尝试覆盖 job ,失败
(integer) 0
redis> GET job # 没有被覆盖
"programmer"
@Component
public class RedisDao {
@Autowired
private RedisTemplate redisTemplate;
@Resource(name="redisTemplate")
private ValueOperations<Object, Object> valOpsObj;
/**
* 如果key不存在,就存储一个key-value,相当于SETNX命令
* @param key 键
* @param value 值,可以为空
* @return
*/
public boolean setIfObsent (String key, String value) {
return valOpsObj.setIfAbsent(key, value);
}
/**
* 为key设置失效时间
* @param key 键
* @param timeout 时间大小
* @param unit 时间单位
*/
public boolean expire (String key, long timeout, TimeUnit unit) {
return redisTemplate.expire(key, timeout, unit);
}
/**
* 删除key
* @param key 键
*/
public void delete (String key) {
redisTemplate.delete(key);
}
}
@RestController
@RequestMapping("test")
public class TestController {
private static final Logger LOG = LoggerFactory.getLogger(TestController.class); //日志对象
@Autowired
private RedisDao redisDao;
//定义的分布式锁key
private static final String LOCK_KEY = "MyTestLock";
@RequestMapping(value={"testRedisLock"}, method=RequestMethod.GET)
public void testRedisLock () {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
//获取分布式锁
boolean flag = redisDao.setIfObsent(LOCK_KEY, "lock");
if (flag) {
LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁成功");
//获取锁成功后设置失效时间
redisDao.expire(LOCK_KEY, 2, TimeUnit.SECONDS);
try {
LOG.info(Thread.currentThread().getName() + ":处理业务开始");
Thread.sleep(1000); //睡眠1000ms模拟处理业务
LOG.info(Thread.currentThread().getName() + ":处理业务结束");
//处理业务完成后删除锁
redisDao.delete(LOCK_KEY);
} catch (InterruptedException e) {
LOG.error("处理业务异常:", e);
}
} else {
LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁失败");
}
}
});
}
}
}
/**
* Redis访问工具类
*/
@Component
public class RedisDao {
private static Logger logger = LoggerFactory.getLogger(RedisDao.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 设置分布式锁
* @param key 键
* @param value 值
* @param timeout 失效时间
* @return
*/
public boolean setDistributeLock (String key, String value, long timeout) {
RedisConnection connection = null;
boolean flag = false;
try {
//获取一个连接
connection = stringRedisTemplate.getConnectionFactory().getConnection();
//设置分布式锁的同时为锁设置失效时间
connection.set(key.getBytes(), value.getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT);
flag = true;
} catch (Exception e) {
logger.error("set automic lock error:", e);
} finally {
//使用后关闭连接
connection.close();
}
return flag;
}
/**
* 查询key的失效时间
* @param key 键
* @param timeUnit 时间单位
* @return
*/
public long ttl (String key, TimeUnit timeUnit) {
return stringRedisTemplate.getExpire(key, timeUnit);
}
}
/**
* 单元测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo1ApplicationTests {
private static final Logger LOG = LoggerFactory.getLogger(Demo1ApplicationTests.class);
@Autowired
private RedisDao redisDao;
@Test
public void testDistributeLock () {
String key = "MyDistributeLock";
//设置分布式锁,失效时间20s
boolean result = redisDao.setDistributeLock(key, "1", 20);
if (result) {
LOG.info("设置分布式锁成功");
long ttl = redisDao.ttl(key, TimeUnit.SECONDS);
LOG.info("{}距离失效还有{}s", key, ttl);
}
}
}
2019-05-15 13:07:10.827 - 设置分布式锁成功
2019-05-15 13:07:10.838 - MyDistributeLock距离失效还有19s