likes
comments
collection
share

关于连接池的那些事儿

作者站长头像
站长
· 阅读数 13

背景

某一日,葫芦正在写一个简单的CURD代码,十三不知何时悄悄的来到身后

十三: 葫芦,你知道db的 conn 如何管理的

葫芦一哆嗦

葫芦: 通过池化技术,通过maxIdleConns 和 maxOpenConns 参数来决定池子的大小

十三: 那你知道超时的空闲连接如何处理的吗

葫芦: 布吉岛,TAT

葫芦GG

database/sql 连接池

什么是连接池

连接池是一个存储数据库连接的缓冲区,这些数据库连接已经初始化并准备就绪,以便快速使用。这提高了应用程序的性能,因为它可以避免为每个新的数据库请求创建一个新的数据库连接。可以重复使用现有的连接来处理新的数据库请求,从而减少了连接数据库所需的时间和资源。同时,它也可以控制数据库连接的数量,以避免过多的连接造成数据库的负担。

database实现

DB结构

// can be controlled with SetMaxIdleConns.
type DB struct {
   // Total time waited for new connections.
   waitDuration atomic.Int64

   connector driver.Connector
   // numClosed is an atomic counter which represents a total number of
   // closed connections. Stmt.openStmt checks it before cleaning closed
   // connections in Stmt.css.
   numClosed atomic.Uint64

   mu           sync.Mutex    // protects following fields
   freeConn     []*driverConn // free connections ordered by returnedAt oldest to newest
   connRequests map[uint64]chan connRequest
   nextRequest  uint64 // Next key to use in connRequests.
   numOpen      int    // number of opened and pending open connections
   // Used to signal the need for new connections
   // a goroutine running connectionOpener() reads on this chan and
   // maybeOpenNewConnections sends on the chan (one send per needed connection)
   // It is closed during db.Close(). The close tells the connectionOpener
   // goroutine to exit.
   openerCh          chan struct{}
   closed            bool
   dep               map[finalCloser]depSet
   lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
   maxIdleCount      int                    // zero means defaultMaxIdleConns; negative means 0
   maxOpen           int                    // <= 0 means unlimited
   maxLifetime       time.Duration          // maximum amount of time a connection may be reused
   maxIdleTime       time.Duration          // maximum amount of time a connection may be idle before being closed
   cleanerCh         chan struct{}
   waitCount         int64 // Total number of connections waited for.
   maxIdleClosed     int64 // Total number of connections closed due to idle count.
   maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
   maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.

   stop func() // stop cancels the connection opener.
}
  • waitDuration: 等待连接的最长时间
  • connector: 数据库连接的创建工厂
  • numClosed: 已关闭的连接数
  • freeConn: 未使用的连接集合
  • connRequests: 用于存储连接请求,每个连接请求都有一个唯一的请求 ID,通过这个 ID 可以在 connRequests 中找到相应的连接请求通道
  • nextRequest: 表示下一个连接请求的 ID
  • numOpen: 表示当前已经打开的连接数量
  • openerCh: 一个 chan struct{} 类型的通道,用于阻塞池的 opener,直到 freeConn 不为空
  • closed: 表示连接池是否已经关闭
  • dep: 一个 map[finalCloser]depSet 类型的映射,用于处理连接的依赖关系,维护连接的关闭顺序
  • lastPut: 用于存储连接最后一次被放回连接池的时间,主要用于连接回收策略
  • maxIdleCount: 表示连接池中最多可以保留的空闲连接数
  • maxOpen: 表示连接池中最多可以打开的连接数量。如果 maxOpen 为 0,则表示没有限制
  • maxLifetime: 表示一个连接的最大生命周期
  • maxIdleTime: 表示一个连接可以在连接池中空闲的最长时间
  • cleanerCh: 一个 chan struct{} 类型的通道,用于定期清理连接池中的空闲连接
  • waitCount: 表示当前正在等待连接的数量
  • maxIdleClosed: 表示因为空闲连接数超过限制而被关闭的连接数量
  • maxIdleTimeClosed: 表示因为空闲时间超过限制而被关闭的连接数量
  • maxLifetimeClosed: 表示因为生命周期超过限制而被关闭的连接数量
  • stop: 一个类型为 func() 的函数变量,用于关闭连接池并释放资源

Open 打开db连接

func Open(driverName, dataSourceName string) (*DB, error) {
   driversMu.RLock()
   driveri, ok := drivers[driverName]
   driversMu.RUnlock()
   if !ok {
      return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
   }

   if driverCtx, ok := driveri.(driver.DriverContext); ok {
      connector, err := driverCtx.OpenConnector(dataSourceName)
      if err != nil {
         return nil, err
      }
      return OpenDB(connector), nil
   }

   return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

检查驱动是否存在,如果存在则调用OpenDB方法来连接数据库

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:     make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(map[uint64]chan connRequest),
      stop:         cancel,
   }

   go db.connectionOpener(ctx)

   return db
}

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
   for {
      select {
      case <-ctx.Done():
         return
      case <-db.openerCh:
         db.openNewConnection(ctx)
      }
   }
}
  • openerCh: 表示连接请求队列
  • lastPut: 表示最后一次分配的数据库连接的goroutine ID
  • connRequests: 表示用于处理连接请求的goroutine ID

如果db.openerCh通道被触发,就意味着有新的连接请求需要处理,就会调用db.openNewConnection(ctx)函数去建立新的数据库连接

Open new conn

// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
   // maybeOpenNewConnections has already executed db.numOpen++ before it sent
   // on db.openerCh. This function must execute db.numOpen-- if the
   // connection fails or is closed before returning.
   ci, err := db.connector.Connect(ctx)
   db.mu.Lock()
   defer db.mu.Unlock()
   if db.closed {
      if err == nil {
         ci.Close()
      }
      db.numOpen--
      return
   }
   if err != nil {
      db.numOpen--
      db.putConnDBLocked(nil, err)
      db.maybeOpenNewConnections()
      return
   }
   dc := &driverConn{
      db:         db,
      createdAt:  nowFunc(),
      returnedAt: nowFunc(),
      ci:         ci,
   }
   if db.putConnDBLocked(dc, err) {
      db.addDepLocked(dc, dc)
   } else {
      db.numOpen--
      ci.Close()
   }
}

函数将从连接器的 Connect 方法中获取数据库连接。Connect 方法的返回值是一个 driver.Conn 接口实例,表示一个数据库连接。

如果连接获取失败,则会将 db.numOpen 减一,回收连接并终止该函数的执行。否则,将创建一个 driverConn 类型的结构体实例 dc 代表数据库连接,其中包括连接信息、连接器、创建时间等属性。 之后,函数调用 db 的 putConnDBLocked 方法将连接添加到连接池中,同时调用 addDepLocked 方法将其添加到依赖池中。如果添加失败,则将 dc 关闭,并将 db.numOpen 减一。添加成功后,函数维护 db.numOpen 计数器,表示当前拥有的活动连接数。值得注意的是,如果数据库连接池已关闭,则将回收已创建的连接,并将 db.numOpen 计数器减一。

也就是说,该函数实现了连接池的管理和资源回收,保证了不会因为连接泄露资源而导致系统性能下降

Close

func (db *DB) Close() error {
   db.mu.Lock()
   if db.closed { // Make DB.Close idempotent
      db.mu.Unlock()
      return nil
   }
   if db.cleanerCh != nil {
      close(db.cleanerCh)
   }
   var err error
   fns := make([]func() error, 0, len(db.freeConn))
   for _, dc := range db.freeConn {
      fns = append(fns, dc.closeDBLocked())
   }
   db.freeConn = nil
   db.closed = true
   for _, req := range db.connRequests {
      close(req)
   }
   db.mu.Unlock()
   for _, fn := range fns {
      err1 := fn()
      if err1 != nil {
         err = err1
      }
   }
   db.stop()
   if c, ok := db.connector.(io.Closer); ok {
      err1 := c.Close()
      if err1 != nil {
         err = err1
      }
   }
   return err
}

Close用于关闭连接,并将连接池中的连接全部归还到驱动连接池中,最后关闭连接

连接保活

// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
   if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
      db.cleanerCh = make(chan struct{}, 1)
      go db.connectionCleaner(db.shortestIdleTimeLocked())
   }
}

func (db *DB) connectionCleaner(d time.Duration) {
   const minInterval = time.Second

   if d < minInterval {
      d = minInterval
   }
   t := time.NewTimer(d)

   for {
      select {
      case <-t.C:
      case <-db.cleanerCh: // maxLifetime was changed or db was closed.
      }

      db.mu.Lock()

      d = db.shortestIdleTimeLocked()
      if db.closed || db.numOpen == 0 || d <= 0 {
         db.cleanerCh = nil
         db.mu.Unlock()
         return
      }

      d, closing := db.connectionCleanerRunLocked(d)
      db.mu.Unlock()
      for _, c := range closing {
         c.Close()
      }

      if d < minInterval {
         d = minInterval
      }

      if !t.Stop() {
         select {
         case <-t.C:
         default:
         }
      }
      t.Reset(d)
   }
}

connectionCleaner 会默认已1s的间隔,去扫描空闲的连接队列,关闭超时的空闲连接,并从连接池中移除

参考资料

tutorial: github.com/VividCortex…

database/sql: github.com/golang/go/t…