flume魅蓝手机变成flume如何恢复出

在通过Flume收集日志的业务场景中,一般都会遇到下面的情况,在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_.log的文件。

这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

我将TaildirSource的相关源码下载下来(需要做简单修改),然后集成到Flume1.6中,满足了上面提到的需求,获得了良好的效果。

 
  1. #-->检测点文件所存储的目录
  2. #-->数据存储所在的目录设置
  3. #-->事务容量的最大值设置

启动之后,在sink所指的/tmp/flumefiles目录下,生成了一个大小为0的目标文件,命令为时间戳-1,如:

接着往监控的目录中生成log_.log的文件:

此时,在上面tail –f目标文件的控制台中,已经可以看到写入的内容了:

再模拟生成一个新的文件(log_.log):

同样,目标文件中也正常写入:

如果在监控的目录/tmp/lxw1234-flume/中,产生和所配置的文件名正则表达式不匹配的文件,则不会被tail。

另外,如果将所监控目录/tmp/lxw1234-flume/中已经过期的文件移除,也不会影响agent的运行。

该文件中记录了所监控的每个文件的当前位置,如图中红圈圈出的pos的值,因为两个文件都已经读到了最后,因此每个pos的值就是该文件的大小。

  • 不再需要master节点和对zookeeper的依赖,配置文件简单化。
  • 插件化,一部分面对用户,工具或系统开发人员。
  • ExecSource,SpoolSource对比:ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。 
    FileChannel。MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。 
    Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。 



至此,咱们的第一个 Flume Agent 算是部署成功了!
启动如下命令,就可以在 hdfs 上看到效果了。

PS:实际环境中有这样的需求,通过在多个agent端tail日志,发送给collector,collector再把数据收集,统一发送给HDFS存储起来,当HDFS文件大小超过一定的大小或者超过在规定的时间间隔会生成一个文件。
Flume 实现了两个Trigger,分别为SizeTriger(在调用HDFS输出流写的同时,count该流已经写入的大小总和,若超过一定大小,则创建新的文件和输出流,写入操作指向新的输出流,同时close以前的输出流)和TimeTriger(开启定时器,当到达该点时,自动创建新的文件和输出流,新的写入重定向到该流中,同时close以前的输出流)。

上面采用的就是类似 cs 架构,各个 flume agent 节点先将各台机器的日志汇总到 Consolidation 节点,然后再由这些节点统一写入 HDFS,并且采用了负载均衡的方式,你还可以配置高可用的模式等等。

其实上面 3.2 中已有说明,flume 的 sink 已经实现了几种最主要的持久化触发器:

比如按大小、按间隔时间、按消息条数等等,针对你的文件过小迟迟没法写入 HDFS 持久化的问题,

那是因为你此时还没有满足持久化的条件,比如你的行数还没有达到配置的阈值或者大小还没达到等等,

可以针对上面 3.2 小节的配置微调下,例如:

当迟迟没有新日志生成的时候,如果你想很快的 flush,那么让它每隔 20s flush 持久化一下,agent 会根据多个条件,优先执行满足条件的触发器。

下面贴一些常见的持久化触发器:

更多关于 sink 的触发机制与参数配置请参见:

注意:对于 HDFS 来说应当竭力避免小文件问题,所以请慎重对待你配置的持久化触发机制。

4.4 数据重复写入、丢失问题

Flume的HDFSsink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,然后重试。但由于HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题导致写入失败,Transaction回滚,然后重写这10000条记录成功,就会导致第一次写入的5000行重复。这些问题是 HDFS 文件系统设计上的特性缺陷,并不能通过简单的Bugfix来解决。我们只能关闭批量写入,单条事务保证,或者启用监控策略,两端对数。

Memory和exec的方式可能会有数据丢失,file 是 end to end 的可靠性保证的,但是性能较前两者要差。

end to end、store on failure 方式 ACK 确认时间设置过短(特别是高峰时间)也有可能引发数据的重复写入

可以在 tail 传的时候记录行号,下次再传的时候,取上次记录的位置开始传输,类似:

(1)文件被 rotation 的时候,需要同步更新你的断点记录“指针”,

(2)需要按文件名来追踪文件,

(3)flume 挂掉后需要累加断点续传“指针”

(4)flume 挂掉后,如果恰好文件被 rotation,那么会有丢数据的风险,

4.6 在 Flume 中如何修改、丢弃、按预定义规则分类存储数据?

这里你需要利用 Flume 提供的拦截器(Interceptor)机制来满足上述的需求了,具体请参考下面几个链接:

(4)flume-ng如何根据源文件名输出到HDFS文件名

(10)基于Flume的美团日志收集系统(一)架构和设计

(11)基于Flume的美团日志收集系统(二)改进和优化

当你看到这篇文章时,应该对flume有一个大概了解但是为照顾刚入门的同学所以还是会说下flume,刚开始使用flume时不需要理解太多里面的东西,只需要理解下面的图就可以使用flume把日志数据传入kafka中,下图中的hdfs只是有代表性的sink而以,我在实际使用中sink是kafka

我要回帖

更多关于 魅蓝手机变成flume 的文章

 

随机推荐