feat: industrial-grade SQLite optimization and high-concurrency support v1.0.12

This commit is contained in:
AI Engineer 2026-05-18 13:54:20 +08:00
parent 7b1e5054d9
commit f8ede4fdc7
8 changed files with 155 additions and 119 deletions

10
Base.go
View File

@ -68,7 +68,10 @@ func basePrepare(db *sql.DB, tx *sql.Tx, query string) *Stmt {
}
func baseExec(db *sql.DB, tx *sql.Tx, query string, args ...any) *ExecResult {
args = flatArgs(args)
return baseExecRaw(db, tx, query, flatArgs(args)...)
}
func baseExecRaw(db *sql.DB, tx *sql.Tx, query string, args ...any) *ExecResult {
var r sql.Result
var err error
startTime := time.Now()
@ -107,8 +110,10 @@ func flatArgs(args []any) []any {
}
func baseQuery(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
args = flatArgs(args)
return baseQueryRaw(db, tx, query, flatArgs(args)...)
}
func baseQueryRaw(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
var rows *sql.Rows
var err error
startTime := time.Now()
@ -128,6 +133,7 @@ func baseQuery(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
return &QueryResult{Sql: &query, Args: args, usedTime: usedTime, rows: rows}
}
func quote(quoteTag string, text string) string {
a := strings.Split(text, ".")
for i, v := range a {

View File

@ -1,5 +1,17 @@
# 变更记录 - @go/db
## [1.0.12] - 2026-05-17
- **SQLite 极致优化 (超高并发支持)**:
- **读写分离与零锁读取**: 读操作 (`Query`) 实现零锁定,配合 WAL 模式彻底解决读写互斥问题;写操作由应用层 `sync.Mutex` 统一排队,规避 `database is locked` 错误。
- **临界区最小化**: 将 FTS 重写、参数 JSON 化 (`flatArgs`) 及日志记录移出锁保护区,极大缩短了写锁持有时间。
- **工业级默认配置**:
- 自动启用 `WAL` 模式、`NORMAL` 同步、`MEMORY` 临时存储及 `busy_timeout(5000)`
- 引入 **动态 Mmap**:根据系统内存自动设置 `mmap_size` (最大 30GB 或物理内存的 1/4),使大数据量访问接近内存速度。
- 默认 `MaxOpenConns` 提升至 100优化多线程只读性能。
- **稳定性**:
- 为 `Stmt` (预处理语句) 增加写锁保护。
- 优化事务锁机制,支持事务内的锁自动追踪与释放。
## [1.0.11] - 2026-05-13
- **基础设施对齐**:
- 移除 `encoding/json` 原生依赖,全面切换至 `apigo.cc/go/cast.UnmarshalJSON` 以增强类型兼容性。

141
DB.go
View File

@ -19,7 +19,6 @@ import (
"apigo.cc/go/crypto"
"apigo.cc/go/id"
"apigo.cc/go/log"
"apigo.cc/go/rand"
"apigo.cc/go/redis"
"apigo.cc/go/safe"
)
@ -194,6 +193,7 @@ type DB struct {
QuoteTag string
tables map[string]*TableStruct
tablesLock *sync.RWMutex
sqliteMu *sync.Mutex // Serial lock for SQLite writers
}
type TableStruct struct {
@ -318,7 +318,7 @@ func (db *DB) NextID(table string) string {
func (db *DB) syncVersionFromDB(table, versionField string) {
query := fmt.Sprintf("SELECT MAX(%s) FROM %s", db.Quote(versionField), db.Quote(table))
maxVer := db.Query(query).IntOnR1C1()
maxVer := db.rawQuery(query).IntOnR1C1()
if db.Config.Redis != "" {
r := redis.GetRedis(db.Config.Redis, db.logger.logger)
@ -537,6 +537,9 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
db.conn = conn
db.tables = make(map[string]*TableStruct)
db.tablesLock = new(sync.RWMutex)
if conf.Type == "sqlite" || conf.Type == "sqlite3" {
db.sqliteMu = new(sync.Mutex)
}
if conf.ReadonlyHosts != nil {
readonlyConnections := make([]*sql.DB, 0)
@ -555,6 +558,9 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
db.Error = nil
db.Config = conf
if (conf.Type == "sqlite" || conf.Type == "sqlite3") && conf.MaxOpens == 0 {
conf.MaxOpens = 100
}
if conf.MaxIdles > 0 {
conn.SetMaxIdleConns(conf.MaxIdles)
}
@ -567,6 +573,25 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
if conf.LogSlow == 0 {
conf.LogSlow = config.Duration(1000 * time.Millisecond)
}
if conf.Type == "sqlite" || conf.Type == "sqlite3" {
baseExecRaw(conn, nil, "PRAGMA journal_mode=WAL")
baseExecRaw(conn, nil, "PRAGMA synchronous=NORMAL")
baseExecRaw(conn, nil, "PRAGMA busy_timeout=5000")
baseExecRaw(conn, nil, "PRAGMA temp_store=MEMORY")
baseExecRaw(conn, nil, "PRAGMA cache_size=-2000")
// Dynamic mmap_size: 1/4 of system memory, max 30GB
mmapLimit := int64(30000000000)
sysMemStr := runShell("sysctl -n hw.memsize || free -b | awk '/Mem:/ {print $2}'")
if sysMem := cast.Int64(sysMemStr); sysMem > 0 {
if mmapLimit > sysMem/4 {
mmapLimit = sysMem / 4
}
}
baseExecRaw(conn, nil, fmt.Sprintf("PRAGMA mmap_size=%d", mmapLimit))
}
if useCache {
dbInstancesLock.Lock()
dbInstances[name] = db
@ -591,6 +616,13 @@ func getPoolForHost(conf *Config, host string) (*sql.DB, error) {
if connector := connectors[conf.Type]; connector != nil {
return sql.OpenDB(connector(conf, conf.pwd, conf.tls)), nil
} else {
if (conf.Type == "sqlite" || conf.Type == "sqlite3") && !strings.Contains(conf.Args, "journal_mode") {
if conf.Args != "" {
conf.Args += "&"
}
conf.Args += "_journal_mode=WAL&_busy_timeout=5000&_pragma=synchronous(1)&_pragma=cache_size(-2000)"
}
dsn := ""
args := make([]string, 0)
if conf.SSL != "" {
@ -624,6 +656,7 @@ func (db *DB) CopyByLogger(logger *log.Logger) *DB {
newDB.Config = db.Config
newDB.tables = db.tables
newDB.tablesLock = db.tablesLock
newDB.sqliteMu = db.sqliteMu
if logger == nil {
logger = log.DefaultLogger
}
@ -660,6 +693,7 @@ func (db *DB) GetOriginDB() *sql.DB {
func (db *DB) Prepare(query string) *Stmt {
stmt := basePrepare(db.conn, nil, query)
stmt.logger = db.logger
stmt.sqliteMu = db.sqliteMu
if stmt.Error != nil {
db.logger.LogError(stmt.Error.Error())
}
@ -676,19 +710,22 @@ func (db *DB) Quotes(texts []string) string {
func (db *DB) Begin() *Tx {
if db.conn == nil {
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: errors.New("operate on a bad connection"), logger: db.logger}
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: errors.New("operate on a bad connection"), logger: db.logger}
}
sqlTx, err := db.conn.Begin()
if err != nil {
db.logger.LogError(err.Error())
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: err, logger: db.logger}
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: err, logger: db.logger}
}
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), conn: sqlTx, logger: db.logger}
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), conn: sqlTx, logger: db.logger}
}
func (db *DB) Exec(query string, args ...any) *ExecResult {
query, args = db.rewriteFTS(query, args)
r := baseExec(db.conn, nil, query, args...)
args = flatArgs(args)
db.lock()
r := baseExecRaw(db.conn, nil, query, args...)
db.unlock()
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
@ -700,20 +737,14 @@ func (db *DB) Exec(query string, args ...any) *ExecResult {
return r
}
func (db *DB) rawExec(query string, args ...any) *ExecResult {
return db.Exec(query, args...)
}
func (db *DB) Query(query string, args ...any) *QueryResult {
query, args = db.rewriteFTS(query, args)
conn := db.conn
if db.readonlyConnections != nil {
connNum := len(db.readonlyConnections)
if connNum == 1 {
conn = db.readonlyConnections[0]
} else {
p := rand.Int(0, connNum-1)
conn = db.readonlyConnections[p]
}
}
r := baseQuery(conn, nil, query, args...)
args = flatArgs(args)
r := baseQueryRaw(db.conn, nil, query, args...)
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
@ -725,6 +756,24 @@ func (db *DB) Query(query string, args ...any) *QueryResult {
return r
}
func (db *DB) rawQuery(query string, args ...any) *QueryResult {
return db.Query(query, args...)
}
func (db *DB) lock() {
if db.sqliteMu != nil {
db.sqliteMu.Lock()
}
}
func (db *DB) unlock() {
if db.sqliteMu != nil {
db.sqliteMu.Unlock()
}
}
var identifierRegex = `(?:['"` + "`" + `][^'"` + "`" + `]+['"` + "`" + `]|[\w\-]+)`
var likeFieldReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s*$`)
var likeLiteralReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s+(['"])(%?[^'"]*?%?)(['"])`)
@ -917,44 +966,17 @@ func (db *DB) extractTableName(query string, field string) string {
func (db *DB) Insert(table string, data any) *ExecResult {
query, values := db.MakeInsertSql(table, data, false)
r := baseExec(db.conn, nil, query, values...)
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
} else {
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
db.logger.LogQuery(query, values, r.usedTime)
}
}
return r
return db.Exec(query, values...)
}
func (db *DB) Replace(table string, data any) *ExecResult {
query, values := db.MakeInsertSql(table, data, true)
r := baseExec(db.conn, nil, query, values...)
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
} else {
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
db.logger.LogQuery(query, values, r.usedTime)
}
}
return r
return db.Exec(query, values...)
}
func (db *DB) Update(table string, data any, conditions string, args ...any) *ExecResult {
query, values := db.MakeUpdateSql(table, data, conditions, args...)
r := baseExec(db.conn, nil, query, values...)
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
} else {
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
db.logger.LogQuery(query, values, r.usedTime)
}
}
return r
return db.Exec(query, values...)
}
func (db *DB) Delete(table string, conditions string, args ...any) *ExecResult {
@ -964,16 +986,7 @@ func (db *DB) Delete(table string, conditions string, args ...any) *ExecResult {
conditions = " where " + conditions
}
query := fmt.Sprintf("delete from %s%s", db.Quote(table), conditions)
r := baseExec(db.conn, nil, query, args...)
r.logger = db.logger
if r.Error != nil {
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
} else {
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
db.logger.LogQuery(query, args, r.usedTime)
}
}
return r
return db.Exec(query, args...)
}
// Shadow delete
@ -1007,7 +1020,7 @@ func (db *DB) getTable(table string) *TableStruct {
var query string
if db.Config.Type == "mysql" {
query = "SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
res := db.Query(query, db.Config.DB, table)
res := db.rawQuery(query, db.Config.DB, table)
rows := res.MapResults()
for _, row := range rows {
col := cast.String(row["COLUMN_NAME"])
@ -1026,7 +1039,7 @@ func (db *DB) getTable(table string) *TableStruct {
}
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
query = "SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema = current_schema() AND table_name = ?"
res := db.Query(query, table)
res := db.rawQuery(query, table)
rows := res.MapResults()
for _, row := range rows {
col := cast.String(row["column_name"])
@ -1047,7 +1060,7 @@ func (db *DB) getTable(table string) *TableStruct {
} else if isFileDB(db.Config.Type) {
// For SQLite
query := fmt.Sprintf("PRAGMA table_info(%s)", db.Quote(table))
res := db.Query(query)
res := db.rawQuery(query)
rows := res.MapResults()
for _, row := range rows {
colName := cast.String(row["name"])
@ -1078,19 +1091,19 @@ func (db *DB) getTable(table string) *TableStruct {
shadowTable := table + "_deleted"
if db.Config.Type == "mysql" {
query = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
res := db.Query(query, db.Config.DB, shadowTable)
res := db.rawQuery(query, db.Config.DB, shadowTable)
if res.StringOnR1C1() != "" {
ts.HasShadowTable = true
}
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = ?"
res := db.Query(query, shadowTable)
res := db.rawQuery(query, shadowTable)
if res.StringOnR1C1() != "" {
ts.HasShadowTable = true
}
} else if isFileDB(db.Config.Type) {
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
res := db.Query(query, shadowTable)
res := db.rawQuery(query, shadowTable)
if res.StringOnR1C1() != "" {
ts.HasShadowTable = true
}

View File

@ -307,6 +307,8 @@ func (db *DB) Sync(desc string) error {
return outErr
}
// CheckTable 检查并同步单个表结构
func (db *DB) CheckTable(table *TableStruct) error {
fieldSets := make([]string, 0)

View File

@ -3,11 +3,13 @@ package db
import (
"database/sql"
"errors"
"sync"
"time"
)
type Stmt struct {
conn *sql.Stmt
sqliteMu *sync.Mutex
lastSql *string
lastArgs []any
Error error
@ -15,6 +17,10 @@ type Stmt struct {
}
func (stmt *Stmt) Exec(args ...any) *ExecResult {
if stmt.sqliteMu != nil {
stmt.sqliteMu.Lock()
defer stmt.sqliteMu.Unlock()
}
stmt.lastArgs = args
if stmt.conn == nil {
return &ExecResult{Sql: stmt.lastSql, Args: stmt.lastArgs, usedTime: -1, logger: stmt.logger, Error: errors.New("operate on a bad connection")}

85
Tx.go
View File

@ -5,12 +5,15 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type Tx struct {
conn *sql.Tx
db *DB
sqliteMu *sync.Mutex
hasLock bool
lastSql *string
lastArgs []any
Error error
@ -29,6 +32,7 @@ func (tx *Tx) Quotes(texts []string) string {
}
func (tx *Tx) Commit() error {
defer tx.unlock()
if tx.isCommittedOrRollbacked {
return nil
}
@ -45,6 +49,7 @@ func (tx *Tx) Commit() error {
}
func (tx *Tx) Rollback() error {
defer tx.unlock()
if tx.isCommittedOrRollbacked {
return nil
}
@ -78,6 +83,7 @@ func (tx *Tx) CheckFinished() error {
}
func (tx *Tx) Prepare(query string) *Stmt {
tx.lock()
tx.lastSql = &query
r := basePrepare(nil, tx.conn, query)
r.logger = tx.logger
@ -89,9 +95,11 @@ func (tx *Tx) Prepare(query string) *Stmt {
func (tx *Tx) Exec(query string, args ...any) *ExecResult {
query, args = tx.db.rewriteFTS(query, args)
args = flatArgs(args)
tx.lock()
tx.lastSql = &query
tx.lastArgs = args
r := baseExec(nil, tx.conn, query, args...)
r := baseExecRaw(nil, tx.conn, query, args...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
@ -105,9 +113,11 @@ func (tx *Tx) Exec(query string, args ...any) *ExecResult {
func (tx *Tx) Query(query string, args ...any) *QueryResult {
query, args = tx.db.rewriteFTS(query, args)
args = flatArgs(args)
// Query in Tx doesn't acquire lock unless it's already held by a previous write
tx.lastSql = &query
tx.lastArgs = args
r := baseQuery(nil, tx.conn, query, args...)
r := baseQueryRaw(nil, tx.conn, query, args...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
@ -119,52 +129,20 @@ func (tx *Tx) Query(query string, args ...any) *QueryResult {
return r
}
func (tx *Tx) Insert(table string, data any) *ExecResult {
query, values := tx.MakeInsertSql(table, data, false)
tx.lastSql = &query
tx.lastArgs = values
r := baseExec(nil, tx.conn, query, values...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
} else {
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
}
}
return r
return tx.Exec(query, values...)
}
func (tx *Tx) Replace(table string, data any) *ExecResult {
query, values := tx.MakeInsertSql(table, data, true)
tx.lastSql = &query
tx.lastArgs = values
r := baseExec(nil, tx.conn, query, values...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
} else {
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
}
}
return r
return tx.Exec(query, values...)
}
func (tx *Tx) Update(table string, data any, conditions string, args ...any) *ExecResult {
query, values := tx.MakeUpdateSql(table, data, conditions, args...)
tx.lastSql = &query
tx.lastArgs = values
r := baseExec(nil, tx.conn, query, values...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
} else {
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
}
}
return r
return tx.Exec(query, values...)
}
func (tx *Tx) Delete(table string, conditions string, args ...any) *ExecResult {
@ -187,24 +165,31 @@ func (tx *Tx) Delete(table string, conditions string, args ...any) *ExecResult {
colList = " select *"
}
moveQuery := fmt.Sprintf("insert into %s%s from %s%s", tx.Quote(table+"_deleted"), colList, tx.Quote(table), where)
r := baseExec(nil, tx.conn, moveQuery, args...)
// Use Exec to handle locking
r := tx.Exec(moveQuery, args...)
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), moveQuery, args, r.usedTime)
return r
}
}
query := fmt.Sprintf("delete from %s%s", tx.Quote(table), where)
tx.lastSql = &query
tx.lastArgs = args
r := baseExec(nil, tx.conn, query, args...)
r.logger = tx.logger
if r.Error != nil {
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
} else {
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
return tx.Exec(query, args...)
}
func (tx *Tx) lock() {
if tx.sqliteMu == nil || tx.hasLock {
return
}
return r
tx.sqliteMu.Lock()
tx.hasLock = true
}
func (tx *Tx) unlock() {
if tx.sqliteMu == nil || !tx.hasLock {
return
}
tx.sqliteMu.Unlock()
tx.hasLock = false
}

2
go.mod
View File

@ -9,7 +9,6 @@ require (
apigo.cc/go/file v1.3.2
apigo.cc/go/id v1.3.1
apigo.cc/go/log v1.3.4
apigo.cc/go/rand v1.3.1
apigo.cc/go/redis v1.3.2
apigo.cc/go/safe v1.3.1
apigo.cc/go/shell v1.3.1
@ -21,6 +20,7 @@ require (
require (
apigo.cc/go/encoding v1.3.1 // indirect
apigo.cc/go/rand v1.3.1 // indirect
filippo.io/edwards25519 v1.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect

12
utils.go Normal file
View File

@ -0,0 +1,12 @@
package db
import (
"os/exec"
"strings"
)
func runShell(command string) string {
cmd := exec.Command("bash", "-c", command)
out, _ := cmd.CombinedOutput()
return strings.TrimSpace(string(out))
}