关于连接池的那些事儿
背景
某一日,葫芦正在写一个简单的CURD代码,十三不知何时悄悄的来到身后
十三: 葫芦,你知道db的 conn 如何管理的
葫芦一哆嗦
葫芦: 通过池化技术,通过maxIdleConns 和 maxOpenConns 参数来决定池子的大小
十三: 那你知道超时的空闲连接如何处理的吗
葫芦: 布吉岛,TAT
葫芦GG
database/sql 连接池
什么是连接池
连接池是一个存储数据库连接的缓冲区,这些数据库连接已经初始化并准备就绪,以便快速使用。这提高了应用程序的性能,因为它可以避免为每个新的数据库请求创建一个新的数据库连接。可以重复使用现有的连接来处理新的数据库请求,从而减少了连接数据库所需的时间和资源。同时,它也可以控制数据库连接的数量,以避免过多的连接造成数据库的负担。
database实现
DB
结构
// can be controlled with SetMaxIdleConns.
type DB struct {
// Total time waited for new connections.
waitDuration atomic.Int64
connector driver.Connector
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed atomic.Uint64
mu sync.Mutex // protects following fields
freeConn []*driverConn // free connections ordered by returnedAt oldest to newest
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdleCount int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
maxIdleClosed int64 // Total number of connections closed due to idle count.
maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.
stop func() // stop cancels the connection opener.
}
- waitDuration: 等待连接的最长时间
- connector: 数据库连接的创建工厂
- numClosed: 已关闭的连接数
- freeConn: 未使用的连接集合
- connRequests: 用于存储连接请求,每个连接请求都有一个唯一的请求 ID,通过这个 ID 可以在 connRequests 中找到相应的连接请求通道
- nextRequest: 表示下一个连接请求的 ID
- numOpen: 表示当前已经打开的连接数量
- openerCh: 一个 chan struct{} 类型的通道,用于阻塞池的 opener,直到 freeConn 不为空
- closed: 表示连接池是否已经关闭
- dep: 一个
map[finalCloser]depSet
类型的映射,用于处理连接的依赖关系,维护连接的关闭顺序 - lastPut: 用于存储连接最后一次被放回连接池的时间,主要用于连接回收策略
- maxIdleCount: 表示连接池中最多可以保留的空闲连接数
- maxOpen: 表示连接池中最多可以打开的连接数量。如果 maxOpen 为 0,则表示没有限制
- maxLifetime: 表示一个连接的最大生命周期
- maxIdleTime: 表示一个连接可以在连接池中空闲的最长时间
- cleanerCh: 一个 chan struct{} 类型的通道,用于定期清理连接池中的空闲连接
- waitCount: 表示当前正在等待连接的数量
- maxIdleClosed: 表示因为空闲连接数超过限制而被关闭的连接数量
- maxIdleTimeClosed: 表示因为空闲时间超过限制而被关闭的连接数量
- maxLifetimeClosed: 表示因为生命周期超过限制而被关闭的连接数量
- stop: 一个类型为 func() 的函数变量,用于关闭连接池并释放资源
Open
打开db连接
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
检查驱动是否存在,如果存在则调用OpenDB方法来连接数据库
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
return db
}
// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
- openerCh: 表示连接请求队列
- lastPut: 表示最后一次分配的数据库连接的goroutine ID
- connRequests: 表示用于处理连接请求的goroutine ID
如果db.openerCh通道被触发,就意味着有新的连接请求需要处理,就会调用db.openNewConnection(ctx)函数去建立新的数据库连接
Open new conn
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnections has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
returnedAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
函数将从连接器的 Connect 方法中获取数据库连接。Connect 方法的返回值是一个 driver.Conn 接口实例,表示一个数据库连接。
如果连接获取失败,则会将 db.numOpen 减一,回收连接并终止该函数的执行。否则,将创建一个 driverConn 类型的结构体实例 dc 代表数据库连接,其中包括连接信息、连接器、创建时间等属性。 之后,函数调用 db 的 putConnDBLocked 方法将连接添加到连接池中,同时调用 addDepLocked 方法将其添加到依赖池中。如果添加失败,则将 dc 关闭,并将 db.numOpen 减一。添加成功后,函数维护 db.numOpen 计数器,表示当前拥有的活动连接数。值得注意的是,如果数据库连接池已关闭,则将回收已创建的连接,并将 db.numOpen 计数器减一。
也就是说,该函数实现了连接池的管理和资源回收,保证了不会因为连接泄露资源而导致系统性能下降
Close
func (db *DB) Close() error {
db.mu.Lock()
if db.closed { // Make DB.Close idempotent
db.mu.Unlock()
return nil
}
if db.cleanerCh != nil {
close(db.cleanerCh)
}
var err error
fns := make([]func() error, 0, len(db.freeConn))
for _, dc := range db.freeConn {
fns = append(fns, dc.closeDBLocked())
}
db.freeConn = nil
db.closed = true
for _, req := range db.connRequests {
close(req)
}
db.mu.Unlock()
for _, fn := range fns {
err1 := fn()
if err1 != nil {
err = err1
}
}
db.stop()
if c, ok := db.connector.(io.Closer); ok {
err1 := c.Close()
if err1 != nil {
err = err1
}
}
return err
}
Close用于关闭连接,并将连接池中的连接全部归还到驱动连接池中,最后关闭连接
连接保活
// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
go db.connectionCleaner(db.shortestIdleTimeLocked())
}
}
func (db *DB) connectionCleaner(d time.Duration) {
const minInterval = time.Second
if d < minInterval {
d = minInterval
}
t := time.NewTimer(d)
for {
select {
case <-t.C:
case <-db.cleanerCh: // maxLifetime was changed or db was closed.
}
db.mu.Lock()
d = db.shortestIdleTimeLocked()
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
d, closing := db.connectionCleanerRunLocked(d)
db.mu.Unlock()
for _, c := range closing {
c.Close()
}
if d < minInterval {
d = minInterval
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
}
}
connectionCleaner 会默认已1s的间隔,去扫描空闲的连接队列,关闭超时的空闲连接,并从连接池中移除
参考资料
tutorial: github.com/VividCortex…
database/sql: github.com/golang/go/t…
转载自:https://juejin.cn/post/7219141666237530168