请教logstash windows日志的多行日志问题,最后一行的日志总是不能获取

通过logstash filter过滤屏蔽不需要的日志 | 峰云就她了
6,229 views
今天是12-24号,又一个平安夜,祝大家赚多多钱, 泡更美的妹子. &
今天朋友问我关于elk日志过滤的问题。java程序虽然会写大量的日志(包括垃圾日志) . & 但为了精简数据需要logstash agent做日志的过滤,也就是说过滤掉不需要的日志.&
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新.&
下面是解决过滤屏蔽日志的logstash配置文件.
xiaorui.cc
#代码高亮有问题,大于可能会被转义成, 大家注意一下.
type =& "producer"
path =& "/data/buzzMaster/extractor.log"
if ([message] =~ "^xiaorui.cc") {
stdout { codec =& rubydebug}
123456789101112131415161718
&#blog:&&xiaorui.cc#代码高亮有问题,大于可能会被转义成, 大家注意一下. &input {&&&&file&&&&{&&&&&&&&&&&&&&&&type =& "producer"&&&&&&&&&&&&&&&&path =& "/data/buzzMaster/extractor.log"&&&&&&&&}}filter {&&&&&&&&&&if ([message] =~ "^xiaorui.cc") {&&&&&&&&&&&& drop {}&&&&&&&&&&}}output&&{&&&&stdout { codec =& rubydebug}}
写入配置文件后,我们开始测试:
首先在客户端测试下. &为什么要和xiaorui.cc这字符串较劲? &我想大家注意到我上面filter配置有个 =~ “^xiaorui.cc” . & &只有当日志是xiaorui.cc开头的才会drop屏蔽.
[ root@bj-buzz-dev01:/data/buzzMaster {extractor} ]$ echo “1xiaorui.cc” &&extractor.log
[ root@bj-buzz-dev01:/data/buzzMaster {extractor} ]$ echo “blog.xiaorui.cc” &&extractor.log
[ root@bj-buzz-dev01:/data/buzzMaster {extractor} ]$ echo “xiaorui.cc” &&extractor.log
[ root@bj-buzz-dev01:/data/buzzMaster {extractor} ]$&
我们再看下logstash server端的反应. &
[ root@bj-buzz-dev01:/data/buzzMaster {extractor} ]$ /usr/local/logstash-1.4.2/bin/logstash -f conf/agent.conf
& & & &”message” =& “1xiaorui.cc”,
& & & “@version” =& +,
& & “@timestamp” =& &#-24T07:45:58.099Z”,
& & & & & “type” =& “producer”,
& & & & & “host” =& “bj-buzz-dev01″,
& & & & & “path” =& “/data/buzzMaster/extractor.log”
& & & &”message” =& “blog.xiaorui.cc”,
& & & “@version” =& +,
& & “@timestamp” =& &#-24T07:46:12.117Z”,
& & & & & “type” =& “producer”,
& & & & & “host” =& “bj-buzz-dev01″,
& & & & & “path” =& “/data/buzzMaster/extractor.log”
通过日志我们可以分析确定drop起到效果了. &需要注意的是在logstash里所有的日志都会放入message里.&
logstash filter还可以根据日志的级别进行drop
如果你有个更加奇特的需求,上面的需求是过滤屏蔽不需要的日志,那如果你想抽样?要部分的随机日志. filter percentage有个百分比抽样参数.&
& if [loglevel] == “debug” {
& & drop {
& & & percentage =& 40
另外可以增加字段.
& & add_field =& { “foo_%{somefield}” =& “Hello world, from %{host}” }
我们可以通过filter删除指定字段.&
& & remove_field =& [ "foo_%{somefield}" ]
结论, &logstash的filter插件还是很多的.
如果大家觉得文章对你有些作用! &
帮忙点击广告. 一来能刺激我写博客的欲望,二来好维护云主机的费用.
如果想赏钱,可以用微信扫描下面的二维码. 另外再次标注博客原地址 && …… &&感谢!
您可能也喜欢:
暂无相关产品rong341233 的BLOG
用户名:rong341233
文章数:113
评论数:62
访问量:66670
注册日期:
阅读量:5863
阅读量:12276
阅读量:330926
阅读量:1037680
51CTO推荐博文
最近在使用ELKStack对系统日志进行分析,在网上也有看到有使用logstash的案例,但是发现不能正常解析出来,于是重新花时间去进行正则计算,主要代码如下:input&{
&&&&type&=&&"mysql-slow"
&&&&path&=&&"/var/lib/mysql/slow.log"
&&&&&&&&start_position&=&&beginning
&&&&&&&&&&&&&&&&sincedb_write_interval&=&&0
&&&&codec&=&&multiline&{
&&&&&&pattern&=&&"^#&User@Host:"
&&&&&&negate&=&&true
&&&&&&what&=&&"previous"
#if&[message]&=~&"^Tcp"&{
#&&&&&&&&drop&{}
#if&[message]&=~&"^Time"&{
#&&&&&&&&drop&{}
#if&[message]&=~&"^\/usr"&{
#&&&&&&&&drop&{}
&&&&match&=&&{&"message"&=&&"SELECT&SLEEP"&}
&&&&add_tag&=&&[&"sleep_drop"&]
&&&&tag_on_failure&=&&[]
&&if&"sleep_drop"&in&[tags]&{
&&&&drop&{}
&&&&grok&{
&&&&match&=&&{&"message"&=&&"(?m)^#&User@Host:&%{USER:User}\[[^\]]+\]&@&(?:(?&clienthost&\S*)&)?\[(?:%{IP:Client_IP})?\]\s.*#&Query_time:&%{NUMBER:Query_Time:float}\s+Lock_time:&%{NUMBER:Lock_Time:float}\s+Rows_sent:&%{NUMBER:Rows_Sent:int}\s+Rows_examined:&%{NUMBER:Rows_Examined:int}\s*(?:use&%{DATA:Database};\s*)?SET&timestamp=%{NUMBER:timestamp};\s*(?&Query&(?&Action&\w+)\s+.*)\n#&Time:.*$"&}
&&&&match&=&&[&"timestamp",&"UNIX"&]
&&&&remove_field&=&&[&"timestamp"&]
&&&&&&&&&&&&redis&{
&&&&&&&&&&&&&&&&host&=&&"192.168.1.2:6379"
&&&&&&&&&&&&&&&&data_type&=&&"list"
&&&&&&&&&&&&&&&&key&=&&"logstash:mysql_slow_log"
&&&&&&&&&&&&}
}说明:在使用codec/multiline搭配使用的时候,需要注意,grok和普通正则一样默认是不支持匹配回车换行的。就像你需要=~//m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记开始的时候一直没加(?m),在/调试的时候正常通过,但是到了logstash执行的时候就是不能正常解析。本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)storysky 的BLOG
用户名:storysky
文章数:81
评论数:398
访问量:1255999
注册日期:
阅读量:5863
阅读量:12276
阅读量:330926
阅读量:1037680
51CTO推荐博文
1、合并日志php的错误日志中常常会出现这样的日志[03-Jun-:29] PHP Fatal error:
Uncaught exception 'Leb_Exception' in /data1/www//htdocs/framework/xbox/ufo.php:68
Stack trace:
#0 /data/www//htdocs/framework/dao/abstract.php(299): Leb_Dao_Pdo-&connect(Array, 'read')
#1 /data/www//htdocs/framework/dao/pdo.php(108): Leb_Dao_Abstract-&initConnect(false)
#2 /data/www//htdocs/framework/dao/abstract.php(1123): Leb_Dao_Pdo-&query('SELECT * FROM `...')
#3 /data/www//htdocs/framework/dao/abstract.php(1217): Leb_Dao_Abstract-&select(Array)
#4 /data/www//htdocs/framework/model.php(735): Leb_Dao_Abstract-&daoSelect(Array, false)
#5 /data/www//htdocs/app/configure/model/configure.php(40): Leb_Model-&find()
#6 /data/www//htdocs/app/search/default.php(131): Configure-&get_configure_by_type('news')
#7 /data/www//htdocs/framework/dispatcher.php(291): defaultController-&indexAction()
#8 /data/www//htdocs/framework/dispatcher.php(222): Leb_Di in /data1/www//htdocs/framework/dao/pdo.php on line 68 &这个时候 logstash一般会只记录上面一行,所以这类的日志就看不全了。怎么办呢?logstash提供了一个功能解决了这个问题就是&multiline&这个filter的功能顾名思义就是对多行的日志进行处理 这个是官网上的说明multiline filterThis filter will collapse multiline messages into a single event.The multiline filter is for combining multiple events from a single source into the same event. 下面看下格式filter {
multiline {
type =& "type"
#类型,不多说
pattern =& "pattern, a regexp" #参数,也可以认为是字符,有点像grep ,如果符合什么字符就交给下面的 what 去处理
negate =& boolean
what =& "previous" or "next" #这个是符合上面 pattern 的要求后具体怎么处理,处理方法有两种,合并到上面一条日志或者下面的日志
}The 'negate' can be &true& or &false& (defaults false). If true, a message not matching the pattern will constitute a match of the multiline filter and the what will be applied. (vice-versa is also true)这个 negate 有两种 true 或者 false,默认是 true,如果选了false 的话估计就是取反的意思。看看例子filter {
multiline {
pattern =& "^[^\[]"
what =& "previous"
}这个例子是针对我上面的php日志写的,意思就是 如果不是以 &[&开头的日志 都跟上一个日志合并在一起。以此类推遇到其他的多行日志也可以按照这个方法来做合并。2、logstash 根据@message内容来触发命令&exec& &我当初的想法是这样的,如果php日志文件中出现 &PHP Fatal error&的时候将相关的错误日志发给相关开发的负责人。一开始想到了 output 的 email 功能,但我尝试了很多次这个email功能不太稳定,有时候能发出来邮件有的时候却发不出来。不知道是邮件服务器的问题还是 logstash本身的问题,具体配置如下output {
match =& [ "@message", "aaaaa" ]
from =& ".cn"
options =& [ "smtpIporHost", "",
"port", "25",
"userName", ".cn",
"starttls", "true",
"password", "opmonitor",
"authenticationType", "login"
subject =& "123"
body =& '123'
via =& smtp
}后来换了方法,改用 grep+exec来做,具体思路就是 grep 过滤到了 PHP Fatal error 之后根据域名将邮件发给具体人员,格式如下filter {
match =& [ "@message", "PHP Fatal error" ]
add_tag =& [fatal_error]
tags =& [fatal_error]
match =& [ "@message", ".*(xbox\.com|xbox\.mib\.com\.cn|supports\.game\.mib\.com\.cn)" ]
add_tag =& [xboxerror]
tags =& [xboxerror]
command =& "echo '%{@timestamp} %{@source}: %{@message}' | mail -s xbox_phplog_error_message "
}如此这般 先定义一个grep tag 为fatal_error 先把带有 PHP Fatal error 的日志过滤出来,后面的 grep承接上面的 tag fatal_error 过滤出具体的域名之后再新建一个tag 叫 xboxerror,最后的output 调用 exec(执行) 去调用一个命令,将时间,源和错误信息发到 这个邮箱里。OK 现在就能做到快速的邮件报警了。3、替换上面做到了邮件实时提醒,但有的时候我发现并没有发出去邮件,查找原因后发现如果邮件内容中出现很多 单引号 & ' & 的话,mail命令就会报错没法发送。于是就找到了mutate 这个功能,下面是介绍The mutate filter allows you to do general mutations to fields. You can rename, remove, replace, and modify fields in your events.好吧,我现在需要把@message里面的 ' 都给替换成& 这样就能正常发邮件了,什么你说开发换了个符号会看不懂?我#¥%……&*((&……%¥%格式如下:mutate {
type =& "phplog"
gsub =& [ "@message","'", "\"" ]
}好了,有了这些功能模块logstash可以工作的比较开心了~~~ 希望你们也开心本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)
16:34:35 10:06:51 14:48:26 15:54:56 16:16:40 16:59:31 20:40:23 08:59:24 10:31:46 17:42:34 09:48:04 16:45:02 10:54:01 23:25:46 23:28:26 &&1&
&&页数 ( 1/2 ) &公司的KafKa+Logstash+Elasticsearch日志收集系统的吞吐量存在问题,logstash的消费速度跟不上,造成数据堆积;
三者的版本分别是:0.8.2.1、1.5.3、1.4.0
数据从KafKa中消费,采用的是logstash-input-kafka插件,输出到Elasticsearch中采用的是logstash-output-Elasticsearch插件。
对于两个插件分别进行了一定的配置,参照了下面的博客:
但是问题并没有得到解决,消费的速度没有什么提升,或者说提升很小,还有数据堆积。考虑到使用的logstash版本以及插件的版本比较低,所以进行了版本升级:
在logstash的网站上下载了集成所有插件的2.3.4版本的logstash,配置的过程中遇到了以下问题:
Elasticsearch插件的配置:
1、新版本的插件没有host和port配置,改成了hosts:[&127.0.0.1:9200&]
2、新版本的配置中没有protocol配置
在运行logstash的过程中,命令中有参数-l logs,用来配置log的目录logs,我没有手动创建这个目录,所以日志一直没有生成,开始一直没有找到日志文件,一旦有了日志,其中就提示了配置文件的问题,很容易就将新版本的logstash以及两个插件配置好了。
但是配置好之后,新版本的logstash的吞吐量有所上升,但是在数据量大、上升比较快的时候仍然会有数据堆积,所以问题还是没有解决。
下面的分析思路:
1、观察logstash的消费过程发现,kafka的中的数据均衡很差,少部分节点中的数据多,增长快,大部分节点中几乎没有数据,所以logstash的多线程到节点的分区中取数据,对于性能的提升不大。由于logstash对于数据的消费采用的是fetch的方式,个人感觉:每个线程会不断的去kafka中取数据,发现没有数据之后,在过一段时间之后又会去取,虽然这些分区中没有数据,但是仍然占用了一部分cpu去取数据,这反而会影响到有数据的线程。如果可以知道哪个节点上有数据,将cpu资源都给这几个节点,针对这几个节点进行数据抓取,效率会快很多。现在的情况是,每个分区都分配了一个cpu核心,其中2/3的核心是在不断去读却读不到数据,只有1/3的cpu在读取,这对于计算资源是很浪费的。
上面的想法是傻逼的。。。并不是每个分区分配一个线程,就是将一个核心绑定给了这个线程,线程申请的是cpu的计算资源,是从所有的核心中去申请,一个线程对应的分区中没有数据,那么这个线程就不占用cpu资源,或者说占用的很少,那么剩余的cpu资源就可以给别的线程用。注意:线程绑定的是cpu的计算资源,并不是一个线程绑定一个核心。所以说某些分区中数据少,kafka的负载均衡不好,并不会怎么影响logstash从中消费数据的速度。问题可能还是存在于logstash向ES中写数据的速度。
2、通过工具观察一下Elasticsearch的索引速度,如果很慢,很可能是logstash的output环节影响到了吞吐量。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:18217次
积分:1015
积分:1015
排名:千里之外
原创:76篇
评论:12条
(1)(2)(1)(2)(1)(19)(1)(1)(1)(2)(30)(1)(1)(3)(8)(7)(1)(1)(1)

我要回帖

更多关于 logstash 错误日志 的文章

 

随机推荐