之前我们部署好了ELK的基本架构,也实现了从系统日志以及nginx中收集日志,不过等待我们的问题依然很多:怎么讲收集好的日志放至临时缓存?或者怎么从缓存中提取日志?对于java应用等日志非单行的服务日志该如何收集等等。本文将继续讲解ELK的各种进阶用法。

收集tomcat日志

  收集tomcat中的日志比较简单,跟nginx一样,将日志序列化为json格式即可。
  修改tomcat配置文件,将日志格式修改为如下格式。

vim /usr/local/tomcat/conf/server.xml
        <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}" />

  此时查看新生成的访问日志,即可看到新生成的日志已经成json格式了。

tail -f /usr/local/tomcat/logs/localhost_access_log.2020-01-03.txt
{"clientip":"192.168.32.1","ClientUser":"-","authenticated":"-","AccessTime":"[03/Jan/2020:19:34:29 +0800]","method":"GET /bg-button.png HTTP/1.1","status":"304","SendBytes":"-","Query?string":"","partner":"http://192.168.32.51:8080/tomcat.css","AgentVersion":"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0"}

  不过为了以防万一,可以去网上使用在线json格式校验工具,检查一下格式是否正确。
JSON格式校验
  此时就可以使用logstash工具来收集我们的tomcat日志了。

vim /etc/logstash/conf.d/tomcat-el.conf

  不过,想必你们一定也发现了,tomcat访问日志中是带有日期格式的,每天的访问日志文件名是不同的,这要怎么写到path中呢?
  哈,别慌,logstash的input-file插件也支持通配符的写法,我们可以写path => "/usr/local/tomcat/logs/localhost_access_log.*.txt",如下所示。

input {
  file {
    path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
    stat_interval => 3
    start_position => "beginning"
    codec => "json"
    type => "tomcat_accesslog"
  }
}

output {
#  stdout {
#    codec => rubydebug
#  }
  if [type] == "tomcat_accesslog" {
  elasticsearch {
    hosts => ["192.168.32.41:9200"]
    index => "tomcat_accesslog-%{+YYYY.MM.dd}"
  }}
}

  PS:可以分开至不同的配置文件,也可以合并至一个文件中,不过我们习惯不同服务用单独的配置文件来抓取,方便修改。
  重启logstash服务,然后去kibana中创建tomcat_accesslog-*的索引,就可以看到tomcat的访问日志了。

收集JAVA服务的日志

  JAVA服务中的日志如果不报错还好,还可以是一行一条,但是一旦出现报错信息,一般都是小半屏幕N多行的报错信息,而我们收集日志的很重要的目的就是为了查看这些报错信息,而如果不对这些日志进行处理,还按照一行一条来收集的话,当我们查看这个日志的时候就会很崩溃了——这乱的几乎完全没法看。所以我们要想办法将JAVA服务的日志处理成一行。
  刚好强大的logstash也支持换行匹配,我们刚好就以logstash服务本身日志为例

vim /etc/logstash/conf.d/java-el.conf
input {
  file {
    path => "/logstash/logs/logstash-plain.log"
    stat_interval => 3
    start_position => "beginning"
    type => "java_accesslog"    
    codec => multiline {
    pattern => "^\[" #当遇到[开头的行时候将多行进行合并
    negate => true #true 为匹配成功进行操作, false 为不成功进行操作
    what => "previous" #与以前的行合并,如果是下面的行合并就是 next
  }}
}
filter { #日志过滤,如果所有的日志都过滤就写这里,如果只针对某一个过滤就写在 input 里面的日志输入里面
}
output {
  if [type] == "java_accesslog" {
  elasticsearch {
    hosts => ["192.168.32.41:9200"]
    index => "java_accesslog-%{+YYYY.MM.dd}"
  }}
}

  重启logstash服务,然后去kibana中创建java_accesslog-*的索引,就可以看到java的访问日志了。

redis

  从redis读取日志和将收集到的日志储存至redis缓存中,其实使用了input的redis插件和output的redis插件来实现。官方文档地址为: https://www.elastic.co/guide/en/logstash/current/plugins-outputsredis.html

写入redis

  我们还以tomcat日志及java日志logstash日志本身为例,写入redis缓存中。
  先配置好redis服务器如下:

IP:192.168.32.31
PORT:6379
auth:123456
input {
  file {
    path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
    stat_interval => 3
    start_position => "beginning"
    codec => "json"
    type => "tomcat_redis_accesslog"
  }
  file {
    path => "/logstash/logs/logstash-plain.log"
    stat_interval => 3
    start_position => "beginning"
    type => "java_redis_accesslog"    
    codec => multiline {
    pattern => "^\[" #当遇到[开头的行时候将多行进行合并
    negate => true #true 为匹配成功进行操作, false 为不成功进行操作
    what => "previous" #与以前的行合并,如果是下面的行合并就是 next
  }}
}

output {
  if [type] == "tomcat_redis_accesslog" {
    redis {
      data_type => "list"
      key => "tomcat_redis_accesslog"
      host => "192.168.32.31"
      port => "6379"
      db => "0"
      password => "123456"
    }}
    if [type] == "java_redis_accesslog" {
    redis {
      data_type => "list"
      key => "java_redis_accesslog"
      host => "192.168.32.31"
      port => "6379"
      db => "1"
      password => "123456"
    }}
}

  将日志数据以列表的形式储存在redis中,多个不同的日志,储存在不同数据库中。

读取redis

  与写入用法大致相同,可以说是怎么写进去的就怎么读出来。形式如下

input {
  redis {
    data_type => "list"
    key => "tomcat_redis_accesslog"
    host => "192.168.32.31"
    port => "6379"
    db => "0"
    password => "123456"
    codec => "json"
  }}
  redis {
    data_type => "list"
    key => "java_redis_accesslog"
    host => "192.168.32.31"
    port => "6379"
    db => "1"
    password => "123456"
    codec => "json"
  }}
}
output {
  if [type] == "tomat_redis_accesslog" {
  elasticsearch {
    hosts => ["192.168.32.41:9200"]
    index => "tomcat_redis_accesslog-%{+YYYY.MM.dd}"
  }}
  if [type] == "java_redis_accesslog" {
  elasticsearch {
    hosts => ["192.168.32.41:9200"]
    index => "java_redis_accesslog-%{+YYYY.MM.dd}"
  }}
}

  不过需要注意的是input中记得加codec => "json",否则无法解析正确各项属性,不方便我们查看和统计日志。而且不知道大家有没有发现,当我们从redis中取出数据的时候,我们的input选项中没有像往常那样写上type => "tomcat_redis_accesslog",而是直接在output中做了type的判断。这是因为,redis中的数据是在写入的时候,已经附加了type属性,它在redis中储存时还是会保留type属性的,所以取出来的时候,还是按照之前写入时的type类型取出即可。

kafka

  在很多大型互联网公司,都喜欢使用kafka来作为缓存层,因为redis虽然效率很高,但数据不如kafka可靠,kafka更适合大数据量场景使用。这就要视业务实际情况而定了。不过使用kafa来作为中间缓冲区的企业还是大有人在的。
  我们可以新开启一个kafka主机,IP为192.168.32.36,使用默认端口9092。kafak的使用方法与redis大致相同。配置文件示例如下

input {
  file {
    path => "/apps/nginx/logs/access_json.log"
    stat_interval => 3
    start_position => "beginning"
    codec => "json"
    type => "nginx_kafka_accesslog"
  }
}
output {
  if [type] == "nginx_kafka_accesslog" {
  kafka {
    bootstrap_servers => "192.168.32.36:9092" #kafka 服务器地址
    topic_id => "nginx_kafka_accesslog"
    codec => "json"
  } }
}

  与写入redis有些区别的就是,kafka中不是key了,而是topic_id。所以读取时也有一些区别,填写的关键字为topics,而且同样做判断时,type是当初写进去的那个属性。

input {
  kafka {
    bootstrap_servers => "192.168.32.36:9092"
    topics => "nginx_kafka_accesslog"
    codec => "json"
    consumer_threads => 1  #线程数,默认值为1,一般设置与分区数量相同
    #decorate_events => true #不写默认是关闭的,开启此属性可以将当前topic、offset、group、partition等信息也带到message中
  }
}
output {
  if [type] == "nginx_kafka_accesslog" {
  elasticsearch {
    hosts => ["192.168.32.41:9200"]
    index => "nginx_kafka_accesslog-%{+YYYY.MM.dd}"
  }}
}

  kafka也支持多文件同时写入,设置不同的topic_id就可以了。


一个低调的男人