轻松提升Golang日志性能:无需修改代码的异步写入解决方案
我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。
事件背景
大概今年年头的时候我写了一篇《如何让 uber-go/zap 日志框架支持异步日志输出(Gin + Zap Async Logger)》的文章,反响还行,有小伙伴给我私信探讨这个实现方式好坏。因为当时我做这个适应也主要是为了解决公司的一个问题,所以没有想做一个项目,只是内部实现了一下,也没有考虑将其开源出来。
通过接近 1 年的公司内部多数生成项目实际运行验证,证明异步这个逻辑是可以满足我们的需求且支撑我们的业务的,所以我决定将其开源出来,供大家参考。同时也为其他的日志库做一些适配:zap
, logrus
, klog
, zerolog
。
使用同一种异步方式来适配不同的日志库,而不需要大量修改代码,对任何一种已有的日志库来说,也不需要实现复杂和独立的异步逻辑,只需要将我这个项目的代码引入即可。 就可以实现大幅度的性能提升。这个是做这个项目的初衷。
做这个项目最终目标如下:
- 标准化异步输出: 适配多种 Golang 日志库
- 无需修改代码: 小改动代价, 低学习成本
- 用户友好接口: 简单易用,高效性能
- 未来兼容性: 支持新日志库快速接入
项目简介
LAW
的全称是 Log Asynchronous Writer
,它的目标是为了让 Golang 日志库支持异步输出,而不需要修改日志库的代码,只需要引入 LAW
的代码即可。
LAW
可以适配多种 Golang 日志库,目前已经适配了 zap
, logrus
, klog
, zerolog
。 并提供完整的 demo 代码,可以直接参考使用。 未来也会适配其他的日志库。
LAW
只有两个可以用的接口,非常的简单和易用:
Write
: 写入日志信息,入参[]bytes
类型,跟io.Writer
的接口一样,也就是说可以直接用在任何一个io.Writer
的地方。Stop
: 停止异步日志输出,释放资源。
这个项目主打就是:“轻量、易用、快速”
架构设计
LAW
架构设置上需要考虑:
- 要实现能够异步,需要将投送日志行为和输出日志动作分离,是非常关键的。
- 同时每次往
io.Writer
写入数据,都会有一个syscall
的开销,所以LAW
也要尽量减少syscall
的次数。
基于上面的整体逻辑,LAW
采用了双缓冲的结构:
channel
: 解耦投送日志行为和输出日志动作。bufio
: 减少syscall
的次数。
结构图如下:
性能比对
尽然要做性能对比,那么当然就要看在实际情况下的性能表现了,所以我写了一个简单的 demo 程序,来对比同步和异步的性能差异。
这次参赛选手如下:
- HttpServer:
net/http
- Logger:
zap
- Writer:
os.Stdout
和law.WriteAsyncer
Go 版本:go1.20.11 darwin/amd64
测试结构图如下:
同步 Writer
demo 代码
package main
import (
"net/http"
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func main() {
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
NameKey: "logger",
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}
zapSyncWriter := zapcore.AddSync(os.Stdout)
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
zapLogger.Info("hello")
})
_ = http.ListenAndServe(":8080", nil)
}
测试结果
异步 Writer
demo 代码
package main
import (
"net/http"
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
x "github.com/shengyanli1982/law"
)
func main() {
aw := x.NewWriteAsyncer(os.Stdout, nil)
defer aw.Stop()
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
NameKey: "logger",
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}
zapSyncWriter := zapcore.AddSync(aw)
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
zapLogger.Info("hello")
})
_ = http.ListenAndServe(":8080", nil)
}
测试结果
对比总结
从上面的测试结果可以看出,同步的方式在 5 次测试中 76K - 80K 的 Req/Sec,而异步的方式在 5 次测试中 83K - 86K 的 Req/Sec,相差 7K - 9K 的 Req/Sec,性能提升了 10% 左右,且异步模式 Req/Sec 的标准差也明显小于同步方式。 同时异步的方式不会阻塞调用,可以提升更多的性能,所以可以处理更多的请求。
使用举例
可以通过一个简单的例子就能知道如何使用 LAW
,而且只需要修改很少的代码,就可以实现异步日志输出。它是多么的简单和易用。
这里使用 zap
作为例子,其他的日志库也是类似的。
package main
import (
"os"
"strconv"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
law "github.com/shengyanli1982/law"
)
func main() {
aw := law.NewWriteAsyncer(os.Stdout, nil) // os.Stdout 表示写入信息的目标, nil 表示使用默认的配置
defer aw.Stop() // 释放资源
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
NameKey: "logger",
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}
zapAsyncWriter := zapcore.AddSync(aw) // 将 law 的异步 writer 转换成 zap 的同步 writer
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapAsyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)
for i := 0; i < 10; i++ {
zapLogger.Info(strconv.Itoa(i))
}
time.Sleep(3 * time.Second)
}
执行结果
$ go run demo.go
{"level":"info","msg":"0"}
{"level":"info","msg":"1"}
{"level":"info","msg":"2"}
{"level":"info","msg":"3"}
{"level":"info","msg":"4"}
{"level":"info","msg":"5"}
{"level":"info","msg":"6"}
{"level":"info","msg":"7"}
{"level":"info","msg":"8"}
{"level":"info","msg":"9"}
代码解析
既然说到这部分,那么就来看看 LAW
的 Double Buffer
代码是如何实现的。
这里并不花里胡哨,就是直接使用了 channel
来实现异步的投送日志行为和输出日志动作的解耦。 同时用 channel
的方式,缓冲投送的日志内容。 用 bufio
包装 io.Writer
来减少 syscall
的次数。
启动了一个 goroutine
来作为 Worker 负责将channel
中的数据写入到 bufio
包装的 io.Writer
中。同时也启动了一个goroutine
监控bufferIoWriterRefresh
,来监控缓冲区的空闲时间,如果超过默认的空闲时间(5 秒),则刷新缓冲区。
// 从队列中取出日志,写入到底层的 writer 中 (Poll log entries from the queue and write to the underlying writer)
func (wa *WriteAsyncer) poller() {
defer wa.wg.Done()
// 从队列中取出日志,写入到底层的 writer 中 (Poll log entries from the queue and write to the underlying writer)
for eb := range wa.queue {
bytes := eb.Buffer().Bytes() // 从缓冲区中获取日志 (Get log entries from the buffer)
now := time.Now().UnixMilli() // 获取当前时间戳 (Get current timestamp)
wa.config.cb.OnPopQueue(bytes, now-eb.UpdateAt()) // 回调函数 (Callback function)
wa.bufferIoLock.Lock()
_, err := wa.buffWriter(bytes) // 将日志写入到底层的 writer 中 (Write log entries to the underlying writer)
wa.bufferIoLock.Unlock()
if err != nil {
log.Printf("data write error, error: %s, message: %s", err.Error(), util.BytesToString(bytes))
}
wa.idleAt.Store(now) // 设置空闲时间 (Set idle time)
wa.bufferPool.Put(eb) // 将缓冲区放回缓冲区池中 (Put buffer back into the buffer pool)
}
}
// 刷新缓冲区 (Flush the buffer)
func (wa *WriteAsyncer) bufferIoWriterRefresh() {
heartbeat := time.NewTicker(time.Second) // 心跳 (Heartbeat)
defer func() {
heartbeat.Stop()
wa.wg.Done()
}()
for {
select {
case <-wa.stopCtx.Done():
return
case <-heartbeat.C:
wa.bufferIoLock.Lock()
// 如果缓冲区有数据,并且空闲时间超过默认空闲时间,则刷新缓冲区 (If the buffer has data and the idle time exceeds the default idle time, flush the buffer)
if wa.bufferIoWriter.Buffered() > 0 && time.Now().UnixMilli()-wa.idleAt.Load() > defaultIdleTimeout.Milliseconds() {
if err := wa.bufferIoWriter.Flush(); err != nil {
log.Printf("buffer io writer flush error, error: %s", err.Error())
}
}
wa.bufferIoLock.Unlock()
}
}
}
// 缓冲写入器 (Buffered writer)
func (wa *WriteAsyncer) buffWriter(p []byte) (int, error) {
if len(p) > wa.bufferIoWriter.Available() && wa.bufferIoWriter.Buffered() > 0 { // 如果日志长度大于缓冲区可用长度,并且缓冲区有数据 (If the log length is greater than the available length of the buffer and the buffer has data)
if err := wa.bufferIoWriter.Flush(); err != nil { // 刷新缓冲区 (Flush the buffer)
wa.config.cb.OnWrite(p) // 回调函数 (Callback function)
return wa.writer.Write(p) // 将日志写入到底层的 writer 中 (Write log entries to the underlying writer)
}
}
wa.config.cb.OnWrite(p) // 回调函数 (Callback function)
return wa.bufferIoWriter.Write(p) // 将日志写入到缓冲区 (Write log entries to the buffer)
}
// 将日志写入到队列中 (Push log entries into the queue)
func (wa *WriteAsyncer) Write(p []byte) (int, error) {
if wa.closed.Load() {
return 0, ErrorWriterIsClosed
}
eb := wa.bufferPool.Get() // 从缓冲区池中获取缓冲区 (Get buffer from the buffer pool)
eb.Buffer().Write(p) // 将日志写入到缓冲区 (Write log entries into the buffer)
eb.SetUpdateAt(time.Now().UnixMilli()) // 设置更新时间 (Set update time)
select {
case wa.queue <- eb: // 将缓冲区放入队列 (Put buffer into the queue)
wa.config.cb.OnPushQueue(p) // 回调函数 (Callback function)
default:
wa.bufferPool.Put(eb) // 将缓冲区放回缓冲区池中 (Put buffer back into the buffer pool)
return 0, ErrorQueueIsFull
}
return len(p), nil
}
总结
通过设计和实现 LAW
这个项目,将以前大家期望和我希望的东西终于实现了标准化,实现一个逻辑代码多处复用,同时也可以让其他的日志库快速接入,而不需要大量的修改代码,这样就可以实现大幅度的性能提升。从这方面来说,我觉得这个项目还是比较成功的。同时我也希望大家可以多多支持,多多使用,多多提建议,让这个项目更加的完善。
转载自:https://juejin.cn/post/7312759727993782311