skype 在S3之后弹出错误对话框 make sure the patch or intelnet address ipandas corrrent

python知识点(13)
Twsited网络框架
Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等等
简而言之,事件驱动分为两个部分,第一、注册事件;第二、出发事件。
自定义事件驱动框架
event_list = []
def run():
for event in event_list:
obj = event()
obj.execute()
class BeasHandler(object):
def execute(self):
raise Exception('你必须自己创建该方法')
import Twsited_test01
class MyHandler(Twsited_test01.BeasHandler):
def execute(self):
print('自定义的事件驱动——MyHandler')
Twsited_test01.event_list.append(MyHandler)
Twsited_test01.run()
Protocol买描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中例子。
Protocols实现了IProtocols接口,它包含如下的方法:
- makeConnection
在transport对象和服务器之间建立一条连接
- connectionMade
链接建立起来后调用
- dataReceived
接收数据时调用
- connectionLost
关闭链接时调用
Transport代表网络中两个通信节点之间的连接。Transport负责描述连接的细节,比如连接是面向流式的还是面向数据包的,流控以及可靠性。TCP、UDP和Unix套接字可作为transport的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性“,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transport实现了ITransport接口,它包含如下方法:
以非阻塞的方式按顺序依次将数据写到物理连接上
- writeSequence
将一个字符串列表写到物理连接上
- loseConnection
将所有挂起的数据写入,然后关闭连接
取得连接中对端的地址信息
取得连接中本端的地址信息
将transport从协议中分离出来也是的这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
一个学说话服务器的例子
from twisted.internet import protocol
from twisted.internet import reactor
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
def main():
factory = protocol.ServerFactory()
factory.protocol = Echo
reactor.listenTCP(12345, factory)
reactor.run()
if __name__ == '__main__':
from twisted.internet import reactor, protocol
class EchoClient(protocol.Protocol):
def connectionMade(self):
self.transport.write(bytes("hello dingyi", encoding='utf8'))
def dataReceived(self, data):
print("服务端返回的信息:%s" % str(data, encoding = 'utf8'))
self.transport.loseConnection()
def connectionLost(self, reason):
print('链接断开')
class EchoFactory(protocol.ClientFactory):
protocol = EchoClient
def clientConnectionFailed(self, connector, reason):
print('连接失败')
reactor.stop()
def clientConnectionLost(self, connector, reason):
print('链接丢失')
reactor.stop()
def main():
f = EchoFactory()
reactor.connectTCP("127.0.0.1", 12345, f)
reactor.run()
if __name__ == '__main__':
运行服务器脚本将会启动一个TCP服务器,监听端口12345上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本对服务器发起一个TCP连接,回显服务端的回应后终止连接并停止reactor事件循环。这的Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。
一个传送文件的例子
import optparse, os
from twisted.internet.protocol import ServerFactory, Protocol
def parse_args():
usage = '''usage: %prog [options] poetry-file
This is the Fast Poetry Server, Twisted edition.
Run it like this:
python fastpoetry.py &path-to-poetry-file&
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-server-1/fastpoetry.py poetry/ecstasy.txt
to serve up John Donne's Ecstasy, which I know you want to do.
parser = optparse.OptionParser(usage)
help = 'The port to listen on. Default to a random available port.'
parser.add_option('--port', type='int', help = help)
help = "The interface to listen on. Default is localhost."
parser.add_option('--iface', help=help, default='localhost')
options, args = parser.parse_args()
print('--args:',options, args)
if len(args) != 1:
parser.error('只能传送一个文件')
poetry_file = args[0]
if not os.path.exists(args[0]):
parser.error("找不到这个文件%s" % poetry_file)
return options, poetry_file
class PoetryProtocol(Protocol):
def connectionMade(self):
self.transport.write(bytes(self.factory.poem, encoding = 'utf8'))
self.transport.loseConnection()
class PoetryFactory(ServerFactory):
protocol = PoetryProtocol
def __init__(self, poem):
self.poem = poem
def main():
options, poetry_file = parse_args()
poem = open(poetry_file).read()
factory = PoetryFactory(poem)
from twisted.internet import reactor
port = reactor.listenTCP(options.port or 9000, factory, interface = options.iface)
print('serving %s on %s.' % (poetry_file, port.getHost()))
reactor.run()
if __name__ == '__main__':
import optparse
from twisted.internet.protocol import Protocol, ClientFactory
def parse_args():
usage = '''usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 3.0
Run it like this:
python get-poetry-1.py port1 port2 port3 ...
parser = optparse.OptionParser(usage)
_,addresses = parser.parse_args()
if not addresses:
print(parser.format_help())
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('端口必须是数字')
return host, int(port)
return map(parse_address, addresses)
class PoetryProtocol(Protocol):
def dataReceived(self, data):
data = str(data, encoding='utf8')
self.poem += data
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(poem)
class PoetryClientFactory(ClientFactory):
protocol = PoetryProtocol
def __init__(self, callback):
self.callback = callback
def poem_finished(self, poem):
self.callback(poem)
def get_poetry(host, port, callback):
Download a poem from the given host and port and invoke callback(poem)
when the poem is complete.
from twisted.internet import reactor
factory = PoetryClientFactory(callback)
reactor.connectTCP(host, port, factory)
def poetry_main():
addresses = []
get_map = parse_args()
for i in get_map:
addresses.append(i)
from twisted.internet import reactor
poems = []
def got_poem(poem):
poems.append(poem)
if len(poems) == len(addresses):
reactor.stop()
for address in addresses:
host, port = address
get_poetry(host, port, got_poem)
reactor.run()
for poem in poems:
print(poem)
if __name__ == '__main__':
poetry_main()
Twsited深入
twsited很深奥,需要很长事件的学习,这里只是介绍了部分的内容,更具体的部分请参看下面的博客
缓存数据库介绍
NoSQL(NoSQL = Not Only SQL),即“不仅仅只是SQL”,泛指非关系行的数据库,随着互联网web2.0网站的兴起,传统的关系型数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0存动态网站已经显得力不从心了,暴露了很多难以克服的问题,而非关系型的数据库由于其本身的特点,得到了非常迅速的发展。NoSQL数据库的产生就是为了解决大规模数据集合多种数据种类带来的挑战,尤其是大数据应用难题。
NoSQL数据库的四大分类
键值(key-value)存储数据库
这一类数据库主要会使用到一个哈希表,这个表中有一个特定的键和一个指针指向特定的数据。Key/value模型对于IT系统来说有优势在于简单,易部署。但是DBA支队部分值进行查询或更新的时候,Key-Value就显得效率低下了。例如:Tokyo Cabinet/Tyrant, Redis, Voldemort, Oracle BDB.
典型应用场景:内容缓存,主要用于处理大量数据的高访问负载,也用于一些日志系统等。
数据模型:Key指向Value的键值对,通常是hash table来实现
优点:查找速度快
缺点:数据无结构化,通常指被当做字符串或者二进制数据
列存储数据库
这部分数据通常是用来应对分布式存储的海量数据。键仍然存在,但是他的特点是指向了多个列。这些列是由列家族安排的。如:Cassandra, HBase, Riak.
典型应用场景:分布式文件系统
数据模型:以列簇式存储,将同一列数据存在一起
优点:查找速度快,可扩展性强,更容易进行分布式扩展
缺点:功能相对局限
文档型数据库
文档型书库的灵感来自于Lotus Notes办公软件,而且他同第一种键值存储类似。该模型的数据模型是版本化的文档,半结构化的文档以特定的格式存储,比如JSON。文档型数据库可以看做是键值型数据库的升级版,允许之间嵌套键值。而且文档型数据库比键值型数据的查询效率更高。如:CouchDB, MongoDb. 国内也有文档型数据库SequoiaDB,已经开源。
典型应用场景:Web应用(与Key-Value类似,Value是结构化的,不同的是数据库能够了解Value的内容)
数据模型:Key-Value对应的键值对,Value为结构化数据
优点:数据结构要求不严格,表结构可变,不需要像关系型数据库那样需要预先定义表结构
缺点:查询性能不高,而且缺乏统一的查询语法
图形(Graph)数据库
图形结构的数据库同其他行列以及刚性结构的SQL数据库不同,它是使用灵活的图形模型,并且能够扩展到多个服务器上。NoSQL数据库没有标准的查询语言(SQL),因此进行数据库查询需要定制数据模型。许多NoSQL数据库都有REST式的数据接口或者查询API。例如:Neo4J, InfoGrid, Infinite Graph.
典型应用场景:社交网络、推荐系统等。专注于构建关系图谱
数据模型:图结构
优点:利用图结构相关算法,比如最短路径寻址,N度关系查找等
缺点:很多时候需要对整个图做计算才能得出需要的信息,而且这种结构不好做分布式的集群方案。
NoSQL数据库在以下情况下比较试用:
1. 数据模型比较简单
2. 需要灵活性更强的IT系统
3. 对数据库性能要求高
4. 不需要高度的数据一致性
5. 对给定Key,比较容易映射复杂值的环境
Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。他通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon)是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。
安装memcached
sudo apt-get install memcached
启动Memcached
memcached -d -m 10 -u root -l 127.0.0.1 -p 11100 -c 128 -P /tmp/memcached.pid
参数说明:
-d 是启动一个守护进程
-m 是分配给Memcache使用的内存数量,单位是MB
-u 是运行mencached的用户
-l 是监听的服务器IP地址
-p 是设置Memcached监听的端口,最好是1024以上的端口
-c 是最大运行的并发连接数,默认是1024,按照尼福取其的负载量来设定
-P 是设置保存Memcached的pid文件
memcached命令
- 存储命令:set,add,replace,append,prepend,cas
- 获取命令:get,gets
- 其他命令:delete、stats..
python操作memcached
sudo pip3 install Python-memcached
1、第一个小例子
import memcache
mc = memcache.Client(['127.0.0.1:11100',], debug=True)
mc.set('foo', 'bar')
ret = mc.get('foo')
print(ret)
py@py-dy:~/桌面/memcached$ python3 memcached_test01.py
2、自带集群功能
Python-memcached模块原声支持集群操作,其原理实在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比
那么在内存中主机列表为:
host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]
如果用户根据需要在内存中创建一个键值对(如:k1 = ‘v1’),那么要执行下一步骤:
- 根据算法价将K1转化成一个数字
- 将数字和主机列表长度求余数,得到一个值N(0&=N&=列表长度)
- 在主机列表中根据第2步得到的值为索引获取主机,例如:host_list[N]
- 连接 将第3步中获取的主机,将k1=’v1’放置在该服务器的内存中
代码实现如下:
mc = memcache。Client([('192.168.0.1:12000',1), ('192.168.0.2:12000', 2), ('192.168.0.3:12000',1)], debug = True)
mc.set('k1':'v1')
添加一条键值对,如果key存在则报错
#!/usr/bin/env python3
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.add('k2','v2')
#第二次执行时报的错
#MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
4、relpace
修改某个KEY的值,如果key不存在,则异常
#!/usr/bin/env python3
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.replace('k2','v11')
#报错的异常
#MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
5、set和set_multi
设置一个键值对,如果key不存在,则创建,如果key存在则修改
设置对个键值对,如果key不存在,则创建,如果key存在则修改
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.set('k4','v4')
mc.set_multi({'k01':'v01', 'k02':'v02'})
delete和delete_mnlti
在memcached中删除一个指定的键值对
delete_multi
在memcached中删除多个指定的键值对
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.delete('k4')
mc.delete_multi(['k01', 'k02'])
get和get_multi
获取一个键值对
获取多个键值对
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug=True)
val = mc.get('k2')
print('val:', val)
vals = mc.get_multi(['k1', 'k2'])
print('vals:', vals)
append和prepend
修改指定key的值,在该值后面追加内容
修改指定key的值,在该值前面插入内容
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.add('k1', 'v1')
mc.append('k1', 'after')
mc.prepend('k1', 'before')
a = mc.get('k1')
decr和incr
自增,将memcached中的某一个值增加N(N默认为1)
自减,将memcached中的某一个值减少N(N默认为1)
#!/usr/bin/env python3
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug= True)
mc.set('k1', '777')
mc.incr('k1')
mc.incr('k1', 10)
mc.decr('k1')
mc.decr('k1', 10)
类比商城商品剩余个数,假设改值保存在memcache中,product_count = 900
A用户刷新页面从memcache中读取到product_count = 900
B用户刷新页面从memcache中读取到product_count = 900
如果A、B均购买商品
A用户修改商品剩余个数,product_count = 899
B用户修改商品剩余个数,product_count = 899
如此一来缓存内的数据就不正确了,两个用户购买商品后剩余个数是899
如果使用python的set和get来操作上述过程,就会得到上述结果
如果想要避免这种情况,就需要使用gets和cas
import memcache
mc = memcache.Client(['127.0.0.1:11100'], debug = True)
mc.set('product_count', '900')
v = mc.gets('product_count')
mc.cas('product_count', '889')
v = mc.get('product_count')
注:本质上每次执行gets时,会从memcache中获取一个自增数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不相等,那表示在gets和cas执行之间,又有其他人修改了gets(获取了缓冲的指定值),这就有可能产生不正确的数据,所以就不能修改该值了
redis是业界主流的key-vlaue nosql数据库之一。和memcached类似,它支持存储的v安略类型相对更多,保罗string(字符),list(列表),set(集合),zset(有序集合)和hash(哈希类型),这些数据类型都支持push/pop、add/remove及取交集和差集以及更丰富的操作,而且这些操作都是原子性的(要么成功要么失败,不会只完成一半)。在此基础上,redis支持各种不同的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别是redis会周期性的吧更新的数据写入磁盘或者把修改操作写入追加的记录文件,并在此基础上实现了主从同步。
- 异常快速:Redis是非常快的,每秒可执行大约110000次设置操作,81000个/每秒的读取操作。
- 支持丰富的数据类型:Redis支持最大多数开发人员已经知道的如列表、集合、可排序集和、哈希等数据类型。这使得在应用中很容易解决各种问题。
- 操作都是原子的:所有Redis的操作都是原子的,从而确保当两个客户同事访问Redis服务器得到的是更新后的值(最新值)。
- MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存、消息传递队列中使用(Redis原生支持发布/订阅)。在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据。
Redis安装环境
sudo apt-get update
sudo apt-get install redis-server
redis-server
查看/进入redis
py@py-dy:~/桌面/memcached$ redis-cli
127.0.0.1:6379&
python操作Redis
sudo pip3 install redis
图形管理工具
要在Ubuntu 上安装 Redis桌面管理,可以从 http:
Redis 桌面管理器会给你用户界面来管理 Redis 键和数据。
Redis API使用
redis-py的API使用可以分类为:
- 连接方式
- string操作
- Hash操作
- List操作
- Sort Set操作
- 发布订阅
1、操作模式
redis-py提供两类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py
import redis
r = redis.Redis(host='127.0.0.1', port=6379)
r.set('foo', 'bar')
print(r.get('foo'))
py@py-dy:~/桌面/redis$ python3 redis_test01.py
redis-py 使用connectoin poll来管理,对一个redis server的所有连接,避免每次建立、释放链接的开销。默认,每次Redis实力都会维护自己的一个连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实力共享一个连接池。
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print r.get('foo')
1、string操作
在redis中的string在内存中按照一个name对应一个value来存储。
n1 ————& v1
n2 ————& v2
n3 ————& v3
set(name, value, ex=None, px=None, nx=False, xx=False)
在redis中设置值,默认,不存在则创建、存在则修改
ex,过期时间(秒)
px,过期时间(毫秒)
nx,如果设置为True,则只有name不存在时,当前set操作才执行
xx,如果设置为True,则只有name存在时,当前set操作才执行
setnx(name, value)
设置值,只有name不存在时,执行设置操作(添加)
setex(name, value, time)
设置值,time参数是过期时间(数字秒或timedelta对象)
psetsx(name, time_ms, value)
设置值,参数time_ms是 过期时间(数字毫秒或timedelta对象)
mset(*args, **kwargs)
批量设置值
mset(k1 = 'v1', k2 = 'v2')
mset({'k1':'v1', 'k2':'v2'})
mget(keys, *args)
mget('k1', 'k2')
mget(['k1', 'k2'])
getset(name, value)
设置新值并获取原来的值
getrange(key, start, end)
获取子序列(根据字节获取,非字符)
name,Redis 的name
start,其实位置(字节)
end, 结束位置(字节)
如:'奥术大师', 0-3表示'奥'
setrange(name, offset, value)
修改字符串内容,从指定的字符串索引开始向后替换(新值太长是则向后添加)
offset:字符串的索引,字节(一个汉子三个字节)
vlaue:要设的值
setbit(name, offset, vlaue)
对name对应值的二进制表示的位进行操作
name,redis的name
offset,位的索引(将值变换成二进制后在进行索引)
value,值只能是0或1
注:如果在redis中有一个对应:n1 = 'foo'
那么字符串foo的二进制表示为:
所以,如果执行setbit('n1', 7, 1),就会将第七位设置为1(从0开始),变成
扩展,转换二进制表示:
source = 'foo'
for i in source:
num = ord(i)
print(bin(num),replace('b',''))
如果source是汉字怎么办
对于utf-8,每一个汉字占 3 个字节
对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制
用途举例:用最省空间的方式,存储在线用户以分别是那些用户在线
getbit(name, offset)
获取name对应的值的二进制便是中的某位的值(0或1)
bitcount(key, start=None, end=None)
获取name对应的值的二进制表示中1的个数
key, Redis的name
start,起始位置
end,结束位置
strlen(name)
返回name对应值的字节长度(汉字为三个字节)
incr(self, name, amount=1)
自增name对应的值,当name不存在是,则创建name = amount,否则自增
name,Redis对应的那么
amount,自增数(必须是整数)
注:同incrby
incrbyfloat(self, name, amount=1.0)
自增name对应的值,当name不存在是,则创建name = amount,否则自增
name,Redis对应的那么
amount,自增数(浮点型)
decr(self, name, amount=1)
自减name对应的值,当name不存在时,则创建name = amount,否则自减
name,Redis的name
amount,自减数(整数)
append(key, value)
在redis 那么队形的之后面追加内容
key,redis的name
value, 要追加的字符串
2、Hash操作
hash表现形式上有些像python中的dict,可以存储一组关联性较强的数据。在redis中Hash在内存中的存储格式如下图
hset(name, key, value)
name对应的hash中设置一个键值对(不存在,则创建;否则,修改)
name,redis的name
key,name对应的hash中的key
value,name对应的hash的value
hsetnx(name, key, value),当那么队形的hash中不存在当前key是则创建(相当于添加)
hmset(name, mapping)
在name对应的hash中批量设置键值对
name:redis的name
mapping,字典,如:{"k1":"v1", "k2":"v2"}
r.hmset('xx', {'k1':'v1', "k2":'v2'})
hget(name, key)
在neme中对应的hash中获取根据key获取的value
hmget(name, key, *args)
在那么对应的hash中获取多个key的值
name,redis对应的name
keys,要获取key的集合。如['k1', 'k2', 'k3']
*args,要获取的key,如:k1, k2, k3
r.mget('xx', ['k1', 'k2'])
print(r.hmget('xx','k1','k2'))
hgetall(name)
获取name对应hash的所有键值
hlen(name)
获取name对应的hash中键值对的个数
hkeys(name)
获取name对应的所有的key的值
hvals(home)
获取name对应的hash中的所有value的值
hexiste(name, key)
检查name对应的hash中是否存在当前传入的key
hdle(name, *key)
将name对应的hash中指定key的键值对删除
hincrby(name, key, amount=1)
自增name对应的hash中指定key的值,不存在则创建key=amount
name, redis中的那么
key, hash对应的key
amount, 自增数(整数)
hincrbyfloat(name, key, amount=1.0)
自增name对应的hash中指定的key的值,不存在则创建key=amount
name,redis中对应的name
key,hash对应的key
amount, 自增数(浮点数)
自增name对应的hash中指定key的值,不存在则创建key=amount
hscan(name, cursor=0, match=None, count=None)
自增式迭代获取,对于数据大的数据非常有用。hscan可以实现分片的获取数据,并非一次性将数据全部取完,从而导致内存溢出
name,redis的name
cursor,游标(基于游标分批获取数据)
match,匹配指定key。默认None表示所有key
count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
第二次:cursor2, data1 = r.hscan('xx',cursor=cursor1, match=None, count=None)
直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
hscan_iter(name, match=None, count=None)
利用yield封装hash创建生成器,实现分批去redis中获取数据
match,匹配指定key,默认None表示所有的key
count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
for item in r.hscan_iter('XX'):
print(item)
List操作,redis中的list在内存中按照一个name对应一个List来存储。
l1 ————& [v1,v2,v3…]
l2 ————& [v4,v5,v6…]
lpush(name, values)
在name对应的list中添加元素,每个新的元素都添加到列表的最左边
r.lpush('oo', 11,22,33)
保存的顺序位:33,22,11
rpush(name, values) 表示从右向左操作
lpushx(name, value)
在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左端
rpushx(name, value)表示从右向左操作
llen(name)
name对应的list元素个数
linsert(name, where, refvalue, value)
在name对应的列表的某一个值前或后插入一个新值
name,redis的name
where, BEFORE或AFTER
refvalue,标杆值(列表中元素的值),在他前后插入
value,要插入的数据
lset(name, index, value)
对name对应的list中的某一个索引位置重新赋值
name,reedis的name
index,List的索引位置
value,要设置的值
lrem(name, value, num)
在name对应的list中删除指定的值
name,redis的name
value,要删除的值
num,num=0,删除 列表中所有的指定值
num=2, 从前到后删除两个指定值
num=-2,从后到前删除两个指定值
lpop(name)
在name对应的列表的左侧获取第一个元素并在列表中移除,返回值是第一个元素
rpop(name) 表示从右向左操作
lindex(name, index)
在name对应的列表中根据索引获取列表元素
lrange(name, start, end)
在name对应的列表分片获取数据
name,redis的name
start,索引的起始位置
end,索引结束的位置
ltrim(name, start, end)
在name对应的列表中移除没有在start-end索引之间的值
name,redis的name
start,索引的起始位置
end,索引结束位置
rpoplpush(src, dst)
从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
src,要取数据的列表的name
dst,要添加数据的列表的name
blpop(keys, timeout)
将多个列表排列,按照从左到右去pop对应列表的元素
keys,redis的name的集合
timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
&&& r.lpush('gg',44,33,22,11)
&&& r.lpush('hh',44,33,22,11)
&&& r.blpop(('gg','hh'))
(b'gg', b'11')
&&& r.blpop(('gg','hh'))
(b'gg', b'22')
&&& r.blpop(('gg','hh'))
(b'gg', b'33')
&&& r.blpop(('gg','hh'))
(b'gg', b'44')
&&& r.blpop(('gg','hh'))
(b'hh', b'11')
&&& r.blpop(('gg','hh'))
(b'hh', b'22')
&&& r.blpop(('gg','hh'))
(b'hh', b'33')
&&& r.blpop(('gg','hh'))
(b'hh', b'44')
&&& r.blpop(('gg','hh'))
r.brpop(keys, timeout),从右向左获取数据
brpoplpush(src, dst, timeout=0)
从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
src,取出并要移除元素的列表对应的name
dst,要插入元素的列表对应的name
timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞
自定义增量迭代
由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要:
1、获取name对应的所有列表
2、循环列表
但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能:
def list_iter(name):
自定义redis列表增量迭代
:param name: redis中的name,即:迭代name对应的列表
:return: yield 返回 列表元素
list_count = r.llen(name)
for index in xrange(list_count):
yield r.lindex(name, index)
for item in list_iter('pp'):
print item
4、set集合操作
sadd(name,values)
name对应的集合中添加元素
scard(name)
获取name对应的集合中元素个数
sdiff(keys, *args)
在第一个name对应的集合中且不在其他name对应的集合的元素集合
sdiffstore(dest, keys, *args)
获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
sinter(keys, *args)
获取第一个name对应集合的并集
sinterstore(dest, keys, *args)
获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
sismember(name, value)
检查value是否是name对应的集合的成员
smembers(name)
获取name对应的集合的所有成员
smove(src, dst, value)
将某个成员从一个集合中移动到另外一个集合
spop(name)
从集合的右侧(尾部)移除一个成员,并将其返回
srandmember(name, numbers)
从name对应的集合中随机获取 numbers 个元素
srem(name, values)
在name对应的集合中删除某些值
sunion(keys, *args)
获取多一个name对应的集合的并集
sunionstore(dest,keys, *args)
获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
5、有序集合
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
zadd(name, *args, **kwargs)
在name对应的有序集合中添加元素
zadd('zz', 'n1', 1, 'n2', 2)
zadd('zz', n1=11, n2=22)
zcard(name)
获取name对应的有序集合元素的数量
zcount(name, min, max)
获取name对应的有序集合中分数 在 [min,max] 之间的个数
zincrby(name, value, amount)
自增name对应的有序集合的value对应的分数
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
按照索引范围获取name对应的有序集合的元素
name,redis的name
start,有序集合索引起始位置,从0开始(非分数)
end,有序集合索引结束位置(非分数)
desc,排序规则,默认按照分数从小到大排序
withscores,是否获取元素的分数,默认只获取元素的值
score_cast_func,对分数进行数据转换的函数
从大到小排序
zrevrange(name, start, end, withscores=False, score_cast_func=float)
按照分数范围获取name对应的有序集合的元素
zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
从大到小排序
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
获取某个值在 name对应的有序集合中的排行(从 0 开始)
zrevrank(name, value),从大到小排序
zrem(name, values)
删除name对应的有序集合中值是values的成员
如:zrem('zz', ['s1', 's2'])
zremrangebyrank(name, min, max)
根据排行范围删除
zremrangebyscore(name, min, max)
根据分数范围删除
zscore(name, value)
获取name对应有序集合中 value 对应的分数
zinterstore(dest, keys, aggregate=None)
获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
aggregate的值为:
zunionstore(dest, keys, aggregate=None)
获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
aggregate的值为:
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作
6、其他操作
delete(*names)
删除redis中的任意数据类型
检测redis的name是否存在
keys(pattern=’*’)
根据模型获取的
* 匹配数据库中所有
,但不匹配
expire(name, time)
为一个redis的某个name设置超时时间
rename(src, dst)
对redis的name重命名为
move(name, db)
将redis的某个值移动到指定的db下
randomkey()
随机获取一个redis的name(不删除)
type(name)
获取name对应值的类型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
同字符串操作,用于增量迭代获取key
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline是原子性操作。
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
r=redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set('d01','dingyi')
pipe.set('lalala', 'hahaha')
pipe.execute()
发布者:服务器
订阅者:Dashboad和数据处理
Demo如下:
import redis
class RedisHelper(object):
def __init__(self):
self.__conn = redis.Redis(host='127.0.0.1')
self.chan_sub = 'fm99.9'
self.chan_pub = 'fm99.9'
def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True
def subscribe(self):
pub = self.__conn.pubsub()
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub
from RedisHelper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
msg = redis_sub.parse_response()
print(msg)
#!/usr/bin/env python3
from RedisHelper import RedisHelper
obj = RedisHelper()
obj.public('hello')
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统,他遵循Mozilla Public License开源协议。
MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应程序的通信方法。应用程序应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来连接他们。消息传递指的是程序之间通过在消息队列中发送数据进行通信而不是直接点用彼此来通信,直接调用通常是用于例如远程过程调用的技术。排队指的是应用程序通过队列来通信队列的使用除了接收和发送应用程序同时执行的要求。
RabbitMQ安装
安装RabbitMQ
apt-get install rabbitmq-server
pip3 install pika
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
import queue
import threading
message = queue.Queue(10)
def producer(i):
message.put(i)
print('放入:',i)
def consumer(i):
a = message.get()
print('取出:',a)
for i in range(12):
t = threading.Thread(target=producer, args=(i,))
for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
对于RabbitMQ来说,生产和消费不在针对内存里的一个queue对象而是某台服务器上的RabbitMQ-server实现的消息队列
#!/usr/bin/env python3
import pika
#建立一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1'))
#生成一个管道,在管道中声明不同的队列
channel = connection.channel()
#在管道中创建一个队列,队列名是hello
channel.queue_declare(queue='hello')
#向队列里发数据,routing_key是队列名,body是内容
#RabbitMQ不能把消息直接发送到队列里,exchange相当于路由器的功能,将内容转发到队列中,exchange起到过滤数据额作用。控制那些数据转发到那些队列中。这里设置为空就是不过滤的意思。
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
print('[x] sent hello world!')
connection.close()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
错,再次创建一下这个队列,避免引发错误。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
channel.basic_consume(callback, queue='hello', no_ack=True)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Work Queues
这种模式下,RabbitMQ默认会把P打的消息依次分发给各个消费者(c),类似负载均衡
import pika
import time
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
message = ''.join(sys.argv[1:]) or "hello world %s" %time.time()
channel.basic_publish(
exchange = '',
routing_key='task_queue',
body = message,
properties=pika.BasicProperties(
delivery_mode=2,
print("[x] sent %r" % message)
connection.close()
#!/usr/bin/env python3
import pika
import time
#建立一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
#生成一个队列,避免报错
channel.queue_declare(queue='task_queue')
#设置callback方法
def callback(ch, method, properties, body):
print('[x] Receive %r' % body)
time.sleep(20)
print('[x] Done')
print('method.delivery_tag', method.delivery_tag)
#对应生产者端设置消息持久化
ch.basic_ack(delivery_tag=method.delivert_tag)
#消费操作的一些定义
channel.basic_consume(
queue='task_queue',
no_ack=True,
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消息不丢失
为了确保信息不会丢失,RabbitMQ支持消息确认功能,让生产者向消费者发送信息的时候,消费者会向生产者返回一个信息,告诉生产者消息已经被处理了。
如果消费者这边有异常(死亡了),在发送消息的时候,RabbitMQ会将没有处理的信息从新放入到队列中。如果这时候有其他的消费者,RabbitMQ就会将消息发送到其他的消费者那里去。这样即使是其中一个消费者有异常(死亡)了,数据也不会消失
在没有设置超时时间的时候,RabbitMQ将在接受者死亡的时候从新传递消息,即使处理信息需要很长很长的时间。
在默认情况下,消息确认功能是被打开的
消息持久化
如果RabbitMQ-server挂掉了,就会在成数据的丢失,避免这种情况,可以再定义队列的时候这样定义
channel。queue_declare(queue='hello', durable=True)
一旦先生成了一个不支持持久化的队列,它是不能再改成支持持久化的。持久化队列必须一开始就要定义好。
在生产者和消费者的代码中,有需要声明一个可以持久化的队列。
上述只是将队列持久化。要实现消息持久化还是需要下面的设置
channel.basic_publish(
exchange = '',
routing_key='task_queue',
body = message,
properties=pika.BasicProperties(
#设置数据持久化
delivery_mode=2,
ch.basic_ack(delivery_tag=method.delivert_tag)
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者上,不考虑消费者负载的话,很可能出现,一个机器配置不高,消息处理不完,另一台配置高的确跟轻松。为了解决此问题,可以设置perfetch=1,意思就是消费者当前的消息还没有处理完,不要再给消费者发送新的消息了。
channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
#!/usr/bin/env python3
import pika, sys
#创建一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
#定义队列,设置为持久化队列,持久化队列必须是新定义的,不能再重新定义之前已经存在的
channel.queue_declare(queue='task_queue_01', durable=True)
message = ''.join(sys.argv[1:]) or 'Hello world!'
#发送消息的操作
channel.basic_publish(
exchange = '',
routing_key='task_queue',
body=message,
#设置消息是持久化的
properties=pika.BasicProperties(
delivery_mode=2,
print('[x] Sent %r' % message)
connection.close()
import pika, time
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue='task_queue_01', durable=True)
print('[*] Waiting for message. To exit press CTRL+C')
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
time.sleep(body.count(b'.'))
print('[x] Dnoe')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
消息发布与订阅
要实现消息发布的功能,就要用到exchange。
exchange在定义的时候是有类型的,以决定到底是那些Queue符合条件,可以接受消息
fanout:所有bind到此exchange的queue都可以接受消息
direct;通过routingKey和exchange决定的那个唯一的queue可以接受消息
topic:所有符合routingKey(此时可以是个表达式)的routingKey所bind的queue可以接受消息
表达式符号说明:
#代表一个或多个字符
*代表任何字符
#.a会匹配a.a,aa.a, aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers:通过headers来决定吧消息发给哪些queue
1、发向所有和全部接收
消息发布方
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = ''.join(sys.argv[1:]) or 'hello world'
channel.basic_publish(
exchange='logs',
routing_key='',
body=message,
print('[x] Sent %r ' % message)
connection.close()
消息订阅方
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(
queue=queue_name,
no_ack=True
channel.start_consuming()
2、有选择的接收消息
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据很具关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至指定队列。
#!/usr/bin/env python3
import pika, sys
#创建一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
#声明一个管道
channel = connection.channel()
#设置exchange的类型,名字
channel.exchange_declare(exchange='direct_log', type='direct')
#在脚本的后面第一个参数是针对哪个关键字发送的信息,如果没有则默认是info关键字
severity = sys.argv[1] if len(sys.argv) & 1 else 'info'
#将脚本的第二个参数设定为发送的内容,没有默认为hello world
message = ''.join(sys.argv[2:]) or 'hello world'
#发送信息的动作
channel.basic_publish(
exchange = 'direct_log',
routing_key = severity,#添加关键字
body=message,
print("[x] Sent %r:%r" % (severity, message))
connection.close()
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_log', type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write('usage: %s [info] [warring] [error] \n' % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_log',
queue = queue_name,
routing_key=severity
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print('[x] %r:%r' % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
3、更细致的消息过滤
虽然上面介绍了一些过滤,但是它们仍然后局限性,它们不能标准的进行过滤。在日志系统中,我们不能光注意日志的级别(info、warring、error),也要注意日志原(mysql的或是Apache的)。
#!/usr/bin/env python3
import pika, sys
#创建一个阻塞的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
#声明一个管道
channel = connection.channel()
#声明exchange的名字和种类
channel.exchange_declare(
exchange = 'topic_logs',
type='topic',
#设置绑定的关键字,没有的话默认
routing_key = sys.argv[1] if len(sys.argv) & 1 else ''
#获取发送的内容
message = ''.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message,
print("[x] Sent %r:%r" % (routing_key, message))
connection.close()
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(
exchange = 'topic_logs',
type = 'topic'
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: %s [binding_key] ...\n' % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange = 'topic_logs',
queue = queue_name,
routing_key = binding_key
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue = queue_name,
no_ack=True,
channel.start_consuming()
发布方可以通过通配符来选择,来让特定的关键字接收数据,通配符在上面有讲解
Remote procedure call (RPC)
RPC类似一个远程方法调用。客户端发一个请求给服务器端,调用服务器端的一个命令,服务器端返回命令的执行结果。不单要发消息还要返回结果。
import pika, time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
elif n == 1:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print("[.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id),
body=str(response),
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print('[x] Awaiting RPC requests')
channel.start_consuming()
import pika, uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
self.on_response,
no_ack=True,
queue = self.callback_queue,
def on_response(self, sh, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties = pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
body = str(n)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print('[x] Requesting fib(30)')
response = fibonacci_rpc.call(30)
print('[.] Got %r' % response
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:4450次
排名:千里之外
原创:28篇
(3)(2)(2)(3)(5)(3)(1)(6)(3)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'

我要回帖

更多关于 intel lan address 的文章

 

随机推荐