如何通过redis实现队列简单队列

问题对人有帮助,内容完整,我也想知道答案
问题没有实际价值,缺少关键内容,没有改进余地
我有一个php代码,接受任意客户端的请求,现在想在后端加上redis的队列将消息传递到别的处理模块。
大致结构如下
broker.php -& redis -& worker.php
1,broker.php如何获取后端worker.php的处理结果(是引入一个响应队列么)
2,如果引入响应队列,broker.php如何将不同的结果对应返回给abc...客户端
3,如果php无法实现这种场景,有其它什么建议方法没,给点参考资料,thanks!
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
redis只是提供一个高性能的、原子操作的分布式队列实现。具体的业务还是得需要你自己定制。
你的需求实际上是一个变形的生产者-消费者实现。对于此类需求,主要是将请求和实际的处理过程解耦,一般都是采取异步的方式来通知请求方,这跟用不用redis其实没有多大的关系。一般的实现方法是你需要将用户的请求封装成一个Task,然后将这个Task再push到redis队列,然后后端的worker.php完全可以多进程、多线程的并发处理Task并将处理结果回调给请求方。这里唯一麻烦点的就是这个Task的设计,需要能够包含请求信息(请求内容,请求方标识等等).
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
redis.blpush模块:
redis.brpop文档:
//PHP我不太知道,这类问题应该都是这个思路吧?可以先用命令模拟出机制,然后调用库实现
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
模式:安全的队列
Redis通常都被用做一个处理各种后台工作或消息任务的消息服务器。 一个简单的队列模式就是:生产者把消息放入一个列表中,等待消息的消费者用 RPOP 命令(用轮询方式), 或者用 BRPOP 命令(如果客户端使用阻塞操作会更好)来得到这个消息。
然而,因为消息有可能会丢失,所以这种队列并是不安全的。例如,当接收到消息后,出现了网络问题或者消费者端崩溃了, 那么这个消息就丢失了。
RPOPLPUSH (或者其阻塞版本的 BRPOPLPUSH) 提供了一种方法来避免这个问题:消费者端取到消息的同时把该消息放入一个正在处理中的列表。 当消息被处理了之后,该命令会使用 LREM 命令来移除正在处理中列表中的对应消息。
另外,可以添加一个客户端来监控这个正在处理中列表,如果有某些消息已经在这个列表中存在很长时间了(即超过一定的处理时限), 那么这个客户端会把这些超时消息重新加入到队列中。
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
楼主是想要实现异步响应客户端的请求吧。把耗时的请求交给worker处理。建议加callback 实现。
同步到新浪微博
分享到微博?
你好!看起来你挺喜欢这个内容,但是你还没有注册帐号。 当你创建了帐号,我们能准确地追踪你关注的问题,在有新答案或内容的时候收到网页和邮件通知。还能直接向作者咨询更多细节。如果上面的内容有帮助,记得点赞 (????)? 表示感谢。
明天提醒我
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)
我要该,理由是:3164人阅读
java(295)
redis(29)
系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验。
有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈用户与否。
另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的处理掉高并发的任务。
消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来作。
使用redis怎么做消息队列
首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。
redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,所以可以用来做消息队列。
Redis实现先进先出队列
Redis实现FIFO很容易,只需要一个List对象从头取数据,从尾部塞数据即可实现。例如lpush存数据,brpop取数据。
Redis实现优先级队列
首先brpop和blpop是支持多list读取的,比如brpop lista listb 0 命令就可以实现先从lista读取数据,读取完lista的数据再去读取listb的数据。
那么我们就可以通过如下方式实现了:
127.0.0.1:6379& lpush a 1
(integer) 1
127.0.0.1:6379& lpush a 2
(integer) 2
127.0.0.1:6379& lpush a 3
(integer) 3
127.0.0.1:6379& lpush b 1
(integer) 1
127.0.0.1:6379& lpush b 2
(integer) 2
127.0.0.1:6379& lpush b 3
(integer) 3
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
127.0.0.1:6379& brpop a b 0
这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。
多优先级问题解决
如果优先级级别很多的情况,假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类&#99这么多级别。那么我们第三种方案将不太合适了。
虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。
有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。&
例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。
因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。
把上面的方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置 10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据 就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。
redis实现定时消息队列
由于Redis排序集合(Sorted Sets)没有实现阻塞功能,所以只能通过程序自己实现。score字段存入时间戳,由于时间戳较长我们用三位数字代替。
127.0.0.1:6379& zadd seta 100 a
(integer) 1
127.0.0.1:6379& zadd seta 200 b
(integer) 1
127.0.0.1:6379& zadd seta 300 c
(integer) 1
127.0.0.1:6379& zadd seta 300 d
(integer) 1
首先我们插入4条数据。
然后我们获取0到当前时间的数据。比如当前时间戳为200,那么我们执行如下命令
127.0.0.1:6379& zrangebyscore seta 0 200 limit 0 1
127.0.0.1:6379& zrem seta a
(integer) 1
127.0.0.1:6379& zrangebyscore seta 0 201 limit 0 1
127.0.0.1:6379& zrem seta b
(integer) 1
127.0.0.1:6379& zrangebyscore seta 0 202 limit 0 1
(empty list or set)
如果取到空数据,阻塞一段时间,然后继续取数据,循环执行即可。
这里我们为什么没有采用zremrangebyscore命令而是采用zrangebyscore和zrem组合,因为zremrangebyscore没有limit参数,可能取到多行数据(例如两个数据socore一样等),由于并发问题可能导致zrem返回0,这样也没事,我们继续取即可。
java代码片段:
public String getData() throws Exception {
& & Jedis jedis = getResource();
& & while (true) {
& & & & Set&String& seta = jedis.zrangeByScore(&seta&, 0, System.currentTimeMillis(), 0, 1);
& & & & if (seta != null && seta.size() & 0) {
& & & & & & String data = seta.toArray(new String[] {})[0];
& & & & & & Long res = jedis.zrem(&seta&, data);
& & & & & & if (res & 0) {
& & & & & & & &
& & & & & & }
& & & & Thread.sleep(1000L);
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:519175次
积分:10434
积分:10434
排名:第1581名
原创:452篇
转载:541篇
评论:30条
(14)(43)(29)(13)(10)(30)(10)(12)(5)(6)(97)(88)(42)(49)(16)(25)(18)(36)(43)(23)(36)(18)(33)(40)(66)(171)(22)(1)redis 怎么实现队列写入数据库【php吧】_百度贴吧
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&签到排名:今日本吧第个签到,本吧因你更精彩,明天继续来努力!
本吧签到人数:0成为超级会员,使用一键签到本月漏签0次!成为超级会员,赠送8张补签卡连续签到:天&&累计签到:天超级会员单次开通12个月以上,赠送连续签到卡3张
关注:135,952贴子:
redis 怎么实现队列写入数据库收藏
百度了半天都没找到思路,是获取缓存,然后写入吗
php学习选达内,O基础入学,名师指导1-4个月从入门到精通,先就业后付款「需高中以上!南昌php培训,达内培训O基础变php高手!免费培训7天试学
不获取怎么写
redis队列没有长度一说,只要不超限就可以一直写入。然后启动一个定时脚本,每次从队列里pop出来10条数据,进行mysql执行。
队列不就是你把数据pop进去,然后push出来不就好了吗
redis是单线程的,所以不会有队列长度,我这里的做法是,先把各种写请求push到redis,后端会有定时服务启动一个php去读取redis信息出来执行,定时5分钟启动一次,脚本循环时间大概是295秒,中间还是会有几秒的停顿。
登录百度帐号推荐应用redis list实现消息队列以及事件模块
你好,想跟你请教个问题:
您好,我是redis的初学者,想利用redis做消息队列,再看了您的文章后,我的理解还是不够透彻,所有想向您请教下。不知道您还有redis做消息队列这方面的资源或者文章没有?我查阅百度和谷歌没有找到合适的文章。特别是在消息处理这块,我不是很清楚如何处理消息,比如数据存储失败,我加到消息队列后,怎么根据该消息调用相应的处理方法。的在此先谢过了!
lpush-&rpop (你也可以右进左出。。)
存储失败和消息调用是在你的逻辑上处理的,不关redis
redis会丢失队列
× 不要点名谁来回答,否则你丢掉的是oschina千千万万的回答者
http://www.oschina.net/question/12_21851
你懂的不是消息队列,而是Redis能做什么。消息队列,就是队列,就是放数据,取数据的地方。什么时候放,时候取,怎么放,怎么取,都是你应该处理的。
我是刚刚看完http://my.oschina.net/yybear/blog/101493这篇文章来提问题的,起始消息队列我搞顶啦,用redis的sub/pub功能就好啦,但是我关心的是怎么处理,怎么根据取得的数据作相应的处理,比如,数据储存出错了,将相关数据加入消息队列后,在取得消息时,怎么再次存储一边。
--- 共有 1 条评论 ---
不是太懂你的意思,具体处理就是根据你的业务决定的。如果数据错误,处理失败可以记录一个日志,以便后面跟踪处理后使用快捷导航没有帐号?
查看: 1774|回复: 6
基于Redis实现分布式消息队列
金牌会员, 积分 1525, 距离下一级还需 1475 积分
论坛徽章:6
1、为什么需要消息队列?
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力。
再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送。
再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开。1:00到4:00和ERP联通,和电商系统断开。
再举个例子,服务员点菜快,厨师做菜慢。
再举个例子,到银行办事的人多,提供服务的窗口少。
乖乖排队吧。2、使用消息队列有什么好处?
2.1、提高系统响应速度
使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回,响应用户了。无需等待处理结果。处理结果可以让用户稍后自己来取,如医院取化验单。也可以让生产者订阅(如:留下手机号码或让生产者实现listener接口、加入监听队列),有结果了通知。获得约定将结果放在某处,无需通知。2.2、提高系统稳定性
考虑电商系统下订单,发送数据给生产系统的情况。
电商系统和生产系统之间的网络有可能掉线,生产系统可能会因维护等原因暂停服务。如果不使用消息队列,电商系统数据发布出去,顾客无法下单,影响业务开展。
两个系统间不应该如此紧密耦合。应该通过消息队列解耦。同时让系统更健壮、稳定。3、为什么需要分布式?
3.1、多系统协作需要分布式
消息队列中的数据需要在多个系统间共享数据才能发挥价值。
所以必须提供分布式通信机制、协同机制。3.2、单系统内部署环境需要分布式
单系统内部,为了更好的性能、为了避免单点故障,多为集群环境。
集群环境中,应用运行在多台服务器的多个JVM中;数据也保存在各种类型的数据库或非数据库的多个节点上。
为了满足多节点协作需要,需要提供分布式的解决方案。4、分布式环境下需要解决哪些问题
4.1、并发问题
需进行良好的并发控制。确保“线程安全“。不要出现一个订单被出货两次。不要出现顾客A下的单,发货发给了顾客B等情况。4.2、简单的、统一的操作机制
需定义简单的,语义明确的,业务无关的,恰当稳妥的统一的访问方式。4.3、容错
控制好单点故障,确保数据安全。4.4、可横向扩展
可便捷扩容。5、如何实现?
成熟的消息队列中间件产品太多了,族繁不及备载。成熟产品经过验证,接口规范,可扩展性强。结合事业环境因素、组织过程遗产、实施运维考虑、技术路线考虑、开发人员情况等原因综合考虑,基于自己做一个是最可行的选择。
1、消息队列需提供哪些功能?
在功能设计上,我崇尚奥卡姆剃刀法则。
对于消息队列,只需要两个方法: 生产 和 消费。
具体的业务场景是任务队列,代码设计如下:public abstract class TaskQueue{
& & private final S
& & public String getName(){return this.}& & public abstract void addTask(Serializable taskId);
& & public abstract Serializable popTask();
}同时支持多个队列,每个队列都应该有个名字。final确保TaskQueue是线程安全的。TaskQueue的实现类也应该确保线程安全。addTask向队列中添加一个任务。队列中仅保存任务的id,不存储任务的业务数据。popTask从队列中取出一个任务来执行。
这种设计不是特别友好,因为她需要调用者自行保证任务执行成功,如果执行失败,自行确保重新把任务放回队列。 无论如何,这种机制是可以工作的。想想奥卡姆剃刀法则,我们先按照这个设计实现出来看看。
如果调用者把业务数据存在数据库中,业务数据中包含“状态“列,标识任务是否被执行,调用者需要自行管理这个状态,并控制事务。popTask采用阻塞方式,还是非阻塞方式呢?
如果采用阻塞方式,队列中没任务的时候,客户端不会断开连接,只是等。
一般情况下,客户端会有多个worker抢着干活儿,几条狼一起等一个肉包子,画面太美。连接是重要资源,如果一直没活儿干,先放回池里,也不错。
先采用非阻塞的方式吧,如果队列是空的,popTask返回null,立即返回。2、后续可能提供的功能
2.1、引入Task生命周期概念
应用场景不同,需求也不同。
在严格的应用场景中,需要确保每个Task执行“成功“了。
对于上面提到的popTask后不管的“模式“,这是另外一种“运行模式“,两种模式可以并行存在。在这种新模式下,Task状态有3种:新创建(new,刚调用addTask加到队列中)、正在执行(in-process,调用popTask后,调用finish前)、完成(done,执行OK了,调用finishTask后)。
调整后的代码如下:public abstract class TaskQueue{& & private final S
& & public String getName(){return this.}& & public abstract int getMode();& & public abstract void addTask(Serializable taskId);
& & public abstract Serializable popTask();
& & public abstract void finishTask(Serializable taskId);
}2.2、增加批量取出任务的功能
popTask()一次取出一个任务,太磨叽了。
好比我们要买5瓶水,开车去超市买,每去一次买1瓶,有点儿啥。
我们需要一个一次取多个任务的方法。public abstract class TaskQueue{
& & ... ...
& & public abstract Serializable[] popTasks(long cnt);
2.3、增加阻塞等待机制
想象一种场景:
小明同学,取出一个任务,发现干不了,放回队列,再去取,取出来发现还是干不了,又放回去。反反复复。
小明童鞋肿么了?可能是他干活需要网络,网络断了。可能是他做任务需要写磁盘,磁盘满了。如果小明像邻居家的孩子一样优秀,当他发现哪里不对的时候,他应该冷静下来,歇会儿。但他万一不是呢?只有我们能帮他了。假如队列中有10000个待办任务。
这时候小明来了。他失败100次后,我们应该拦他吗?不应该,除非他主动要求(在系统参数中配置)。5000次后呢?也不应该,除非他主动要求。我们的原则是:我们做的所有事情,对于调用者,都是可以预期的。我们可以在系统参数中要求调用者设置一个阀值N,如果不设置,默认为100。连续失败N次后,让调用者睡一会儿,睡多长时间,让调用者配置。假如我们的底层实现中包含待办子队列、重做子队列和完成子队列(这种设计好复杂!pop的时候先pop重做,还是先pop待办,复杂死了!但愿不需要这样)。
待办子队列中有10000个任务。在小明失败10000次后,所有的任务都在重做子队列了。这时候我们应该拦他吗?
重做子队列要不要设置大小,超过之后,让下一个访问者等。
等的话就会涉及超时,超时后,任务也不能丢弃。
太复杂 了!设置一个连续失败次数的限制就够了!2.4、考虑增加Task类
不保存任务的相关数据是基本原则,绝对不动摇。
增加Task类可以管理下生命周期,更有用的是,可以把Task本身设计成Listener,代码大概时这样的:public abstract class Task{& & public Serializable getId();
& & public int getState();& & pubic void doTask();& & public void whenAdded(final TaskQueue tq);
& & public void whenPoped(final TaskQueue tq);
& & // public void whenFaild(final TaskQueue tq);
& & public void whenFinished(final TaskQueue tq);
}通过Task接口,我们可以对调用过程进行更强势的管理(如进行事务控制),对调用者施加更强的控制,用户也可以获得更多的交互机会,同TaskQueue有更好的交互(如在whenFinished中做持久化工作)。但这些真的有必要吗?是不是太侵入了?注解的方式会好些吗?
再考虑吧。2.5、增加系统参数
貌似需要个Config类了,不爽!
本来想做一个很小很精致的小东西的,如果必须再加吧。
如果做的话,需要支持properties、注解设置、api方式设置、Spring注入式设置,烦。次回预告:Redis本身机制和TaskQueue的契合。
1、Redis是什么鬼?
Redis是一个简单的,高效的,分布式的,基于内存的缓存工具。
假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务。简单,是Redis突出的特色。
简单可以保证核心功能的稳定和优异。2、性能
性能方面:Redis是足够高效的。
和Memecached对比,在数据量较小大情况下,Redis性能更优秀。
数据量大到一定程度的时候,Memecached性能稍好。简单结论:但总体上讲Redis性能已经足够好。// Ref: Redis性能测试
原则:Value大小不要超过1390Byte。经实验得知:
List操作和字符串操作性能相当,略差,几乎可以忽略。
使用Jedis自带pool,“每次从pool中取用完放回“ 和 “重用单个连接“ 相比,平均用时是3倍。这部分需要继续研究底层机制,采用更合理的实验方法进一步获得数据。
使用Jedis自带pool,性能上是满足当前访问量需要的,等有时间了再进一步深入。3、数据类型
Redis支持5种数据类型:字符串、Map、List、Set、Sorted Set。
List特别适合用于实现队列。提供的操作包括:
从左侧(或右侧)放入一个元素,从右侧(或左侧)取出一个元素,读取某个范围的元素,删除某个范围的元素。Sorted Set中元素是唯一的,可以通过名字找。
Map可以高效地通过key找。
假如我们需要实现finishTash(taskId),需要通过名字在队列中找元素,上面两个可能会用到。4、原子操作
实现分布式队列首要问题是:不能出现并发问题。Redis是底层是单线程的,命令执行是原子操作,支持事务,契合了我们的需求。Redis直接提供的命令都是原子操作,包括lpush、rpop、blpush、brpop等。Redis支持事务。通过类似 begin…[cancel]…commit的语法,提供begin…commit之间的命令为原子操作的功能,之间命令对数据的改变对其他操作是不可见的。类似关系型数据库中的存储过程,同时提供了最高级别的事务隔离级别。Redis支持脚本,每个脚本的执行是原子性的。做了一下并发测试:
写了个小程序,随机对List做push或pop操作,push的比pop的稍多。
记录每次处理的详细信息到数据库。
最后把List中数据都pop出来,详细记录每次pop详细信息。
统计push和pop是否相等,统计针对每条数据是否都有push和pop。
500并发,没有出现并发问题。5、集群
实现分布式队列另一个重要问题是:不能出现单点故障。Redis支持Master-Slave数据复制,从服务器设置 slave-of master-ip:port 即可。
集群功能可以由客户端提供。
客户端使用哨兵,可自动切换主服务器。由于队列操作都是写操作,从服务器主要目的是备份数据,保证数据安全。如果想基于 sharding 做多master集群,可以结合 zookeeper 自己做。Redis 3.0支持集群了,还没细看,应该是个好消息,等大家都用起来,没什么问题的话,可以考虑试试看。如果 master 宕掉,怎么办?
“哨兵”会选出一个新的master来。产生过程中,消息队列暂停服务。
最极端的情况,所有Redis都停了,当消息队列发现Redis停止响应时,对业务系统的请求应抛出异常,停止队列服务。
这样会影响业务,业务系统下订单、审批等操作会失败。如果可以接受,这是一种方案。
Redis整个集群宕掉,这种情况很少发生,如果真发生了,业务系统停止服务也是可以理解的。如果想要在Redis整个集群宕掉的情况下,消息队列仍继续提供服务。
方法是这样的:
启用备用存储机制,可以是zookeeper、可以是关系型数据库、可以是另外可用的Memecached等。
本地内存存储是不可取的,首先,同步多个客户端虚拟机内存数据太复杂,相当于自己实现了一个Redis,其次,保证内存数据存储安全太复杂。
备用存储机制相当于实现了另外一个版本的消息队列,逻辑一致,底层存储不同。这个实现可以性能低一些,保证最基本的原则即可。
想要保证不出现并发问题,由于消息队列程序同时运行在多个虚拟机中,对象锁、方法锁无效。需要有一个独立于虚拟机的锁机制,zookeeper是个好选择。
将关系型数据库设置为最高级别的事务隔离级别,太傻了。除了zk有其他好办法吗?Redis集群整个宕掉的同时Zookeeper也全军覆没怎么办?
这个问题是没有尽头的,提供了第二备用存储、第三备用存储、第四备用存储、…,理论上也会同时宕掉,那时候怎么办?
有钱任性的土豪可以继续,预算有限的情况,能做到哪步就做到哪步。6、持久化
分布式队列的应用场景和缓存的应用场景是不一样的。如果有没来得及持久化的数据怎么办?
从业务系统的角度,已经成功发送给消息队列了。
消息队列也以为Redis妥妥地收好了。
可Redis还没写到日记里,更没有及时通知小伙伴,挂了。可能是断电了,可能是进程被kill了。后果会怎样?
已经执行过的任务会再次执行一遍。
已经放到队列中的任务,消失了。
标记为已经完成的任务,状态变为“进行中”了,然后又被执行了一遍。
后果不可接受。分布式队列不允许丢数据。
从业务角度,哪怕丢1条数据也是无法接受的。
从运维角度,Redis丢数据后,如果可以及时发现并补救,也是可以接受的。从架构角度,队列保存在Redis中,业务数据(包括任务状态)保存在关系型数据库中。
任务状态是从业务角度确定的,消息队列不应该干涉。如果业务状态没有统一的规范和定义,从业务数据比对任务队列是否全面正确,就只能交给业务开发方来做。
从分工上来看,任务队列的目的是管理任务执行的状态,业务系统把这个职责交给了任务队列,业务系统自身的任务状态维护未必准确。
结论:任务队列不能推卸责任,不能丢数据是核心功能,不能打折扣。采用 Master-Slave 数据复制模式,配置bgsave,追加存储到aof。在从服务器上配置bgsave,不影响master性能。队列操作都是写操作,master任务繁重,能让slave分担的持久化工作,就不要master做。rdb和aof两种方法都用上,多重保险。
appendfsync设为always。// 单节点测性能,连续100000次算平均时间,和per second比对,性能损失不大。
性能会有些许损失,但任务执行为异步操作,无需用户同步等待,为了保证数据安全,这样是值得的。当运维需要重启Master服务器的时候,采取这样的顺序:
1. 通过 cli shutdown 停止master服务器, master交代完后事后,关掉自己。这时候“哨兵”会找一个新的master出来。
万万不可以直接kill或者直接打开防火墙中断master和slave之间的连接。
master 对外防火墙,停止对外服务,Master 自动切换到其他服务器上, 原 Master 继续持久化 aof,发送到原来各从服务器。
2. 在原 master 上进行运维操作。
3. 启动原 master,这时候它已经是从服务器了。耐心等待它从新 master 获取最新数据。观察 redis 日志输出,确认数据安全。
4. 对新的 master 重复1-3的操作。
5. 将以上操作写成脚本,自动化执行,避免人为错误。
1、访问Redis的工具类
public class RedisManager {& & private static Pool&Jedis&& & protected final static Logger logger = Logger.getLogger(RedisManager.class);& & static{
& && &&&try {
& && && && &init();
& && &&&} catch (Exception e) {
& && && && &e.printStackTrace();
& & }& & public static void init() throws Exception {& && &&&Properties props = ConfigManager.getProperties(&redis&);
& && &&&logger.debug(&初始化Redis连接池。&);
& && &&&if(props==null){
& && && && &throw new RuntimeException(&没有找到redis配置文件&);
& && &&&// 创建jedis池配置实例
& && &&&JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
& && &&&// 设置池配置项值
& && &&&int poolMaxTotal = Integer.valueOf(props.getProperty(&redis.pool.maxTotal&).trim());
& && &&&jedisPoolConfig.setMaxTotal(poolMaxTotal);& && &&&int poolMaxIdle = Integer.valueOf(props.getProperty(&redis.pool.maxIdle&).trim());
& && &&&jedisPoolConfig.setMaxIdle(poolMaxIdle);& && &&&long poolMaxWaitMillis = Long.valueOf(props.getProperty(&redis.pool.maxWaitMillis&).trim());
& && &&&jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);& && &&&logger.debug(String.format(&poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s &,
& && && && && & poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));& && &&&// 根据配置实例化jedis池
& && &&&String connectMode = props.getProperty(&redis.connectMode&);
& && &&&String hostPortStr = props.getProperty(&redis.hostPort&);& && &&&logger.debug(String.format(&host : %s &,hostPortStr));
& && &&&logger.debug(String.format(&mode : %s &,connectMode));& && &&&if(StringUtils.isEmpty(hostPortStr)){
& && && && &throw new OptimusException(&redis配置文件未配置主机-端口集&);
& && &&&String[] hostPortSet = hostPortStr.split(&,&);
& && &&&if(&single&.equals(connectMode)){
& && && && &String[] hostPort = hostPortSet[0].split(&:&);
& && && && &pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
& && &&&}else if(&sentinel&.equals(connectMode)){
& && && && &Set&String& sentinels = new HashSet&String&();& &
& && && && &for(String hostPort : hostPortSet){
& && && && && & sentinels.add(hostPort);
& && && && &}
& && && && &pool = new JedisSentinelPool(&mymaster&, sentinels, jedisPoolConfig);
& & }& & /**
& & * 使用完成后,必须调用 returnResource 还回。
& & * @return 获取Jedis对象
& & public static Jedis getResource(){
& && &&&Jedis jedis = pool.getResource();
& && &&&if(logger.isDebugEnabled()){
& && && && &logger.debug(&获得链接:& + jedis);
& & }& & /**
& & * 获取Jedis对象。
& & * 用完后,需要调用returnResource放回连接池。
& & * @param db 数据库序号
& & * @return
& & public static Jedis getResource(int db){
& && &&&Jedis jedis = pool.getResource();
& && &&&jedis.select(db);
& && &&&if(logger.isDebugEnabled()){
& && && && &logger.debug(&获得链接:& + jedis);
& & }& & /**
& & * @param jedis
& & public static void returnResource(Jedis jedis){
& && &&&if(jedis!=null){
& && && && &pool.returnResource(jedis);
& && && && &if(logger.isDebugEnabled()){
& && && && && & logger.debug(&放回链接:& + jedis);
& && && && &}
& & }& & /**
& & * 需要通过Spring确认这个方法被调用。
& & * @throws Exception
& & public static void destroy() throws Exception {
& && &&&pool.destroy();
}这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。2、队列接口
public interface TaskQueue {& & /**
& & * 获取队列名
& & * @return
& & String getName();& & /**
& & * 往队列中添加任务
& & * @param task
& & void pushTask(String task);& & /**
& & * 从队列中取出一个任务
& & * @return
& & String popTask();}用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。3、队列的Redis实现类
* 任务队列Redis实现。
* 采用每次获取Jedis并放回pool的方式。
* 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。
* 暂时先忍受这种低性能,不明确Jedis是否线程安全。
public class TaskQueueRedisImpl implements TaskQueue {& & private final static int REDIS_DB_IDX = 9;& & private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);& & private final S& & /**
& & * 构造函数。
& & * @param name
& & public TaskQueueRedisImpl(String name) {
& && &&&this.name =
& & }& & /* (non-doc)
& & * @see mon.mq.TaskQueue#getName()
& & public String getName() {
& && &&&return this.
& & /* (non-Javadoc)
& & * @see mon.mq.TaskQueue#pushTask(String)
& & public void pushTask(String task) {
& && &&&Jedis jedis =
& && &&&try{
& && && && &jedis = RedisManager.getResource(REDIS_DB_IDX);
& && && && &jedis.lpush(this.name, task);
& && &&&}catch(Throwable e){
& && && && &logger.error(e.getMessage(),e);
& && &&&}finally{
& && && && &if(jedis!=null){
& && && && && & RedisManager.returnResource(jedis);
& && && && &}
& & }& & /* (non-Javadoc)
& & * @see mon.mq.TaskQueue#popTask()
& & public String popTask() {
& && &&&Jedis jedis =
& && &&&String task =
& && &&&try{
& && && && &jedis = RedisManager.getResource(REDIS_DB_IDX);
& && && && &task = jedis.rpop(this.name);
& && &&&}catch(Throwable e){
& && && && &logger.error(e.getMessage(),e);
& && &&&}finally{
& && && && &if(jedis!=null){
& && && && && & RedisManager.returnResource(jedis);
& && && && &}
& & }}4、获取队列实例的工具类
*&&// 获得队列
*&&TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
*&&// 添加任务到队列
*&&String task = &task id&;
*&&tq.pushTask(task);
*&&// 从队列中取出任务执行
*&&String taskToDo = tq.popTask();
* @author liuhailong
public class TaskQueueManager {& & protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);& & private static Map&String, TaskQueueRedisImpl& queneMap = new ConcurrentHashMap&String, TaskQueueRedisImpl&();& & /**
& & * 短信队列名。
& & public static final String SMS_QUEUE = &SMS_QUEUE&;& & /**
& & * 规则队列名。
& & public static final String RULE_QUEUE = &RULE_QUEUE&;& & private static void initQueneMap() {
& && &&&logger.debug(&初始化任务队列...&);
& && &&&queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
& && &&&logger.debug(&建立队列:&+RULE_QUEUE);
& && &&&queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
& && &&&logger.debug(&建立队列:&+SMS_QUEUE);
& & }& & static {
& && &&&initQueneMap();
& & }& & public static TaskQueue get(String name){
& && &&&return getRedisTaskQueue(name);
& & }& & public static TaskQueue getRedisTaskQueue(String name){
& && &&&return queneMap.get(name);
& & }}和具体的队列过于紧耦合,但简单好用。
先跑起来再说。5、向队列中添加任务的代码
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);1
6、从队列中取出任务执行的代码
public class SmsSendTask{& & protected final static Logger logger = Logger.getLogger(SmsSendTask.class);& & protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
& & * 入口方法。
& & public void execute()&&{
& && &&&TaskQueue taskQueue =
& && &&&String task =
& && &&&try {
& && && && &taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);& && && && &// 非线程安全
& && && && &Set&Serializable& executedTaskSet = new HashSet&Serializable&();& && && && &task = taskQueue.popTask();
& && && && &while(task!=null){
& && && && && & // 判断是否把所有任务都执行一遍了,避免死循环
& && && && && & if(executedTaskSet.contains(task)){
& && && && && && &&&taskQueue.pushTask(task);
& && && && && && &&&
& && && && && & }& && && && && & executeSingleTask(taskQueue,task);& && && && && & task = taskQueue.popTask();
& && && && &}
& && &&&}catch(Throwable e){
& && && && &logger.error(e.getMessage(),e);
& && && && &e.printStackTrace();
& & }& & /**
& & * 发送单条短信。
& & * 取出任务并执行,如果失败,放回任务列表。
& & * @param taskQueue
& & * @param task
& & @SuppressWarnings({ &rawtypes&, &unchecked& })
& & private void executeSingleTask(TaskQueue taskQueue, String task) {
& && &&&try {
& && && && &// do the job
& && && && &String smsId =
& && && && &Map&String,String& sms = smsSendService.getSmsList(smsId);& && && && &smsSendService.send(sms);& && && && &smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);& && && && &String opType = &2&;
& && && && &TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
& && &&&} catch (Throwable e) {
& && && && &if(task!=null){
& && && && && & taskQueue.pushTask(task);
& && && && && & smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
& && && && && & if(logger.isDebugEnabled()){
& && && && && && &&&logger.error(String.format(&任务%s执行失败:%s,重新放回队列&, task, e.getMessage()));
& && && && && & }
& && && && &}else {
& && && && && & e.printStackTrace();
& && && && &}
& & }}这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。
有空再改。
中级会员, 积分 372, 距离下一级还需 128 积分
论坛徽章:2
我们可以使用Redis来缓存一些经常会被用到、或者需要耗费大量资源的内容,通过将这些内容放到Redis里面(也即是内存里面),程序可以以极快的速度取得这些内容。
& & 举个例子,对于一个网站来说,如果某个页面经常会被访问到,或者创建页面时耗费的资源比较多(比如需要多次访问数据库、生成时间比较长,等等),那么我们可以使用Redis将这个页面缓存起来,减轻网站的负担,降低网站的延迟值。
中级会员, 积分 279, 距离下一级还需 221 积分
论坛徽章:8
讲的很好,学习了~~~~~~~
新手上路, 积分 36, 距离下一级还需 14 积分
论坛徽章:1
好文章,学习了。。。。
金牌会员, 积分 1000, 距离下一级还需 2000 积分
论坛徽章:8
redis master-slave。
注册会员, 积分 112, 距离下一级还需 88 积分
论坛徽章:1
很全面,收了~~~
高级会员, 积分 796, 距离下一级还需 204 积分
论坛徽章:8
不错的资料 分享&&很好&&good!

我要回帖

更多关于 redis如何实现队列 的文章

 

随机推荐