flume hive sink可以sink到本地文件目录么

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&1676人阅读
Flume(9)
一、HDFS Sink
Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中。主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景。
目前,它支持HDFS的文本和序列文件&#26684;式,以及支持两个文件类型的压缩。支持将所用的时间、数据大小、事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的)。它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径。 HDFS目录路径可能包含&#26684;式转义序列用于取代由HDFS Sink生成一个目录/文件名存储的事件。
注意:Hadoop的版本需要支持sync()方法调用,当然首先得按照Hadoop。
下面是HDFS &Sinks转义符的支持目录:
Description
Substitute value of event header named “host”. Arbitrary header names are supported.
Unix time in milliseconds
locale’s short weekday name (Mon, Tue, ...)
locale’s full weekday name (Monday, Tuesday, ...)
locale’s short month name (Jan, Feb, ...)
locale’s long month name (January, February, ...)
locale’s date and time (Thu Mar 3 23:05:25 2005)
day of month (01) 每月中的第几天
same as %m/%d/%y
hour (00..23)
hour (01..12)
day of year (001..366) 一年中的第几天
hour ( 0..23)
month (01..12)
minute (00..59)
locale’s equivalent of am or pm
seconds since
00:00:00 UTC
second (00..59)
last two digits of year (00..99) &年的后两位
year (2010)
&#43;hhmm numeric timezone (for example, -0400)
下面是官网给出的HDFS &Sinks的配置,加粗的参数是必选,可选项十分丰富,这里就不一一列出来了
Description
The component type name, needs to be&hdfs
HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix
Name prefixed to files created by Flume in hdfs directory 文件前缀
hdfs.fileType
SequenceFile
File format: currently&SequenceFile,&DataStream&or&CompressedStream
hdfs.useLocalTimeStamp
Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.codeC
Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.round
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) 定时间用
hdfs.roundValue
Rounded down to the highest multiple of this (in the unit configured using&hdfs.roundUnit), less than current time.(需要hdfs.round为true)
hdfs.roundUnit
The unit of the round down value -&second,&minute&or&hour.(同上)
下面是官网的例子,他的三个round*配置是将向下舍入到最后10分钟的时间戳记录。
假设现在是上午10时56分20秒等等,日的Flume Sinks的数据到输出到HDFS的路径为/flume/events//1050/00的。。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
下面是实际的例子:
#配置文件:hdfs_case9.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.bind= 192.168.233.128
a1.sources.r1.port= 50000
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= hdfs
a1.sinks.k1.channel= c1
a1.sinks.k1.hdfs.path= hdfs://carl:9000/flume/
a1.sinks.k1.hdfs.filePrefix= carl
a1.sinks.k1.hdfs.round= true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit= minute
a1.sinks.k1.hdfs.fileType=DataStream
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
这里我们偷懒拷了上节TCP的例子,然后加入sinks为HDFS中。我们设置数据是放入在HDFS的目录为hdfs://carl:9000/flume/,文件前缀为carl,其中这里有个设置要说明下:a1.sinks.k1.hdfs.fileType=DataStream,因为文件&#26684;式默认是 SequenceFile,如果直接打开是乱码,这个不方便演示,因此我们设置成普通数据&#26684;式。
flume-ng agent -cconf -f conf/hdfs_case9.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo &hello looklook7hello hdfs& | nc 192.168.233.128 50000
#在启动的终端查看console输出
这里可以看到他报了一个错误,说isfileclosed不可用。。。这个是这样的,这边的Hadoop是cdh3版本的,而flume ng 是说支持cdh4版本的,所以版本不匹配。不过这个无妨,下面看他们数据已经插入进去了,一开始生成一个hdfs://carl:9000/flume//carl.4.tmp,
然后数据进去了生成文件hdfs://carl:9000/flume/carl.4
那我们看下数据文件,hdfs://carl:9000/flume/carl.4
我们看到日志文件的生成过程,最后数据已经进去了。
然后我对配置文件里的这这个参数改下,参照官网的例子
a1.sinks.k1.hdfs.path= hdfs://carl:9000/flume/%y-%m-%d/%H%M/%S
然后加上这个参数
a1.sinks.k1.hdfs.useLocalTimeStamp=true
打开另一个终端输入,往侦听端口送数据
echo &hello looklook7hello hdfs& | nc 192.168.233.128 50000
这里如果不加上面的参数a1.sinks.k1.hdfs.useLocalTimeStamp=true,会需要向事件里面明确header,否则会报错,如下
数据成功发送后,会生成数据文件
数据目录是/flume/14-10-24/1354/00
因为我们设的参数是1分钟a1.sinks.k1.hdfs.roundValue= 1
这个与官网讲的一致
二、Logger Sink
INFO级别的日志事件。通常有用的测试/调试目的。之前的测试里有些,下面就不多赘述
下面是官网配置
Property Name
Description
The component type name, needs to be&logger
三、Avro Sink
Avro Sink主要用于Flume分层结构。Flumeevent 发送给这个sink的事件都会转换成Avro事件,发送到配置好的Avro主机和端口上。这些事件可以批量传输给通道。
下面是官网配置,加粗为必须,可选项太多就不一一列了
Property Name
Default Description
The component type name, needs to be&avro.
The hostname or IP address to bind to.
The port # to listen on.
下面是官网例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=<span style="color:#BB.10.10
a1.sinks.k1.port=<span style="color:#BB
因为Avro Sink主要用于Flume分层结构,那么这边都会想到我们学习心得(二)关于集群配置的列子就是关于Avro Sink与Avro Source的一个实例,其中pull.cof是关于Avro Source的例子,而push.conf 是Avro Sink的例子,具体内容大家可以去第二节看,这里不做赘述。
三、Avro Sink
Thrift也是用来支持Flume分层结构。Flumeevent 发送给这个sink的事件都会转换成Thrift事件,发送到配置好的Thrift主机和端口上。这些事件可以批量传输给通道。和Avro Sink一模一样。这边也就略过了。
四、IRC Sink
IRC Sink 从通道中取得信息到IRCServer,这个没有IRC Server。。。无法测试,也略过吧。。。
五、File RollSink
存储到本地存储中。他有个滚动间隔的设置,设置多长时间去生成文件(默认是30秒)。
下面是官网配置
Property Name
Description
The component type name, needs to be&file_roll.
sink.directory
The directory where files will be stored
sink.rollInterval
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer
Other possible options include&avro_event&or the FQCN of an implementation of EventSerializer.Builder interface.
接下去是官网例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=file_roll
a1.sinks.k1.channel=c1
a1.sinks.k1.sink.directory=/var/log/flume
下面是测试例子:
#配置文件:fileroll_case10.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.port= 50000
a1.sources.r1.host= 192.168.233.128
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= file_roll
a1.sinks.k1.channel= c1
a1.sinks.k1.sink.directory= /tmp/logs
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
flume-ng agent -cconf -f conf/fileroll_case10.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo &hello looklook5hello hdfs& | nc 192.168.233.128 50000
#在启动的终端查看console输出
可以看到数据传过来并生成文件,然后无论是否有数据传过来,都会每过30秒就会生成文件。
六、Null Sink
丢弃从通道接收的所有事件。。。这边就不测试了。。
下面是官网配置
Property Name
Description
The component type name, needs to be&null.
下面是官网例子
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=null
a1.sinks.k1.channel=c1
七、HBaseSinks与AsyncHBaseSink
HBaseSinks负责将数据写入到Hbase中。Hbase的配置信息从classpath路径里面遇到的第一个hbase-site.xml文件中获取。在配置文件中指定的实现了HbaseEventSerializer 接口的类,用于将事件转换成Hbase所表示的事件或者增量。然后将这些事件和增量写入Hbase中。
Hbase Sink支持写数据到安全的Hbase。为了将数据写入安全的Hbase,用户代理运行必须对配置的table表有写权限。主要用来验证对KDC的密钥表可以在配置中指定。在Flume Agent的classpath路径下的Hbase-site.xml文件必须设置到Kerberos认证。
注意有一定很重要,就是这个sinks 对&#26684;式的规范要求非常高。
至于 AsyncHBaseSink则是异步的HBaseSinks。
这边没有HBase环境,因此也就不演示了。。
八、Custom Sink
一个自定义 Sinks其实是对Sinks接口的实现。当我们开始flume代理的时候必须将自定义Sinks和相依赖的jar包放到代理的classpath下面。自定义 Sinks的type就是我们实现Sinks接口对应的类全路径。
这里后面的内容里会详细介绍,这里不做赘述。
九、MemoryChannel
Source通过通道添加事件,Sinks通过通道取事件。所以通道类&#20284;缓存的存在。
Memory Channel是事件存储在一个内存队列中。速度快,吞吐量大。但会有代理出现故障后数据丢失的情况。
下面是官网配置
Property Name
Description
The component type name, needs to be&memory
The maximum number of events stored in the channel
transactionCapacity
The maximum number of events the channel will take from a source or give to a sink per transaction
keep-alive
Timeout in seconds for adding or removing an event
byteCapacityBufferPercentage
Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity
see description
Maximum total&bytes&of memory allowed as a sum of all events in this channel. The implementation only counts the Event&body, which is the reason for providing thebyteCapacityBufferPercentage&configuration parameter as well.
Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e.
if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to&0&will cause this value to fall back to a hard internal limit of about 200 GB.
以及官网例子
a1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=<span style="color:#BB
a1.channels.c1.transactionCapacity=<span style="color:#BB
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=<span style="color:#BB
之前的例子全部是Memory Channel。关于Channel的列子不好演示,后面就不会有例子了。
十、JDBCChannel
JDBC Channel是把事件存储在数据库。目前的JDBC Channel支持嵌入式Derby。主要是为了数据持久化,并且可恢复的特性。
Property Name
Description
The component type name, needs to be&jdbc
Database vendor, needs to be DERBY.
driver.class
org.apache.derby.jdbc.EmbeddedDriver
Class for vendor’s JDBC driver
driver.url
(constructed from other properties)
JDBC connection URL
db.username
User id for db connection
db.password
password for db connection
下面是官网例子:
a1.channels=c1
a1.channels.c1.type=jdbc
十一、FileChannel
注意默认情况下,File Channel使用检查点(checkpointDir)和在用户目录(dataDirs)上指定的数据目录。所以在一个agent下面启动多个File Channel实例,只会有一个File channel能锁住文件目录,其他的都将初始化失败。因此,有必要提供明确的路径的所有已配置的通道,同时考虑最大吞吐率,检查点与数据目录最好是在不同的磁盘上。
Property Name Default
Description
The component type name, needs to be&file.
checkpointDir
~/.flume/file-channel/checkpoint
The directory where checkp
~/.flume/file-channel/data
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
下面是官网例子
a1.channels=c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/mnt/flume/checkpoint
a1.channels.c1.dataDirs=/mnt/flume/data
File Channel 加密官网也给出了相应的配置
Generating a key with a password seperate from the key store password:
keytool -genseckey -alias key-0 -keypasskeyPassword -keyalg AES\
&-keysize 128 -validity 9000 -keystore test.keystore\
&-storetype jceks -storepass keyStorePassword
Generating a key with the password the same as the key store password:
keytool -genseckey -alias key-1 -keyalgAES -keysize 128 -validity 9000\
&-keystore src/test/resources/test.keystore -storetype jceks\
&-storepass keyStorePassword
a1.channels.c1.encryption.activeKey=key-0
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=key-provider-0
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0
Let’s say you have aged key-0 out and new files should be encrypted withkey-1:
a1.channels.c1.encryption.activeKey=key-1
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0 key-1
The same scenerio as above, however key-0 has its own password:
a1.channels.c1.encryption.activeKey=key-1
a1.channels.c1.encryption.cipherProvider=AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider=JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile=/path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile=/path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys=key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile=/path/to/key-0.password
十二、Spillable Memory Channel 与Pseudo Transaction Channel
前者还在试验阶段。。后者仅仅用来测试目的,不是在生产环境中使用,所以略过。
十三、CustomChannel
Custom Channel是对channel接口的实现。需要在classpath中引入实现类和相关的jar文件。这Channel对应的type是该类的完整路径
下面是官网配置
Property Name
Description
The component type name, needs to be a FQCN
后面是官网例子
a1.channels=c1
a1.channels.c1.type=org.example.MyChannel
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:18591次
排名:千里之外
原创:24篇
评论:15条
(4)(2)(5)(2)(3)(10)Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。IBM 的这篇文章:《》,从基本组件以及用户体验的角度阐述 Flume OG 到 Flume NG 发生的革命性变化。本文就不再赘述各种细枝末节了,不过这里还是简要提下 Flume NG (1.x.x)的主要变化:
sources和sinks 使用channels 进行链接
两个主要channel 。1, &in-memory channel &非持久性支持,速度快。2 , JDBC-based channel 持久性支持。
不再区分逻辑和物理node,所有物理节点统称为 “agents”,每个agents 都能运行0个或多个sources 和sinks
不再需要master节点和对zookeeper的依赖,配置文件简单化。
插件化,一部分面对用户,工具或系统开发人员。
使用Thrift、Avro Flume sources 可以从flume0.9.4 发送 events &到flume 1.x
注:本文所使用的 Flume 版本为 flume-1.4.0-cdh4.7.0,不需要额外的安装过程,解压缩即可用。&
1、Flume 的一些核心概念:
使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
生产数据,运行在一个独立的线程。
从Client收集数据,传递给Channel。
从Channel收集数据,运行在一个独立线程。
连接 sources 和 sinks ,这个有点像一个队列。
可以是日志记录、 avro 对象等。
1.1 数据流模型
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。 很直白的设计,其中值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。 如果你以为Flume就这些能耐那就大错特错了。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下图所示:
1.2 高可靠性
作为生产环境运行的软件,高可靠性是必须的。 从单agent来看,Flume使用基于事务的数据传递方式来保证事件传递的可靠性。Source和Sink被封装进一个事务。事件被存放在Channel中直到该事件被处理,Channel中的事件才会被移除。这是Flume提供的点到点的可靠机制。 从多级流来看,前一个agent的sink和后一个agent的source同样有它们的事务来保障数据的可靠性。
1.3 可恢复性
还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
2、Flume 整体架构介绍
Flume架构整体上看就是&source--&channel--&sink&的三层架构(参见最上面的 图一),类似生成者和消费者的架构,他们之间通过queue(channel)传输,解耦。
Source:完成对日志数据的收集,分成 transtion 和 event 打入到channel之中。& Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。& Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。& 对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。& 对于直接读取文件Source, 主要有两种方式:&
2.1 Exec source
可通过写Unix command的方式组织数据,最常用的就是tail -F [file]。 可以实现实时传输,但在flume不运行和脚本错误时,会丢数据,也不支持断点续传功能。因为没有记录上次文件读到的位置,从而没办法知道,下次再读时,从什么地方开始读。特别是在日志文件一直在增加的时候。flume的source挂了。等flume的source再次开启的这段时间内,增加的日志内容,就没办法被source读取到了。不过flume有一个execStream的扩展,可以自己写一个监控日志增加情况,把增加的日志,通过自己写的工具把增加的内容,传送给flume的node。再传送给sink的node。要是能在tail类的source中能支持,在node挂掉这段时间的内容,等下次node开启后在继续传送,那就更完美了。
2.2 Spooling Directory Source
SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来,可实现准实时。需要注意两点:1、拷贝到spool目录下的文件不可以再打开编辑。2、spool目录下不可包含相应的子目录。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)& ExecSource,SpoolSource对比:ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。& Channel有多种方式:有MemoryChannel, JDBC Channel, MemoryRecoverChannel, FileChannel。MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。& Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。&
3、常用架构、功能配置示例
3.1 先来个简单的:单节点 Flume 配置
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将上述配置存为:example.conf
然后我们就可以启动 Flume 了:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
PS:-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。。。
-c/--conf 后跟配置目录,-f/--conf-file&后跟具体的配置文件,-n/--name 指定agent的名称
然后我们再开一个 shell 终端窗口,telnet 上配置中侦听的端口,就可以发消息看到效果了:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! &ENTER&
Flume 终端窗口此时会打印出如下信息,就表示成功了:
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D
Hello world!. }
至此,咱们的第一个 Flume Agent 算是部署成功了!
3.2 单节点 Flume 直接写入 HDFS
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent1.sources.avro-source1.channels = ch1
#agent1.sources.avro-source1.type = avro
#agent1.sources.avro-source1.bind = 0.0.0.0
#agent1.sources.avro-source1.port = 41414
#agent1.sources.avro-source1.threads = 5
#define source monitor a file
agent1.sources.avro-source1.type = exec
agent1.sources.avro-source1.shell = /bin/bash -c
agent1.sources.mand = tail -n +0 -F /home/storm/tmp/id.txt
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.1.111:8020/flumeTest
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
启动如下命令,就可以在 hdfs 上看到效果了。
../bin/flume-ng agent --conf ../conf/ -f flume_directHDFS.conf -n agent1 -Dflume.root.logger=INFO,console
PS:实际环境中有这样的需求,通过在多个agent端tail日志,发送给collector,collector再把数据收集,统一发送给HDFS存储起来,当HDFS文件大小超过一定的大小或者超过在规定的时间间隔会生成一个文件。 Flume 实现了两个Trigger,分别为SizeTriger(在调用HDFS输出流写的同时,count该流已经写入的大小总和,若超过一定大小,则创建新的文件和输出流,写入操作指向新的输出流,同时close以前的输出流)和TimeTriger(开启定时器,当到达该点时,自动创建新的文件和输出流,新的写入重定向到该流中,同时close以前的输出流)。
3.3 来一个常见架构:多 agent 汇聚写入 HDFS
3.3.1 在各个webserv日志机上配置 Flume Client
# clientMainAgent
clientMainAgent.channels = c1
clientMainAgent.sources
clientMainAgent.sinks
# clientMainAgent sinks group
clientMainAgent.sinkgroups = g1
# clientMainAgent Spooling Directory Source
clientMainAgent.sources.s1.type = spooldir
clientMainAgent.sources.s1.spoolDir
=/dsap/rawdata/
clientMainAgent.sources.s1.fileHeader = true
clientMainAgent.sources.s1.deletePolicy =immediate
clientMainAgent.sources.s1.batchSize =1000
clientMainAgent.sources.s1.channels =c1
clientMainAgent.sources.s1.deserializer.maxLineLength =1048576
# clientMainAgent FileChannel
clientMainAgent.channels.c1.type = file
clientMainAgent.channels.c1.checkpointDir = /var/flume/fchannel/spool/checkpoint
clientMainAgent.channels.c1.dataDirs = /var/flume/fchannel/spool/data
clientMainAgent.channels.c1.capacity =
clientMainAgent.channels.c1.keep-alive = 30
clientMainAgent.channels.c1.write-timeout = 30
clientMainAgent.channels.c1.checkpoint-timeout=600
# clientMainAgent Sinks
clientMainAgent.sinks.k1.channel = c1
clientMainAgent.sinks.k1.type = avro
# connect to CollectorMainAgent
clientMainAgent.sinks.k1.hostname = flume115
clientMainAgent.sinks.k1.port = 41415
clientMainAgent.sinks.k2.channel = c1
clientMainAgent.sinks.k2.type = avro
# connect to CollectorBackupAgent
clientMainAgent.sinks.k2.hostname = flume116
clientMainAgent.sinks.k2.port = 41415
# clientMainAgent sinks group
clientMainAgent.sinkgroups.g1.sinks = k1 k2
# load_balance type
clientMainAgent.sinkgroups.g1.processor.type = load_balance
clientMainAgent.sinkgroups.g1.processor.backoff
clientMainAgent.sinkgroups.g1.processor.selector
../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n clientMainAgent -Dflume.root.logger=DEBUG,console
3.3.2 在汇聚节点配置 Flume server
# collectorMainAgent
collectorMainAgent.channels = c2
collectorMainAgent.sources
collectorMainAgent.sinks
# collectorMainAgent AvroSource
collectorMainAgent.sources.s2.type = avro
collectorMainAgent.sources.s2.bind = flume115
collectorMainAgent.sources.s2.port = 41415
collectorMainAgent.sources.s2.channels = c2
# collectorMainAgent FileChannel
collectorMainAgent.channels.c2.type = file
collectorMainAgent.channels.c2.checkpointDir =/opt/var/flume/fchannel/spool/checkpoint
collectorMainAgent.channels.c2.dataDirs = /opt/var/flume/fchannel/spool/data,/work/flume/fchannel/spool/data
collectorMainAgent.channels.c2.capacity =
collectorMainAgent.channels.c2.transactionCapacity=6000
collectorMainAgent.channels.c2.checkpointInterval=60000
# collectorMainAgent hdfsSink
collectorMainAgent.sinks.k2.type = hdfs
collectorMainAgent.sinks.k2.channel = c2
collectorMainAgent.sinks.k2.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}
collectorMainAgent.sinks.k2.hdfs.filePrefix =k2_%{file}
collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_
collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp
collectorMainAgent.sinks.k2.hdfs.rollSize = 0
collectorMainAgent.sinks.k2.hdfs.rollCount = 0
collectorMainAgent.sinks.k2.hdfs.rollInterval = 240
collectorMainAgent.sinks.k2.hdfs.writeFormat = Text
collectorMainAgent.sinks.k2.hdfs.fileType = DataStream
collectorMainAgent.sinks.k2.hdfs.batchSize = 6000
collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000
collectorMainAgent.sinks.k1.type = hdfs
collectorMainAgent.sinks.k1.channel = c2
collectorMainAgent.sinks.k1.hdfs.path = hdfs://db-cdh-cluster/flume%{dir}
collectorMainAgent.sinks.k1.hdfs.filePrefix =k1_%{file}
collectorMainAgent.sinks.k1.hdfs.inUsePrefix =_
collectorMainAgent.sinks.k1.hdfs.inUseSuffix =.tmp
collectorMainAgent.sinks.k1.hdfs.rollSize = 0
collectorMainAgent.sinks.k1.hdfs.rollCount = 0
collectorMainAgent.sinks.k1.hdfs.rollInterval = 240
collectorMainAgent.sinks.k1.hdfs.writeFormat = Text
collectorMainAgent.sinks.k1.hdfs.fileType = DataStream
collectorMainAgent.sinks.k1.hdfs.batchSize = 6000
collectorMainAgent.sinks.k1.hdfs.callTimeout = 60000
../bin/flume-ng agent --conf ../conf/ -f flume_Consolidation.conf -n collectorMainAgent -Dflume.root.logger=DEBUG,console
上面采用的就是类似 cs 架构,各个 flume agent 节点先将各台机器的日志汇总到&Consolidation 节点,然后再由这些节点统一写入 HDFS,并且采用了负载均衡的方式,你还可以配置高可用的模式等等。
4、可能遇到的问题:
4.1 OOM 问题:
flume 报错:
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: Java heap space
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh&中添加 JVM 启动参数:&
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
然后在启动 agent 的时候一定要带上&-c conf 选项,否则&flume-env.sh 里配置的环境变量不会被加载生效。
具体参见:
4.2 JDK 版本不兼容问题:
14:44:17,902 (agent-shutdown-hook) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.stop(HDFSEventSink.java:504)] Exception while closing hdfs://192.168.1.111:8020/flumeTest/FlumeData. Exception follows.
java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses.
at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto.getSerializedSize(ClientNamenodeProtocolProtos.java:30108)
at com.google.protobuf.AbstractMessageLite.toByteString(AbstractMessageLite.java:49)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.constructRpcRequest(ProtobufRpcEngine.java:149)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:193)
把你的 jdk7 换成 jdk6 试试。
4.3 小文件写入 HDFS 延时的问题
其实上面 3.2 中已有说明,flume 的 sink 已经实现了几种最主要的持久化触发器:
比如按大小、按间隔时间、按消息条数等等,针对你的文件过小迟迟没法写入 HDFS 持久化的问题,
那是因为你此时还没有满足持久化的条件,比如你的行数还没有达到配置的阈值或者大小还没达到等等,
可以针对上面 3.2 小节的配置微调下,例如:
agent1.sinks.log-sink1.hdfs.rollInterval = 20
当迟迟没有新日志生成的时候,如果你想很快的 flush,那么让它每隔 20s flush 持久化一下,agent 会根据多个条件,优先执行满足条件的触发器。
下面贴一些常见的持久化触发器:
# Number of seconds to wait before rolling current file (in 600 seconds)
agent.sinks.sink.hdfs.rollInterval=600
# File size to trigger roll, in bytes (256Mb)
agent.sinks.sink.hdfs.rollSize =
# never roll based on number of events
agent.sinks.sink.hdfs.rollCount = 0
# Timeout after which inactive files get closed (in seconds)
agent.sinks.sink.hdfs.idleTimeout = 3600
agent.sinks.HDFS.hdfs.batchSize = 1000
更多关于 sink 的触发机制与参数配置请参见:
注意:对于 HDFS 来说应当竭力避免小文件问题,所以请慎重对待你配置的持久化触发机制。
4.4 数据重复写入、丢失问题
Flume的HDFSsink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,然后重试。但由于HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题导致写入失败,Transaction回滚,然后重写这10000条记录成功,就会导致第一次写入的5000行重复。这些问题是 HDFS 文件系统设计上的特性缺陷,并不能通过简单的Bugfix来解决。我们只能关闭批量写入,单条事务保证,或者启用监控策略,两端对数。
Memory和exec的方式可能会有数据丢失,file 是 end to end 的可靠性保证的,但是性能较前两者要差。
end to end、store on failure 方式 ACK 确认时间设置过短(特别是高峰时间)也有可能引发数据的重复写入。
4.5 tail 断点续传的问题:
可以在 tail 传的时候记录行号,下次再传的时候,取上次记录的位置开始传输,类似:
agent1.sources.mand = /usr/local/bin/tail
-n +$(tail -n1 /home/storm/tmp/n) --max-unchanged-stats=600 -F
/home/storm/tmp/id.txt | awk 'ARNGIND==1{i=$0;next}{i++; if($0~/文件已截断/)i=0; print i && "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n -
需要注意如下几点:
(1)文件被 rotation 的时候,需要同步更新你的断点记录“指针”,
(2)需要按文件名来追踪文件,
(3)flume 挂掉后需要累加断点续传“指针”
(4)flume 挂掉后,如果恰好文件被 rotation,那么会有丢数据的风险,
& & & &只能监控尽快拉起或者加逻辑判断文件大小重置指针。
(5)tail 注意你的版本,请更新&coreutils 包到最新。
4.6 在 Flume 中如何修改、丢弃、按预定义规则分类存储数据?
这里你需要利用 Flume 提供的拦截器(Interceptor)机制来满足上述的需求了,具体请参考下面几个链接:
(1)Flume-NG源码阅读之Interceptor(原创) &
(2)Flume-NG自定义拦截器
(3)Flume-ng生产环境实践(四)实现log格式化interceptor
(4)flume-ng如何根据源文件名输出到HDFS文件名
5、Refer:
(1)scribe、chukwa、kafka、flume日志系统对比 &
(2)关于Flume-ng那些事 &
& & & & &关于Flume-ng那些事(三):常见架构测试 &
(3)Flume 1.4.0 User Guide
(4)flume日志采集 &
(5)Flume-NG + HDFS + HIVE 日志收集分析
(6)【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建
(7)Flume-NG + HDFS + PIG 日志收集分析
flume 示例一收集tomcat日志 &
flume-ng 多节点集群示例 &
试用flume-ng 1.1&&
(8)Flafka: Apache Flume Meets Apache Kafka for Event Processing
(9)Flume-ng的原理和使用
(10)基于Flume的美团日志收集系统(一)架构和设计
(11)基于Flume的美团日志收集系统(二)改进和优化
(12)How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue
(13)Real-time analytics in Apache Flume - Part 1
http://my.oschina.net/leejun2005/blog/336397
引用来自“xrzs”的评论http://my.oschina.net/leejun2005/blog/336397麻烦问一下,这个断点续传命令,我在1.5.0.2版本配置的时候,它不会收集数据,也没有报错,是什么原因呢?
& 开源中国(OSChina.NET) |
开源中国社区(OSChina.net)是工信部
指定的官方社区

我要回帖

更多关于 flume 多个sink 的文章

 

随机推荐