提到ELK,就不得不提到EFK,通常意义上说,EFK是指用filebeat代替logstash形成的新组合。(哈,也有是指Fluentd的,这个我们之后再说)
  Filebeat 是基于原先 logstash-forwarder 的源码改造出来的,无需依赖 Java 环境就能运行,安装包10M不到。
  而且如果日志的量很大,Logstash 会遇到资源占用高的问题,为解决这个问题,我们引入了Filebeat。Filebeat 是基于 logstash-forwarder 的源码改造而成,用 Golang 编写,无需依赖 Java 环境,效率高,占用内存和 CPU 比较少,非常适合作为 Agent 跑在服务器上,来实现日志转发的功能。

  还是去官网下载https://www.elastic.co/cn/downloads/beats/filebeat。本次演示还是以最新版的filebeat7.5.1为例(以前版本的filebeat配置文件格式参数上可能有一些改变,不过大同小异)。

cd /usr/local/src/
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.1-amd64.deb
dpkg -i filebeat-7.5.1-amd64.deb

基础配置

  配置文件也很简单,如果想要写入文件,则配置如下

grep -v "#" /etc/filebeat/filebeat.yml | grep -v "^$"
filebeat.inputs:
- type: log
  paths:
    - /var/log/syslog
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
  tags: "syslog-filebeat"
output.file: 
  path: "/tmp"
  filename: "filebeat.txt"

  paths路径支持正则通配符写法,exclude是设置不匹配的文件格式。而且filebeat也支持同时从多个路径收集写成如下配置

filebeat.inputs:
- type: log
  paths:
    - /var/log/syslog
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
  tags: "syslog-filebeat"
- type: log
  paths:
    - /var/log/nginx/access.log
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
  document_type: "nginx-accesslog-filebeat
output.file: 
  path: "/tmp"
  filename: "filebeat.txt"

  同样,filebeat支持写入redis和kafka

filebeat.inputs:
- type: log
  paths:
    - /var/log/syslog
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
    tags: "filebeat-redis-syslog"
output.redis:
  hosts: ["192.168.32.31:6379"]
  key: "filebeat-system-log" #为了后期日志处理,建议自定义 key 名称
  db: 1 #使用第几个库
  timeout: 5 #超时时间
  password: 123456 #redis 密码

  想要写入kafka则添加output插件,配置如下

filebeat.inputs:
- type: log
  paths:
    - /var/log/syslog
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
  tags: "filebeat-kafka-syslog"
output.kafka: #写入 kafka
  hosts: ["192.168.15.11:9092","192.168.15.12:9092","192.168.15.13:9092"]
  topic: "systemlog-1512-filebeat"
  partition.round_robin:
    reachable_only: true
  required_acks: 1 #本地写入完成
  compression: gzip #开启压缩
  max_message_bytes: 1000000 #消息最大值

配置详解

input

  也就是设置日志收集的来源,需要的属性有type,path,根据官方文档,现在版本常用写法为,

filebeat.inputs:
- type: log
  paths:
    - /var/log/system.log
    - /var/log/wifi.log
- type: log
  paths:
    - "/var/log/apache2/*"
  fields:
    apache: true
  fields_under_root: true

  其中type的类型很多

  • Log 日志文件,必须有PATH,官方示例如下:
filebeat.inputs:
- type: log 
  paths:
    - /var/log/system.log
    - /var/log/wifi.log
    - /var/log/*.log
- type: log 
  paths:
    - "/var/log/apache2/*"
  fields:
    apache: true
  fields_under_root: true
  • Stdin 标准输入,没有PATH,官方示例如下:
filebeat.inputs:
- type: stdin
  • Container 容器中日志,必须有PATH,官方示例如下:
filebeat.inputs:
- type: container
  paths: 
    - '/var/lib/docker/containers/*/*.log'
  • Kafka 从kafka中读取数据,官方示例如下:
filebeat.inputs:
- type: kafka
  hosts: ["<your event hub namespace>.servicebus.windows.net:9093"]
  topics: ["<your event hub instance>"]
  group_id: "<your consumer group>"

  username: "$ConnectionString"
  password: "<your connection string>"
  ssl.enabled: true
  • Redis 从redis中读取数据,官方示例如下:
filebeat.inputs:
- type: redis
  hosts: ["localhost:6379"]
  password: "${redis_pwd}"
  • UDP 开放UDP端口来接受数据,可设置单条最大数据上限,不定义默认为20MiB。
filebeat.inputs:
- type: udp
  max_message_size: 10KiB
  host: "localhost:8080"
  • Docker 也支持直接从容器中读取数据, containers.ids是必须定义说明,可以用*代表所有容器。
filebeat.inputs:
- type: docker
  containers.ids: 
    - '8b6fe7dc9e067b58476dc57d6986dd96d7100430c5de3b109a99cd56ac655347'
  • TCP 用法与UDP相同,设置监听的主机和端口。
filebeat.inputs:
- type: tcp
  max_message_size: 10MiB
  host: "localhost:9000"
  • Syslog 监听系统日志,指定传输协议即可。类似TCP和UDP。
filebeat.inputs:
- type: syslog
  protocol.udp:
    host: "localhost:9000"
filebeat.inputs:
- type: syslog
  protocol.tcp:
    host: "localhost:9000"
  • s3 AWS的对象存储日志,不过多介绍
  • NetFlow Cisco设备网络设备的日志,不过多介绍
  • Google Pub/Sub google云的订阅发布模式协议数据,不过多介绍。

  一般来说,我们都写log,就可以满足我们绝大多数场景的使用了。除了typepath这俩常用的input属性外,还有两个设置属性,我们也经常会用到,就是include_linesexclude_lines,顾名思义,就是包括和排除,配合path中的通配符,可以帮助我们更灵活的指定要收集的日志文件。
  还有一个很常用的属性就是tags,可以写多个,用[]括起来就可以。因为在filebeat中因为有自带type关键字,所以我们在之后筛选日志的时候,无法通过type字段来区分不同的日志源了,所以我们可以通过自定义tags字段,来实现之前在logstash上type的功能,这样在我们收集到的日志中,会自动加入tags 标签属性,然后通过logstash的筛选时,就可以对tags关键字做判断了。

output

  输出选项有ElasticsearchLogstashKafkaRedisFileConsoleElastic Cloud

  • File
    输出到文件中是最简单的设置了,一般用于测试。
    output.file:
    path: "/tmp/filebeat"     #输出文件路径
    filename: filebeat        #输出日志名称,超过大小限制后会自动添加数字后缀
    #rotate_every_kb: 10000   #每个日志文件大小限制
    #number_of_files: 7   #路径下最大的储存日志文件数量,超过此值后自动删除最早的日志文件,默认为7。
    #permissions: 0600    #创建的日志文件的权限
  • Logstash
    filebeat支持直接将数据输出值logstash主机。
    output.logstash:
    hosts: ["127.0.0.1:5044"]
      而logstash主机需要设置输入为beats,才可以顺利接收filebeat的数据。
    input {
    beats {
      port => 5044
    }
    }
    

output {
elasticsearch {
hosts => [“http://localhost:9200"]
index => “%{[@metadata][beat]}-%{[@metadata][version]}”
}
}

- redis
输出值redis,上面有说明,这里就不详细介绍了。

output.redis:
hosts: [“localhost”]
password: “my_password”
key: “filebeat”
db: 0
timeout: 5

- kafka
输出至kafka。

output.kafka:

initial brokers for reading cluster metadata

hosts: [“kafka1:9092”, “kafka2:9092”, “kafka3:9092”]

message topic selection + partitioning

topic: ‘%{[fields.log_topic]}’
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

&emsp;&emsp;也可以输出至kafka的不同的topic中

>Rule settings:
> topic
>    The topic format string to use. If this string contains field references, such as %{[fields.name]}, the fields must exist, or the rule fails. 
>mappings
>    A dictionary that takes the value returned by topic and maps it to a new >name. 
>default
>    The default string value to use if mappings does not find a match. 
>when
>    A condition that must succeed in order to execute the current rule. All the conditions supported by processors are also supported here. 

&emsp;&emsp;官方示例如下:

output.kafka:
hosts: [“localhost:9092”]
topic: “logs-%{[beat.version]}”
topics:
- topic: “critical-%{[beat.version]}”
when.contains:
message: “CRITICAL”
- topic: “error-%{[beat.version]}”
when.contains:
message: “ERR”

- Elaticsearch
可以直接将数据输出给elaticsearch服务器,不过一般来说我们不会这样做,一般是会经过logstash来筛选之后再传入elaticsearch。官方示例如下:

output.elasticsearch:
hosts: [“https://localhost:9200"]
username: “filebeat_internal”
password: “YOUR_PASSWORD”


- Console
输出至屏幕终端显示。pretty官方的介绍为``If pretty is set to true, events written to stdout will be nicely formatted. The default is false``,示例如下:

output.console:
pretty: true


一个低调的男人