查看原文
其他

一文轻松搞定ELK日志实时采集分析平台

一航 一行Java 2022-08-09

file

ELK简介

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。

此文中使用了FileBeat进行日志采集,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式零配置自动发现索引自动分片索引副本机制restful风格接口多数据源自动搜索负载等。主要负责将日志索引并存储起来,方便业务方检索查询。

Logstash 主要是用来日志的搜集分析过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。是一个日志收集、过滤、转发的中间件,主要负责将各条业务线的各类日志统一收集、过滤后,转发给 Elasticsearch 进行下一步处理。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志;最终将数据以直观的、图形化的方式展示出来

准备工作

虚拟机搭建

便于测试,环境的搭建

参考 基于VirtualBox搭建Linux(CentOS)虚拟机环境(学习必备技能)

Elasticsearch安装

参考 Elasticsearch 6.6.0 集群搭建:https://blog.lupf.cn/articles/2020/04/22/1587535463629.html

kafka安装

参考 并发(七)之分布式异步更新:Zookeeper+Kafka实现数据分布式异步更新 : https://blog.lupf.cn/articles/2020/04/17/1587096190497.html

JDK安装

以上文章中都有包含JDK的安装

ELK架构图

file

ELK流程图

file
  • 日志生产;服务通过日志框架输出的日志,Nginx产生的日志;也可以是任何形式输出的日志文件。
  • 日志抓取(filebeat);通过配置,监控抓取符合规则的日志文件,并将抓取到的每条数据发送给kafka
  • kafka;主要起到削峰填谷,ELK高可用的关键作用;当流量过大,kafka可以起到很好的缓冲作用,降低下游的压力;当下游的logstash或ES故障,也可以很好保证数据的完整性
  • logstash;消费kafka的数据,并将数据持久化到ES或者其他持久化框架
  • Elasticsearch;持久化数据,并提供分布式检索功能
  • Kibana;展示ES中的数据
  • xpack watch;监控异常
  • 通知模块;将监控到的告警通知给相关责任人

项目日志规范

为什么要规范?只有规范之后的日志,在后续的抓取、整理同步至ES以及查看都会带来很多便利 , 请参考SpringBoot日志的链路追踪 : https://lupf.cn/articles/2020/06/04/1591273110824.html ; 建议优先阅读一下这篇文章 , 后续关于日志的拦截及解析都是基于这里的日志规则进行的; 具体格式如下:

// 完整格式
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%X{requestId}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{localIp}] [%X{clientIp}] [%X{applicationName}] [%X{requestUri}] [%F,%L,%C,%M] [%m] ## '%ex'%n

// 以下是详细说明
// 其中%X打头的都是自定义的日志  需要通过DMC设置
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]  //当前的时间
[%X{requestId}]  // 本次请求的唯一ID
[%level{length=5}]  // 日志级别
[%thread-%tid]  // 线程id
[%logger]  // loger对应的class
[%X{hostName}]  // 服务部署的主机名
[%X{localIp}]  // 服务部署的主机ip
[%X{clientIp}]  //客户端请求的ip
[%X{applicationName}]  //应用名称
[%X{requestUri}]  // 调用的地址
[%F,%L,%C,%M]  // 当前日志所处的类的信息 
[%m]  // 打印的消息
## '%ex'  // 异常信息  使用单引号包裹起来是够了方便后续的logstash的
%n  // 换行

filebeat安装

软件包安装
  • 下载

    // 迅雷下载(比较的快 wget太慢了)
    https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-linux-x86_64.tar.gz
  • 上传至服务器解压

    cd /usr/local/src
    tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz

    mv filebeat-6.6.0-linux-x86_64 /usr/local/filebeat-6.6.0
    cd /usr/local/filebeat-6.6.0
  • filebeat.yml配置

    #============== Filebeat prospectors ===========
    filebeat.prospectors:
    typelog
      paths:
        - /usr/local/src/logs/log4j2-demo/all.log
    #    - /usr/local/src/logs/*/*.log
      document_type: "all-log"
      fields:
        logbiz: log4j2-demo            # 项目名称
        log_topic: all-log-log4j2-demo     # kafka的topic
        evn: dev
      multiline:
        pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
        negate: true     # 是否匹配到 
        match: after     # 没有匹配上正则合并到上一行末尾
        max_lines: 1000  # 最大未匹配上的行数
        timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入

    typelog          #定义多个输入
      paths:
        - /usr/local/src/logs/log4j2-demo/error.log
    #    - /usr/local/src/logs/*/*.log
      document_type: "err-log"
      fields:
        logbiz: log4j2-demo
        log_topic: err-log-log4j2-demo     # kafka的topic
        evn: dev
      multiline:
        pattern: '^\['   #指定多行匹配的表达式 以[开头的标识一行新的数据
        negate: true     # 是否匹配到 
        match: after     # 没有匹配上正则合并到上一行末尾
        max_lines: 1000  # 最大未匹配上的行数
        timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入

    ## 定义输出的方式
    output.kafka:
      enabled: true    # 启动
      hosts: ["192.168.1.160:9092"]    #kafka的ip和port
      topic: '%{[fields.log_topic]}'   #指定输出到的topicname 也就是上面定义的变量
      partition.hash:                  # 分区规则 hash           
        reachable_only: true
      compression: gzip                # 数据压缩
      max_message_bytes: 1000000       # 最大的消息字节数
      required_acks: 1                 # kafka ack的方式 0:丢出去就好了  1:有一个响应就好了  -1:所有节点响应才算成功
    logging.to_files: true             #
  • 测试配置

    cd /usr/local/filebeat-6.6.0
    ./filebeat -c filebeat.yml -configtest
    file
  • kafka创建topic

    cd /usr/local/kafka/
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic all-log-log4j2-demo --replication-factor 1 --partitions 1 --create
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic err-log-log4j2-demo --replication-factor 1 --partitions 1 --create
    // 查看topic的详情
    bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic  all-log-log4j2-demo --describe
    file
  • 启动filebeat

    cd /usr/local/filebeat-6.6.0
    ./filebeat 1>/dev/null 2>&1 &

    ps -ef | grep filebeat
    file
  • 产生日志数据并确定kafka是否拿到了数据

    cd /tmp/kafka-logs/err-log-log4j2-demo-0
    ll
    cd /tmp/kafka-logs/all-log-log4j2-demo-0
    ll
    // 看到文件不为空,说明filebeat生产的数据kafka已经收到
    file
Docker安装
  • 第一步,创建一个同上的filebeat.yml

  • 第二步

    docker run --name filebeat -v /你的路径/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro -v /日志所在的目录:/日志所在的目录 -d elastic/filebeat:6.8.9
    如 docker run --name filebeat -v /var/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro -v /usr/local/src/logs:/usr/local/src/logs -d elastic/filebeat:6.8.9
docker compose安装
  • 创建docker compose文件

    # Use root/example as user/password credentials
    version: '3.1'

    services:
      filebeat:
        image: elastic/filebeat:6.8.9
        user: root
        container_name: filebeat-6.8.9
        restart: always
        volumes:
          - /var/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
          - /usr/local/src/logs:/usr/local/src/logs
  • 启动

    docker-compose up -d

logstash

  • 下载

    // 迅雷下载
    https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz

    // 上传至任意主机 /usr/local/src 目录下
  • 解压

    cd /usr/local/src
    tar -zxvf logstash-6.6.0.tar.gz
    mv logstash-6.6.0 /usr/local/logstash-6.6.0
  • 添加配置

    mkdir /usr/local/logstash-6.6.0/script
    vim logstash-script.conf

    # 添加以下配置
    # 输入从kafka
    input {
      kafka {
        topics_pattern => "all-log-.*"         # 也可以模糊匹配 如:all-log.*
        bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
        codec => json                               # 数据格式
        consumer_threads => 1                       # 对应partition的数量
        decorate_events => true
        #auto_offset_rest => "latest"               # 默认值就是这个
        group_id => "all-logs-group"                # kafka的消费组
      }

      kafka {
        topics_pattern => "err-log-.*"               # 也可以模糊匹配 如:err-log-*   这样就可以匹配到 err-log-product err-log-user
        bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
        codec => json                               # 数据格式
        consumer_threads => 1                       # 对应partition的数量
        decorate_events => true
        #auto_offset_rest => "latest"               # 默认值就是这个
        group_id => "err-logs-group"                # kafka的消费组
      }
    }

    filter {
      ruby {
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
      }

      ## 这里匹配的是filebeat中的fields.log_topic字段
      if "all-log" in [fields][log_topic] {
        grok {
          ## 表达式
          ## 这里的表达式的定义需要和真实的lo4j2中定义的规则保持一致;否则将无法匹配上
          match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{NOTSPACE:requestId}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:logger}\] \[%{DATA:hostName}\] \[%{DATA:localIp}\] \[%{DATA:clientIp}\] \[%{DATA:applicationName}\] \[%{DATA:requestUrl}\] \[%{DATA:location}\] \[%{DATA:loginfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }

      if "err-log" in [fields][log_topic] {
        grok {
          ## 表达式
          match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{NOTSPACE:requestId}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:logger}\] \[%{DATA:hostName}\] \[%{DATA:localIp}\] \[%{DATA:clientIp}\] \[%{DATA:applicationName}\] \[%{DATA:requestUrl}\] \[%{DATA:location}\] \[%{DATA:loginfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
      }
    }

    ## 输出方式
    ## 到控制台 当测试无误之后,可以将此输出关闭
    output {
      stdout { codec => rubydebug }
    }

    output {
      if "all-log" in [fields][log_topic] {
        elasticsearch {
          hosts => ["192.168.1.170:9200"]        # es的ip 端口
          # 用户名 密码
          # user => "admin"
          # password => "admin"

          # 索引名称(index)
          # 索引格式:all-log-应用名称-按天分组
          # [fields][logbiz]为filebeat中定义的变量
          index => "all-log-%{[fields][logbiz]}-%{index_time}"
          # 是否嗅探集群
          # 通过嗅探机制解析es集群的负载均衡发送日志
          sniffing => true 

          # logstash默认自带一个mapping模版,进行模版覆盖
          template_overwrite => true
        }
      }

      if "err-log" in [fields][log_topic] {
        elasticsearch {
          hosts => ["192.168.1.170:9200"]        # es的ip 端口
          # 用户名 密码
          # user => "admin"
          # password => "admin"

          # 索引名称(index)
          # 索引格式:all-log-应用名称-按天分组
          # [fields][logbiz]为filebeat中定义的变量
          index => "err-log-%{[fields][logbiz]}-%{index_time}"
          # 是否嗅探集群
          # 通过嗅探机制解析es集群的负载均衡发送日志
          sniffing => true

          # logstash默认自带一个mapping模版,进行模版覆盖
          template_overwrite => true
        }
      }
    }
  • 启动

    cd /usr/local/logstash-6.6.0
    // 前台启动 方便观察数据及日志
    ./bin/logstash -f ./script/logstash-script.conf &

    // 后台启动
    nohup ./bin/logstash -f ./script/logstash-script.conf 1>/dev/null 2>&1 &
    // jps查看是否有Logstash的进程
    jps
  • logstash的输出

    // 以下是产生的日志数据
    [2020-04-22T21:35:28.164+08:00] [4562f064-a59f-420d-8673-f9810ef1361c] [INFO] [http-nio-8080-exec-3-21] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''
    // logstash 接收到kafka的消息并输出的对象
    {
                    "log" => {
            "file" => {
                "path" => "/usr/local/src/logs/log4j2-demo/all.log"
            }
        },
               "@version" => "1",
        "currentDataTime" => "2020-04-23T13:51:58.396+08:00",
                  "input" => {
            "type" => "log"
        },
                   "host" => {
            "name" => "elasticsearch0000"
        },
                   "beat" => {
                "name" => "elasticsearch0000",
            "hostname" => "elasticsearch0000",
             "version" => "6.6.0"
        },
                 "source" => "/usr/local/src/logs/log4j2-demo/all.log",
             "index_time" => "2020.04.23",
             "requestUrl" => "/info",
                 "offset" => 69870,
                "message" => "[2020-04-23T13:51:58.396+08:00] [54452aa5-8a5c-4518-8e43-be347c55ff09] [INFO] [http-nio-8080-exec-7-25] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''",
        "applicationName" => "log4j2-demo",
                 "logger" => "com.lupf.log4j2demo.controller.Log4j2Controller",
             "prospector" => {
            "type" => "log"
        },
                "loginfo" => "我是info日志!!!",
              "thread-id" => "http-nio-8080-exec-7-25",
             "@timestamp" => 2020-04-23T05:51:59.763Z,
               "clientIp" => "192.168.1.82",
                  "level" => "INFO",
               "location" => "Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info",
               "hostName" => "elasticsearch0000",
                 "fields" => {
            "log_topic" => "all-log-log4j2-demo",
               "logbiz" => "log4j2-demo",
                  "evn" => "dev"
        },
              "requestId" => "54452aa5-8a5c-4518-8e43-be347c55ff09",
                "localIp" => "192.168.1.170"
    }
  • 查看kafka的消费情况

    cd /usr/local/kafka/
    bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group err-logs-group
    bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group all-logs-group
    file
logstash输入、过滤器,输出测试

当如果上面的 logstash配置导致输入、过滤器、输出的不对,就可以通过以下的方式进行配置测试,快速定位问题或者配置匹配调整;

  • 定义一个手动输入的配置文件

    cd /usr/local/logstash-6.6.0
    vim script/test.conf

    // 加入以下配置
    # 手动输入
    input {
      stdin {
      }
    }

    # 过滤器
    filter{
      grok{
        # 测试匹配IP
        match => {"message" => "%{IPV4:ip}"}
      }
    }

    # 输出
    output {
      stdout {
      }
    }
  • 通过以上的配置文件启动logstash

    cd /usr/local/logstash-6.6.0
    ./bin/logstash -f ./script/test.conf

    // 然后直接输入想要测试的文本,回车就可以看见结果;如:
    192.168.1.123 aabbcc 11223344
    file

Kibana日志信息查看

  • 进入kibana

    // kibana监听的是5601端口
    http://elasticsearch:5601
  • 优先查看索引是否是正常的

    GET /_cat/indices?v
    file
  • 配置索引管理

  • 日志查看

到此!一个从0搭建的ELK技术栈即完成!!!

- END -


推荐阅读

手摸手一小时从0搭建专属个人博客(视频教程)

SpringBoot!你的请求、响应、异常规范了吗?(文末红包)

开发人员常用工具最全锦集(持续更新)

jvm运行时数据区内存结构



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存