为什么学习Redis作为消息队列服务器

您现在的位置: -->
--> Redis消息队列的若干实现方式
Redis消息队列的若干实现方式
最近忙着用Redis实现一个消息通知系统,今天大概总结了一下技术细节,其中演示代码如果没有特殊说明,使用的都是扩展来实现的。
比如要推送一条全局消息,如果真的给所有用户都推送一遍的话,那么会占用很大的内存,实际上不管粘性有多高的产品,活跃用户同全部用户比起来,都会小很多,所以如果只处理登录用户的话,那么至少在内存消耗上是相当划算的,至于未登录用户,可以推迟到用户下次登录时再处理,如果用户一直不登录,就一了百了了。
当大量用户同时登录的时候,如果全部都即时处理,那么很容易就崩溃了,此时可以使用一个队列来保存待处理的登录用户,如此一来顶多是反应慢点,但不会崩溃。
Redis的数据类型可以很自然的创建一个队列,代码如下:&?php
$redis = new R
$redis-&connect('/tmp/redis.sock');
$redis-&lPush('usr', &USRID&);
while ($usr = $redis-&rPop('usr')) {
var_dump($usr);
出于类似的原因,我们还需要一个队列来保存待处理的消息。当然也可以使用LIST来实现,但LIST只能按照插入的先后顺序实现类似FIFO或LIFO形式的队列,然而消息实际上是有优先级的:比如说个人消息优先级高,全局消息优先级低。此时可以使用来实现,它里面分数的概念很自然的实现了优先级。
不过ZSET没有原生的POP操作,所以我们需要模拟实现,代码如下:&?php
class RedisClient extends Redis
const POSITION_FIRST = 0;
const POSITION_LAST = -1;
public function zPop($zset)
return $this-&zsetPop($zset, self::POSITION_FIRST);
public function zRevPop($zset)
return $this-&zsetPop($zset, self::POSITION_LAST);
private function zsetPop($zset, $position)
$this-&watch($zset);
$element = $this-&zRange($zset, $position, $position);
if (!isset($element[0])) {
if ($this-&multi()-&zRem($zset, $element[0])-&exec()) {
return $element[0];
return $this-&zsetPop($zset, $position);
模拟实现了POP操作后,我们就可以使用ZSET实现队列了,代码如下:&?php
$redis = new RedisC
$redis-&connect('/tmp/redis.sock');
$redis-&zAdd('msg', &PRIORITY&, &MSGID&);
while ($msg = $redis-&zRevPop('msg')) {
var_dump($msg);
以前微博架构中推拉选择的问题已经被大家讨论过很多次了。实际上消息通知系统和微博差不多,也存在推拉选择的问题,同样答案也是类似的,那就是应该推拉结合。具体点说:在登陆用户获取消息的时候,就是一个拉消息的过程;在把消息发送给登陆用户的时候,就是一个推消息的过程。
假设要推送一百万条消息的话,那么最直白的实现就是不断的插入,代码如下:&?php
for ($msgid = 1; $msgid &= 1000000; $msgid++) {
$redis-&sAdd('usr:&USRID&:msg', $msgid);
Redis的速度是很快的,但是借助,会更快,代码如下:&?php
for ($i = 1; $i &= 100; $i++) {
$redis-&multi(Redis::PIPELINE);
for ($j = 1; $j &= 10000; $j++) {
$msgid = ($i - 1) * 10000 + $j;
$redis-&sAdd('usr:&USRID&:msg', $msgid);
$redis-&exec();
说明:所谓PIPELINE,就是省略了无谓的折返跑,把命令打包给服务端统一处理。
前后两段代码在我的测试里,使用PIPELINE的速度大概是不使用PIPELINE的十倍。
我们用Redis命令行来演示一下用户是如何查询消息的。
先插入三条消息,其&MSGID&分别是1,2,3:redis& HMSET msg:1 title title1 content content1
redis& HMSET msg:2 title title2 content content2
redis& HMSET msg:3 title title3 content content3
再把这三条消息发送给某个用户,其&USRID&是123:redis& SADD usr:123:msg 1
redis& SADD usr:123:msg 2
redis& SADD usr:123:msg 3
此时如果简单查询用户有哪些消息的话,无疑只能查到一些&MSGID&:redis& SMEMBERS usr:123:msg
如果还需要用程序根据&MSGID&再来一次查询无疑有点低效,好在Redis内置的命令可以达到事半功倍的效果,实际上它类似于SQL中的JOIN:redis& SORT usr:123:msg GET msg:*-&title
1) "title1"
2) "title2"
3) "title3"
redis& SORT usr:123:msg GET msg:*-&content
1) "content1"
2) "content2"
3) "content3"
SORT的缺点是它只能GET出字符串类型的数据,如果你想要多个数据,就要多次GET:redis& SORT usr:123:msg GET msg:*-&title GET msg:*-&content
1) "title1"
2) "content1"
3) "title2"
4) "content2"
5) "title3"
6) "content3"
很多情况下这显得不够灵活,好在我们可以采用其他一些方法平衡一下利弊,比如说新加一个字段,冗余保存完整消息的序列化,接着只GET这个字段就OK了。
实际暴露查询接口的时候,不会使用PHP等程序来封装,因为那会成倍降低RPS,推荐使用,它是一个Redis的Web代理,效率没得说。
最近发表了一篇类似的文章:,介绍了他们使用Redis实现消息通知系统的一些情况,有兴趣的不妨一起看看。
觉得文章有用?立即:
和朋友一起 共学习 共进步!
建议继续学习:
QQ技术交流群:,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
作者:&&&&来源:
发布时间: 00:08:49
建议继续学习
近3天十大热文
rightLowest
& 2009 - 2016主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
傻丫头和高科技产物小心翼翼的初恋
如今的编程是一场程序员和上帝的竞赛,程序员要开发出更大更好、傻瓜都会用到软件。而上帝在努力创造出更大更傻的傻瓜。目前为止,上帝是赢的。个人网站:。个人QQ群:、
编程小菜鸟
CSDN &《程序员》编辑/记者,我的邮箱
个人大数据技术博客:
Redis的作者Salvatore Sanfilippo(网名Antirez)2小时前发表了一篇,介绍了自己几个月以来在晚上和周末开发的新项目——Disque。
Disque是一个分布式的消息队列。与Redis有单结点和分布式模式不同,单一Disque结点也是只有一个结点的集群。它是一个AP系统,也就是具有Availability(可用性)和Partition tolerance(分区容错性)。另外它能在各种情形下保持高扩展性:无论是多生产者和多消费者处理多队列,还是所有生产者和消费者都在一个队列。
另外,Disque的设计中有一点重大牺牲,就是只尽力提供而不保证消息的排序。
Antirez还通过回答Adrian Colyer(AspectJ的作者,曾任SpringSource CTO)在Hacker News上某次讨论(但这一讨论没有查到)中所提问题的方式,更详细地描述了Disque的特性:
消息发送可以选择至少一次或者最多一次。
消息需要消费者确认。
如果没有确认,会一直重发,直至到期。确认信息会广播给拥有消息副本的所有结点,然后消息会被垃圾收集或者删除。
队列是持久的。
Disque默认只运行在内存里,持久性是通过同步备份实现的。
队列为了保证最大吞吐量,不是全局一致的,但会尽力提供排序。
在压力大的时候,消息不会丢弃,但会拒绝新的消息。
消费者和生产者可以通过命令查看队列中的消息。
队列尽力提供FIFO。
一组master作为中介,客户端可以与任一结点通信。
中介有命名的队列,无需消费者和生产者干预。
消息发送是事务性的,保证集群中会有所需数量的副本。
消息接收不是事务性的。
消费者默认是接收时是阻塞的,但也可以选择查看新消息。
生产者在队列满时发新消息可以得到错误信息,也可以让集群异步地复制消息。
支持延迟作业,粒度是秒,最久可以长达数年。但需要消耗内存。
消费者和生产者可以连接不同的结点。
Antirez之所以动念开发Disque,是因为看到很多人用Redis来处理队列,但这样做的优势和劣势都很明显:Redis很快、易用而且很多基础设施里已经在用;但是Redis的高可用性/集群特性的设计完全偏向可变数据结构,这与不可变的消息非常不同,并非最佳方案。
消息中介重要的功能是保证至少一次或者最多一次发送消息,而且前者更重要。Antirez开始想通过少量修改Redis来实现,但几天后发现客户端算法太复杂了。Redis已经有很多功能,再增加功能并非什么好主意。何况消息队列的运作方式与Redis很不同。
那么,是不是可以新开发一个消息队列呢?
世界上已经有很多消息队列了,新做一个有价值吗?Antirez想,既然有这么多人用Redis来处理消息队列,已有的方案看上去要么太简单要么太复杂,其中必有机会,于是他动手了。
他头一次没有直接写代码,而是花了几个星期思考设计,尝试从用户角度理解什么样的消息队列会让人更爽。主要的使用场景没变:延迟作业。Disque是通用系统,但主要针对的问题,是发送可能要处理的作业的消息。如果有什么违背了这一场景,就会被干掉。
设计有了,Antirez直接从Redis代码入手。幸运的是Redis部分就是编写C分布式系统的一个框架。协议、网络库、客户端处理、结点到结点的消息总线已经有了,无需重头再写。但他又不想影响Redis本身,于是采取了比较实际的办法:开一个Redis分支,然后将Redis专用的东西全部删掉,只剩一个框架,再开始实现设计。
到目前为止,他已经完成了80%左右的工作,还剩下AOF硬盘持久性没做,此外还需要对API做一些改善。
啥时候代码放出来啊,让我们期待吧。
上Parse.ly的CTO Andrew Montalenti(@pixelmonkey )将Disque和Kafka做了比较:
设计上AP和部分排序都很类似。但Disque会在发送完成后垃圾收集数据,而Kafka在SLA/TTL中保持所有消息,允许重新处理。Disque在服务器端处理最多一次,而Kafka是由客户端处理的。
Blekko的工程副总Chuck Mcmanis(@ChuckMcM)也给出了自己的经验:
My advice for folks building such systems are never depend on the 'time', always assume at-least-once, and build in-band error detection and correction to allow for computing the correct result from message stream 'n' where two or more invariants in your message protocol have been violated.redis消息队列取值失败分析
最近在项目开发过程中接触到了redis,遇到了一些问题,所以对redis的一般操作做了些学习。项目背景项目中使用了Jedis连接池和Redis消息队列的设计方案。queue1为任务队列,在完成业务逻辑后会将一个任务rpush到队列中。然后在项目启动时我们就开启了一个线程,不断地执行从连接池中的queue1中blpop出任务的操作。单线程示例以下代码是项目中该流程的单线程示例,即没有将取队列中任务的过程jedis.blpop在另一个线程中执行。JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(50);jedisPoolConfig.setMaxIdle(10);jedisPoolConfig.setMaxWaitMillis(1000);jedisPoolConfig.setTestOnBorrow(true);JedisPool pool = new JedisPool(jedisPoolConfig, "127.0.0.1",6379,1000);// 每次使用时 从连接池取Jedis jedis = pool.getResource();try {
Transaction transaction = jedis.multi();
transaction.rpush("myTest", "1");
transaction.exec();} finally {
pool.returnResource(jedis);}// 以阻塞的方式从队列上取值 , timeout 为等待时间,超过timeout取不到值返回NULLList&String& values = jedis.blpop(1000, "myTest");问题描述取任务时,有时成功有时失败。 在调试时,首先将断点打在transaction.exec()处,并在另一个取值线程的jedis.blpop处打上断点。 正常情况下,在执行了transaction.exec(),未执行jedis.blpop时,登录redis服务器,通过查看key值,应该会有该队列名的key。然而,我们并没有找到该key值。分析过程首先想到的,就是没有rpush成功。如果队列中没有值,该队列也就不存在,查看key值时当然也不会出现该队列名。但是经过反复实验,该过程是正确的,也没有出现异常和错误。 那就想到,是不是队列中的值已经被取走了。由于项目有专门的redis服务器,而该项目同时也有多人在开发。那很可能就是我这边的取值线程被阻塞了,但是别人的线程在取值。那最简单的就是把redis服务器换成本地服务器,设置为自己的电脑。解决方案在设置成本地redis后,就没有出现该问题。所以绕了一大圈,在开发过程中,应该在本地机器上做测试。Redis简单操作由于之前对redis不熟悉,所以在解决该问题的同时也做了一些简单学习。 首先,在windows系统下,要连接到远程的redis服务器或者在自己的机器搭建redis服务器,可以使用redis客户端。下载地址:/dmajkic/redis/downloads 下载下来的包里有32位和64位的,各取所需。 选择好适合自己系统的文件夹后,可以把这个文件夹复制到其它地方。连接到远程redis服务器 打开cmd窗口,进入包含redis-cli.exe的目录。命令行操作:redis-cli.exe -h 191.168.12.4 -p 6379 以上命令直接将ip和port替换成自己的即可。查看所有key值 keys * 以上命令*之前有一个空格查看key值对应的value 单一值的value:
get key1 列表值:
lrange queue1 0 -1 返回存储在 key 的列表里指定范围内的元素。 start 和 end 偏移量都是基于0的下标,即list的第一个元素下标是0(list的表头),第二个元素下标是1,以此类推。 偏移量也可以是负数,表示偏移量是从list尾部开始计数。 例如, -1 表示列表的最后一个元素,-2 是倒数第二个,以此类推。关于所有redis命令,可以搜索/commands.html本地搭建redis服务器 打开一个cmd窗口
使用cd命令切换目录,运行 redis-server.exe redis.conf 这时候再启动另外一个cmd窗口,原来的不要关闭,不然就无法访问服务端了。切换到redis目录下运行 redis-cli.exe -h 127.0.0.1 -p 6379 redis.conf配置文件解释,可以查看/lxx/archive//3116985.html
最新教程周点击榜
微信扫一扫&&国之画&&&&&&
&& &&&&&&&&&&&&&&&&&&
鲁ICP备号-4
打开技术之扣,分享程序人生!

我要回帖

 

随机推荐