基于redis分布式锁 的分布式锁到底安全吗

12:41 提问
redis分布式锁的运用和理解
redis是单进程单线程模式 ,为什么还需要分布式锁?跨jvm是指多个服务器上在运行同一个服务吗?
网上看了很多说跨jvm会需要锁 ,但还是不怎么理解
按赞数排序
----------------------同志你好,我是CSDN问答机器人小N,奉组织之命为你提供参考答案,编程尚未成功,同志仍需努力!
redis 3.0也有集群模式了,可以多个机器组成一个整体的cache
redis分布式锁和zookeeper分布式锁都在在企业服务中常用的知识。譬如:你有一个服务程序,需要部署在5台独立的机器上,才能避免高峰访问的压力。
试想用户查询query的推荐结果(经过复杂机器学习算法的计算)保留在机器1的缓存中,当新来的用户再次查询相同的query时,而服务访问的是机器2,那么机器2还需要重新计算推荐结果吗?当然不需要了,
只需要实时更新每台机器的redis缓存,就可以实现缓存信息互享,减少服务器的计算压力,提高访问的效率。当然redis缓存的更新就需要redis分布式锁,通过分布式锁锁定异步计算,监控缓存更新因 Redis 的分布式锁到底安吗(下)?
(点击上方公众号,可快捷关注)作者:张铁蕾/posts/blog-redlock-reasoning-part2.html如有好章投稿,请点击 → 这里了解详情《Redis 内部数据结构详解(1):dict》《Redis 内部数据结构详解(2):sds》《Redis 内部数据结构详解(3):robj》《Redis 内部数据结构详解(4):ziplist》《Redis 内部数据结构详解(5):quicklist》《Redis 内部数据结构详解(6):skiplist》《基于 Redis 的分布式锁到底安吗(上)?》本系列基于 Redis 3.2 分支自从我写完这话题的上半部分后,就觉头脑中出现了广大一线的鸣响,久久挥之不去。它们就像是在为了局部鸡毛蒜皮的麻烦事而相互扯皮个持续。的确,有关分布式的话题就是这样,琐碎异常,而且每个人说吧听起来像都有道理。今天,我们就此起彼伏探讨这个话题的后半部分。本文中,我们将从antirez反驳Martin Kleppmann的意见开始讲起,然后会涉及到Hacker News上出现的部分讨论内容,接下来我们还会谈论到因Zookeeper和Chubby的布式锁是何等的,并和Redlock进行部分相比。最后,我们会涉及Martin对于这一事件的下结论。还没看过上半部分的同窗,请先看:基于Redis的布式锁到底安全吗(上)antirez的辩解Martin在刊登了那篇分析分布式锁的blog (How to do distributed locking)之后,该文章在Twitter和Hacker News上掀起了普遍的讨论。但众人更想闻的是Redlock的作者antirez对此会发表什么样的意见。Martin的那篇文章是在这一天刊登的,但据Martin说,他在公然发表文章的一星期之前就把草稿发给了antirez进行review,而且他们中间经过email进行了讨论。不知情Martin有没有预想到,antirez对于此事的感应很快,就在Martin的文章刊载出来的第二天,antirez就在他的博客上贴出了他对于此事的辩解文章,名字叫"Is Redlock safe?",地址如下:/news/101这是王牌之间的过招。antirez这篇文章也条例十分清晰,并且中间涉及到大方的细节。antirez认为,Martin的文章对Redlock的批评可以包为两个地方(与Martin文章的上下两部分对应):带有自动过期功能的遍布式锁,必须提供某种fencing机制来保证对共享资源的真的排外保护。Redlock提供源源这样一种机制。Redlock构建在一个不够安全的系模型之上。它于系统的记时假设(timing assumption)有比较强的求,而这些要求在切实可行的系中是力不从心保证的。antirez对这两方面分别展开了辩解。首先,关于fencing机制。antirez对于Martin的这种论证方式提出了质疑:既然在锁失效的景象下一度是一种fencing机制能连续保持资源的排外访问了,那干什么还要以一个分布式锁又还要求它提供那么强的安全性保证呢?即使退一步讲,Redlock虽然提供源源Martin所讲的与日俱增的fencing token,但采用Redlock产生的肆意字符串(my_random_value)可以上等同的法力。这个自由字符串虽然不是递增的,但却是绝无仅有的,可以称unique token。antirez举了个例证,比如,你可用它来兑现“Check and Set”操作,原话是:When start
分享这篇日志的人也喜欢
今天二十岁生日啦~
回来了……
照片与本人不符
我回来了!
热门日志推荐
人人最热标签
北京千橡网景科技发展有限公司:
文网文[号··京公网安备号·甲测资字
文化部监督电子邮箱:wlwh@··
文明办网文明上网举报电话: 举报邮箱:&&&&&&&&&&&&
请输入手机号,完成注册
请输入验证码
密码必须由6-20个字符组成
下载人人客户端
品评校花校草,体验校园广场利用redis实现分布式锁_redis实现分布式锁_词汇网
<meta name="keywords" content="利用redis实现分布式锁redis实现分布式锁,SETNX并不难完美实现(不带过期时间),SETNX实现锁有陷阱需谨慎SETEX复写,带过期时间(原子)
利用redis实现分布式锁
责任编辑:词汇网 发表时间: 23:01:53
SETNX并不难完美实现(不带过期时间),SETNX实现锁有陷阱需谨慎SETEX复写,带过期时间(原子) 标签:
代码片段(2)[全屏查看所有代码] 1.[代码]分布式锁工具 private static Logger logger = Logger.getLogger(LockUtils.class); /** * 最长时间锁为1天 */private final static int maxExpireTime = 24 * 60 * 60;/** * 系统时间偏移量15秒,服务器间的系统时间差不可以超过15秒,避免由于时间差造成错误的解锁 */private final static int offsetTime = 15;/** * 锁只是为了解决小概率事件,最好的方式是不用,从设计上避免分布式锁 * * @param key * key * @param value * @param waitTime * 秒 - 最大等待时间,如果还无法获取,则直接失败 * @param expire * 秒- 锁生命周期时间 * @return true 成功 false失败 * @throws Exception */public static boolean Lock(String key, String value, int waitTime, int expire) {long start = System.currentTimeMillis();String lock_key = key + "_lock";("开始获取分布式锁 key:" + key + " lock_key:" + lock_key + " value:" + value);do {try {Thread.sleep(1);long ret = CacheUtils.Setnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, System.currentTimeMillis() + "$T$" + value, (expire > maxExpireTime) ? maxExpireTime : expire);if (ret == 1) {("成功获得分布式锁 key:" + key + " value:" + value);return Boolean.TRUE;} else { // 存在锁,并对死锁进行修复String desc = CacheUtils.GSetnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key);// 首次锁检测if (desc.indexOf("$T$") > 0) {// 上次锁时间long lastLockTime = NumberUtils.toLong(desc.split("[$T$]")[0]);// 明确死锁,利用Setex复写,再次设定一个合理的解锁时间让系统正常解锁if (System.currentTimeMillis() - lastLockTime > (expire + offsetTime) * 1000) {// 原子操作,只需要一次,【任然会发生小概率事件,多个服务同时发现死锁同时执行此行代码(并发),// 为什么设置解锁时间为expire(而不是更小的时间),防止在解锁发送错乱造成新锁解锁】CacheUtils.Setex(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, value, expire);logger.warn("发现死锁【" + expire + "秒后解锁】key:" + key + " desc:" + desc);} else {("当前锁key:" + key + " desc:" + desc);}} else {logger.warn("死锁解锁中key:" + key + " desc:" + desc);}}if (waitTime == 0) {}Thread.sleep(500);}catch (Exception ex) {logger.error(Trace.GetTraceStackDetails("获取锁失败", ex));}}while ((System.currentTimeMillis() - start) < waitTime * 1000);logger.warn("获取分布式锁失败 key:" + key + " value:" + value);return Boolean.FALSE;}/** * 解锁 * * @param key * @return * @throws Exception */public static boolean UnLock(String key) {String lock_key = key + "_lock";try {CacheUtils.Del(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key);}catch (Exception ex) {logger.error(Trace.GetTraceStackDetails("解锁锁失败key:" + key + " lock_key:" + lock_key, ex));}return Boolean.FALSE;} 2.[代码]redis操作分装部分 @Overridepublic Long Setnx(String key, String value, int expireTime) throws Exception {ShardedJedis jedis =try {jedis = pool.getResource();Long ret = jedis.setnx(key, value);if (ret == 1 && expireTime > 0) {jedis.expire(key, expireTime);}}catch (Exception e) {}finally {if (pool != null && jedis != null) {pool.returnResourceObject(jedis);}}}@Overridepublic String Setex(String key, String value, int expireTime) throws Exception {ShardedJedis jedis =try {jedis = pool.getResource(); String ret = jedis.setex(key, expireTime, value);}catch (Exception e) {}finally {if (pool != null && jedis != null) {pool.returnResourceObject(jedis);}}}@Overridepublic String GSetnx(String key) throws Exception {ShardedJedis jedis =try {jedis = pool.getResource();return jedis.get(key);}catch (Exception e) {}finally {if (pool != null && jedis != null) {pool.returnResourceObject(jedis);}}}
上一集:没有了 下一集:
相关文章:&&&&&&
最新添加资讯
24小时热门资讯
附近好友搜索基于redis分布式锁实现“秒杀” - 简书
基于redis分布式锁实现“秒杀”
最近在项目中遇到了类似“秒杀”的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。
所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。
一些可能的实现
刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。
上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。
那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。
何为分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。
具体的实现
先来看看一些redis的基本命令:SETNX key value如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是"SET if Not eXists"。expire KEY seconds设置key的过期时间。如果key已过期,将会被自动删除。del KEY删除key由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考。
需要考虑的问题
1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。
talk is cheap,show me the code
在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。
几个注解定义:cachelock是方法级的注解,用于注解会产生并发问题的方法:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default "";//redis 锁key的前缀
long timeOut() default 2000;//轮询锁的时间
int expireTime() default 1000;//key在redis里存在的时间,1000S
lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
//不需要值
LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default "";//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID
CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:
public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT
public CacheLockInterceptor(Object proxied) {
this.proxied =
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CacheLock cacheLock = method.getAnnotation(CacheLock.class);
//没有cacheLock注解,pass
if(null == cacheLock){
System.out.println("no cacheLock annotation");
return method.invoke(proxied, args);
//获得方法中参数的注解
Annotation[][] annotations = method.getParameterAnnotations();
//根据获取到的参数注解和参数列表获得加锁的参数
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
//新建一个锁
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){//取锁失败
ERROR_COUNT += 1;
throw new CacheLockException("get lock fail");
//加锁成功,执行方法
return method.invoke(proxied, args);
lock.unlock();//释放锁
* @param annotations
* @param args
* @throws CacheLockException
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException("方法参数为空,没有被锁定的对象");
if(null == annotations || annotations.length == 0){
throw new CacheLockException("没有被注解的参数");
//不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
int index = -1;//标记参数的位置指针
for(int i = 0;i & annotations.i++){
for(int j = 0;j & annotations[i].j++){
if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException("注解对象中没有该属性" + ((LockedComplexObject)annotations[i][j]).field());
if(annotations[i][j] instanceof LockedObject){
//找到第一个后直接break,不支持多参数加锁
if(index != -1){
if(index == -1){
throw new CacheLockException("请指定被锁定参数");
return args[index];
最关键的RedisLock类中的lock方法和unlock方法:
* 使用方式为:
executeMethod();
* }finally{
* @param timeout timeout的时间范围内轮询锁
* @param expire 设置锁超时时间
* @return 成功 or 失败
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
//在timeout的时间范围内不断轮询锁
while (System.nanoTime() - nanoTime & timeout) {
//锁不存在的话,设置锁并设置锁过期时间,即加锁
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
//锁的情况下锁过期后消失,不会造成永久阻塞
this.lock =
return this.
System.out.println("出现锁等待");
//短暂休眠,避免可能的活锁
Thread.sleep(3, RANDOM.nextInt(30));
} catch (Exception e) {
throw new RuntimeException("locking error",e);
void unlock() {
if(this.lock){
redisClient.delKey(key);//直接删除
} catch (Throwable e) {
上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。先定义一个接口,接口里定义了一个秒杀方法:
public interface SeckillInterface {
*现在暂时只支持在接口方法上注解
//cacheLock注解可能产生并发的方法
@CacheLock(lockedPrefix="TEST_PREFIX")
public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
上述SeckillInterface接口的实现类,即秒杀的具体实现:
public class SecKillImpl implements SeckillInterface{
static Map&Long, Long&
inventory = new HashMap&&();
inventory.put(L, 10000l);
inventory.put(L, 10000l);
public void secKill(String arg1, Long arg2) {
//最简单的秒杀,这里仅作为demo示例
reduceInventory(arg2);
//模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) - 1);
return inventory.get(commodityId);
模拟秒杀场景,1000个线程来争抢两个商品:
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();
Thread[] threads = new Thread[threadCount];
//起500个线程,秒杀第一个商品
for(int i= 0;i & splitPi++){
threads[i] = new Thread(new
Runnable() {
public void run() {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
threads[i].start();
//再起500个线程,秒杀第二件商品
for(int i= splitPi & threadCi++){
threads[i] = new Thread(new
Runnable() {
public void run() {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId2);
//testClass.testFunc("test", L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
threads[i].start();
long startTime = System.currentTimeMillis();
//主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
beginCount.countDown();
//主线程等待结束信号量
endCount.await();
//观察秒杀结果是否正确
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
System.out.println("total cost " + (System.currentTimeMillis() - startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。
这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。
源码仓库:

我要回帖

更多关于 基于数据库的分布式锁 的文章

 

随机推荐