database/sql是go里面操作数据库的,这里我们主要讲解databas/sql和mysql使用
DB是数据库结构,是并发安全的
type DB struct {
// 等待新连接的总时间(放在头部保证内存对齐)
waitDuration int64 // Total time waited for new connections.
connector driver.Connector
// 连接关闭的次数
numClosed uint64
// 锁
mu sync.Mutex
// 空闲的连接
freeConn []*driverConn
// 处理连接请求,用自增id
connRequests map[uint64]chan connRequest
// 下一个连接请求使用的id
nextRequest uint64 // Next key to use in connRequests.
// 打开和pending的连接数量
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
// 打开ch,
//connectionOpener 读取数据, maybeOpenNewConnections 发送数据,缓存1000000
openerCh chan struct{}
resetterCh chan *driverConn
// 是否close
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
// 最大空闲连接数,0和负数都是defaultMaxIdleConns, 默认是2
maxIdle int // zero means defaultMaxIdleConns; negative means 0
// 最大连接打开数量, 0是无限制
maxOpen int // <= 0 means unlimited
// 连接重用的最大时间
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
// 等待连接的总次数
waitCount int64 // Total number of connections waited for.
// 由于空闲而关闭的连接数
maxIdleClosed int64 // Total number of connections closed due to idle.
// 达到最大空闲数量而关闭的连接数
maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
// stop会取消
stop func() // stop cancels the connection opener and the session resetter.
}driverConn 是driver.Conn的包装,包括对数据库的操作
type driverConn struct {
db *DB
// 创建的时间
createdAt time.Time
// 保证下面的并发处理
sync.Mutex // guards following
ci driver.Conn
// 是否close
closed bool
// 最终关闭
finalClosed bool // ci.Close has been called
openStmt map[*driverStmt]bool
// 捕获最后一次执行结果的error
lastErr error // lastError captures the result of the session resetter.
// 通过db.mu 提供并发支持
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}Conn是一个单独的数据库连接
type Conn struct {
db *DB
// closemu prevents the connection from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned until close, at which point
// it's returned to the connection pool.
dc *driverConn
// done transitions from 0 to 1 exactly once, on close.
// Once done, all operations fail with ErrConnDone.
// Use atomic operations on value when checking value.
done int32
}DBStats数据库的状态
type DBStats struct {
// 最大打开连接数
MaxOpenConnections int // Maximum number of open connections to the database.
// 连接池状态
// 连接使用中和空闲的数量
OpenConnections int // The number of established connections both in use and idle.
// 连接使用中的数量
InUse int // The number of connections currently in use.
// 连接的数量
Idle int // The number of idle connections.
// 统计过
// 等待连接的总数
WaitCount int64 // The total number of connections waited for.
// 等待连接阻塞的总时间
WaitDuration time.Duration // The total time blocked waiting for a new connection.
// 连接空闲被关闭的总数
MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns.
// 由于达到MaxLifetime而关闭的总数
MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime.
}下面进入源码解读阶段
Open会打开一个数据库, 返回一个DB指针,是并发安全的。
func Open(driverName, dataSourceName string) (*DB, error) {
// 根据driverName获取driver
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
// DriverContext是一个实现了OpenConnector方法的接口,返回是一个Connector
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}打开一个数据库链接
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
// 启动一个单独的goroutine去处理新建连接请求
go db.connectionOpener(ctx)
go db.connectionResetter(ctx)
return db
}func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}打开一个连接
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
// 获取一个新的连接
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
// 数据库已经关闭了,需要关闭刚刚创建的连接
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
// 处理这个连接
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}如果err!=nil, dc会被忽略,并且返回false 如果err==nil, dc必须不为nil
连接处理有两种情况:
- 如果同时有连接请求,则直接将dc会发给这个连接请求方
- 否则如果当前空闲连接小于最大空闲连接,则将连接加入空闲连接,否则将maxIdleClosed++
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
// 超过最大连接
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 如果同时有连接请求
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
// 从待连接的map中删除这个key
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
// 将dc标记为使用中
dc.inUse = true
}
// 直接发送过去
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
// 计入到空闲连接池
db.freeConn = append(db.freeConn, dc)
// 开始清理处理
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}空闲连接放入free list,判断是否有连接需要清理
func (db *DB) startCleanerLocked() {
if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
go db.connectionCleaner(db.maxLifetime)
}
}按照maxLifetime清理过期连接
func (db *DB) connectionCleaner(d time.Duration) {
const minInterval = time.Second
if d < minInterval {
d = minInterval
}
t := time.NewTimer(d)
for {
select {
case <-t.C:
case <-db.cleanerCh: // maxLifetime was changed or db was closed.
}
db.mu.Lock()
d = db.maxLifetime
// 检查环境
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
// 过期时间
expiredSince := nowFunc().Add(-d)
var closing []*driverConn
for i := 0; i < len(db.freeConn); i++ {
c := db.freeConn[i]
if c.createdAt.Before(expiredSince) {
// 早于过期时间,加入到待清理队列
closing = append(closing, c)
last := len(db.freeConn) - 1
// 将最后一个空闲连接放入到索引为i的位置,并且删除
db.freeConn[i] = db.freeConn[last]
db.freeConn[last] = nil
db.freeConn = db.freeConn[:last]
i--
}
}
// 计数++
db.maxLifetimeClosed += int64(len(closing))
db.mu.Unlock()
// 开始关闭
for _, c := range closing {
c.Close()
}
if d < minInterval {
d = minInterval
}
// 重置定时器
t.Reset(d)
}
}打开一个连接,阻塞获取
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
var dc *driverConn
var err error
// 最大失败次数,默认是2
for i := 0; i < maxBadConnRetries; i++ {
// cachedOrNewConn是获取策略,有两种cachedOrNewConn和alwaysNewConn
dc, err = db.conn(ctx, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
dc, err = db.conn(ctx, alwaysNewConn)
}
if err != nil {
return nil, err
}
conn := &Conn{
db: db,
dc: dc,
}
return conn, nil
}conn获取一个新连接或者从idle中获取
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
// 数据库已关闭
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 检查ctx是否过去
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 优先从空闲队列中读取
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
// 获取空闲连接,并且从freeConn中删除
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
// 如果连接过期了,则返回ErrBadConn
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
// 加锁读取lastErr
conn.Lock()
err := conn.lastErr
conn.Unlock()
// ErrBadConn 返回
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
// 获取成功,返回
return conn, nil
}
// 超过打开最大连接数
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// 创建连接channel,有缓存
req := make(chan connRequest, 1)
// 获取下一个key,自增
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
// 等待开始时间
waitStart := time.Now()
// 到达context的时间会超时
select {
case <-ctx.Done():
// Remove the connection request and ensure no value has been sent
// on it after removing.
// 超时或者被取消,删除这个连接请求
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
// 更新
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
// 在获取一下
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 获取成功,更新时间
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
// 连接关闭, db.Close
if !ok {
return nil, errDBClosed
}
// 达到maxLifeTime,返回ErrBadConn
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
// 连接错误
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
// 获取成功
return ret.conn, ret.err
}
}
// 依然没有获取到,到这里都++
db.numOpen++ // optimistically
db.mu.Unlock()
// 连接
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}如果没有达到maxOpen,打开一个新的连接
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
// 写入连接请求
db.openerCh <- struct{}{}
}
}connectionOpener运行一个单独的goroutine
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}总结: 数据库连接源码比较简单,接下来我们看下go使用数据库连接另一个要注意的结构
Rows 是一个查询的结构,游标从结果的第一行开始,用Next方法可以从一行到下一行
type Rows struct {
// 当前的dc,关闭后会调用releaseConn释放
dc *driverConn // owned; must call releaseConn when closed to release
// 释放连接函数
releaseConn func(error)
rowsi driver.Rows
// 当Rows 关闭后调用
cancel func() // called when Rows is closed, may be nil.
closeStmt *driverStmt // if non-nil, statement to Close on close
// closemu 锁
closemu sync.RWMutex
// 是否关闭
closed bool
// 只有在closed的情况是非nil
lasterr error // non-nil only if closed is true
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not to be called concurrently.
lastcols []driver.Value
}Rows查询完需要Close, 为什么需要Close? 接下来我们看下
Rows的Close会调用close去关闭
func (rs *Rows) close(err error) error {
rs.closemu.Lock()
defer rs.closemu.Unlock()
if rs.closed {
return nil
}
// 标记为关闭
rs.closed = true
if rs.lasterr == nil {
rs.lasterr = err
}
// 关闭
withLock(rs.dc, func() {
err = rs.rowsi.Close()
})
if fn := rowsCloseHook(); fn != nil {
fn(rs, &err)
}
if rs.cancel != nil {
rs.cancel()
}
if rs.closeStmt != nil {
rs.closeStmt.Close()
}
// 释放连接
rs.releaseConn(err)
return err
}releaseConn 就是driverConn.releaseConn函数
func (dc *driverConn) releaseConn(err error) {
dc.db.putConn(dc, err, true)
}释放连接
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
if !dc.inUse {
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
if debugGetPut {
db.lastPut[dc] = stack()
}
// 修改为未使用
dc.inUse = false
for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil
// 当前连接是ErrBadConn, 重新打开一个连接
if err == driver.ErrBadConn {
// Don't reuse bad connections.
// Since the conn is considered bad and is being discarded, treat it
// as closed. Don't decrement the open count here, finalClose will
// take care of that.
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
if putConnHook != nil {
putConnHook(db, dc)
}
if db.closed {
// Connections do not need to be reset if they will be closed.
// Prevents writing to resetterCh after the DB has closed.
resetSession = false
}
if resetSession {
if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
// Lock the driverConn here so it isn't released until
// the connection is reset.
// The lock must be taken before the connection is put into
// the pool to prevent it from being taken out before it is reset.
dc.Lock()
}
}
// 加入队列
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
if !added {
if resetSession {
dc.Unlock()
}
dc.Close()
return
}
if !resetSession {
return
}
select {
default:
// If the resetterCh is blocking then mark the connection
// as bad and continue on.
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}看了上面应该明白为什么Rows需要关闭了,如果不关闭,连接就不会释放,这样打开的连接就是一直增加,如果一旦超过了maxOpen,就会导致后面的数据库操作阻塞。