Lanyon 记录下日常工作与学习哈~,还有技术分享哦。。🎉

golang sql体系及orm实现

golang sql标准库研究

抽象接口定义

database/sql/driver/driver.go关于数据库驱动模块下各核心interface主要包括:

  • Connector: 抽象的数据库连接器,需要具备创建数据库连接以及返回从属的数据库驱动的能力;
  • Driver: 抽象的数据库驱动,具备创建数据库连接的能力;
  • Conn: 抽象的数据库连接,具备预处理sql以及开启事务的能力;
  • Tx: 抽象的事务,具备提交和回滚的能力;
  • Statement: 抽象的请求预处理状态. 具备实际执行sql并返回执行结果的能力;
  • Result/Row: 抽象的sql执行结果;

database/sql/sql.go中定义的几个核心实体类. 核心内容主要是对于数据库连接池的实现以及对第三方数据库驱动能力的再封装.

  • DB: 对应为数据库的具象化实例,其中包含如下几个核心字段:connector(用于创建数据库连接的抽象连接器,由第三方数据库提供具体实现)、mufreeConnconnRequests等。
  • driverConn: 其核心属性是由第三方驱动实现的driver.Conn,在此之上添加了时间属性、回调函数、状态标识等辅助信息;
  • driverStmt: 在抽象的driver.Stmt基础上,添加了互斥锁、关闭状态标识等信息;
  • Tx: 在抽象的driver.TX基础上,额外添加了互斥锁、数据库连接、连接释放函数、上下文等辅助属性;

创建数据库

沿着sql.Open方法向下追溯,查看一下创建数据库实例的流程细节:

  • 首先校验对应的 driver 是否已注册;
  • 接下来调用OpenDB方法执行真正的db实例创建操作,方法中会创建一个DB,启动一个connectionOpener协程,连接池资源不足时,用于补充创建连接;
  • connectionOpener方法中,通过for + select多路复用的形式,保持协程的运行;
// 创建数据库
func Open(driverName, dataSourceName string) (*DB, error) {
    // 首先根据驱动类型获取数据库驱动, 导入mysql驱动时,会自动在drivers中注册,_ "github.com/go-sql-driver/mysql"
    driversMu.RLock()
    driveri, ok := drivers[driverName]
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    // 若驱动实现了对应的连接器 connector,则获取之并进行 db 实例创建
    if driverCtx, ok := driveri.(driver.DriverContext); ok {
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil
    }
    // 默认使用 dsn 数据库连接器,进行 db 创建
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

执行请求

在执行一次db.Query()请求中,其中核心步骤包括:获取数据库连接(通过调用conn方法完成),执行sql(通过调用queryDC方法完成)、归还/释放连接(通过在queryDC方法中调用releaseConn方法完成);

const maxBadConnRetries = 2
// 执行查询类 sql
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
    var rows *Rows
    var err error
    var isBadConn bool

    // 最多可以因为 BadConn 类型的错误重试两次
    for i := 0; i < maxBadConnRetries; i++ {
        // 执行 sql,此时采用的是 连接池有缓存连接优先复用 的策略
        rows, err = db.query(ctx, query, args, cachedOrNewConn)
        // 属于 badConn 类型的错误可以重试
        isBadConn = errors.Is(err, driver.ErrBadConn)
        if !isBadConn {
            break
        }
    }
    // 重试了两轮 badConn 错误后,第三轮会采用
    if isBadConn {
        return db.query(ctx, query, args, alwaysNewConn)
    }
    return rows, err
}

conn方法获取数据库连接:

  • 倘若启用了连接池策略且连接池中有可用的连接,则会优先获取该连接进行返回;
  • 倘若当前连接数已达上限,则会将当前协程挂起,建立对应的channel添加到connRequests map中,等待有连接释放时被唤醒;
  • 倘若连接数未达上限,则会调用第三方驱动的connector完成新连接的创建;

归还数据库连接,使用完数据库连接后,需要尝试将其放还连接池中,入口方法为releaseConn

func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err, true)
}

清理任务

接下来是cleaner协程的运行流程,整体是通过for + select的方式常驻运行. 其中,cleaner创建了一个定时器ticker,定时时间间隔会在maxIdleTimemaxLifeTime中取较小值,并基于秒级向上取整. 每一轮ticker触发后,会执行:

  • 判断当前db是否已关闭或者存活连接数是否为零,是的话退出当前cleaner协程
  • 调用connectionCleanerRunLocked对连接池中过期的连接进行清理

mysql驱动实现

go-sql-driver/mysql的核心功能是,遵循database/sql标准库中预留的接口协议,提供出对应于mysql的实现版本,将和mysql服务端的数据传输、通信协议,预处理模式、事务操作等内容封装实现在其中.

驱动加载,数据库驱动. mysql driver时,只需要匿名导入go-sql-driver/mysqllib包,即可完成driver的注册操作。其原理是:会默认调用mysql包的init方法。

驱动类定义位于driver.go,名称为MySQLDriver,对应实现Open方法用于创建数据库连接,核心步骤包括: 解析dsn,转为配置类实例、 构造连接器实例、 通过连接器完成连接创建操作;

import (
    // 注册 mysql 数据库驱动
    _ "github.com/go-sql-driver/mysql"
)

// mysql#driver.go, This variable can be replaced with -ldflags like below:
// go build "-ldflags=-X github.com/go-sql-driver/mysql.driverName=custom"
var driverName = "mysql"
func init() {
    if driverName != "" {
        sql.Register(driverName, &MySQLDriver{})
    }
}
// MySQL 版本的数据库驱动
type MySQLDriver struct{}

连接器的实现位于connecto.go,其需实现database/sql connector接口定义的ConnectDriver()方法:

type connector struct {
    cfg               *Config // immutable private copy.
    encodedAttributes string  // Encoded connection attributes.
}

// Connect implements driver.Connector interface.
// Connect returns a connection to the database.
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
    // New mysqlConn
    mc := &mysqlConn{
        maxAllowedPacket: maxPacketSize,
        maxWriteSize:     maxPacketSize - 1,
        closech:          make(chan struct{}),
        cfg:              cfg,
        connector:        c,
	}
	// ...
}

// Driver implements driver.Connector interface.
// Driver returns &MySQLDriver{}.
func (c *connector) Driver() driver.Driver {
    return &MySQLDriver{}
}

Connect方法的实现主要包含如下几个核心步骤,与mysql连接配置有关的内容被聚合在dsn.go

  • 创建连接(net.Dialer.DialContext)、设置为tcp长连接(net.TCPConn.KeepAlive)、创建连接缓冲区(mc.buf = newBuffer
  • 设置连接超时配置(mc.buf.timeout = mc.cfg.ReadTimeoutmc.writeTimeout = mc.cfg.WriteTimeout
  • 接收来自服务端的握手请求(mc.readHandshakePacket)、向服务端发起鉴权请求(mc.writeHandshakeResponsePacket
  • 处理鉴权结果(mc.handleAuthResult)、设置dsn中的参数变量(mc.handleParams

数据库连接接口,值得一提的是,在使用mysqlConn的过程中,在文件connection.go中,mysqlConn对外可以通过公开方法Close实现关闭:

type mysqlConn struct {
    // 缓冲区数据
    buf              buffer
    // 网络连接
    netConn          net.Conn
    rawConn          net.Conn    // underlying connection when netConn is TLS connection.
    result           mysqlResult // sql 执行结果
	// ...
}

func (mc *mysqlConn) Close() (err error) {
    // Makes Close idempotent
    if !mc.closed.Load() {
        err = mc.writeCommandPacket(comQuit)
    }
    mc.cleanup()
    return
}

下面是通过mysqlConn执行查询类请求的流程,对于query方法,入参中的query字段为sql模板,args字段为用于填充占位符的参数。

query方法的出参类型为textRows,其首先会读取响应报文中第一部分,填充各个列的信息,后续内容会保留在内置的conn中,通过使用方调用rowsNext方法时再进行读取操作.

func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, error) {
    return mc.query(query, args)
}

func (mc *mysqlConn) query(query string, args []driver.Value) (*textRows, error) {
    handleOk := mc.clearResult()
    // 连接已关闭?
    if mc.closed.Load() {
        mc.cfg.Logger.Print(ErrInvalidConn)
        return nil, driver.ErrBadConn
    }
	// ...
}

sql预处理,go-sql-driver/mysql 库实现的statement类如下,对应的代码位于statement.go文件中,prepare statement是通过调用mysqlConnprepare方法开启的,对应流程及源码如下:

type mysqlStmt struct {
    // 关联的 mysql 连接
    mc         *mysqlConn
    // 预处理语句的标识 id
    id         uint32
    // 预处理状态中多少待填充参数
    paramCount int
}

gorm框架原理分析

gorm框架通过一个gorm.DB实例来指代我们所操作的数据库. 使用gorm的第一步就是要通过Open方法创建出一个gorm.DB实例,其中首个入参为连接器dialector,本身是个抽象的interface,其实现类关联了具体数据库类型.

import (
    "gorm.io/driver/mysql"
    "gorm.io/gorm"
)
var (
    dsn := "root:123456@tcp(127.0.0.1:3306)/douban_datahub?charset=utf8mb4&parseTime=True&loc=Local"
    db *gorm.DB
    dbOnce sync.Once
)
func getDB() (*gorm.DB, error) {
    var err error
    dbOnce.Do(func(){
        // 创建 db 实例
        db, err = gorm.Open(mysql.Open(dsn),&gorm.Config{})
    })
    return db,err
}

创建gorm.DB实例流程,gorm.Open方法是创建DB实例的入口方法,其中包含如下几项核心步骤:

  • 完成gorm.Config配置的创建和注入,完成连接器dialector的注入,本篇使用的是mysql版本;
  • 完成callbackscrud等几类processor的创建 (通过initializeCallbacks(...) 方法 )
  • 完成connPool的创建以及各类processor fns函数的注册(通过dialector.Initialize(...)方法)
  • 倘若启用了prepare模式,需要使用preparedStmtDB进行connPool的平替 ,构造statement实例
  • 根据策略,决定是否通过ping请求测试连接,返回创建好的db实例;
    // Open initialize db session based on dialector
    func Open(dialector Dialector, opts ...Option) (db *DB, err error) {
      config := &Config{}
      // ...
      if config.NamingStrategy == nil {   // 表、列命名策略
          config.NamingStrategy = schema.NamingStrategy{IdentifierMaxLength: 64} // Default Identifier length is 64
      }
      if dialector != nil {   // 连接器
          config.Dialector = dialector
      }
      db = &DB{Config: config, clone: 1}
      db.callbacks = initializeCallbacks(db)  // 初始化 callback 当中的各个 processor
    
      if config.PrepareStmt { // 是否启用 prepare 模式
          preparedStmt := NewPreparedStmtDB(db.ConnPool)
          db.cacheStore.Store(preparedStmtDBKey, preparedStmt)
          db.ConnPool = preparedStmt
      }
      // ...
    }
    

初始化dialectorgormmysql版本的dialector实现在代码仓库 https://github.com/go-gorm/mysql 中,使用者通过Open方法,将传入的dsn解析成配置,然后返回mysql版本的Dialector实例.

// go-gorm/mysql/mysql.go
func Open(dsn string) gorm.Dialector {
    dsnConf, _ := mysql.ParseDSN(dsn)
    return &Dialector{Config: &Config{DSN: dsn, DSNConfig: dsnConf}}
}

// 在gorm.Open中,当dialector不为空时,会调用config.Dialector.Initialize(db),对应实现在 go-gorm/mysql/mysql.go中
func (dialector Dialector) Initialize(db *gorm.DB) (err error) {
    if dialector.DriverName == "" {
        dialector.DriverName = DefaultDriverName
    }
	// connPool 初始化
    if dialector.Conn != nil {
        db.ConnPool = dialector.Conn
    } else {
        db.ConnPool, err = sql.Open(dialector.DriverName, dialector.DSN)
        if err != nil {
        return err
        }
    }
    // register callbacks
    callbackConfig := &callbacks.Config{
        CreateClauses: CreateClauses,
        QueryClauses:  QueryClauses,
        UpdateClauses: UpdateClauses,
        DeleteClauses: DeleteClauses,
    }
    // ...完成 crud 类操作 callback 函数的注册
    callbacks.RegisterDefaultCallbacks(db, callbackConfig)
	return
}

查询,以db.First方法作为入口,展示数据库查询的方法链路,在db.First方法当中:

  • 遵循First的语义,通过limitorder追加clause,限制只取满足条件且主键最小的一笔数据;
  • 追加用户传入的一系列condition,进行clause追加;
  • FirstTakeLast等方法中,会设置RaiseErrorOnNotFound标识为true,倘若未找到记录,则会抛出ErrRecordNotFound错误;

添加条件,执行查询类操作时,通常会通过链式调用的方式,传入一些查询限制条件,比如 Where、Group By、Order、Limit 之类. 我们以 Limit 为例,进行展开介绍:

  • 首先调用 db.getInstance() 方法,克隆出一份 DB 会话实例
  • 调用 statement.AddClause 方法,将 limit 条件追加到 statement 的 Clauses map 中
    func (db *DB) Limit(limit int) (tx *DB) {
      tx = db.getInstance()
      tx.Statement.AddClause(clause.Limit{Limit: &limit})
      return
    }
    func (stmt *Statement) AddClause(v clause.Interface) {
      name := v.Name()
      c := stmt.Clauses[name]
      c.Name = name
      v.MergeClause(&c)
      stmt.Clauses[name] = c
    }