python 消息队列 java 选型选型什么实用

Redis实现简单消息队列_Python开发者-爱微帮
&& &&& Redis实现简单消息队列
(点击上方公号,可快速关注)作者:人世间(@-人世间-)&链接:/p/9cba任务异步化打开浏览器,输入地址,按下回车,打开了页面。于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容。我们每天都在浏览网页,发送大大小小的请求给服务器。有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情。更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。从事异步任务的工具有很多。主要原理还是处理通知消息,针对通知消息通常采取是队列结构。生产和消费消息进行通信和业务实现。生产消费与队列上述异步任务的实现,可以抽象为生产者消费模型。如同一个餐馆,厨师在做饭,吃货在吃饭。如果厨师做了很多,暂时卖不完,厨师就会休息;如果客户很多,厨师马不停蹄的忙碌,客户则需要慢慢等待。实现生产者和消费者的方式用很多,下面使用Python标准库Queue写个小例子:import randomimport timefrom Queue import Queuefrom threading import Thread&queue = Queue(10)&class Producer(Thread):&&&&def run(self):&&&&&&&&while True:&&&&&&&&&&&&elem = random.randrange(9)&&&&&&&&&&&&queue.put(elem)&&&&&&&&&&&&print &厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完&.format(self.name, elem, queue.qsize())&&&&&&&&&&&&time.sleep(random.random())&class Consumer(Thread):&&&&def run(self):&&&&&&&&while True:&&&&&&&&&&&&elem = queue.get()&&&&&&&&&&&&print &吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃&.format(self.name, elem, queue.qsize())&&&&&&&&&&&&time.sleep(random.random())&def main():&&&&for i in range(3):&&&&&&&&p = Producer()&&&&&&&&p.start()&&&&for i in range(2):&&&&&&&&c = Consumer()&&&&&&&&c.start()&if __name__ == '__main__':&&&&main()大概输出如下:厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完Redis 队列Python内置了一个好用的队列结构。我们也可以是用redis实现类似的操作。并做一个简单的异步任务。Redis提供了两种方式来作消息队列。一个是使用生产者消费模式模式,另外一个方法就是发布订阅者模式。前者会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。后者也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是ping的。生产消费模式主要使用了redis提供的blpop获取队列数据,如果队列没有数据则阻塞等待,也就是监听。import redis&class Task(object):&&&&def __init__(self):&&&&&&&&self.rcon = redis.StrictRedis(host='localhost', db=5)&&&&&&&&self.queue = 'task:prodcons:queue'&&&&&def listen_task(self):&&&&&&&&while True:&&&&&&&&&&&&task = self.rcon.blpop(self.queue, 0)[1]&&&&&&&&&&&&print &Task get&, task&if __name__ == '__main__':&&&&print 'listen task queue'&&&&Task().listen_task()发布订阅模式使用redis的pubsub功能,订阅者订阅频道,发布者发布消息到频道了,频道就是一个消息队列。import redis&class Task(object):&&&&&def __init__(self):&&&&&&&&self.rcon = redis.StrictRedis(host='localhost', db=5)&&&&&&&&self.ps = self.rcon.pubsub()&&&&&&&&self.ps.subscribe('task:pubsub:channel')&&&&&def listen_task(self):&&&&&&&&for i in self.ps.listen():&&&&&&&&&&&&if i['type'] == 'message':&&&&&&&&&&&&&&&&print &Task get&, i['data']&if __name__ == '__main__':&&&&print 'listen task channel'&&&&Task().listen_task()Flask 入口我们分别实现了两种异步任务的后端服务,直接启动他们,就能监听redis队列或频道的消息了。简单的测试如下:import redisimport randomimport loggingfrom flask import Flask, redirect&app = Flask(__name__)&rcon = redis.StrictRedis(host='localhost', db=5)prodcons_queue = 'task:prodcons:queue'pubsub_channel = 'task:pubsub:channel'&@app.route('/')def index():&&&&&html = &&&&br&&center&&h3&Redis Message Queue&/h3&&br&&a href=&/prodcons&&生产消费者模式&/a&&br&&br&&a href=&/pubsub&&发布订阅者模式&/a&&/center&&&&&&&&return html&@app.route('/prodcons')def prodcons():&&&&elem = random.randrange(10)&&&&rcon.lpush(prodcons_queue, elem)&&&&logging.info(&lpush {} -- {}&.format(prodcons_queue, elem))&&&&return redirect('/')&@app.route('/pubsub')def pubsub():&&&&ps = rcon.pubsub()&&&&ps.subscribe(pubsub_channel)&&&&elem = random.randrange(10)&&&&rcon.publish(pubsub_channel, elem)&&&&return redirect('/')&if __name__ == '__main__':&&&&app.run(debug=True)启动脚本,使用siege -c10 -r 5 http://127.0.0.1:5000/prodconssiege -c10 -r 5 http://127.0.0.1:5000/pubsub可以分别在监听的脚本输入中看到异步消息。在异步的任务中,可以执行一些耗时间的操作,当然目前这些做法并不知道异步的执行结果,如果需要知道异步的执行结果,可以考虑设计协程任务或者使用一些工具如RQ或者celery等。【今日微信账号推荐】
点击展开全文
悄悄告诉你
更多同类文章
还可知道有多少人阅读过此篇文章哦
阅读原文和更多同类文章
可微信扫描右侧二维码关注后
还可知道有多少人阅读过此篇文章哦
人生苦短,我用 Python。伯乐在线旗下账号「Python开发者」分享 Python 相关的技术文章、工具资源、精选课程、热点资讯等。
您的【关注和订阅】是作者不断前行的动力
本站文章来自网友的提交收录,如需删除可进入
删除,或发送邮件到 bang@ 联系我们,
(C)2014&&版权所有&&&|&&&
京ICP备号-2&&&&京公网安备34python消息队列snakemq使用总结 - pythonic - 博客园
Python 消息队列snakemq总结
最近学习消息总线zeromq,在网上搜了python实现的消息总线模块,意外发现有个消息队列snakemq,于是拿来研究一下,感觉还是很不错的,入手简单使用也简单(比ice强多了),就是资料太少了,只能自己抠。
一、关于snakemq的官方介绍
1纯python实现,跨平台
2自动重连接
3可靠发送--可配置的消息方式与消息超时方式
4持久化/临时 两种队列
5支持异步 -- poll()
6symmetrical -- 单个TCP连接可用于双工通讯
7多数据库支持 -- SQLite、MongoDB&&
8brokerless - 类似的实现原理
9扩展模块:RPC, bandwidth throttling
以上都是官话,需要自己验证,动手封装了一下,感觉萌萌哒。
二、几个主要问题说明
1支持自动重连,不需要自己动手写心跳逻辑,你只需要关注发送和接收就行
2支持数据持久化,如果开始持久化,在重连之后会自动发送数据。
3数据的接收,snakemq通过提供回调实现,你只需要写个接收方法添加到回调列表里去。
4数据的发送,在此发送的都是bytes类型(二进制),因此需要转换。我在程序中测试的都是文本字符串,使用str.encode(&utf-8&)转换成bytes,接收时再转换回来。
5术语解释,Connector:类似于socket的TcpClient,Lisenter:类似于socket的TcpServer,每个connector或者listener都一个一个ident标识,发送和接收数据时就知道是谁的数据了。
6使用sqlite持久化时,需要修改源码,sqlite3.connect(filename,check_same_thread = False),用于解决多线程访问sqlite的问题。(会不会死锁?)
7启动持久化时,如果重新连上,则会自动发送,保证可靠。8为了封装的需要,数据接收以后,我通过callback方式传送出去。
说明代码中使用了自定义的日志模块
from common import nxlogger
import snakemqlogger as logger
可替换成logging的。
回调类(callbacks.py):
# -*- coding:utf-8 -*-
#*********************************************************#
# @@FileName: callbacks.py
# @@Author:
Shelwinnee&&
# @@Create Date:
# @@Modify Date:
# @@Description: simple synchronized
callbacks helper
#*********************************************************#
'''synchronized callback'''
class Callback(object):
def __init__(self):
self.callbacks = []
def add(self, func):
self.callbacks.append(func)
def remove(self, func):
self.callbacks.remove(func)
def __call__(self, *args, **kwargs):
for callback in self.callbacks:
&&&&&&&&&&& callback(*args, **kwargs)
Connector类(snakemqConnector.py):
# -*- coding:utf-8 -*-
#*********************************************************#
# @@FileName: snakemqConnector.py
# @@Author:
Shelwinnee&&
# @@Create Date:
# @@Modify Date:
# @@Description:&
# note:if message persistent is
necessary, please make sure you know the following instruction meaning,
# 'sqlite object created in a thread can
only be used in that same thread'
# this problem's solution: modify the
connection method of sqlite3, open file ('snakemq\storage\sqlite.py') ,and
modify the following code:
# self.conn =
sqlite3.connect(filename,check_same_thread = False)
#*********************************************************#
import threading
import snakemq
import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message
from snakemq.storage.sqlite import
SqliteQueuesStorage
from snakemq.message import
FLAG_PERSISTENT
from common.callbacks import Callback
from common import nxlogger
import snakemqlogger as logger
SnakemqConnector(threading.Thread):
&&&&&&&& def
__init__(self, snakemqident = None, remoteIp = "localhost",
remotePort = 9090, persistent = False):
&&&&&&&& &&&&&&&& super(SnakemqConnector,self).__init__()
&&&&&&&& &&&&&&&& self.messaging = None
&&&&&&&& &&&&&&&& self.link = None
&&&&&&&& &&&&&&&& self.snakemqident = snakemqident
&&&&&&&& &&&&&&&& self.pktr = None
&&&&&&&& &&&&&&&& self.remoteIp = remoteIp
&&&&&&&& &&&&&&&& self.remotePort = remotePort
&&&&&&&& &&&&&&&& self.persistent = persistent
&&&&&&&& &&&&&&&& self.on_recv = Callback()
&&&&&&&& &&&&&&&& self._initConnector()
&&&&&&&& def
run(self):
&&&&&&&& &&&&&&&& ("connector
start...")
&&&&&&&& &&&&&&&&
&&&&&&&& &&&&&&&& if self.link != None:
&&&&&&&& &&&&&&&& &&&&&&&& self.link.loop()
&&&&&&&& &&&&&&&& ("connector
&&&&&&&& def
terminate(self):
&&&&&&&& &&&&&&&& ("connetor
terminating...")
&&&&&&&& &&&&&&&& if self.link != None:
&&&&&&&& &&&&&&&& &&&&&&&& self.link.stop()
&&&&&&&& &&&&&&&& &&&&&&&& self.link.cleanup()
&&&&&&&& &&&&&&&& ("connetor
terminated")
&&&&&&&& def
on_recv_message(self, conn, ident, message):
&&&&&&&& &&&&&&&& try:
&&&&&&&& &&&&&&&& &&&&&&&& self.on_recv(ident,
message.data.decode('utf-8'))#dispatch received data
&&&&&&&& &&&&&&&& except Exception as e:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("connector
recv:{0}".format(e))
&&&&&&&& &&&&&&&& &&&&&&&& print(e)
&&&&&&&& '''send
message to dest host named destIdent'''
&&&&&&&& def
sendMsg(self, destIdent, byteseq):
&&&&&&&& &&&&&&&& msg = None
&&&&&&&& &&&&&&&& if self.persistent:
&&&&&&&& &&&&&&&& &&&&&&&& msg
= snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)
&&&&&&&& &&&&&&&& else:
&&&&&&&& &&&&&&&& &&&&&&&& msg
= snakemq.message.Message(byteseq, ttl=60)
&&&&&&&& &&&&&&&& if self.messaging == None:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("connector:messaging
is not initialized, send message failed")
&&&&&&&& &&&&&&&& &&&&&&&& return
&&&&&&&& &&&&&&&& self.messaging.send_message(destIdent,
&&&&&&&& '''
&&&&&&&& '''
&&&&&&&& def
_initConnector(self):
&&&&&&&& &&&&&&&& try:
&&&&&&&& &&&&&&&& &&&&&&&& self.link
= snakemq.link.Link()
&&&&&&&& &&&&&&&& &&&&&&&& self.link.add_connector((self.remoteIp,
self.remotePort))
&&&&&&&& &&&&&&&& &&&&&&&& self.pktr
= snakemq.packeter.Packeter(self.link)
&&&&&&&& &&&&&&&& &&&&&&&& if
self.persistent:
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& storage =
SqliteQueuesStorage("SnakemqStorage.db")
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& self.messaging = snakemq.messaging.Messaging(self.snakemqident,
"", self.pktr, storage)
&&&&&&&& &&&&&&&& &&&&&&&& else:
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& self.messaging =
snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)
&&&&&&&& &&&&&&&& &&&&&&&&
&&&&&&&& &&&&&&&& &&&&&&&& self.messaging.on_message_recv.add(self.on_recv_message)
&&&&&&&& &&&&&&&& &&&&&&&&
&&&&&&&& &&&&&&&& except Exception as e:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("connector:{0}".format(e))
&&&&&&&& &&&&&&&& finally:
&&&&&&&& &&&&&&&& &&&&&&&& ("connector[{0}]
loop ended...".format(self.snakemqident))
Listener类(snakemqListener.py):
# -*- coding:utf-8 -*-
#*********************************************************#
# @@FileName: snakemqListener.py
# @@Author:
Shelwinnee&&
# @@Create Date:
# @@Modify Date:
# @@Description:&
# note:if message persistent is
necessary, please make sure you know the following instruction meaning,
# 'sqlite object created in a thread can
only be used in that same thread'
# this problem's solution: modify the
connection method of sqlite3, open file ('snakemq\storage\sqlite.py') ,and
modify the following code:
# self.conn =
sqlite3.connect(filename,check_same_thread = False)
#*********************************************************#
import threading
import snakemq
import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message
from common import nxlogger
import snakemqlogger as logger
from common.callbacks import Callback
class SnakemqListener(threading.Thread):
&&&&&&&& def
__init__(self, snakemqident = None, ip = "localhost", port = 9090,
persistent = False):
&&&&&&&& &&&&&&&& super(SnakemqListener,self).__init__()
&&&&&&&& &&&&&&&& self.messaging = None
&&&&&&&& &&&&&&&& self.link = None
&&&&&&&& &&&&&&&& self.pktr = None
&&&&&&&& &&&&&&&& self.snakemqident = snakemqident
&&&&&&&& &&&&&&&& self.ip =
&&&&&&&& &&&&&&&& self.port = port
&&&&&&&& &&&&&&&& self.connectors = {}
&&&&&&&& &&&&&&&& self.on_recv = Callback()
&&&&&&&& &&&&&&&& self.persistent = persistent
&&&&&&&& &&&&&&&& self._initlistener()
&&&&&&&& '''
&&&&&&&& thread
&&&&&&&& '''
&&&&&&&& def
run(self):
&&&&&&&& &&&&&&&& ("listener
start...")
&&&&&&&& &&&&&&&&
&&&&&&&& &&&&&&&& if self.link != None:
&&&&&&&& &&&&&&&& &&&&&&&& self.link.loop()
&&&&&&&& &&&&&&&& ("listener
&&&&&&&& '''
&&&&&&&& terminate
snakemq listener thread
&&&&&&&& '''
&&&&&&&& def
terminate(self):
&&&&&&&& &&&&&&&& ("listener
terminating...")
&&&&&&&& &&&&&&&& if self.link != None:
&&&&&&&& &&&&&&&& &&&&&&&& self.link.stop()
&&&&&&&& &&&&&&&& &&&&&&&& self.link.cleanup()
&&&&&&&& &&&&&&&& ("listener
terminated")
&&&&&&&& '''
&&&&&&&& receive
message from host named ident
&&&&&&&& '''
&&&&&&&& def
on_recv_message(self, conn, ident, message):
&&&&&&&& &&&&&&&& try:
&&&&&&&& &&&&&&&& &&&&&&&& self.on_recv(ident,
message.data.decode('utf-8'))#dispatch received data
&&&&&&&& &&&&&&&& &&&&&&&& self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))
&&&&&&&& &&&&&&&& except Exception as e:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("listener
recv:{0}".format(e))
&&&&&&&& &&&&&&&& &&&&&&&& print(e)
&&&&&&&& def
on_drop_message(self, ident, message):
&&&&&&&& &&&&&&&& print("message dropped",
ident, message)
&&&&&&&& &&&&&&&& logger.debug("listener:message
dropped,ident:{0},message:{1}".format(ident, message))
&&&&&&&& '''client
connect'''
&&&&&&&& def
on_connect(self, ident):
&&&&&&&& &&&&&&&& logger.debug("listener:{0}
connected".format(ident))
&&&&&&&& &&&&&&&& self.connectors[ident] = ident
&&&&&&&& &&&&&&&& self.sendMsg(ident,
"hello".encode('utf-8'))
&&&&&&&& '''client
disconnect'''
&&&&&&&& def
on_disconnect(self, ident):
&&&&&&&& &&&&&&&& logger.debug("listener:{0}
disconnected".format(ident))
&&&&&&&& &&&&&&&& if ident in self.connectors:
&&&&&&&& &&&&&&&& &&&&&&&& self.connectors.pop(ident)
&&&&&&&& '''
&&&&&&&& listen
start loop
&&&&&&&& '''
&&&&&&&& def
_initlistener(self):
&&&&&&&& &&&&&&&& try:
&&&&&&&& &&&&&&&& &&&&&&&& self.link
= snakemq.link.Link()
&&&&&&&& &&&&&&&& &&&&&&&& self.link.add_listener((self.ip,
self.port))
&&&&&&&& &&&&&&&& &&&&&&&& self.pktr
= snakemq.packeter.Packeter(self.link)
&&&&&&&& &&&&&&&& &&&&&&&& self.pktr.on_connect.add(self.on_connect)
&&&&&&&& &&&&&&&& &&&&&&&& self.pktr.on_disconnect.add(self.on_disconnect)
&&&&&&&& &&&&&&&& &&&&&&&& if
self.persistent:
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& storage =
SqliteQueuesStorage("SnakemqStorage.db")
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& self.messaging =
snakemq.messaging.Messaging(self.snakemqident, "", self.pktr,
&&&&&&&& &&&&&&&& &&&&&&&& else:
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& self.messaging =
snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)
&&&&&&&& &&&&&&&& &&&&&&&&
&&&&&&&& &&&&&&&& &&&&&&&& self.messaging.on_message_recv.add(self.on_recv_message)
&&&&&&&& &&&&&&&& &&&&&&&& self.messaging.on_message_drop.add(self.on_drop_message)
&&&&&&&& &&&&&&&& except Exception as e:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("listener:{0}".format(e))
&&&&&&&& &&&&&&&& finally:
&&&&&&&& &&&&&&&& &&&&&&&& ("listener:loop
ended...")
&&&&&&&& '''send
message to dest host named destIdent'''
&&&&&&&& def
sendMsg(self, destIdent, byteseq):
&&&&&&&& &&&&&&&& msg = None
&&&&&&&& &&&&&&&& if self.persistent:
&&&&&&&& &&&&&&&& &&&&&&&& msg
= snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)
&&&&&&&& &&&&&&&& else:
&&&&&&&& &&&&&&&& &&&&&&&& msg
= snakemq.message.Message(byteseq, ttl=60)
&&&&&&&& &&&&&&&& if self.messaging == None:
&&&&&&&& &&&&&&&& &&&&&&&& logger.error("listener:messaging
is not initialized, send message failed")
&&&&&&&& &&&&&&&& &&&&&&&& return
&&&&&&&& &&&&&&&& self.messaging.send_message(destIdent,
测试代码connector(testSnakeConnector.py):
读取本地一个1M的文件,然后发送给listener,然后listener发回一个hello的信息。
from netComm.snakemq import
snakemqConnector
import time
import sys
def received(ident, data):
&&&&&&&& print(data)
if __name__ == "__main__":
&&&&&&&& bob
= snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)
&&&&&&&& bob.on_recv.add(received)
&&&&&&&& bob.start()
&&&&&&&& try:
&&&&&&&& &&&&&&&& with
open("testfile.txt",encoding='utf-8') as f:
&&&&&&&& &&&&&&&& &&&&&&&& txt
= f.read()
&&&&&&&& &&&&&&&& &&&&&&&& for
i in range(100):
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& bob.sendMsg("niess",txt.encode('utf-8'))
&&&&&&&& &&&&&&&& &&&&&&&& &&&&&&&& time.sleep(0.1)
&&&&&&&& except
Exception as e:
&&&&&&&& &&&&&&&& print(e)
&&&&&&&& time.sleep(5)
&&&&&&&& bob.terminate()&&&&&
测试代码listener(testSnakeListener.py):
from netComm.snakemq import
snakemqListener
import time
def received(ident, data):
&&&&&&&& filename
"log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))
&&&&&&&& file
= open(filename,'w')
&&&&&&&& file.writelines(data)
&&&&&&&& file.close()
if __name__ == "__main__":
&&&&&&&& niess
snakemqListener.SnakemqListener("niess","10.16.5.45",4002)
&&&&&&&& niess.on_recv.add(received)
&&&&&&&& niess.start()
&&&&&&&& print("niess
start...")
&&&&&&&& time.sleep(60)
&&&&&&&& niess.terminate()&&&
&&&&&&&& print("niess
四、测试结果
&应该与其它消息类组件对比后出结论。应付一个小项目绝对杠杠滴。

我要回帖

更多关于 消息队列 技术选型 的文章

 

随机推荐