plugins/db/db.go
Star b6bb3098bc update dao
many other updates
2024-03-24 12:28:02 +08:00

492 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package db
import (
"apigo.cloud/git/apigo/plugin"
"fmt"
"github.com/ssgo/db"
"github.com/ssgo/log"
"github.com/ssgo/u"
"strings"
"sync"
)
type DB struct {
name string
pool *db.DB
logger *log.Logger
}
type Tx struct {
conn *db.Tx
}
type ExecResult struct {
Id int64
Changes int64
Version uint64
Sql string
SqlArgs []interface{}
}
type QueryResult struct {
Result []map[string]interface{}
Sql string
SqlArgs []interface{}
}
var configSet = map[string]string{}
var configSetLock = sync.RWMutex{}
func init() {
plugin.Register(plugin.Plugin{
Id: "apigo.cloud/git/apigo/plugins/db",
Name: "数据库操作",
ConfigSample: `default: mysql://root:<**encrypted_password**>@127.0.0.1:3306/1?maxIdles=0&maxLifeTime=0&maxOpens=0&logSlow=1s # set default db connection pool, used by db.xxx
configs:
conn1: sqlite3://conn1.db # set a named connection pool, used by db.get('conn1').xxx
conn2: mysql://root:@127.0.0.1:3306/1?sslCa=<**encrypted**>&sslCert=<**encrypted**>&sslKey=<**encrypted**>&sslSkipVerify=true # set ssl connection pool for mysql
conn3: mysql://root:@127.0.0.1:3306/1?timeout=90s&readTimeout=5s&writeTimeout=3s&charset=utf8mb4,utf8 # set more option for mysql
`,
Init: func(conf map[string]interface{}) {
newSet := map[string]string{}
if conf["default"] != nil {
newSet["default"] = u.String(conf["default"])
}
if conf["configs"] != nil {
confs := map[string]string{}
u.Convert(conf["configs"], &confs)
for name, url := range confs {
newSet[name] = url
}
}
configSetLock.Lock()
for k, v := range newSet {
configSet[k] = v
}
configSetLock.Unlock()
},
Objects: map[string]interface{}{
"fetch": GetDB,
"begin": DefaultBegin,
"delete": DefaultDelete,
"destroy": DefaultDestroy,
"exec": DefaultExec,
"insert": DefaultInsert,
"make": DefaultMake,
"makeDao": DefaultMakeDao,
"makeER": DefaultMakeER,
"makeId": DefaultMakeId,
"query": DefaultQuery,
"query1": DefaultQuery1,
"query11": DefaultQuery11,
"query1a": DefaultQuery1a,
"replace": DefaultReplace,
"update": DefaultUpdate,
"getDBName": DefaultGetDBName,
"inKeys": InKeys,
"DB": (*DB)(nil),
"Tx": (*Tx)(nil),
},
})
}
// GetDB 获得数据库连接
// GetDB name 连接配置名称,如果不提供名称则使用默认连接
// GetDB return 数据库连接,对象内置连接池操作,完成后无需手动关闭连接
func GetDB(ctx *plugin.Context, name *string) *DB {
logger := ctx.GetInject("*log.Logger").(*log.Logger)
if name != nil && *name != "" {
configSetLock.RLock()
connUrl := configSet[*name]
configSetLock.RUnlock()
if connUrl != "" {
return &DB{pool: db.GetDB(connUrl, logger)}
}
}
configSetLock.RLock()
defaultConnUrl := configSet["default"]
configSetLock.RUnlock()
return &DB{pool: db.GetDB(defaultConnUrl, logger)}
}
// Destroy 关闭连接池
func (db *DB) Destroy() error {
return db.pool.Destroy()
}
// InKeys 根据长度生成SQL中 IN "(?,?,...)" 引号中的部分
// InKeys return IN后面的Key变量的SQL
func InKeys(numArgs int) string {
a := make([]string, numArgs)
for i := 0; i < numArgs; i++ {
a[i] = "?"
}
return fmt.Sprintf("(%s)", strings.Join(a, ","))
}
// Make 创建表格,如果表格已经存在则更新表结构
// Make return 已执行的SQL列表
func (db *DB) Make(groups []ERGroup, ctx *plugin.Context) ([]string, error) {
outSql := make([]string, 0)
for _, group := range groups {
for _, table := range group.Tables {
//fmt.Println("=====TB", table.Name)
sql, err := MakeTable(db.pool, &table, ctx)
outSql = append(outSql, strings.Join(sql, "\n"))
if err != nil {
return outSql, err
}
}
}
return outSql, nil
}
// MakeER 创建ER图
func (db *DB) MakeER(groups []ERGroup, outputFile *string, tplFile *string, ctx *plugin.Context) error {
return MakeER(groups, outputFile, tplFile, ctx)
}
// GetDBName 获取当前数据库名称
// GetDBName return 当前数据库名称
func (db *DB) GetDBName() string {
return db.pool.Config.DB
}
// MakeDao 创建实体对象
// MakeDao outputPath 实体文件输出目录,为空时输出到当前目录下以数据库为目录名
func (db *DB) MakeDao(outputPath *string, conf *DaoConfig, ctx *plugin.Context) error {
if outputPath == nil || *outputPath == "" {
defaultOutput := db.pool.Config.DB
outputPath = &defaultOutput
}
return MakeDao(*outputPath, db.pool, conf, ctx)
}
// Begin 开始事务
// Begin return 事务对象事务中的操作都在事务对象上操作请务必在返回的事务对象上执行commit或rollback
func (db *DB) Begin() *Tx {
return db.Begin()
}
// Exec 执行SQL
// * requestSql SQL语句
// * args SQL语句中问号变量的值按顺序放在请求参数中
// Exec return 如果是INSERT到含有自增字段的表中返回插入的自增ID否则返回影响的行数
func (db *DB) Exec(requestSql string, args ...interface{}) (ExecResult, error) {
r := db.pool.Exec(requestSql, args...)
out := ExecResult{}
lSql := strings.ToLower(requestSql)
if strings.Contains(lSql, "insert into") || strings.Contains(lSql, "replace into") {
out.Id = r.Id()
}
out.Changes = r.Changes()
out.Sql = *r.Sql
out.SqlArgs = r.Args
return out, r.Error
}
// Query 查询
// Query return 返回查询到的数据,对象数组格式
func (db *DB) Query(requestSql string, args ...interface{}) (QueryResult, error) {
r := db.pool.Query(requestSql, args...)
return QueryResult{
Result: r.MapResults(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
// Query1 查询
// Query1 return 返回查询到的第一行数据,对象格式
func (db *DB) Query1(requestSql string, args ...interface{}) (map[string]interface{}, error) {
r := db.pool.Query(requestSql, args...)
results := r.MapResults()
if len(results) > 0 {
return results[0], r.Error
} else {
return nil, r.Error
}
}
// Query11 查询
// Query11 return 返回查询到的第一行第一列数据,字段类型对应的格式
func (db *DB) Query11(requestSql string, args ...interface{}) (interface{}, error) {
r := db.pool.Query(requestSql, args...)
results := r.SliceResults()
if len(results) > 0 {
if len(results[0]) > 0 {
return results[0][0], r.Error
} else {
return nil, r.Error
}
} else {
return nil, r.Error
}
}
// Query1a 查询
// Query1a return 返回查询到的第一列数据,数组格式
func (db *DB) Query1a(requestSql string, args ...interface{}) ([]interface{}, error) {
r := db.pool.Query(requestSql, args...)
results := r.SliceResults()
a := make([]interface{}, 0)
for _, row := range results {
if len(results[0]) > 0 {
a = append(a, row[0])
}
}
return a, r.Error
}
// Insert 插入数据
// * table 表名
// * data 数据对象Key-Value格式
// Insert return 如果是INSERT到含有自增字段的表中返回插入的自增ID否则返回影响的行数
func (db *DB) Insert(table string, data map[string]interface{}) (ExecResult, error) {
r := db.pool.Insert(table, data)
return ExecResult{
Id: r.Id(),
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
// Replace 替换数据
// Replace return 如果是REPLACE到含有自增字段的表中返回插入的自增ID否则返回影响的行数
func (db *DB) Replace(table string, data map[string]interface{}) (ExecResult, error) {
r := db.pool.Replace(table, data)
out := r.Id()
if out == 0 {
out = r.Changes()
}
return ExecResult{
Id: r.Id(),
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
// Update 更新数据
// * wheres 条件SQL中WHERE后面的部分
// Update return 返回影响的行数
func (db *DB) Update(table string, data map[string]interface{}, wheres string, args ...interface{}) (ExecResult, error) {
r := db.pool.Update(table, data, wheres, args...)
return ExecResult{
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
// Delete 删除数据
// Delete return 返回影响的行数
func (db *DB) Delete(table string, wheres string, args ...interface{}) (ExecResult, error) {
r := db.pool.Delete(table, wheres, args...)
return ExecResult{
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
// MakeId 生成指定字段不唯一的ID
// MakeId idField ID字段
// MakeId idSize ID长度
// MakeId return 新的ID
func (db *DB) MakeId(table string, idField string, idSize uint) (string, error) {
var id string
var err error
for i := 0; i < 100; i++ {
if idSize > 20 {
id = u.UniqueId()
} else if idSize > 14 {
id = u.UniqueId()[0:idSize]
} else if idSize > 12 {
id = u.ShortUniqueId()[0:idSize]
} else if idSize > 10 {
id = u.Id12()[0:idSize]
} else if idSize > 8 {
id = u.Id10()[0:idSize]
} else if idSize >= 6 {
id = u.Id8()[0:idSize]
} else {
id = u.Id6()
}
r := db.pool.Query(fmt.Sprintf("SELECT COUNT(*) FROM `%s` WHERE `%s`=?", table, idField), id)
err = r.Error
if r.IntOnR1C1() == 0 {
break
}
}
return id, err
}
// Commit 提交事务
func (tx *Tx) Commit() error {
return tx.conn.Commit()
}
// Rollback 回滚事务
func (tx *Tx) Rollback() error {
return tx.conn.Rollback()
}
// Finish 根据传入的成功标识提交或回滚事务
// Finish ok 事务是否执行成功
func (tx *Tx) Finish(ok bool) error {
return tx.conn.Finish(ok)
}
// CheckFinished 检查事务是否已经提交或回滚,如果事务没有结束则执行回滚操作
func (tx *Tx) CheckFinished() error {
return tx.conn.CheckFinished()
}
func (tx *Tx) Exec(requestSql string, args ...interface{}) (ExecResult, error) {
r := tx.conn.Exec(requestSql, args...)
out := ExecResult{}
lSql := strings.ToLower(requestSql)
if strings.Contains(lSql, "insert into") || strings.Contains(lSql, "replace into") {
out.Id = r.Id()
}
out.Changes = r.Changes()
out.Sql = *r.Sql
out.SqlArgs = r.Args
return out, r.Error
}
func (tx *Tx) Query(requestSql string, args ...interface{}) (QueryResult, error) {
r := tx.conn.Query(requestSql, args...)
return QueryResult{
Result: r.MapResults(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
func (tx *Tx) Query1(requestSql string, args ...interface{}) (map[string]interface{}, error) {
r := tx.conn.Query(requestSql, args...)
results := r.MapResults()
if len(results) > 0 {
return results[0], r.Error
} else {
return nil, r.Error
}
}
func (tx *Tx) Query11(requestSql string, args ...interface{}) (interface{}, error) {
r := tx.conn.Query(requestSql, args...)
results := r.SliceResults()
if len(results) > 0 {
if len(results[0]) > 0 {
return results[0][0], r.Error
} else {
return nil, r.Error
}
} else {
return nil, r.Error
}
}
func (tx *Tx) Query1a(requestSql string, args ...interface{}) ([]interface{}, error) {
r := tx.conn.Query(requestSql, args...)
results := r.SliceResults()
a := make([]interface{}, 0)
for _, row := range results {
if len(results[0]) > 0 {
a = append(a, row[0])
}
}
return a, r.Error
}
func (tx *Tx) Insert(table string, data map[string]interface{}) (ExecResult, error) {
r := tx.conn.Insert(table, data)
return ExecResult{
Id: r.Id(),
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
func (tx *Tx) Replace(table string, data map[string]interface{}) (ExecResult, error) {
r := tx.conn.Replace(table, data)
return ExecResult{
Id: r.Id(),
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
func (tx *Tx) Update(table string, data map[string]interface{}, wheres string, args ...interface{}) (ExecResult, error) {
r := tx.conn.Update(table, data, wheres, args...)
return ExecResult{
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
func (tx *Tx) Delete(table string, wheres string, args ...interface{}) (ExecResult, error) {
r := tx.conn.Delete(table, wheres, args...)
return ExecResult{
Changes: r.Changes(),
Sql: *r.Sql,
SqlArgs: r.Args,
}, r.Error
}
func DefaultBegin(ctx *plugin.Context) *Tx {
return GetDB(ctx, nil).Begin()
}
func DefaultDelete(ctx *plugin.Context, table string, wheres string, args ...interface{}) (ExecResult, error) {
return GetDB(ctx, nil).Delete(table, wheres, args...)
}
func DefaultDestroy(ctx *plugin.Context) error {
return GetDB(ctx, nil).Destroy()
}
func DefaultExec(ctx *plugin.Context, requestSql string, args ...interface{}) (ExecResult, error) {
return GetDB(ctx, nil).Exec(requestSql, args...)
}
func DefaultInsert(ctx *plugin.Context, table string, data map[string]interface{}) (ExecResult, error) {
return GetDB(ctx, nil).Insert(table, data)
}
func DefaultReplace(ctx *plugin.Context, table string, data map[string]interface{}) (ExecResult, error) {
return GetDB(ctx, nil).Replace(table, data)
}
func DefaultUpdate(ctx *plugin.Context, table string, data map[string]interface{}, wheres string, args ...interface{}) (ExecResult, error) {
return GetDB(ctx, nil).Update(table, data, wheres, args...)
}
func DefaultMake(ctx *plugin.Context, groups []ERGroup) ([]string, error) {
return GetDB(ctx, nil).Make(groups, ctx)
}
func DefaultMakeDao(ctx *plugin.Context, outputPath *string, conf *DaoConfig) error {
return GetDB(ctx, nil).MakeDao(outputPath, conf, ctx)
}
func DefaultMakeER(ctx *plugin.Context, groups []ERGroup, outputFile *string, tplFile *string) error {
return GetDB(ctx, nil).MakeER(groups, outputFile, tplFile, ctx)
}
func DefaultMakeId(ctx *plugin.Context, table string, idField string, idSize uint) (string, error) {
return GetDB(ctx, nil).MakeId(table, idField, idSize)
}
func DefaultQuery(ctx *plugin.Context, requestSql string, args ...interface{}) (QueryResult, error) {
return GetDB(ctx, nil).Query(requestSql, args...)
}
func DefaultQuery1(ctx *plugin.Context, requestSql string, args ...interface{}) (map[string]interface{}, error) {
return GetDB(ctx, nil).Query1(requestSql, args...)
}
func DefaultQuery11(ctx *plugin.Context, requestSql string, args ...interface{}) (interface{}, error) {
return GetDB(ctx, nil).Query11(requestSql, args...)
}
func DefaultQuery1a(ctx *plugin.Context, requestSql string, args ...interface{}) ([]interface{}, error) {
return GetDB(ctx, nil).Query1a(requestSql, args...)
}
func DefaultGetDBName(ctx *plugin.Context) string {
return GetDB(ctx, nil).GetDBName()
}