文章目录
环境准备
准备、logstash 、es app开发定制环境并启动服务,app开发定制参考之前的文章这里不重复
app开发定制日志读取及保存
启动zookeeper:[root@localhost kafka]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka:[root@localhost kafka]# bin/kafka-server-start.sh -daemon config/server.properties
app开发定制假设日志格式如下,app开发定制包括基本字段(时间、IP、级别等)app开发定制和业务字段(字段1,字段2,字段3)。
2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]
- 1
app开发定制消费消息并打印
app开发定制配置文件如下,从kafka获取日志并直接打印:
input { kafka { bootstrap_servers => "192.168.195.11:9092" topics => ["guardlog"] group_id => "GuardLogGroup" client_id => "guard_cli" security_protocol => "SSL" }}filter {}output { stdout { } # 控制台输出}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
启动
[root@localhost logstash]# bin/logstash -f config/logstash.conf --config.reload.automatic
- 1
向kakfa发送消息后,logstash接收并打印了消息。
消息输出到ES
启动ES[es@localhost es7.12]$ bin/elasticsearch -d
修改配置文件.output 增加 es输出如下:
output { stdout { } # 控制台输出 elasticsearch { hosts => ["192.168.195.12:9200"] index => "guardlog" }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
插入kafka消息控制台有输出
es中也有了这条消息,postman查询结果如下
日志解析
上边我们从kafka读取日志并保存到了es 中但是可以看到,日志格式是一段文本,很多时候我们需要结构化的数据方便数据查询过滤等,因此我们在将日志保存到es前要对日志做解析和转换。
logstash提供了很多插件可以用来解析日志,对于常见的apache 等日志有现成的解析插件,这里我们使用grok插件来解析基本日志格式内容,并通过正则表达式和字符串分割解析出业务字段。
可以直接调用grok内置的正则格式,来解析日志内容,如下:通过IP 来解析日志中的请求者IP":%{IP:sag_ip}"
默认提供的格式见:
groke基于正则表达式库是Oniguruma,你可以在Oniguruma网站上看到完整支持的regexp语法。
中文学习正则表达式网站:http://www.runoob.com/regexp/regexp-tutorial.html
调试工具:
观察输出内容可以看到,当日志内容中有ip时会解析出来并在日志中增加一个字段sag_ip 来保存这个值,但是如果日志中没有找到IP那么就会增加tags字段,提示groke解析失败。测试发现如果有一个表达式没有匹配到内容那么就是报错(提示解析失败)其他字段也无法正确解析。实际中可以通过是否存在tags字段来判断是否处理成功,如果未处理成功那么可以将数据单独保存到一个地方方便分析
如下,我们使用 grok自带的表达式来解析时间、IP、tag、loglevel等内容,最后一个字段是使用我们自定义的正则表达式(\[#?)(?<source>.*(?=#\]$))
来解析出 [##]中的全部内容,并保存到source字段
filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?<source>.*(?=#\]$))" } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
输入测试数据可以看到 ,数据被正常解析
{ "sag_tag" => "1", "@version" => "1", "message" => "2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]", "sag_ip" => "192.168.8.96", "log_level" => "error", "op_date" => "2021-05-21 15:28:50.093", "log_src" => "4", "@timestamp" => 2022-05-08T03:33:16.271Z, "source" => "字段1#][#字段2#][#字段3"}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
可以在souce字段的基础上根据#][# 进行字段分割就可以得到 所有的业务字段,如下使用来完成这个功能
filter { grok { pattern_definitions =>{ # MYNUM => "\d+", FIELD => "(?:\[#).*(?:#\])" SAGTAG => "\w+" } #patterns_dir =>["/root/logstash/patterns"] match => { "message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?<source>.*(?=#\]$))" } } ruby{ code =>" ipStr = event.get('source') ipArray = ipStr.split('#][#',-1) length = ipArray.length path = '' if length != 3 event.set('tags','mainMsgLengthError') else event.set('f1',ipArray[0]) event.set('f2',ipArray[1]) event.set('f3',ipArray[2]) end " }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
再次测试可以发现业务字段已经都完成了解析
[2022-05-08T11:48:23,133][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=guard_cli-0, groupId=GuardLogGroup] Setting newly assigned partitions [guardlog-0]{ "sag_tag" => "1", "@version" => "1", "f2" => "字段2", "op_date" => "2021-05-21 15:28:50.093", "source" => "字段1#][#字段2#][#字段3", "f3" => "字段3", "message" => "2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]", "sag_ip" => "192.168.8.96", "log_level" => "error", "log_src" => "4", "@timestamp" => 2022-05-08T03:48:40.473Z, "f1" => "字段1"}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
最后删除不必要的字段,同时增加输出判断 如果包含tags字段则输出到异常数据索引
filter { grok { pattern_definitions =>{ # MYNUM => "\d+", FIELD => "(?:\[#).*(?:#\])" SAGTAG => "\w+" } #patterns_dir =>["/root/logstash/patterns"] match => { "message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?<source>.*(?=#\]$))" } } ruby{ code =>" ipStr = event.get('source') elasticsearch { hosts => ["192.168.195.12:9200"] index => "guardlog" } ipArray = ipStr.split('#][#',-1) length = ipArray.length path = '' if length != 3 event.set('tags','mainMsgLengthError') else event.set('f1',ipArray[0]) event.set('f2',ipArray[1]) event.set('f3',ipArray[2]) end " } if [tags] { #如果出错了,不删除内容直接保存到 } else { #如果没有出错择删除不必要的字段 并保存到es mutate {remove_field => ["source","message","@timestamp","@version"]} } }output { stdout { } # 控制台输出 if [tags] { elasticsearch { hosts => ["192.168.195.12:9200"] index => "guarderrlog" } } else { elasticsearch { hosts => ["192.168.195.12:9200"] index => "guardlog" } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
再次测试,没有了多余的字段,同时一旦解析出错就会保存到guarderrlog索引中
[2022-05-08T11:55:47,875][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=guard_cli-0, groupId=GuardLogGroup] Setting newly assigned partitions [guardlog-0]{ "sag_tag" => "1", "f2" => "字段2", "op_date" => "2021-05-21 15:28:50.093", "f3" => "字段3", "sag_ip" => "192.168.8.96", "log_level" => "error", "log_src" => "4", "f1" => "字段1"}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
grok调试工具
上述过程中我们每次都通过运行logstash来验证我们解析的正确性,很繁琐,实际中我们有必要搞一套grok测试工具。以方便对解析过程的测试。网络上的工具有时候不稳定,可以在本地启动调试服务。
docker环境下运行一下命令就可以运行本地的测试服务,
[root@localhost ~]# docker pull qiudev/grokdebugger[root@localhost ~]# docker run -d --name grokdebugger -p 19999:9999
- 1
- 2
启动后我们访问 http://192.168.195.10:19999/ 就可以使用本地调试服务
grok自定义格式
如果grok自带的表达式不能满足需求,还可以自定义表达式,在logstash 中自定义表达式有多种方式
通过pattern_definitions 属性来定义
grok { pattern_definitions =>{ # MYNUM => "\d+", FIELD => "(?:\[#).*(?:#\])" SAGTAG => "\w+" } match => { "message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?<source>.*(?=#\]$))" }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
通过文件来定义
定义一个模式文件里边每行都是一个模式
[root@192 patterns]# pwd/root/logstashacc/patterns[root@192 patterns]# cat pattern #LOG_TIME (?>\d\d){1,2}[/-](?:0[1-9]|1[0-2])[/-](?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])MYNUM [-]\d+
- 1
- 2
- 3
- 4
- 5
然后再grok配置文件中通过patterns_dir 属性配置好模式文件所在的目录即可
grok { patterns_dir =>["/root/logstash/patterns"] match => { "message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?<source>.*(?=#\]$))" }}
- 1
- 2
- 3
- 4
- 5
- 6
直接在配置文件中使用
如上述配置中的 (\[#?)(?<source>.*(?=#\]$))
就是直接书写正则表达式
关于表达式的详细规则和语法 请参考这个不错的学习网站