Compare commits
No commits in common. "main" and "v1.3.4" have entirely different histories.
10
Base.go
10
Base.go
@ -68,10 +68,7 @@ func basePrepare(db *sql.DB, tx *sql.Tx, query string) *Stmt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func baseExec(db *sql.DB, tx *sql.Tx, query string, args ...any) *ExecResult {
|
func baseExec(db *sql.DB, tx *sql.Tx, query string, args ...any) *ExecResult {
|
||||||
return baseExecRaw(db, tx, query, flatArgs(args)...)
|
args = flatArgs(args)
|
||||||
}
|
|
||||||
|
|
||||||
func baseExecRaw(db *sql.DB, tx *sql.Tx, query string, args ...any) *ExecResult {
|
|
||||||
var r sql.Result
|
var r sql.Result
|
||||||
var err error
|
var err error
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
@ -110,10 +107,8 @@ func flatArgs(args []any) []any {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func baseQuery(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
|
func baseQuery(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
|
||||||
return baseQueryRaw(db, tx, query, flatArgs(args)...)
|
args = flatArgs(args)
|
||||||
}
|
|
||||||
|
|
||||||
func baseQueryRaw(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResult {
|
|
||||||
var rows *sql.Rows
|
var rows *sql.Rows
|
||||||
var err error
|
var err error
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
@ -133,7 +128,6 @@ func baseQueryRaw(db *sql.DB, tx *sql.Tx, query string, args ...any) *QueryResul
|
|||||||
return &QueryResult{Sql: &query, Args: args, usedTime: usedTime, rows: rows}
|
return &QueryResult{Sql: &query, Args: args, usedTime: usedTime, rows: rows}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func quote(quoteTag string, text string) string {
|
func quote(quoteTag string, text string) string {
|
||||||
a := strings.Split(text, ".")
|
a := strings.Split(text, ".")
|
||||||
for i, v := range a {
|
for i, v := range a {
|
||||||
|
|||||||
12
CHANGELOG.md
12
CHANGELOG.md
@ -1,17 +1,5 @@
|
|||||||
# 变更记录 - @go/db
|
# 变更记录 - @go/db
|
||||||
|
|
||||||
## [1.0.12] - 2026-05-17
|
|
||||||
- **SQLite 极致优化 (超高并发支持)**:
|
|
||||||
- **读写分离与零锁读取**: 读操作 (`Query`) 实现零锁定,配合 WAL 模式彻底解决读写互斥问题;写操作由应用层 `sync.Mutex` 统一排队,规避 `database is locked` 错误。
|
|
||||||
- **临界区最小化**: 将 FTS 重写、参数 JSON 化 (`flatArgs`) 及日志记录移出锁保护区,极大缩短了写锁持有时间。
|
|
||||||
- **工业级默认配置**:
|
|
||||||
- 自动启用 `WAL` 模式、`NORMAL` 同步、`MEMORY` 临时存储及 `busy_timeout(5000)`。
|
|
||||||
- 引入 **动态 Mmap**:根据系统内存自动设置 `mmap_size` (最大 30GB 或物理内存的 1/4),使大数据量访问接近内存速度。
|
|
||||||
- 默认 `MaxOpenConns` 提升至 100,优化多线程只读性能。
|
|
||||||
- **稳定性**:
|
|
||||||
- 为 `Stmt` (预处理语句) 增加写锁保护。
|
|
||||||
- 优化事务锁机制,支持事务内的锁自动追踪与释放。
|
|
||||||
|
|
||||||
## [1.0.11] - 2026-05-13
|
## [1.0.11] - 2026-05-13
|
||||||
- **基础设施对齐**:
|
- **基础设施对齐**:
|
||||||
- 移除 `encoding/json` 原生依赖,全面切换至 `apigo.cc/go/cast.UnmarshalJSON` 以增强类型兼容性。
|
- 移除 `encoding/json` 原生依赖,全面切换至 `apigo.cc/go/cast.UnmarshalJSON` 以增强类型兼容性。
|
||||||
|
|||||||
141
DB.go
141
DB.go
@ -19,6 +19,7 @@ import (
|
|||||||
"apigo.cc/go/crypto"
|
"apigo.cc/go/crypto"
|
||||||
"apigo.cc/go/id"
|
"apigo.cc/go/id"
|
||||||
"apigo.cc/go/log"
|
"apigo.cc/go/log"
|
||||||
|
"apigo.cc/go/rand"
|
||||||
"apigo.cc/go/redis"
|
"apigo.cc/go/redis"
|
||||||
"apigo.cc/go/safe"
|
"apigo.cc/go/safe"
|
||||||
)
|
)
|
||||||
@ -193,7 +194,6 @@ type DB struct {
|
|||||||
QuoteTag string
|
QuoteTag string
|
||||||
tables map[string]*TableStruct
|
tables map[string]*TableStruct
|
||||||
tablesLock *sync.RWMutex
|
tablesLock *sync.RWMutex
|
||||||
sqliteMu *sync.Mutex // Serial lock for SQLite writers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type TableStruct struct {
|
type TableStruct struct {
|
||||||
@ -318,7 +318,7 @@ func (db *DB) NextID(table string) string {
|
|||||||
|
|
||||||
func (db *DB) syncVersionFromDB(table, versionField string) {
|
func (db *DB) syncVersionFromDB(table, versionField string) {
|
||||||
query := fmt.Sprintf("SELECT MAX(%s) FROM %s", db.Quote(versionField), db.Quote(table))
|
query := fmt.Sprintf("SELECT MAX(%s) FROM %s", db.Quote(versionField), db.Quote(table))
|
||||||
maxVer := db.rawQuery(query).IntOnR1C1()
|
maxVer := db.Query(query).IntOnR1C1()
|
||||||
|
|
||||||
if db.Config.Redis != "" {
|
if db.Config.Redis != "" {
|
||||||
r := redis.GetRedis(db.Config.Redis, db.logger.logger)
|
r := redis.GetRedis(db.Config.Redis, db.logger.logger)
|
||||||
@ -537,9 +537,6 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
|
|||||||
db.conn = conn
|
db.conn = conn
|
||||||
db.tables = make(map[string]*TableStruct)
|
db.tables = make(map[string]*TableStruct)
|
||||||
db.tablesLock = new(sync.RWMutex)
|
db.tablesLock = new(sync.RWMutex)
|
||||||
if conf.Type == "sqlite" || conf.Type == "sqlite3" {
|
|
||||||
db.sqliteMu = new(sync.Mutex)
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf.ReadonlyHosts != nil {
|
if conf.ReadonlyHosts != nil {
|
||||||
readonlyConnections := make([]*sql.DB, 0)
|
readonlyConnections := make([]*sql.DB, 0)
|
||||||
@ -558,9 +555,6 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
|
|||||||
|
|
||||||
db.Error = nil
|
db.Error = nil
|
||||||
db.Config = conf
|
db.Config = conf
|
||||||
if (conf.Type == "sqlite" || conf.Type == "sqlite3") && conf.MaxOpens == 0 {
|
|
||||||
conf.MaxOpens = 100
|
|
||||||
}
|
|
||||||
if conf.MaxIdles > 0 {
|
if conf.MaxIdles > 0 {
|
||||||
conn.SetMaxIdleConns(conf.MaxIdles)
|
conn.SetMaxIdleConns(conf.MaxIdles)
|
||||||
}
|
}
|
||||||
@ -573,25 +567,6 @@ func getDB(name string, logger *log.Logger, useCache bool) *DB {
|
|||||||
if conf.LogSlow == 0 {
|
if conf.LogSlow == 0 {
|
||||||
conf.LogSlow = config.Duration(1000 * time.Millisecond)
|
conf.LogSlow = config.Duration(1000 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.Type == "sqlite" || conf.Type == "sqlite3" {
|
|
||||||
baseExecRaw(conn, nil, "PRAGMA journal_mode=WAL")
|
|
||||||
baseExecRaw(conn, nil, "PRAGMA synchronous=NORMAL")
|
|
||||||
baseExecRaw(conn, nil, "PRAGMA busy_timeout=5000")
|
|
||||||
baseExecRaw(conn, nil, "PRAGMA temp_store=MEMORY")
|
|
||||||
baseExecRaw(conn, nil, "PRAGMA cache_size=-2000")
|
|
||||||
|
|
||||||
// Dynamic mmap_size: 1/4 of system memory, max 30GB
|
|
||||||
mmapLimit := int64(30000000000)
|
|
||||||
sysMemStr := runShell("sysctl -n hw.memsize || free -b | awk '/Mem:/ {print $2}'")
|
|
||||||
if sysMem := cast.Int64(sysMemStr); sysMem > 0 {
|
|
||||||
if mmapLimit > sysMem/4 {
|
|
||||||
mmapLimit = sysMem / 4
|
|
||||||
}
|
|
||||||
}
|
|
||||||
baseExecRaw(conn, nil, fmt.Sprintf("PRAGMA mmap_size=%d", mmapLimit))
|
|
||||||
}
|
|
||||||
|
|
||||||
if useCache {
|
if useCache {
|
||||||
dbInstancesLock.Lock()
|
dbInstancesLock.Lock()
|
||||||
dbInstances[name] = db
|
dbInstances[name] = db
|
||||||
@ -616,13 +591,6 @@ func getPoolForHost(conf *Config, host string) (*sql.DB, error) {
|
|||||||
if connector := connectors[conf.Type]; connector != nil {
|
if connector := connectors[conf.Type]; connector != nil {
|
||||||
return sql.OpenDB(connector(conf, conf.pwd, conf.tls)), nil
|
return sql.OpenDB(connector(conf, conf.pwd, conf.tls)), nil
|
||||||
} else {
|
} else {
|
||||||
if (conf.Type == "sqlite" || conf.Type == "sqlite3") && !strings.Contains(conf.Args, "journal_mode") {
|
|
||||||
if conf.Args != "" {
|
|
||||||
conf.Args += "&"
|
|
||||||
}
|
|
||||||
conf.Args += "_journal_mode=WAL&_busy_timeout=5000&_pragma=synchronous(1)&_pragma=cache_size(-2000)"
|
|
||||||
}
|
|
||||||
|
|
||||||
dsn := ""
|
dsn := ""
|
||||||
args := make([]string, 0)
|
args := make([]string, 0)
|
||||||
if conf.SSL != "" {
|
if conf.SSL != "" {
|
||||||
@ -656,7 +624,6 @@ func (db *DB) CopyByLogger(logger *log.Logger) *DB {
|
|||||||
newDB.Config = db.Config
|
newDB.Config = db.Config
|
||||||
newDB.tables = db.tables
|
newDB.tables = db.tables
|
||||||
newDB.tablesLock = db.tablesLock
|
newDB.tablesLock = db.tablesLock
|
||||||
newDB.sqliteMu = db.sqliteMu
|
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.DefaultLogger
|
logger = log.DefaultLogger
|
||||||
}
|
}
|
||||||
@ -693,7 +660,6 @@ func (db *DB) GetOriginDB() *sql.DB {
|
|||||||
func (db *DB) Prepare(query string) *Stmt {
|
func (db *DB) Prepare(query string) *Stmt {
|
||||||
stmt := basePrepare(db.conn, nil, query)
|
stmt := basePrepare(db.conn, nil, query)
|
||||||
stmt.logger = db.logger
|
stmt.logger = db.logger
|
||||||
stmt.sqliteMu = db.sqliteMu
|
|
||||||
if stmt.Error != nil {
|
if stmt.Error != nil {
|
||||||
db.logger.LogError(stmt.Error.Error())
|
db.logger.LogError(stmt.Error.Error())
|
||||||
}
|
}
|
||||||
@ -710,22 +676,19 @@ func (db *DB) Quotes(texts []string) string {
|
|||||||
|
|
||||||
func (db *DB) Begin() *Tx {
|
func (db *DB) Begin() *Tx {
|
||||||
if db.conn == nil {
|
if db.conn == nil {
|
||||||
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: errors.New("operate on a bad connection"), logger: db.logger}
|
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: errors.New("operate on a bad connection"), logger: db.logger}
|
||||||
}
|
}
|
||||||
sqlTx, err := db.conn.Begin()
|
sqlTx, err := db.conn.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.logger.LogError(err.Error())
|
db.logger.LogError(err.Error())
|
||||||
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: err, logger: db.logger}
|
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), Error: err, logger: db.logger}
|
||||||
}
|
}
|
||||||
return &Tx{db: db, sqliteMu: db.sqliteMu, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), conn: sqlTx, logger: db.logger}
|
return &Tx{db: db, QuoteTag: db.QuoteTag, logSlow: db.Config.LogSlow.TimeDuration(), conn: sqlTx, logger: db.logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Exec(query string, args ...any) *ExecResult {
|
func (db *DB) Exec(query string, args ...any) *ExecResult {
|
||||||
query, args = db.rewriteFTS(query, args)
|
query, args = db.rewriteFTS(query, args)
|
||||||
args = flatArgs(args)
|
r := baseExec(db.conn, nil, query, args...)
|
||||||
db.lock()
|
|
||||||
r := baseExecRaw(db.conn, nil, query, args...)
|
|
||||||
db.unlock()
|
|
||||||
r.logger = db.logger
|
r.logger = db.logger
|
||||||
if r.Error != nil {
|
if r.Error != nil {
|
||||||
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
|
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
|
||||||
@ -737,14 +700,20 @@ func (db *DB) Exec(query string, args ...any) *ExecResult {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) rawExec(query string, args ...any) *ExecResult {
|
|
||||||
return db.Exec(query, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) Query(query string, args ...any) *QueryResult {
|
func (db *DB) Query(query string, args ...any) *QueryResult {
|
||||||
query, args = db.rewriteFTS(query, args)
|
query, args = db.rewriteFTS(query, args)
|
||||||
args = flatArgs(args)
|
conn := db.conn
|
||||||
r := baseQueryRaw(db.conn, nil, query, args...)
|
if db.readonlyConnections != nil {
|
||||||
|
connNum := len(db.readonlyConnections)
|
||||||
|
if connNum == 1 {
|
||||||
|
conn = db.readonlyConnections[0]
|
||||||
|
} else {
|
||||||
|
p := rand.Int(0, connNum-1)
|
||||||
|
conn = db.readonlyConnections[p]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r := baseQuery(conn, nil, query, args...)
|
||||||
r.logger = db.logger
|
r.logger = db.logger
|
||||||
if r.Error != nil {
|
if r.Error != nil {
|
||||||
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
|
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
|
||||||
@ -756,24 +725,6 @@ func (db *DB) Query(query string, args ...any) *QueryResult {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) rawQuery(query string, args ...any) *QueryResult {
|
|
||||||
return db.Query(query, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (db *DB) lock() {
|
|
||||||
if db.sqliteMu != nil {
|
|
||||||
db.sqliteMu.Lock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) unlock() {
|
|
||||||
if db.sqliteMu != nil {
|
|
||||||
db.sqliteMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
var identifierRegex = `(?:['"` + "`" + `][^'"` + "`" + `]+['"` + "`" + `]|[\w\-]+)`
|
var identifierRegex = `(?:['"` + "`" + `][^'"` + "`" + `]+['"` + "`" + `]|[\w\-]+)`
|
||||||
var likeFieldReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s*$`)
|
var likeFieldReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s*$`)
|
||||||
var likeLiteralReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s+(['"])(%?[^'"]*?%?)(['"])`)
|
var likeLiteralReg = regexp.MustCompile(`(?i)(` + identifierRegex + `(?:\.` + identifierRegex + `)*)\s+LIKE\s+(['"])(%?[^'"]*?%?)(['"])`)
|
||||||
@ -966,17 +917,44 @@ func (db *DB) extractTableName(query string, field string) string {
|
|||||||
|
|
||||||
func (db *DB) Insert(table string, data any) *ExecResult {
|
func (db *DB) Insert(table string, data any) *ExecResult {
|
||||||
query, values := db.MakeInsertSql(table, data, false)
|
query, values := db.MakeInsertSql(table, data, false)
|
||||||
return db.Exec(query, values...)
|
r := baseExec(db.conn, nil, query, values...)
|
||||||
|
r.logger = db.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
|
||||||
|
db.logger.LogQuery(query, values, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Replace(table string, data any) *ExecResult {
|
func (db *DB) Replace(table string, data any) *ExecResult {
|
||||||
query, values := db.MakeInsertSql(table, data, true)
|
query, values := db.MakeInsertSql(table, data, true)
|
||||||
return db.Exec(query, values...)
|
r := baseExec(db.conn, nil, query, values...)
|
||||||
|
r.logger = db.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
|
||||||
|
db.logger.LogQuery(query, values, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Update(table string, data any, conditions string, args ...any) *ExecResult {
|
func (db *DB) Update(table string, data any, conditions string, args ...any) *ExecResult {
|
||||||
query, values := db.MakeUpdateSql(table, data, conditions, args...)
|
query, values := db.MakeUpdateSql(table, data, conditions, args...)
|
||||||
return db.Exec(query, values...)
|
r := baseExec(db.conn, nil, query, values...)
|
||||||
|
r.logger = db.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
db.logger.LogQueryError(r.Error.Error(), query, values, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
|
||||||
|
db.logger.LogQuery(query, values, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Delete(table string, conditions string, args ...any) *ExecResult {
|
func (db *DB) Delete(table string, conditions string, args ...any) *ExecResult {
|
||||||
@ -986,7 +964,16 @@ func (db *DB) Delete(table string, conditions string, args ...any) *ExecResult {
|
|||||||
conditions = " where " + conditions
|
conditions = " where " + conditions
|
||||||
}
|
}
|
||||||
query := fmt.Sprintf("delete from %s%s", db.Quote(table), conditions)
|
query := fmt.Sprintf("delete from %s%s", db.Quote(table), conditions)
|
||||||
return db.Exec(query, args...)
|
r := baseExec(db.conn, nil, query, args...)
|
||||||
|
r.logger = db.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
db.logger.LogQueryError(r.Error.Error(), query, args, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if db.Config.LogSlow > 0 && r.usedTime >= float32(db.Config.LogSlow.TimeDuration()/time.Millisecond) {
|
||||||
|
db.logger.LogQuery(query, args, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shadow delete
|
// Shadow delete
|
||||||
@ -1020,7 +1007,7 @@ func (db *DB) getTable(table string) *TableStruct {
|
|||||||
var query string
|
var query string
|
||||||
if db.Config.Type == "mysql" {
|
if db.Config.Type == "mysql" {
|
||||||
query = "SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
|
query = "SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
|
||||||
res := db.rawQuery(query, db.Config.DB, table)
|
res := db.Query(query, db.Config.DB, table)
|
||||||
rows := res.MapResults()
|
rows := res.MapResults()
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
col := cast.String(row["COLUMN_NAME"])
|
col := cast.String(row["COLUMN_NAME"])
|
||||||
@ -1039,7 +1026,7 @@ func (db *DB) getTable(table string) *TableStruct {
|
|||||||
}
|
}
|
||||||
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
|
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
|
||||||
query = "SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema = current_schema() AND table_name = ?"
|
query = "SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema = current_schema() AND table_name = ?"
|
||||||
res := db.rawQuery(query, table)
|
res := db.Query(query, table)
|
||||||
rows := res.MapResults()
|
rows := res.MapResults()
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
col := cast.String(row["column_name"])
|
col := cast.String(row["column_name"])
|
||||||
@ -1060,7 +1047,7 @@ func (db *DB) getTable(table string) *TableStruct {
|
|||||||
} else if isFileDB(db.Config.Type) {
|
} else if isFileDB(db.Config.Type) {
|
||||||
// For SQLite
|
// For SQLite
|
||||||
query := fmt.Sprintf("PRAGMA table_info(%s)", db.Quote(table))
|
query := fmt.Sprintf("PRAGMA table_info(%s)", db.Quote(table))
|
||||||
res := db.rawQuery(query)
|
res := db.Query(query)
|
||||||
rows := res.MapResults()
|
rows := res.MapResults()
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
colName := cast.String(row["name"])
|
colName := cast.String(row["name"])
|
||||||
@ -1091,19 +1078,19 @@ func (db *DB) getTable(table string) *TableStruct {
|
|||||||
shadowTable := table + "_deleted"
|
shadowTable := table + "_deleted"
|
||||||
if db.Config.Type == "mysql" {
|
if db.Config.Type == "mysql" {
|
||||||
query = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
|
query = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
|
||||||
res := db.rawQuery(query, db.Config.DB, shadowTable)
|
res := db.Query(query, db.Config.DB, shadowTable)
|
||||||
if res.StringOnR1C1() != "" {
|
if res.StringOnR1C1() != "" {
|
||||||
ts.HasShadowTable = true
|
ts.HasShadowTable = true
|
||||||
}
|
}
|
||||||
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
|
} else if db.Config.Type == "postgres" || db.Config.Type == "pgx" {
|
||||||
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = ?"
|
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = ?"
|
||||||
res := db.rawQuery(query, shadowTable)
|
res := db.Query(query, shadowTable)
|
||||||
if res.StringOnR1C1() != "" {
|
if res.StringOnR1C1() != "" {
|
||||||
ts.HasShadowTable = true
|
ts.HasShadowTable = true
|
||||||
}
|
}
|
||||||
} else if isFileDB(db.Config.Type) {
|
} else if isFileDB(db.Config.Type) {
|
||||||
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
|
query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
|
||||||
res := db.rawQuery(query, shadowTable)
|
res := db.Query(query, shadowTable)
|
||||||
if res.StringOnR1C1() != "" {
|
if res.StringOnR1C1() != "" {
|
||||||
ts.HasShadowTable = true
|
ts.HasShadowTable = true
|
||||||
}
|
}
|
||||||
|
|||||||
@ -307,8 +307,6 @@ func (db *DB) Sync(desc string) error {
|
|||||||
return outErr
|
return outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// CheckTable 检查并同步单个表结构
|
// CheckTable 检查并同步单个表结构
|
||||||
func (db *DB) CheckTable(table *TableStruct) error {
|
func (db *DB) CheckTable(table *TableStruct) error {
|
||||||
fieldSets := make([]string, 0)
|
fieldSets := make([]string, 0)
|
||||||
|
|||||||
6
Stmt.go
6
Stmt.go
@ -3,13 +3,11 @@ package db
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Stmt struct {
|
type Stmt struct {
|
||||||
conn *sql.Stmt
|
conn *sql.Stmt
|
||||||
sqliteMu *sync.Mutex
|
|
||||||
lastSql *string
|
lastSql *string
|
||||||
lastArgs []any
|
lastArgs []any
|
||||||
Error error
|
Error error
|
||||||
@ -17,10 +15,6 @@ type Stmt struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (stmt *Stmt) Exec(args ...any) *ExecResult {
|
func (stmt *Stmt) Exec(args ...any) *ExecResult {
|
||||||
if stmt.sqliteMu != nil {
|
|
||||||
stmt.sqliteMu.Lock()
|
|
||||||
defer stmt.sqliteMu.Unlock()
|
|
||||||
}
|
|
||||||
stmt.lastArgs = args
|
stmt.lastArgs = args
|
||||||
if stmt.conn == nil {
|
if stmt.conn == nil {
|
||||||
return &ExecResult{Sql: stmt.lastSql, Args: stmt.lastArgs, usedTime: -1, logger: stmt.logger, Error: errors.New("operate on a bad connection")}
|
return &ExecResult{Sql: stmt.lastSql, Args: stmt.lastArgs, usedTime: -1, logger: stmt.logger, Error: errors.New("operate on a bad connection")}
|
||||||
|
|||||||
85
Tx.go
85
Tx.go
@ -5,15 +5,12 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Tx struct {
|
type Tx struct {
|
||||||
conn *sql.Tx
|
conn *sql.Tx
|
||||||
db *DB
|
db *DB
|
||||||
sqliteMu *sync.Mutex
|
|
||||||
hasLock bool
|
|
||||||
lastSql *string
|
lastSql *string
|
||||||
lastArgs []any
|
lastArgs []any
|
||||||
Error error
|
Error error
|
||||||
@ -32,7 +29,6 @@ func (tx *Tx) Quotes(texts []string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Commit() error {
|
func (tx *Tx) Commit() error {
|
||||||
defer tx.unlock()
|
|
||||||
if tx.isCommittedOrRollbacked {
|
if tx.isCommittedOrRollbacked {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -49,7 +45,6 @@ func (tx *Tx) Commit() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Rollback() error {
|
func (tx *Tx) Rollback() error {
|
||||||
defer tx.unlock()
|
|
||||||
if tx.isCommittedOrRollbacked {
|
if tx.isCommittedOrRollbacked {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -83,7 +78,6 @@ func (tx *Tx) CheckFinished() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Prepare(query string) *Stmt {
|
func (tx *Tx) Prepare(query string) *Stmt {
|
||||||
tx.lock()
|
|
||||||
tx.lastSql = &query
|
tx.lastSql = &query
|
||||||
r := basePrepare(nil, tx.conn, query)
|
r := basePrepare(nil, tx.conn, query)
|
||||||
r.logger = tx.logger
|
r.logger = tx.logger
|
||||||
@ -95,11 +89,9 @@ func (tx *Tx) Prepare(query string) *Stmt {
|
|||||||
|
|
||||||
func (tx *Tx) Exec(query string, args ...any) *ExecResult {
|
func (tx *Tx) Exec(query string, args ...any) *ExecResult {
|
||||||
query, args = tx.db.rewriteFTS(query, args)
|
query, args = tx.db.rewriteFTS(query, args)
|
||||||
args = flatArgs(args)
|
|
||||||
tx.lock()
|
|
||||||
tx.lastSql = &query
|
tx.lastSql = &query
|
||||||
tx.lastArgs = args
|
tx.lastArgs = args
|
||||||
r := baseExecRaw(nil, tx.conn, query, args...)
|
r := baseExec(nil, tx.conn, query, args...)
|
||||||
r.logger = tx.logger
|
r.logger = tx.logger
|
||||||
if r.Error != nil {
|
if r.Error != nil {
|
||||||
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
@ -113,11 +105,9 @@ func (tx *Tx) Exec(query string, args ...any) *ExecResult {
|
|||||||
|
|
||||||
func (tx *Tx) Query(query string, args ...any) *QueryResult {
|
func (tx *Tx) Query(query string, args ...any) *QueryResult {
|
||||||
query, args = tx.db.rewriteFTS(query, args)
|
query, args = tx.db.rewriteFTS(query, args)
|
||||||
args = flatArgs(args)
|
|
||||||
// Query in Tx doesn't acquire lock unless it's already held by a previous write
|
|
||||||
tx.lastSql = &query
|
tx.lastSql = &query
|
||||||
tx.lastArgs = args
|
tx.lastArgs = args
|
||||||
r := baseQueryRaw(nil, tx.conn, query, args...)
|
r := baseQuery(nil, tx.conn, query, args...)
|
||||||
r.logger = tx.logger
|
r.logger = tx.logger
|
||||||
if r.Error != nil {
|
if r.Error != nil {
|
||||||
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
@ -129,20 +119,52 @@ func (tx *Tx) Query(query string, args ...any) *QueryResult {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (tx *Tx) Insert(table string, data any) *ExecResult {
|
func (tx *Tx) Insert(table string, data any) *ExecResult {
|
||||||
query, values := tx.MakeInsertSql(table, data, false)
|
query, values := tx.MakeInsertSql(table, data, false)
|
||||||
return tx.Exec(query, values...)
|
tx.lastSql = &query
|
||||||
|
tx.lastArgs = values
|
||||||
|
r := baseExec(nil, tx.conn, query, values...)
|
||||||
|
r.logger = tx.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
|
||||||
|
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Replace(table string, data any) *ExecResult {
|
func (tx *Tx) Replace(table string, data any) *ExecResult {
|
||||||
query, values := tx.MakeInsertSql(table, data, true)
|
query, values := tx.MakeInsertSql(table, data, true)
|
||||||
return tx.Exec(query, values...)
|
tx.lastSql = &query
|
||||||
|
tx.lastArgs = values
|
||||||
|
r := baseExec(nil, tx.conn, query, values...)
|
||||||
|
r.logger = tx.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
|
||||||
|
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Update(table string, data any, conditions string, args ...any) *ExecResult {
|
func (tx *Tx) Update(table string, data any, conditions string, args ...any) *ExecResult {
|
||||||
query, values := tx.MakeUpdateSql(table, data, conditions, args...)
|
query, values := tx.MakeUpdateSql(table, data, conditions, args...)
|
||||||
return tx.Exec(query, values...)
|
tx.lastSql = &query
|
||||||
|
tx.lastArgs = values
|
||||||
|
r := baseExec(nil, tx.conn, query, values...)
|
||||||
|
r.logger = tx.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
|
||||||
|
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) Delete(table string, conditions string, args ...any) *ExecResult {
|
func (tx *Tx) Delete(table string, conditions string, args ...any) *ExecResult {
|
||||||
@ -165,31 +187,24 @@ func (tx *Tx) Delete(table string, conditions string, args ...any) *ExecResult {
|
|||||||
colList = " select *"
|
colList = " select *"
|
||||||
}
|
}
|
||||||
moveQuery := fmt.Sprintf("insert into %s%s from %s%s", tx.Quote(table+"_deleted"), colList, tx.Quote(table), where)
|
moveQuery := fmt.Sprintf("insert into %s%s from %s%s", tx.Quote(table+"_deleted"), colList, tx.Quote(table), where)
|
||||||
// Use Exec to handle locking
|
r := baseExec(nil, tx.conn, moveQuery, args...)
|
||||||
r := tx.Exec(moveQuery, args...)
|
|
||||||
if r.Error != nil {
|
if r.Error != nil {
|
||||||
|
tx.logger.LogQueryError(r.Error.Error(), moveQuery, args, r.usedTime)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
query := fmt.Sprintf("delete from %s%s", tx.Quote(table), where)
|
query := fmt.Sprintf("delete from %s%s", tx.Quote(table), where)
|
||||||
return tx.Exec(query, args...)
|
tx.lastSql = &query
|
||||||
|
tx.lastArgs = args
|
||||||
|
r := baseExec(nil, tx.conn, query, args...)
|
||||||
|
r.logger = tx.logger
|
||||||
|
if r.Error != nil {
|
||||||
|
tx.logger.LogQueryError(r.Error.Error(), *tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
|
} else {
|
||||||
|
if tx.logSlow > 0 && r.usedTime >= float32(tx.logSlow/time.Millisecond) {
|
||||||
|
tx.logger.LogQuery(*tx.lastSql, tx.lastArgs, r.usedTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) lock() {
|
|
||||||
if tx.sqliteMu == nil || tx.hasLock {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
tx.sqliteMu.Lock()
|
return r
|
||||||
tx.hasLock = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) unlock() {
|
|
||||||
if tx.sqliteMu == nil || !tx.hasLock {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tx.sqliteMu.Unlock()
|
|
||||||
tx.hasLock = false
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -9,6 +9,7 @@ require (
|
|||||||
apigo.cc/go/file v1.3.2
|
apigo.cc/go/file v1.3.2
|
||||||
apigo.cc/go/id v1.3.1
|
apigo.cc/go/id v1.3.1
|
||||||
apigo.cc/go/log v1.3.4
|
apigo.cc/go/log v1.3.4
|
||||||
|
apigo.cc/go/rand v1.3.1
|
||||||
apigo.cc/go/redis v1.3.2
|
apigo.cc/go/redis v1.3.2
|
||||||
apigo.cc/go/safe v1.3.1
|
apigo.cc/go/safe v1.3.1
|
||||||
apigo.cc/go/shell v1.3.1
|
apigo.cc/go/shell v1.3.1
|
||||||
@ -20,7 +21,6 @@ require (
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
apigo.cc/go/encoding v1.3.1 // indirect
|
apigo.cc/go/encoding v1.3.1 // indirect
|
||||||
apigo.cc/go/rand v1.3.1 // indirect
|
|
||||||
filippo.io/edwards25519 v1.2.0 // indirect
|
filippo.io/edwards25519 v1.2.0 // indirect
|
||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/gomodule/redigo v2.0.0+incompatible // indirect
|
github.com/gomodule/redigo v2.0.0+incompatible // indirect
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user