likes
comments
collection
share

将go-zero日志推送至ELK中

作者站长头像
站长
· 阅读数 14

本文示例代码地址:gitee.com/dn-jinmin/z…

1. 实现分析

想要让go-zero与elk结合,在实现的方式上有两种

  1. 将go-zero的日志输出到本地的日志文件中,通过filebeat采集推送至elk中进行分析

  2. 自定义日志输出存储位置,由elk基于存储位置收集日志信息

实现难度上第一种最为简单,第二种相对有难度。如果第二种方案是基于redis那么相对而言性能会更好,但是你得先理解go-zero的日志机制,因为这需要基于go-zero的日志机制才能实现。

2. 方案1基于本地日志方式

日志用例代码zlog_test.go

func TestFile_Log(t *testing.T) {
   logx.SetUp(logx.LogConf{
      ServiceName: "file.log",
      Mode:        "file",
      Encoding:    "json",
      Path:        "./zlog",
   })

   // -------------------------------- info --------------------------

   logx.Info("测试", "info")
   logx.Infof("测试  %v", "info")

   // -------------------------------- error --------------------------

   logx.Error("测试", "error")
   logx.Errorf("测试  %v", "error")

   // -------------------------------- debug --------------------------

   logx.Debug("测试", "debug")
   logx.Debugf("测试  %v", "debug")
}

执行后会在代码的同级生成对应的日志

~zlog
  - zlog_test.go
  - zlog
    - access.log
    - error.log
    - severe.log
    - slow.log
    - stat.log

无论那种日志文件,日志中的内容大体如下格式

{"@timestamp":"2024-03-15T15:11:31.230+08:00","caller":"zlog/zlog_test.go:26","content":"测试info","level":"info"}

因此我们接下来是需要编写好filebeat与logstash的配置信息

filebeat的配置信息

zlog.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:

    # 如果值为ture,那么fields存储在输出文档的顶级位置
    fields_under_root: true
    fields:
      type: zlog.file

output.logstash:
  hosts: ["192.168.117.80:5044"]

logstash的配置信息

logstash.conf

input{
    beats {
        port => 5044
    }
}

filter {
    grok {
         match => {
            "message" => "^{"@timestamp":"%{DATA:timestamp}","caller":"%{DATA:caller}","content":"%{DATA:content}","level":"%{DATA:level}"}"
        }
    }
}
output{
    # 标准输出到控制台
    stdout{
        codec => rubydebug
    }

    # 按需
    # elasticsearch {
    #    hosts => ["http://192.168.117.80:9200"]
    #    index => "zlog.file"
    #    user => "admin"
    #    password => "000000"
    # }
}

效果

如下是logstash的输出

{
     "timestamp" => "2024-03-15T16:09:02.210+08:00",
         "input" => {
        "type" => "log"
    },
        "caller" => "zlog/zlog_test.go:26",
          "type" => "zlog.file",
       "message" => "{"@timestamp":"2024-03-15T16:09:02.210+08:00","caller":"zlog/zlog_test.go:26","content":"测试info","level":"info"}",
           "log" => {
          "file" => {
            "path" => "/root/easy-im/pkg/zlog/zlog/access.log"
        },
        "offset" => 234
    },
           "ecs" => {
        "version" => "1.12.0"
    },
       "content" => "测试info",
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "@timestamp" => 2024-03-07T11:32:08.019Z,
         "level" => "info",
         "agent" => {
            "hostname" => "localhost.localdomain",
                  "id" => "a7b9c31e-9955-4044-9900-70b8a2b0fb4a",
                "type" => "filebeat",
             "version" => "7.17.4",
        "ephemeral_id" => "4c935724-a2bd-4c65-96a2-9a969b6cf9fb",
                "name" => "localhost.localdomain"
    },
          "host" => {
        "name" => "localhost.localdomain"
    }
}

3. 自定义日志输出方式

该方式我们采取 go-zero 将日志推送至redis中 ,然后logstash从redis中采集日志。该方式的难点在于你需要理解go-zero中日志的处理。该节点跳过理解分析过程,放在后面第4点。

3.1 核心代码

目录结构; redis_io_writer 与 redis_logx_writer 是基于go-zero的两种实现,推荐用redis_io_writer 方式

- zlog
  - redis_io_writer.go
  - redis_logx_writer.go
  - tool.go

redis_io_writer.go

package zlog

import (
   "io"

   "github.com/zeromicro/go-zero/core/stores/redis"
)

type redisIoWriter struct {
   redisKey string
   redis    *redis.Redis
}

func NewRedisIoWriter(redisKey string, redisCfg redis.RedisConf) io.Writer {
   return &redisIoWriter{
      redisKey: redisKey,
      redis:    redis.MustNewRedis(redisCfg),
   }
}

func (r *redisIoWriter) Write(p []byte) (n int, err error) {
   go r.redis.Rpush(r.redisKey, string(p))

   return 0, err
}

redis_logx_writer.go

package zlog

import (
   "encoding/json"
   "fmt"
   "github.com/zeromicro/go-zero/core/logx"
   "github.com/zeromicro/go-zero/core/stores/redis"
   "time"
)

const (
   levelAlert  = "alert"
   levelInfo   = "info"
   levelError  = "error"
   levelSevere = "severe"
   levelFatal  = "fatal"
   levelSlow   = "slow"
   levelStat   = "stat"
   levelDebug  = "debug"
)

const (
   callerKey    = "caller"
   contentKey   = "content"
   durationKey  = "duration"
   levelKey     = "level"
   spanKey      = "span"
   timestampKey = "@timestamp"
   traceKey     = "trace"
   truncatedKey = "truncated"
)

type redisLogxWriter struct {
   redisKey string
   redis    *redis.Redis
}

func NewRedisLogxWriter(redisKey string, redisCfg redis.RedisConf) logx.Writer {
   return &redisLogxWriter{
      redisKey: redisKey,
      redis:    redis.MustNewRedis(redisCfg),
   }
}

func (n *redisLogxWriter) Alert(v any) {
   n.outPut(v, levelAlert)
}

func (n *redisLogxWriter) Close() error {
   return nil
}

func (n *redisLogxWriter) Debug(v any, fields ...logx.LogField) {
   n.outPut(v, levelDebug, fields...)
}

func (n *redisLogxWriter) Error(v any, fields ...logx.LogField) {
   n.outPut(v, levelError, fields...)
}

func (n *redisLogxWriter) Info(v any, fields ...logx.LogField) {
   n.outPut(v, levelInfo, fields...)
}

func (n *redisLogxWriter) Severe(v any) {
   n.outPut(v, levelSevere)
}

func (n *redisLogxWriter) Slow(v any, fields ...logx.LogField) {
   n.outPut(v, levelSlow, fields...)
}

func (n *redisLogxWriter) Stack(v any) {
   n.outPut(v, levelError)
}

func (n *redisLogxWriter) Stat(v any, fields ...logx.LogField) {
   n.outPut(v, levelStat, fields...)
}

func (n *redisLogxWriter) outPut(v any, level string, fields ...logx.LogField) {
   // 根据日志处理数据信息

   // 日志内容
   content := make(map[string]any)

   content[contentKey] = v
   for _, field := range fields {
      t := logx.Field(field.Key, field.Value)
      content[t.Key] = t.Value
   }

   // 增加时间
   content[timestampKey] = time.Now().UnixNano()
   // 等级
   content[levelKey] = level

   f, ok := getCallerFrame(6)
   if ok {
      content[callerKey] = fmt.Sprintf("%v:%v", f.File, f.Line)
   }

   // 格式化

   b, err := json.Marshal(content)
   if err != nil {
      return
   }

   data := string(b)

   // 输出
   go n.redis.Rpush(n.redisKey, data)
}

tool.go

// 获取代码的执行行数
func getCallerFrame(skip int) (frame runtime.Frame, ok bool) {
   pc := make([]uintptr, 1)
   numFrames := runtime.Callers(skip, pc)
   if numFrames < 1 {
      return
   }

   frame, _ = runtime.CallersFrames(pc).Next()
   return frame, frame.PC != 0
}

3.2 logstash的配置

测试用例

func getRedisIoWriter() io.Writer {
   return NewRedisIoWriter("redis.io.writer", redis.RedisConf{
      Host:        "192.168.117.80:16379",
      Type:        "node",
      Pass:        "",
      Tls:         false,
      NonBlock:    false,
      PingTimeout: 0,
   })
}

func Test_redisIoWriter_Write(t *testing.T) {
   io := getRedisIoWriter()
   logx.SetWriter(logx.NewWriter(io))

   // -------------------------------- info --------------------------

   logx.Info("测试", "info")
   logx.Infof("测试  %v", "info")
  
   for {
   }
}

func getRedisLogxWrite() logx.Writer {
   return NewRedisLogxWriter("redis.logx.writer", redis.RedisConf{
      Host:        "192.168.117.80:16379",
      Type:        "node",
      Pass:        "",
   })
}

func TestNewRedisLogxWriter(t *testing.T) {
   writer := getRedisLogxWrite()
   logx.SetWriter(writer)

   // -------------------------------- info --------------------------

   logx.Info("测试", "info")
   logx.Infof("测试  %v", "info")
  
   for {
   }
}

如上用例执行之后那么在redis中是可以看到相应的日志信息的

将go-zero日志推送至ELK中

然后修改logstash的配置信息即可

input{
    beats {
        port => 5044
    }

    redis {
         data_type => "list"
         key => "redis.io.writer"
         host => "192.168.117.80"
         password => ""
         port => 16379
         db => 0
         threads => 2
         type => "redis.io.writer"
    }
    redis {
         data_type => "list"
         key => "redis.logx.writer"
         host => "192.168.117.80"
         password => ""
         port => 16379
         db => 0
         threads => 2
         type => "redis.logx.writer"
    }
}

filter {
    grok {
         match => {
            "message" => "^\{\"@timestamp\":\"%{DATA:timestamp}\",\"caller\":\"%{DATA:caller}\",\"content\":\"%{DATA:content}\",\"level\":\"%{DATA:level}\"}"

        }
    }
}
output{
    # 标准输出到控制台
    stdout{
        codec => rubydebug
    }

    # 按需
    # elasticsearch {
    #    hosts => ["http://192.168.117.80:9200"]
    #    index => "zlog.file"
    #    user => "admin"
    #    password => "000000"
    # }
}

4. 如何基于go-zero的日志扩展?

有需要的话可以点个赞后续补上,不算很难,重点去阅读go-zero中core/logx/writer.go的源码即可理解

5. 关于logstash中grok的内容

关于logstash中grok的内容,在填写前先去这个网址根据日志类型来编辑定义grokconstructor.appspot.com/do/match#re…

将go-zero日志推送至ELK中

将go-zero日志推送至ELK中