如何使用python对elasticsearch教程进行操作

运用Python操纵Elasticsearch数值索引的教程
作者:陈加兴
字体:[ ] 范例:转载 时刻:
这篇文章首要引见了运用Python操纵Elasticsearch数值索引的教程,Elasticsearch处置数值索引十分高效,要的伴侣能够参考下
Elasticsearch是一个散布式、Restful的搜刮及剖析效劳器,Apache Solr同样,它也是根据Lucence的索引效劳器,但我以为Elasticsearch比照Solr的长处在于:
&&& 轻量级:装置启动便当,下载文件以后一条号令就能够启动;
&&& Schema free:能够向效劳器上交恣意布局的JSON目标,Solr中运用schema.xml指定了索引布局;
&&& 多索引文件支援:运用相同的index参数就能创立另外一个索引文件,Solr中需求另行设置;
&&& 散布式:Solr Cloud的设置比拟杂乱。
启动Elasticsearch,拜访端口在9200,经过阅读器能够检察到前往的JSON数值,Elasticsearch上交和前往的数值体例都是JSON.
&& bin/elasticsearch -f
装置民间供给的Python API,在OS X上装置后呈现一些Python运转谬误,是由于setuptools版别太旧惹起的,删去重装后康复失常。
&& pip install elasticsearch
关于单条索引,能够挪用create或index办法。
from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch() #create a localhost server connection, or Elasticsearch("ip")
es.create(index="test-index", doc_type="test-type", id=1,
body={"any":"data", "timestamp": datetime.now()})
Elasticsearch批量索引的号令是bulk,今朝Python API的文档示例较少,花了很多时刻浏览源代码才澄清楚批量索引的上交体例。
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch("10.18.13.3")
count = int(df[0].count())
actions = []
while (j & count):
action = {
"_index": "tickets-index",
"_type": "tickets",
"_id": j + 1,
"_source": {
"crawaldate":df[0][j],
"flight":df[1][j],
"price":float(df[2][j]),
"discount":float(df[3][j]),
"date":df[4][j],
"takeoff":df[5][j],
"land":df[6][j],
"source":df[7][j],
"timestamp": datetime.now()}
actions.append(action)
if (len(actions) == 500000):
helpers.bulk(es, actions)
del actions[0:len(actions)]
if (len(actions) & 0):
helpers.bulk(es, actions)
del actions[0:len(actions)]
在这里发觉Python API序列化JSON时对数值范例撑持比拟有限,原始数值运用的NumPy.Int32必需转换为int才干索引。别的,如今的bulk操纵默许是每次上交500条数值,我批改为5000乃至50000停止测验,会有索引不可功的状况。
#helpers.py source code
def streaming_bulk(client, actions, chunk_size=500, raise_on_error=False,
expand_action_callback=expand_action, **kwargs):
actions = map(expand_action_callback, actions)
# if raise on error is set, we need to collect errors per chunk before raising them
errors = []
while True:
chunk = islice(actions, chunk_size)
bulk_actions = []
for action, data in chunk:
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)
if not bulk_actions:
def bulk(client, actions, stats_only=False, **kwargs):
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in streaming_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
success += 1
return success, failed if stats_only else errors
关于索引的批量删去和更新操纵,对应的文档体例以下,更新文档中的doc节点是必需的。
'_op_type': 'delete',
'_index': 'index-name',
'_type': 'document',
'_id': 42,
'_op_type': 'update',
'_index': 'index-name',
'_type': 'document',
'_id': 42,
'doc': {'question': 'The life, universe and everything.'}
&&& SerializationError:JSON数值序列化堕落,一般为由于不支援某个节点值的数值范例
&&& RequestError:上交数值体例不精确
&&& ConflictError:索引ID抵触
&&& TransportError:衔接无奈树立
下面是运用MongoDB和Elasticsearch存储雷同数值的比照,固然效劳器和操纵方法都不彻底相同,但能够看出数值库对批量写入仍是比索引效劳器更具有劣势。
Elasticsearch的索引文件是主动分块,到达万万级数值对写入速率也没有作用。但在到达磁盘时间下限时,Elasticsearch呈现了文件兼并谬误,而且很多丧失数值(共丢了100多万条),遏制客户端写入后,效劳器也无奈主动康复,必需手动遏制。在出产情况中这点比拟致命,特别是运用非Java客户端,仿佛无奈在客户端获得到效劳端的Java异样,这使得法式员必需很当心地处置效劳端的前往资讯。
您能够感趣味的文章:
各人感趣味的内容
12345678910
比来更新的内容
罕用在线小东西
&&&&&&&&&&&&&&&&&&&&&&&&主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
平芜尽处是春山
个人项目源码地址:http://git.oschina.net/xuliugen
既然选择了ACM了,那就好好爱它,不管风雨兼程.
CSDN内容运营,关注前端开发、用户体验技术领域,希望携手更多技术专家,共同推进两大领域的向前发展。研发心得、项目实战、前沿技术……,只要是技术干货,十分欢迎您投稿至chenqg#csdn.net。人人都是主编,这里就是你的舞台。曾负责CSDN知识库产品内容运营,致力于为大家提供精华、专业的内容服务。Python编写Oracle和Elasticsearch数据同步脚本_Oracle教程-织梦者
当前位置:&>&&>& > Python编写Oracle和Elasticsearch数据同步脚本
Python编写Oracle和Elasticsearch数据同步脚本
python版本 x64 2.7.12
Oracle(x64 12.1.0.2.0)和Elasticsearch(2.2.0)
python编辑器 PyCharm
下载安装请选择适合自己机器的版本
二、下载模块
通过官网下载和安装cx_Oracle和pyes模块,分别用于操作Oracle数据库和ES。
如果是远程连接数据库和ES,请一定注意安装的模块或包版本。务必选择相应的版本,不然会遇到问题。
cx_Oracle:https://sourceforge.net/projects/cx-oracle/files/?source=navbar
pyes:/aparo/pyes
三、安装过程中会遇到的问题
cx_Oracle在本地安装过程中出现的一些问题:
1、安装c++for python的环境
2、安装Oracle数据库(或者安装API接口中需要的文件而不必下载配置整个oracle环境)
3、打开数据库工具 oracle SQL developor 按要求创建连接,并新建用户(创建数据库用户名时以c##开头,不然会提示)
4、oracle连接不上远程的服务器,检查版本是否匹配
#-*-coding:utf-8-*-
importdatetime,time
importpyes#引入pyes模块,ES接口
importcx_Oracle#引入cx_Oracle模块,Oracle接口
os.environ['NLS_LANG']='SIMPLIFIEDCHINESE_CHINA.UTF8'#中文编码
reload(sys)#默认编码设置为utf-8
sys.setdefaultencoding('utf-8')
conn2=pyes.ES('127.0.0.1:9200') #链接ES
conn2.indices.delete_index('_all')#清除所有索引
conn2.indices.create_index_if_missing('pom')#创建索引
spiderInfo_mapping={'tableName':{'index':'not_analyzed','type':'string'},
'tableId':{'index':'not_analyzed','type':'integer'},
'title':{'index':'analyzed','type':'string'},
'author':{'index':'not_analyzed','type':'string'},
'content':{'index':'analyzed','type':'string'},
'publishTime':{'index':'not_analyzed','type':'string'},
'browseNum':{'index':'not_analyzed','type':'integer'},
'commentNum':{'index':'not_analyzed','type':'integer'},
'dataType':{'index':'not_analyzed','type':'integer'}}#除去涉我部分内容的ES映射结构
involveVideo_mapping={'tableName':{'index':'not_analyzed','type':'string'},
'tableId':{'index':'not_analyzed','type':'integer'},
'title':{'index':'analyzed','type':'string'},
'author':{'index':'not_analyzed','type':'string'},
'summary':{'index':'analyzed','type':'string'},
'publishTime':{'index':'not_analyzed','type':'string'},
'url':{'index':'not_analyzed','type':'string'},
'imgUrl':{'index':'not_analyzed','type':'string'},
'ranking':{'index':'not_analyzed','type':'integer'},
'playNum':{'index':'not_analyzed','type':'integer'},
'dataType':{'index':'not_analyzed','type':'integer'}}#涉我视音频内容的ES映射结构
involveCeefax_mapping={'tableName':{'index':'not_analyzed','type':'string'},
'tableId':{'index':'not_analyzed','type':'integer'},
'title':{'index':'analyzed','type':'string'},
'author':{'index':'not_analyzed','type':'string'},
'content':{'index':'analyzed','type':'string'},
'publishTime':{'index':'not_analyzed','type':'string'},
'keyWords':{'index':'not_analyzed','type':'string'},
'popularity':{'index':'not_analyzed','type':'integer'},
'url':{'index':'not_analyzed','type':'string'},
'dataType':{'index':'not_analyzed','type':'integer'}}#涉我图文资讯内容的ES映射结构
conn2.indices.put_mapping(&spiderInfo&,{'properties':spiderInfo_mapping},[&pom&])#在索引pom下创建spiderInfo的_type
conn2.indices.put_mapping(&involveVideo&,{'properties':involveVideo_mapping},[&pom&])#在索引pom下创建involveVideo的_type
conn2.indices.put_mapping(&involveCeefax&,{'properties':involveCeefax_mapping},[&pom&])#在索引pom下创建involveCeefax的_type
#conn1=cx_Oracle.connect('c##chenlong','','localhost:1521/ORCL')#链接本地数据库
conn1=cx_Oracle.connect(&pom&,&Bohui@123&,&172.17.7.118:1521/ORCL&) #链接远程数据库
print'ES数据同步脚本连接不上数据库,请检查connect参数是否正确,或者模块版本是否匹配'
withconn1:
cur=conn1.cursor()
result=cur.execute(&select*fromtabs&) ##执行数据库操作读取各个表名
row=result.fetchall()
#将表名取出并赋值给tablename元组
tablename=[]
forxinrow:
tablename.append(x[0])
printtablename
result2=cur.execute('selectT_SOCIAL_ID fromT_SOCIAL')
printresult2
num=result2.fetchall()
foriinnum:
print'selecttitlefromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0])
result_title=cur.execute('selecttitlefromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
title=result_title.fetchone()
printtitle[0]
result_author=cur.execute('selectauthorfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
author=result_author.fetchone()
printauthor[0]
result_content=cur.execute('selectcontentfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
content=result_content.fetchone()
printcontent[0]
result_publishTime=cur.execute('selectpublishTimefromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
publishTime=result_publishTime.fetchone()
datetime_publishTime=datetime.datetime.strptime(str(publishTime[0]),'%Y-%m-%d%H:%M:%S')#通过date函数将日期转化成标准格式
printdatetime_publishTime
result_browseNum=cur.execute('selectbrowseNumfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
browseNum=result_browseNum.fetchone()
printbrowseNum[0]
result_likeNum=cur.execute('selectlikeNumfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
likeNum=result_likeNum.fetchone()
printlikeNum[0]
result_forwardNum=cur.execute('selectFORWARDNUMfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
forwardNum=result_forwardNum.fetchone()
printforwardNum[0]
result_commentNum=cur.execute('selectcommentNumfromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
commentNum=result_commentNum.fetchone()
printcommentNum[0]
result_accountCode=cur.execute('selectaccountCodefromT_SOCIALwhereT_SOCIAL_ID={}'.format(i[0]))
dataType=result_accountCode.fetchone()
printdataType[0]
conn2.index({'tableName':'T_SOCIAL','tableId':i[0],'title':unicode(title[0]),'author':unicode(author[0]),'content':unicode(content[0]),'publishTime':str(publishTime[0]),'browseNum':browseNum[0],'commentNum':commentNum[0],'dataType':'1'},&pom&,&spiderInfo&,)#将数据写入索引pom的spiderInfo
#根据表的个数创建不同的对象
#从记录文档中读取各个表的记录ID,判断各个表的ID是否有变化
#分别读取各个表中的相关数据
五、编译过程的问题
1、直接print游标cur.execute( )将不能得到我们想要的结果
result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
print result2
返回:&__builtin__.OracleCursor on &cx_Oracle.Connection to pom@172.17.7.118:1521/ORCL&&
result2=cur.execute('selectT_SOCIAL_ID fromT_SOCIAL')
printresult2
num=result2.fetchall()
for i in num:
print i[0]
返回:[(55,), (56,), (57,), (58,), (59,), (60,), (61,), (62,), (63,), (64,), (65,), (66,), (67,), (68,), (69,), (70,)]
注意:用fetchall()得到的数据为:[(55,), (56,), (57,), (58,), (59,)] 元组而不是数字。
用 变量[num] 的方式取出具体的数值
2、cx_Oracle中文编码乱码问题
显示中文乱码:??????DZ??? ???????????
或者显示未知的编码:('\xce\xd2\xd5\xe6\xb5\xc4\xca\xc7\xb1\xea\xcc\xe2',)
需要注意一下几个地方,将数据库中的中文编码转化成utf-8编码,并将中文写入elasticsearch
os.environ['NLS_LANG']='SIMPLIFIEDCHINESE_CHINA.UTF8'#中文编码
reload(sys)#默认编码设置为utf-8 一定需要reload(sys)
sys.setdefaultencoding('utf-8')
'title':unicode('中文')
3、远程连接不上Oracle数据库的问题
第一:确保connect()中各个参数的值都正确。例如
conn1=cx_Oracle.connect(&username&,&password&,&172.17.7.118:1521/ORCL&) #连接远程数据库
conn1=cx_Oracle.connect('username','password','localhost:1521/ORCL')#连接本地数据库
conn2=pyes.ES('127.0.0.1:9200') #连接ES
第二:确保安装的版本都符合要求,包括模块的版本。
4、提示TypeError: 'NoneType' object is not callable
确保mapping中的各个字段类型都设置正确
检查索引和映射是否都书写正确
------------
Python中cx_Oracle的一些函数:
commit() 提交
rollback() 回滚
cursor用来执行命令的方法:
callproc(self, procname, args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
execute(self, query, args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
executemany(self, query, args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
nextset(self):移动到下一个结果集
cursor用来接收返回值的方法:
fetchall(self):接收全部的返回结果行.
fetchmany(self, size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
fetchone(self):返回一条结果行.
scroll(self, value, mode='relative'):移动指针到某一行.如果mode='relative',则表示从当前所在行移动value条,如果 mode='absolute',则表示从结果集的第一行移动value条.
MySQL中关于中文编码的问题
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python')中加一个属性:
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python',charset='utf8')
charset是要跟你数据库的编码一样,如果是数据库是gb2312 ,则写charset='gb2312'。
以上就是Python编写Oracle和Elasticsearch数据同步脚本的全文介绍,希望对您学习和使用Oracle有所帮助.
这些内容可能对你也有帮助
更多可查看Oracle教程列表页。
猜您也会喜欢这些文章学会了 python 和 elasticsearch,想做一个搜索引擎 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
已注册用户请 &
Sponsored by
国内领先的实时后端云野狗 API 可用于开发即时聊天、网络游戏、实时定位等实时场景传输快!响应快!入门快!
Promoted by
学会了 python 和 elasticsearch,想做一个搜索引擎
22:34:06 +08:00 · 1160 次点击
干死百度,可行吗?
& & 23:02:54 +08:00
我也想……但是要成本啊,就算是分布存储也要好多空间
其实最想的是按照ip扫全网,然后存储,做个检索……
& & 23:41:10 +08:00
要是百度这么容易干死,其他大公司都不会去干吗,这根本不是成本的事情。
或者说不是你想的有几万台服务器就能搞定的事情。
& & 10:02:53 +08:00
百度广告加个脚本就能干死你了
& · & 1111 人在线 & 最高记录 2399 & · &
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.7.5 · 51ms · UTC 16:11 · PVG 00:11 · LAX 09:11 · JFK 12:11? Do have faith in what you're doing.从源码分析python批量插入elasticsearch的实现 - 推酷
从源码分析python批量插入elasticsearch的实现
看了elasticsearch python的一些源码,现在看到bulk批量操作. &发现网上对于elasticsearch批量插入的文章有些少,我在这里就简单描述下es bulk的各个接口。&
最近爬虫太粗暴了,文章总是被转走. 这里标注下原文链接&
希望这篇文章使大家能对elasticsearch有更好的理解. python elasticsearch批量操作函数是在helpers文件里面。&
下面是helpers的用法,不太明白elasticsearch的命名手法,奇奇怪怪就用helpers….让人发愣 !
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
actions = []
for j in range(0, 500):
action = {
&_index&: &xiaorui_index&,
&_type&: &fengyun&,
&_source&: {
&any&:&data& + str(j),
&timestamp&: datetime.now()
actions.append(action)
if len(actions) & 0:
helpers.bulk(es, actions)
fromdatetimeimportdatetime
fromelasticsearchimportElasticsearch
fromelasticsearchimporthelpers
es=Elasticsearch()
actions=[]
forjinrange(0,500):
&_index&:&xiaorui_index&,
&_type&:&fengyun&,
&_source&:{
&any&:&data&+str(j),
&timestamp&:datetime.now()
actions.append(action)
iflen(actions)&0:
helpers.bulk(es,actions)
说了详细的用法,我们再来看看elasticsearch py源码是怎么实现的. &elasticsearch-py给我们提供了三个接口。
elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=, raise_on_error=True, expand_action_callback=&function expand_action&, raise_on_exception=True, **kwargs)
streaming接口, 他把结果集组成成迭代器,用yield一个个的send出来。 def bulk调用的也是streaming_bulk函数.
client – instance of Elasticsearch to use
actions – iterable containing the actions to be executed
chunk_size – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes – the maximum size of the request in bytes (default: 100MB)
elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=, expand_action_callback=&function expand_action&, **kwargs)
这个函数是可以实现多线程批量操作,理论来说要比bulk和streaming_bulk要快的多… 相比其他两个bulk接口的参数列表,他有个多线程数目的控制…
thread_count – size of the threadpool to use for the bulk requests
原本以为他用的是threading函数,看了源代码才得知调用的是multiprocessing.dummy
elasticsearch.helpers.bulk(client, actions, stats_only=False, **kwargs)
client – instance of Elasticsearch to use
actions – iterator containing the actions
stats_only – if True only report number of successful/failed operations instead of just number of successful and a list of error responses
def bulk(client, actions, stats_only=False, **kwargs):
success, failed = 0, 0
#当stats_only False的时候,不收集错误列表.
errors = []
#可以看到bulk调用的用streaming_bulk,他会把成功数,失败数,失败列表return过去。
for ok, item in streaming_bulk(client, actions, **kwargs):
if not ok:
if not stats_only:
errors.append(item)
failed += 1
success += 1
return success, failed if stats_only else errors
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
max_chunk_bytes=100 * 1014 * 1024,
expand_action_callback=expand_action, **kwargs):
#这是多线程的版本,我们可以看到下面引用了multiprocessing.dummy多线程库
from multiprocessing.dummy import Pool
actions = map(expand_action_callback, actions)
pool = Pool(thread_count)
for result in pool.imap(
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
for item in result:
yield item
pool.close()
pool.join()
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024,
raise_on_error=True, expand_action_callback=expand_action,
raise_on_exception=True, **kwargs):
#buffer list里最大的空间占用大小 100M
#buffer list个数
#raise_on_error是否报错,False是忽略
actions = map(expand_action_callback, actions)
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
yield result
def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
Split actions into chunks by number or size, serialize them into strings in
the process.
bulk_actions = []
size, action_count = 0, 0
for action, data in actions:
action = serializer.dumps(action)
cur_size = len(action) + 1
if data is not None:
data = serializer.dumps(data)
cur_size += len(data) + 1
#如果chunk满了或者是chunk_bytes超过限制的空间大小,默认是100M
if bulk_actions and (size + cur_size & max_chunk_bytes or action_count == chunk_size):
yield bulk_actions
bulk_actions = []
size, action_count = 0, 0
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)
size += cur_size
action_count += 1
if bulk_actions:
yield bulk_actions
defbulk(client,actions,stats_only=False,**kwargs):
success,failed=0,0
#当stats_only False的时候,不收集错误列表.
#可以看到bulk调用的用streaming_bulk,他会把成功数,失败数,失败列表return过去。
forok,iteminstreaming_bulk(client,actions,**kwargs):
ifnotstats_only:
errors.append(item)
success+=1
returnsuccess,failedifstats_onlyelseerrors
defparallel_bulk(client,actions,thread_count=4,chunk_size=500,
max_chunk_bytes=100*,
expand_action_callback=expand_action,**kwargs):
#这是多线程的版本,我们可以看到下面引用了multiprocessing.dummy多线程库
frommultiprocessing.dummyimportPool
actions=map(expand_action_callback,actions)
pool=Pool(thread_count)
forresultinpool.imap(
lambdachunk:list(_process_bulk_chunk(client,chunk,**kwargs)),
_chunk_actions(actions,chunk_size,max_chunk_bytes,client.transport.serializer)
foriteminresult:
pool.close()
pool.join()
defstreaming_bulk(client,actions,chunk_size=500,max_chunk_bytes=100*,
raise_on_error=True,expand_action_callback=expand_action,
raise_on_exception=True,**kwargs):
#buffer list里最大的空间占用大小 100M
#buffer list个数
#raise_on_error是否报错,False是忽略
actions=map(expand_action_callback,actions)
forbulk_actionsin_chunk_actions(actions,chunk_size,max_chunk_bytes,client.transport.serializer):
forresultin_process_bulk_chunk(client,bulk_actions,raise_on_exception,raise_on_error,**kwargs):
yieldresult
def_chunk_actions(actions,chunk_size,max_chunk_bytes,serializer):
Split actions into chunks by number or size, serialize them into strings in
the process.
bulk_actions=[]
size,action_count=0,0
foraction,datainactions:
action=serializer.dumps(action)
cur_size=len(action)+1
ifdataisnotNone:
data=serializer.dumps(data)
cur_size+=len(data)+1
#如果chunk满了或者是chunk_bytes超过限制的空间大小,默认是100M
ifbulk_actionsand(size+cur_size&max_chunk_bytesoraction_count==chunk_size):
yieldbulk_actions
bulk_actions=[]
size,action_count=0,0
bulk_actions.append(action)
ifdataisnotNone:
bulk_actions.append(data)
size+=cur_size
action_count+=1
ifbulk_actions:
yieldbulk_actions
对于python elasticsearch批量插入
,我觉得这根线上的整套环境是有关联,比如client的带宽,nginx的一些调参(尤其是proxy buffer),elasticsearch本身的负载等等有关。&
我这边得到的结果是,每次批量操作的空间大小别超过20M,elasticsearch的timeout也控制到最少60秒。 虽然很多时候你测试花费的时间都很短,但现实往往不是这样.
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致

我要回帖

更多关于 elasticsearch的使用 的文章

 

随机推荐