Filebeat + ELK + Kafka 日志系统
Filebeat + ELK + Kafka 日志系统
1. 环境简介
因为很多日志需要采集,那么根据每个日志文件不同的格式来采集不同的日志文件,此处仅仅讲解nginx的日志(大可不必把nginx放到es中,因为很少会去es里面拿nginx的日志来分析一些请求有效或者安全性,亦或者简单的服务请求次数,那都是直接统计的,直接lua+nginx+redis统计就行,或者自己定好了格式,直接从kafka中获取,此处仅仅是简单,演示一波而已,下期会补充简单的springboot的日志收集);
为什么不直接用elk呢?因为logstash这玩意有点吃资源,所以呢,用一个小版本的beat先直接获取重要信息,然后用logstash自己过滤下就行,避免影响正在跑的服务
那为什么用kafka呢?因为一个异步队列的出现,可以解决很多问题,削峰!解耦!
(还有其他,但是这两个实在是 泰裤辣)
如果需要搭建这一套服务呢,需要的中间件很多(jdk用jdk8就行,这个自行安装,不做记录):
中间件 | 版本/链接 |
---|---|
filebeat | filebeat-8.7.1-x86_64.rpm |
es | docker pull docker.elastic.co/elasticsearch/elasticsearch:7.13.4 |
kibana | docker pull kibana:7.13.4 |
Logstash | logstash-8.7.1-x86_64.rpm |
zk | apache-zookeeper-3.7.1.tar.gz |
kafka | kafka_2.11-2.4.1.tgz(自己去官网下一个吧,这个是本人经常用的 ,懒得打开网页) |
机器简介:
比较乱哈,但是本人基本都是本地测试,切换成集群也不麻烦。本人都是按照hostname来区分的
可以使用
hostnamectl set-hostname
你想要的名字 执行完这个命令之后reboot
就能看到了其实只需要修改下hosts就行了,直接一键编辑多台机器
vim /etc/hosts
192.199.56.200 zk01 192.199.56.200 zk02 192.199.56.200 zk03 192.199.56.200 kafka01 192.199.56.200 kafka02 192.199.56.200 kafka03 192.199.56.130 es01 192.199.56.150 hadoop01
机器 | 中间件 |
---|---|
es01 | es+kibana |
mid (192.199.56.200) | kafka(单机集群)+zk(单机集群)+logstash |
hadoop01 | nginx+filebeat |
2. zk单机集群搭建
2.1. 下载
官网下载 zookeeper.apache.org
2.2. 安装
# 新建目录
mkdir -p /opt/zk
# 进入目录
cd /opt/zk
# 拷贝或者 wget下载zookeeper的包
# 修改文件名为zk01 并拷贝两份 分别为 zk02 zk03
# 进入解压目录新建文件夹
mkdir logs data
# 进入对应目录 conf 直接执行命令 vim zoo.cfg 依次复制下面代码
2.3 配置 zoo.cfg
# zk1的zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk/zk01/data
dataLogDir=/opt/zk/zk01/logs
clientPort=2181
server.1=zk01:2881:3881
server.2=zk02:2882:3882
server.3=zk03:2883:3883
# zk2的zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk/zk02/data
dataLogDir=/opt/zk/zk02/logs
clientPort=2182
server.1=zk01:2881:3881
server.2=zk02:2882:3882
server.3=zk03:2883:3883
# zk3的zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk/zk03/data
dataLogDir=/opt/zk/zk03/logs
clientPort=2183
server.1=zk01:2881:3881
server.2=zk02:2882:3882
server.3=zk03:2883:3883
2.4 配置myid文件
在各自的data文件夹中添加myid文件 分别为 1 2 3
2.5. 启动
# 进入对应的目录
./zkServ./bin/zkServer.sh start
2.6. 查看状态
# 进入对应的目录 展示处当前Mode的状态即可
./bin/zkServer.sh status
2.7 工具查看
集群填写方式就是逗号隔开
3. kafka单机集群搭建
搭建完毕zk之后呢,就可以直接搭建kafka集群了(我用的版本比较老,新版本有自己的部署方式,也可以用docker)
3.1 下载
官网下载好tar包
3.2 安装与配置、启动
cd /opt
mkdir -p /opt/kafka
tar -zxf kafka_2.11-2.4.1.tgz -C /opt/kafka
# 进入目录
cd kafka
# 创建目录
mkdir kafka01 kafka02 kafka03
# 拷贝多份
cp -r kafka_2.11-2.4.1/* kafka01/
cp -r kafka_2.11-2.4.1/* kafka02/
cp -r kafka_2.11-2.4.1/* kafka03/
# 新建日志目录
mkdir -p /opt/kafka/kafka01/logs
mkdir -p /opt/kafka/logs/zklog01
mkdir -p /opt/kafka/kafka02/logs
mkdir -p /opt/kafka/logs/zklog02
mkdir -p /opt/kafka/kafka03/logs
mkdir -p /opt/kafka/logs/zklog03
# 修改配置文件 以kafka03为例,其他除了ip和端口,还有borker.id 其他都是一样的
rm -rf /opt/kafka/kafka03/config/server.properties
vim /opt/kafka/kafka03/config/server.properties
# 内容如下
#advertised_listeners是对外暴露的服务端口,真正建立连接用的是`listeners`,集群要改为自己的iP
# listeners=PLAINTEXT://kafka01:9092
# listeners=PLAINTEXT://kafka02:9093
listeners=PLAINTEXT://kafka03:9094
#broker 集群中的全局唯一编号,不能重复
# broker.id=0
# broker.id=1
broker.id=2
#删除 topic 功能
delete.topic.enable=true
#自动创建topic,false:生产者发送信息到已存在topic才没有报错
auto.create.topics.enable=false
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#修改kafka的日志目录和zookeeper数据目录,因为这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失
log.dirs=/opt/kafka/kafka03/logs
#topic 在当前 broker 上的分区个数.分区数量一般与broker保持一致
num.partitions=3
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址,新版自带Zookeeper
zookeeper.connect=zk01:2181,zk02:2182,zk03:2183/kafka
# 处理好之后 需要每个都启动下
cd /opt/kafka/logs/zklog01
nohup /opt/kafka/kafka01/bin/kafka-server-start.sh /opt/kafka/kafka01/config/server.properties > /opt/kafka/logs/zklog01/kafka.log 2>1 &
cd /opt/kafka/logs/zklog02
nohup /opt/kafka/kafka02/bin/kafka-server-start.sh /opt/kafka/kafka02/config/server.properties > /opt/kafka/logs/zklog02/kafka.log 2>1 &
cd /opt/kafka/logs/zklog03
nohup /opt/kafka/kafka03/bin/kafka-server-start.sh /opt/kafka/kafka03/config/server.properties > /opt/kafka/logs/zklog03/kafka.log 2>1 &
# 通过各个日志来查看启动的情况
3.3 查看注册情况
看上面的zookeepr的那张图里面有没有一个是kafka的文件夹 如果有,说明注册上了
3.4 工具查看
本人用的比较多的就是 kafkaTools了 公司有这部分架构的都知道在云上看,我这边顶多是个记录,有些项目很蛋疼,需要自己搭建,自己记录下,免得以后再百度或者ai了(吐槽:现在啥破ai,训练半天才给我写一个垃圾c++的ConcurrentHashMap)
3.5 创建topic,因为这边没有设置自动创建topic
# 创建topic
/opt/kafka/kafka01/bin/kafka-topics.sh --create -zookeeper zk01:2181,zk02:2182,zk03:2183/kafka --topic nginx-access --partitions 3 --replication-factor 2
# 查看topic
/opt/kafka/kafka01/bin/kafka-topics.sh --list --zookeeper zk01:2181,zk02:2182,zk03:2183/kafka
4. es、kibana搭建
超级简单不写步骤了,就docker命令
# docker-es搭建 --- start
# 创建目录和配置
mkdir -p /data/es
cd /data/es
mkdir config data logs
cd config
vim elasticsearch.yml
#内容start
cluster.name: "es01"
network.host: 0.0.0.0
http.port: 9200
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
#内容end
# 运行
docker run -it -d -p 9200:9200 -p 9300:9300 --name es -e ES_JAVA_OPTS="-Xms84m -Xmx512m" -e "discovery.type=single-node" --restart=always -v /data/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /data/es/data:/usr/share/elasticsearch/data -v /data/es/logs:/usr/share/elasticsearch/logs docker.elastic.co/elasticsearch/elasticsearch:7.13.4
# 如果想要设置密码的也可以
docker exec -it es /bin/bash
./bin/elasticsearch-setup-passwords interactive
# 输入 y 然后各个点需要的账号密码就行
exit
docker restart es
# 验证,打开网页,es01:9200 输入账号密码最后能显示正常就行
# docker-es搭建 --- end
# docker-kibana --- start
mkdir -p /data/kibana
vim /data/kibana/kibana.yml
#内容如下
server.name: "kibana"
server.host: "0.0.0.0"
server.port: 5601
# es的链接,多个就是数组,然后数组里面逗号分隔
elasticsearch.hosts: ["http://192.199.56.130:9200"]
#xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.requestTimeout: 120000
# 中文显示
i18n.locale: "zh-CN"
elasticsearch.username: "自己设置的名称"
elasticsearch.password: "自己设置的密码"
# 运行
docker run -d --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kibana -p 5601:5601 -v /data/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.13.4
# 查看日志
docker logs -f kibana
# 最后在网页上输入es01:5601 查看是否打开了,没有打开就查看日志,如果是为准备,那么就等等,如果3分钟没有打开,肯定有问题,看看日志
# docker-kibana --- end
5. nginx安装
一样很简单,有点时间搞一搞 自己去官网下载 nginx.org/download/ng…
进入对应的机器的/opt/目录
,下载完毕之后放到opt
目录中
cd /opt
mkdir nginx
tar -zxf nginx-1.18.0.tar.gz -C /opt/nginx
# 安装依赖
yum install -y gcc gcc-devel gcc-c++ gcc-c++-devel make kernel kernel-devel
# 安装nginx
cd /opt/nginx/nginx-1.18.0
./configure
make && make install
# 进入/usr/local/nginx/conf 修改nginx.conf文件
vim /usr/local/nginx/conf/nginx.conf
# 修改内容如下:(主要内容) 修改完毕之后保存退出
...
http {
...
log_format main '{"时间":"$time_iso8601",'
'"客户端外网地址":"$http_x_forwarded_for",'
'"客户端内网地址":"$remote_addr",'
'"状态码":$status,'
'"传输流量":$body_bytes_sent,'
'"跳转来源":"$http_referer",'
'"URL":"$request",'
'"浏览器":"$http_user_agent",'
'"请求响应时间":$request_time,'
'"后端地址":"$upstream_addr"}';
access_log /usr/local/nginx/logs/access.log main;
...
}
...
# 启动nginx
/usr/local/nginx/sbin/nginx
# 重启命令
/usr/local/nginx/sbin/nginx -s reload
# 验证 访问地址,然后是否返回内容
curl 127.0.0.1
6. filebeat安装
简直了 早知道这么简单 我早用了
6.1 下载
下载地址头文件上面有
6.2 安装
rpm -ivh filebeat-8.7.1-x86_64.rpm
6.3 修改配置
vim /etc/filebeat/filebeat.yml
# 内容如下
filebeat.inputs:
- type: log #类型为log
enabled: true
paths: #指定日志所在的路径
- /usr/local/nginx/logs/access.log
json.keys_under_root: true #支持json格式的日志输出
json.overwrite_keys: true
fields: #在日志中增加一个字段,字段为log_topic,值为nginx_access,logstash根据带有这个字段>的日志存储到指定的es索引库
log_topic: nginx-access
tail_files: true #开启日志监控,从日志的最后一行开始收集
output.kafka: #输出到kafka系统
enabled: true
hosts: ["192.199.56.200:9092","192.199.56.200:9093","192.199.56.200:9094"] #kafka的地址
topic: '%{[fields][log_topic]}' #指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields>,通过这种方式可以将不同路径的日志分别存储到不同的topic中
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
6.4 运行
systemctl start filebeat
# 查看状态
systemctl status filebeat -l
6.5 检查
检测基本都是查看日志,如果你访问nginx,然后nginx打印日志了,在filebeat的日志中也能看
systemctl status filebeat -l
,不过有更好的选择,就是用工具
7. Logstash安装
7.1 下载
文件顶部有
7.2 安装、配置、运行、查看日志
# 老规矩 进入 /opt目录
cd /opt
# 安装
rpm -ivh logstash-8.7.1-x86_64.rpm
# 配置
vim /etc/logstash/conf.d/in_kafka_to_es.conf
# 内容如下
#从kafka中读取日志数据
input { #数据源端
kafka { #类型为kafka
bootstrap_servers => ["kafka01:9092,kafka02:9093,kafka03:9094"] #kafka集群地址
topics => ["nginx-access"] #要读取那些kafka topics
codec => "json" #处理json格式的数据
auto_offset_reset => "latest" #只消费最新的kafka数据
}
}
#处理数据,去掉没用的字段
filter {
if[fields][log_topic] == "nginx-access" { #如果log_topic字段为nginx-access则进行以下数据处理
json { #json格式数据处理
source => "message" #source等于message的
remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"] #删除指定的字段
}
mutate { #修改数据
remove_field => ["_index","_id","_type","_version","_score","referer","agent"] #删除没用的字段
}
}
}
#数据处理后存储es集群
output { #目标端
if[fields][log_topic] == "nginx-access" { #如果log_topic的字段值为nginx-access就存到下面的es集群里
elasticsearch {
action => "index" #类型为索引
hosts => ["es01:9200"] #es集群地址
user => "你设置的es的名称"
password => "你设置的es的密码"
index => "nginx-access-%{+YYYY.MM.dd}" #存储到es集群的哪个索引里
codec => "json" #处理json格式的解析
}
}
}
# 运行
cd /etc/logstash/conf.d
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf &
# 查看日志
tail -f nohup.out
8.测试
这是就要看界面了
8.1 打开kibana界面
8.2 创建索引模式,确认匹配成功,选择时间字段,最后创建完成
8.3 查看
请求一下nginx 等待个10s左右(看网络和配置了)
总结
没啥好总结的,实操,然后看看几个中间件架构图,看看面试小点,就可以吹牛逼了,哎,又是装13的一天,对了 520到了给我女朋友买了个包,她说:泰裤辣
转载自:https://juejin.cn/post/7234078632653193272