From 02c091b97658b66de44e4e41c4158605ad250053 Mon Sep 17 00:00:00 2001 From: AI Engineer Date: Fri, 15 May 2026 23:35:27 +0800 Subject: [PATCH] feat: refactor API, add ScanDocuments and improve vector search accuracy (by AI) --- CHANGELOG.md | 17 ++- README.md | 34 ++--- fulltext.go | 1 + indexDB.go | 330 ++++++++++++++++++++---------------------------- indexDB_test.go | 27 ++-- types.go | 7 + vector.go | 13 +- 7 files changed, 194 insertions(+), 235 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef81c9..a1e15ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,10 @@ # Changelog -## [1.0.0] - 2026-05-15 -### Added -- Created `apigo.cc/go/indexDB` as an independent, unified hybrid search engine. -- Extracted and merged fulltext search (`bleve`) and vector search (`chromem-go`) from `knowbase`. -- Added support for RRF (Reciprocal Rank Fusion) for combined result scoring. -- Implemented `Condition` filtering logic across both search engines. -- Implemented `RebuildAll` method to cleanly regenerate both fulltext and vector indices. -- Support user-level isolation using `Auth(userID)`. -- Replaced custom dependency types and matched apigo.cc/go infrastructure. +## [1.1.0] - 2026-05-15 +### Changed +- Refactored `GetDB` to accept separate `fulltextDBPath` and `vectorDBPath`. +- Removed internal version management (`xxxVersion.txt`). +- Replaced `RebuildAll` with `ScanDocuments` for cursor-based data traversal. +- Improved vector search accuracy by expanding retrieval scope when filters are present. +- Improved result merging by patching missing `Content` from the fulltext engine. +- Switched `id` field to `keyword` analyzer for accurate range queries. diff --git a/README.md b/README.md index cfed04e..0b9278b 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,12 @@ `apigo.cc/go/indexDB` 提供了统一的混合检索引擎,结合了基于 `bleve` 的全文检索和基于 `chromem-go` 的向量检索。 -> **注意:** 启用向量检索需要传入 `embedding` 外部回调函数,这会增加额外的内存和计算开销,请在必要时使用。 - ## 特性 - **全文+向量** 混合检索,内置倒数排序融合(RRF)进行评分合并。 -- **无状态依赖**,仅接收和存储数据。不绑定特定的 LLM 或业务模型。 -- **傻瓜化检索 API**,支持多种复杂条件 `Condition` 和 `idPrefix`。 -- **平滑重建**,支持从 Bleve 重新导出数据生成全新的全文本及向量数据库,确保分词模型更改后能平滑过渡。 +- **无状态依赖**,支持独立指定全文和向量库路径。 +- **高召回率**,在应用过滤器时会自动扩大向量检索范围,确保筛选结果的准确性。 +- **内容补全**,如果向量库命中但全文库未命中,会自动从全文库补全正文内容。 +- **游标遍历**,提供 `ScanDocuments` 接口,支持基于 ID 的断点续传遍历,便于外部重建索引。 - **细粒度权限控制**,在引擎层进行系统和用户级别的视图隔离。 ## 安装 @@ -35,14 +34,14 @@ func mockEmbedding(text string) ([]float32, error) { } func main() { - // 若不传入 embedding 函数,则仅使用全文检索 - dbUnauth, err := indexDB.GetDB("./data_dir", mockEmbedding, log.Default()) + // 向量库路径为空或 embedding 函数为 nil,则不启用向量搜索 + dbUnauth, err := indexDB.GetDB("./fulltext", "./vector", mockEmbedding, log.Default()) if err != nil { panic(err) } // 绑定系统管理员或特定用户权限 - db := dbUnauth.Auth(indexDB.SystemUserID) // 获取全部权限 + db := dbUnauth.Auth(indexDB.SystemUserID) // 2. 添加数据 db.Add("doc1", "这是一段测试文本", map[string]any{"source": "test"}, []string{"user1"}) @@ -53,19 +52,22 @@ func main() { } results, err := db.Search("", "测试", 10, filter) for _, r := range results { - log.Println("ID:", r.ID, "Score:", r.Score) + log.Println("ID:", r.ID, "Score:", r.Score, "Content:", r.Content) } - // 4. 重建索引 - db.RebuildAll() + // 4. 遍历数据(用于外部重建) + docs, _ := db.ScanDocuments("", 100) + for _, doc := range docs { + log.Println("Found:", doc.ID) + } } ``` ## API 指南 -- `GetDB(indexDBPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error)`: 获取引擎的非授权实例。 +- `GetDB(fulltextPath, vectorPath string, embedding func(string) ([]float32, error), logger *log.Logger)`: 获取引擎实例。 - `(*IndexDBUnauthorized) Auth(userId string) *IndexDB`: 获得特定用户的授权实例。 -- `(*IndexDB) Add(id string, text string, metadata map[string]any, allowUsers []string) error`: 将数据添加到引擎,触发回调写入向量。 -- `(*IndexDB) Remove(id string) error`: 从引擎中删除。 -- `(*IndexDB) Search(idPrefix string, query string, topK int, filter []Condition) ([]SearchResult, error)`: 混合检索接口。 -- `(*IndexDB) RebuildAll() error`: 根据旧索引生成全新的版本以适用新的分词或模型算法。 +- `(*IndexDB) Add(id, text string, metadata map[string]any, allowUsers []string) error`: 添加数据。 +- `(*IndexDB) Remove(id string) error`: 删除数据。 +- `(*IndexDB) Search(idPrefix, query string, topK int, filter []Condition) ([]SearchResult, error)`: 混合检索。 +- `(*IndexDB) ScanDocuments(lastID string, limit int) ([]RawDocument, error)`: 基于游标遍历数据。 diff --git a/fulltext.go b/fulltext.go index 7e72992..b47b8c8 100644 --- a/fulltext.go +++ b/fulltext.go @@ -167,6 +167,7 @@ func buildIndexMapping() mapping.IndexMapping { docMapping.AddFieldMappingsAt("content", contentMapping) idMapping := bleve.NewTextFieldMapping() + idMapping.Analyzer = "keyword" idMapping.Store = true idMapping.Index = true docMapping.AddFieldMappingsAt("id", idMapping) diff --git a/indexDB.go b/indexDB.go index 7fdfb0d..38b2778 100644 --- a/indexDB.go +++ b/indexDB.go @@ -9,8 +9,8 @@ import ( "strings" "sync" - "github.com/blevesearch/bleve/v2/search/query" "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/search/query" "apigo.cc/go/cast" ) @@ -18,9 +18,10 @@ import ( const SystemUserID = "_system" type IndexDBUnauthorized struct { - indexDBPath string - embedding func(text string) ([]float32, error) - logger *log.Logger + fulltextPath string + vectorPath string + embedding func(text string) ([]float32, error) + logger *log.Logger fulltext *Engine vector *Store @@ -28,71 +29,43 @@ type IndexDBUnauthorized struct { mu sync.RWMutex } -func GetDB(indexDBPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error) { +func GetDB(fulltextPath, vectorPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error) { if logger == nil { logger = log.Default() } db := &IndexDBUnauthorized{ - indexDBPath: indexDBPath, - embedding: embedding, - logger: logger, + fulltextPath: fulltextPath, + vectorPath: vectorPath, + embedding: embedding, + logger: logger, } - err := db.load() - if err != nil { - return nil, err + if fulltextPath != "" { + if parent := filepath.Dir(fulltextPath); parent != "." { + os.MkdirAll(parent, 0755) + } + fEngine, err := newFulltextEngine(fulltextPath) + if err != nil { + return nil, fmt.Errorf("init fulltext error: %w", err) + } + db.fulltext = fEngine + } + + if vectorPath != "" && embedding != nil { + if parent := filepath.Dir(vectorPath); parent != "." { + os.MkdirAll(parent, 0755) + } + vStore, err := newVectorStore(Config{StorageDir: vectorPath}) + if err != nil { + return nil, fmt.Errorf("init vector error: %w", err) + } + db.vector = vStore } return db, nil } -func (db *IndexDBUnauthorized) load() error { - os.MkdirAll(db.indexDBPath, 0755) - - fvPath := filepath.Join(db.indexDBPath, "fulltextVersion.txt") - vvPath := filepath.Join(db.indexDBPath, "vectorVersion.txt") - - if _, err := os.Stat(fvPath); os.IsNotExist(err) { - os.WriteFile(fvPath, []byte("1"), 0644) - } - if _, err := os.Stat(vvPath); os.IsNotExist(err) { - os.WriteFile(vvPath, []byte("1"), 0644) - } - - fv, _ := os.ReadFile(fvPath) - vv, _ := os.ReadFile(vvPath) - - fVersion := strings.TrimSpace(string(fv)) - vVersion := strings.TrimSpace(string(vv)) - if fVersion == "" { - fVersion = "1" - } - if vVersion == "" { - vVersion = "1" - } - - fulltextPath := filepath.Join(db.indexDBPath, "fulltextV"+fVersion) - vectorPath := filepath.Join(db.indexDBPath, "vectorV"+vVersion) - - fEngine, err := newFulltextEngine(fulltextPath) - if err != nil { - return fmt.Errorf("init fulltext error: %w", err) - } - - var vStore *Store - if db.embedding != nil { - vStore, err = newVectorStore(Config{StorageDir: vectorPath}) - if err != nil { - return fmt.Errorf("init vector error: %w", err) - } - } - - db.fulltext = fEngine - db.vector = vStore - return nil -} - func (db *IndexDBUnauthorized) Auth(userId string) *IndexDB { return &IndexDB{ db: db, @@ -109,9 +82,11 @@ func (idx *IndexDB) Add(id string, text string, metadata map[string]any, allowUs idx.db.mu.RLock() defer idx.db.mu.RUnlock() - err := idx.db.fulltext.AddDocument(id, text, metadata, allowUsers) - if err != nil { - return fmt.Errorf("fulltext add: %w", err) + if idx.db.fulltext != nil { + err := idx.db.fulltext.AddDocument(id, text, metadata, allowUsers) + if err != nil { + return fmt.Errorf("fulltext add: %w", err) + } } if idx.db.embedding != nil && idx.db.vector != nil { @@ -135,9 +110,11 @@ func (idx *IndexDB) Remove(id string) error { idx.db.mu.RLock() defer idx.db.mu.RUnlock() - err := idx.db.fulltext.RemoveDocument(id) - if err != nil { - return fmt.Errorf("fulltext remove: %w", err) + if idx.db.fulltext != nil { + err := idx.db.fulltext.RemoveDocument(id) + if err != nil { + return fmt.Errorf("fulltext remove: %w", err) + } } if idx.db.vector != nil { @@ -158,11 +135,13 @@ func (idx *IndexDB) Search(idPrefix string, queryStr string, topK int, filter [] var vResults []SearchResult var fErr, vErr error - wg.Add(1) - go func() { - defer wg.Done() - fResults, fErr = idx.db.fulltext.Search(idPrefix, queryStr, topK*5, idx.userId, filter) - }() + if idx.db.fulltext != nil { + wg.Add(1) + go func() { + defer wg.Done() + fResults, fErr = idx.db.fulltext.Search(idPrefix, queryStr, topK*5, idx.userId, filter) + }() + } if idx.db.vector != nil && idx.db.embedding != nil && queryStr != "" { wg.Add(1) @@ -193,7 +172,34 @@ func (idx *IndexDB) Search(idPrefix string, queryStr string, topK int, filter [] return nil, fmt.Errorf("vector search: %w", vErr) } - return mergeAndRRF(fResults, vResults, topK), nil + merged := mergeAndRRF(fResults, vResults, topK) + + // Patch content for vector-only results + if idx.db.fulltext != nil { + for i := range merged { + if merged[i].Content == "" { + q := query.NewDocIDQuery([]string{merged[i].ID}) + req := bleve.NewSearchRequest(q) + req.Fields = []string{"content", "metadata"} + res, _ := idx.db.fulltext.index.Search(req) + if res != nil && len(res.Hits) > 0 { + merged[i].Content, _ = res.Hits[0].Fields["content"].(string) + // Metadata might also be missing if it was vector-only hit + if merged[i].Metadata == nil { + metaIfc := make(map[string]any) + for k, v := range res.Hits[0].Fields { + if strings.HasPrefix(k, "metadata.") { + metaIfc[strings.TrimPrefix(k, "metadata.")] = v + } + } + merged[i].Metadata = metaIfc + } + } + } + } + } + + return merged, nil } func mergeAndRRF(fResults []SearchResult, vResults []SearchResult, topK int) []SearchResult { @@ -233,6 +239,67 @@ func mergeAndRRF(fResults []SearchResult, vResults []SearchResult, topK int) []S return merged } +func (idx *IndexDB) ScanDocuments(lastID string, limit int) ([]RawDocument, error) { + idx.db.mu.RLock() + defer idx.db.mu.RUnlock() + + if idx.db.fulltext == nil { + return nil, fmt.Errorf("fulltext engine not initialized") + } + + var q query.Query + if lastID == "" { + q = query.NewMatchAllQuery() + } else { + rq := query.NewTermRangeQuery(lastID, "") + f := false + rq.InclusiveMin = &f + rq.SetField("id") + q = rq + } + + req := bleve.NewSearchRequest(q) + req.Size = limit + req.SortBy([]string{"id"}) + req.Fields = []string{"*"} + + res, err := idx.db.fulltext.index.Search(req) + if err != nil { + return nil, fmt.Errorf("scan search error: %w", err) + } + + var docs []RawDocument + for _, hit := range res.Hits { + idVal, _ := hit.Fields["id"].(string) + if idVal == "" { + idVal = hit.ID + } + + content, _ := hit.Fields["content"].(string) + metaIfc := make(map[string]any) + var allowUsers []string + + for k, v := range hit.Fields { + if strings.HasPrefix(k, "metadata.") { + metaIfc[strings.TrimPrefix(k, "metadata.")] = v + } else if strings.HasPrefix(k, "U-") && k != "U-_system" { + if cast.To[string](v) == "1" { + allowUsers = append(allowUsers, strings.TrimPrefix(k, "U-")) + } + } + } + + docs = append(docs, RawDocument{ + ID: idVal, + Text: content, + Metadata: metaIfc, + AllowUsers: allowUsers, + }) + } + + return docs, nil +} + func evalCondition(metadata map[string]any, id string, idPrefix string, filters []Condition) bool { if idPrefix != "" && !strings.HasPrefix(id, idPrefix) { return false @@ -293,128 +360,3 @@ func evalCondition(metadata map[string]any, id string, idPrefix string, filters } return true } - -func (idx *IndexDB) RebuildAll() error { - idx.db.mu.Lock() - defer idx.db.mu.Unlock() - - // 1. Get current versions - fvPath := filepath.Join(idx.db.indexDBPath, "fulltextVersion.txt") - vvPath := filepath.Join(idx.db.indexDBPath, "vectorVersion.txt") - - fv, _ := os.ReadFile(fvPath) - vv, _ := os.ReadFile(vvPath) - - fVersion := cast.To[int](strings.TrimSpace(string(fv))) - vVersion := cast.To[int](strings.TrimSpace(string(vv))) - - if fVersion <= 0 { fVersion = 1 } - if vVersion <= 0 { vVersion = 1 } - - newFVersion := fVersion + 1 - newVVersion := vVersion + 1 - - newFPath := filepath.Join(idx.db.indexDBPath, "fulltextV"+cast.To[string](newFVersion)) - newVPath := filepath.Join(idx.db.indexDBPath, "vectorV"+cast.To[string](newVVersion)) - - // 2. Initialize new engines - newFEngine, err := newFulltextEngine(newFPath) - if err != nil { - return fmt.Errorf("rebuild new fulltext err: %w", err) - } - - var newVStore *Store - if idx.db.embedding != nil { - newVStore, err = newVectorStore(Config{StorageDir: newVPath}) - if err != nil { - return fmt.Errorf("rebuild new vector err: %w", err) - } - } - - // 3. Read all data from old fulltext engine - // Bleve search MatchAll with large size or pagination - req := bleve.NewSearchRequest(query.NewMatchAllQuery()) - req.Fields = []string{"*"} - req.Size = 1000 - - from := 0 - for { - req.From = from - res, err := idx.db.fulltext.index.Search(req) - if err != nil { - return fmt.Errorf("rebuild search fulltext err: %w", err) - } - - if len(res.Hits) == 0 { - break - } - - for _, hit := range res.Hits { - idVal, _ := hit.Fields["id"].(string) - if idVal == "" { - idVal = hit.ID - } - - content, _ := hit.Fields["content"].(string) - - // Extract metadata and allowUsers - metaIfc := make(map[string]any) - allowUsers := []string{} - - for k, v := range hit.Fields { - if strings.HasPrefix(k, "metadata.") { - metaIfc[strings.TrimPrefix(k, "metadata.")] = v - } else if strings.HasPrefix(k, "U-") && k != "U-_system" { - if cast.To[string](v) == "1" { - allowUsers = append(allowUsers, strings.TrimPrefix(k, "U-")) - } - } - } - - // Add to new engines - err = newFEngine.AddDocument(idVal, content, metaIfc, allowUsers) - if err != nil { - idx.db.logger.Println("Rebuild fulltext add err:", err) - } - - if newVStore != nil && idx.db.embedding != nil { - vec, err := idx.db.embedding(content) - if err == nil { - coll, _ := newVStore.GetOrCreateCollection("main") - err = coll.AddRecord(idVal, vec, metaIfc, allowUsers) - if err != nil { - idx.db.logger.Println("Rebuild vector add err:", err) - } - } else { - idx.db.logger.Println("Rebuild vector embed err:", err) - } - } - } - - from += len(res.Hits) - if from >= int(res.Total) { - break - } - } - - // 4. Swap and cleanup - oldFEngine := idx.db.fulltext - oldVStore := idx.db.vector - - idx.db.fulltext = newFEngine - idx.db.vector = newVStore - - os.WriteFile(fvPath, []byte(cast.To[string](newFVersion)), 0644) - os.WriteFile(vvPath, []byte(cast.To[string](newVVersion)), 0644) - - // Close old engines and remove dirs in background - go func() { - oldFEngine.Close() - os.RemoveAll(filepath.Join(idx.db.indexDBPath, "fulltextV"+cast.To[string](fVersion))) - if oldVStore != nil { - os.RemoveAll(filepath.Join(idx.db.indexDBPath, "vectorV"+cast.To[string](vVersion))) - } - }() - - return nil -} diff --git a/indexDB_test.go b/indexDB_test.go index c00a96c..8ffd208 100644 --- a/indexDB_test.go +++ b/indexDB_test.go @@ -15,10 +15,12 @@ func mockEmbedding(text string) ([]float32, error) { } func TestIndexDB(t *testing.T) { - dbPath := "test_db" - defer os.RemoveAll(dbPath) + fPath := "test_fulltext" + vPath := "test_vector" + defer os.RemoveAll(fPath) + defer os.RemoveAll(vPath) - dbUnauth, err := GetDB(dbPath, mockEmbedding, nil) + dbUnauth, err := GetDB(fPath, vPath, mockEmbedding, nil) if err != nil { t.Fatalf("Failed to create engine: %v", err) } @@ -74,19 +76,20 @@ func TestIndexDB(t *testing.T) { t.Fatalf("User2 should NOT see doc1") } - // Rebuild Test - err = db.RebuildAll() + // Scan Test + docs, err := db.ScanDocuments("", 10) if err != nil { - t.Fatalf("Rebuild failed: %v", err) + t.Fatalf("Scan failed: %v", err) + } + if len(docs) != 2 { + t.Fatalf("Expected 2 docs in scan, got %d", len(docs)) } - // Wait and test search after rebuild - resultsRebuilt, err := db.Search("", "火星探测", 10, nil) + docsPart, err := db.ScanDocuments("1", 10) if err != nil { - t.Fatalf("Search after rebuild failed: %v", err) + t.Fatalf("Scan from 1 failed: %v", err) } - if len(resultsRebuilt) == 0 || resultsRebuilt[0].ID != "1" { - t.Fatalf("Expected doc1 after rebuild") + if len(docsPart) != 1 || docsPart[0].ID != "2" { + t.Fatalf("Expected doc 2 in scan from 1, got %v", docsPart) } - } diff --git a/types.go b/types.go index 3ba867b..af043ae 100644 --- a/types.go +++ b/types.go @@ -13,3 +13,10 @@ type SearchResult struct { Preview string Metadata map[string]any } + +type RawDocument struct { + ID string + Text string + Metadata map[string]any + AllowUsers []string +} diff --git a/vector.go b/vector.go index 75af723..5be7bf0 100644 --- a/vector.go +++ b/vector.go @@ -81,14 +81,19 @@ func (c *Collection) Search(queryVector []float32, topK int, userID string, idPr } } - nResults := topK * 10 count := c.coll.Count() - if nResults > count { + if count == 0 { + return nil, nil + } + + nResults := topK * 10 + if len(filter) > 0 || idPrefix != "" { + // If there are filters, we should fetch as much as possible to ensure accuracy. nResults = count } - if nResults == 0 { - return nil, nil + if nResults > count { + nResults = count } // We query more and filter in memory since Chromem-go only supports exact map filter.