feat: refactor API, add ScanDocuments and improve vector search accuracy (by AI)
This commit is contained in:
parent
ffa0f95c34
commit
02c091b976
17
CHANGELOG.md
17
CHANGELOG.md
@ -1,11 +1,10 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
## [1.0.0] - 2026-05-15
|
## [1.1.0] - 2026-05-15
|
||||||
### Added
|
### Changed
|
||||||
- Created `apigo.cc/go/indexDB` as an independent, unified hybrid search engine.
|
- Refactored `GetDB` to accept separate `fulltextDBPath` and `vectorDBPath`.
|
||||||
- Extracted and merged fulltext search (`bleve`) and vector search (`chromem-go`) from `knowbase`.
|
- Removed internal version management (`xxxVersion.txt`).
|
||||||
- Added support for RRF (Reciprocal Rank Fusion) for combined result scoring.
|
- Replaced `RebuildAll` with `ScanDocuments` for cursor-based data traversal.
|
||||||
- Implemented `Condition` filtering logic across both search engines.
|
- Improved vector search accuracy by expanding retrieval scope when filters are present.
|
||||||
- Implemented `RebuildAll` method to cleanly regenerate both fulltext and vector indices.
|
- Improved result merging by patching missing `Content` from the fulltext engine.
|
||||||
- Support user-level isolation using `Auth(userID)`.
|
- Switched `id` field to `keyword` analyzer for accurate range queries.
|
||||||
- Replaced custom dependency types and matched apigo.cc/go infrastructure.
|
|
||||||
|
|||||||
34
README.md
34
README.md
@ -2,13 +2,12 @@
|
|||||||
|
|
||||||
`apigo.cc/go/indexDB` 提供了统一的混合检索引擎,结合了基于 `bleve` 的全文检索和基于 `chromem-go` 的向量检索。
|
`apigo.cc/go/indexDB` 提供了统一的混合检索引擎,结合了基于 `bleve` 的全文检索和基于 `chromem-go` 的向量检索。
|
||||||
|
|
||||||
> **注意:** 启用向量检索需要传入 `embedding` 外部回调函数,这会增加额外的内存和计算开销,请在必要时使用。
|
|
||||||
|
|
||||||
## 特性
|
## 特性
|
||||||
- **全文+向量** 混合检索,内置倒数排序融合(RRF)进行评分合并。
|
- **全文+向量** 混合检索,内置倒数排序融合(RRF)进行评分合并。
|
||||||
- **无状态依赖**,仅接收和存储数据。不绑定特定的 LLM 或业务模型。
|
- **无状态依赖**,支持独立指定全文和向量库路径。
|
||||||
- **傻瓜化检索 API**,支持多种复杂条件 `Condition` 和 `idPrefix`。
|
- **高召回率**,在应用过滤器时会自动扩大向量检索范围,确保筛选结果的准确性。
|
||||||
- **平滑重建**,支持从 Bleve 重新导出数据生成全新的全文本及向量数据库,确保分词模型更改后能平滑过渡。
|
- **内容补全**,如果向量库命中但全文库未命中,会自动从全文库补全正文内容。
|
||||||
|
- **游标遍历**,提供 `ScanDocuments` 接口,支持基于 ID 的断点续传遍历,便于外部重建索引。
|
||||||
- **细粒度权限控制**,在引擎层进行系统和用户级别的视图隔离。
|
- **细粒度权限控制**,在引擎层进行系统和用户级别的视图隔离。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
@ -35,14 +34,14 @@ func mockEmbedding(text string) ([]float32, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// 若不传入 embedding 函数,则仅使用全文检索
|
// 向量库路径为空或 embedding 函数为 nil,则不启用向量搜索
|
||||||
dbUnauth, err := indexDB.GetDB("./data_dir", mockEmbedding, log.Default())
|
dbUnauth, err := indexDB.GetDB("./fulltext", "./vector", mockEmbedding, log.Default())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 绑定系统管理员或特定用户权限
|
// 绑定系统管理员或特定用户权限
|
||||||
db := dbUnauth.Auth(indexDB.SystemUserID) // 获取全部权限
|
db := dbUnauth.Auth(indexDB.SystemUserID)
|
||||||
|
|
||||||
// 2. 添加数据
|
// 2. 添加数据
|
||||||
db.Add("doc1", "这是一段测试文本", map[string]any{"source": "test"}, []string{"user1"})
|
db.Add("doc1", "这是一段测试文本", map[string]any{"source": "test"}, []string{"user1"})
|
||||||
@ -53,19 +52,22 @@ func main() {
|
|||||||
}
|
}
|
||||||
results, err := db.Search("", "测试", 10, filter)
|
results, err := db.Search("", "测试", 10, filter)
|
||||||
for _, r := range results {
|
for _, r := range results {
|
||||||
log.Println("ID:", r.ID, "Score:", r.Score)
|
log.Println("ID:", r.ID, "Score:", r.Score, "Content:", r.Content)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. 重建索引
|
// 4. 遍历数据(用于外部重建)
|
||||||
db.RebuildAll()
|
docs, _ := db.ScanDocuments("", 100)
|
||||||
|
for _, doc := range docs {
|
||||||
|
log.Println("Found:", doc.ID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## API 指南
|
## API 指南
|
||||||
|
|
||||||
- `GetDB(indexDBPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error)`: 获取引擎的非授权实例。
|
- `GetDB(fulltextPath, vectorPath string, embedding func(string) ([]float32, error), logger *log.Logger)`: 获取引擎实例。
|
||||||
- `(*IndexDBUnauthorized) Auth(userId string) *IndexDB`: 获得特定用户的授权实例。
|
- `(*IndexDBUnauthorized) Auth(userId string) *IndexDB`: 获得特定用户的授权实例。
|
||||||
- `(*IndexDB) Add(id string, text string, metadata map[string]any, allowUsers []string) error`: 将数据添加到引擎,触发回调写入向量。
|
- `(*IndexDB) Add(id, text string, metadata map[string]any, allowUsers []string) error`: 添加数据。
|
||||||
- `(*IndexDB) Remove(id string) error`: 从引擎中删除。
|
- `(*IndexDB) Remove(id string) error`: 删除数据。
|
||||||
- `(*IndexDB) Search(idPrefix string, query string, topK int, filter []Condition) ([]SearchResult, error)`: 混合检索接口。
|
- `(*IndexDB) Search(idPrefix, query string, topK int, filter []Condition) ([]SearchResult, error)`: 混合检索。
|
||||||
- `(*IndexDB) RebuildAll() error`: 根据旧索引生成全新的版本以适用新的分词或模型算法。
|
- `(*IndexDB) ScanDocuments(lastID string, limit int) ([]RawDocument, error)`: 基于游标遍历数据。
|
||||||
|
|||||||
@ -167,6 +167,7 @@ func buildIndexMapping() mapping.IndexMapping {
|
|||||||
docMapping.AddFieldMappingsAt("content", contentMapping)
|
docMapping.AddFieldMappingsAt("content", contentMapping)
|
||||||
|
|
||||||
idMapping := bleve.NewTextFieldMapping()
|
idMapping := bleve.NewTextFieldMapping()
|
||||||
|
idMapping.Analyzer = "keyword"
|
||||||
idMapping.Store = true
|
idMapping.Store = true
|
||||||
idMapping.Index = true
|
idMapping.Index = true
|
||||||
docMapping.AddFieldMappingsAt("id", idMapping)
|
docMapping.AddFieldMappingsAt("id", idMapping)
|
||||||
|
|||||||
298
indexDB.go
298
indexDB.go
@ -9,8 +9,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/blevesearch/bleve/v2/search/query"
|
|
||||||
"github.com/blevesearch/bleve/v2"
|
"github.com/blevesearch/bleve/v2"
|
||||||
|
"github.com/blevesearch/bleve/v2/search/query"
|
||||||
|
|
||||||
"apigo.cc/go/cast"
|
"apigo.cc/go/cast"
|
||||||
)
|
)
|
||||||
@ -18,7 +18,8 @@ import (
|
|||||||
const SystemUserID = "_system"
|
const SystemUserID = "_system"
|
||||||
|
|
||||||
type IndexDBUnauthorized struct {
|
type IndexDBUnauthorized struct {
|
||||||
indexDBPath string
|
fulltextPath string
|
||||||
|
vectorPath string
|
||||||
embedding func(text string) ([]float32, error)
|
embedding func(text string) ([]float32, error)
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
|
||||||
@ -28,71 +29,43 @@ type IndexDBUnauthorized struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDB(indexDBPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error) {
|
func GetDB(fulltextPath, vectorPath string, embedding func(string) ([]float32, error), logger *log.Logger) (*IndexDBUnauthorized, error) {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.Default()
|
logger = log.Default()
|
||||||
}
|
}
|
||||||
|
|
||||||
db := &IndexDBUnauthorized{
|
db := &IndexDBUnauthorized{
|
||||||
indexDBPath: indexDBPath,
|
fulltextPath: fulltextPath,
|
||||||
|
vectorPath: vectorPath,
|
||||||
embedding: embedding,
|
embedding: embedding,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.load()
|
if fulltextPath != "" {
|
||||||
|
if parent := filepath.Dir(fulltextPath); parent != "." {
|
||||||
|
os.MkdirAll(parent, 0755)
|
||||||
|
}
|
||||||
|
fEngine, err := newFulltextEngine(fulltextPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("init fulltext error: %w", err)
|
||||||
|
}
|
||||||
|
db.fulltext = fEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
if vectorPath != "" && embedding != nil {
|
||||||
|
if parent := filepath.Dir(vectorPath); parent != "." {
|
||||||
|
os.MkdirAll(parent, 0755)
|
||||||
|
}
|
||||||
|
vStore, err := newVectorStore(Config{StorageDir: vectorPath})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("init vector error: %w", err)
|
||||||
|
}
|
||||||
|
db.vector = vStore
|
||||||
}
|
}
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *IndexDBUnauthorized) load() error {
|
|
||||||
os.MkdirAll(db.indexDBPath, 0755)
|
|
||||||
|
|
||||||
fvPath := filepath.Join(db.indexDBPath, "fulltextVersion.txt")
|
|
||||||
vvPath := filepath.Join(db.indexDBPath, "vectorVersion.txt")
|
|
||||||
|
|
||||||
if _, err := os.Stat(fvPath); os.IsNotExist(err) {
|
|
||||||
os.WriteFile(fvPath, []byte("1"), 0644)
|
|
||||||
}
|
|
||||||
if _, err := os.Stat(vvPath); os.IsNotExist(err) {
|
|
||||||
os.WriteFile(vvPath, []byte("1"), 0644)
|
|
||||||
}
|
|
||||||
|
|
||||||
fv, _ := os.ReadFile(fvPath)
|
|
||||||
vv, _ := os.ReadFile(vvPath)
|
|
||||||
|
|
||||||
fVersion := strings.TrimSpace(string(fv))
|
|
||||||
vVersion := strings.TrimSpace(string(vv))
|
|
||||||
if fVersion == "" {
|
|
||||||
fVersion = "1"
|
|
||||||
}
|
|
||||||
if vVersion == "" {
|
|
||||||
vVersion = "1"
|
|
||||||
}
|
|
||||||
|
|
||||||
fulltextPath := filepath.Join(db.indexDBPath, "fulltextV"+fVersion)
|
|
||||||
vectorPath := filepath.Join(db.indexDBPath, "vectorV"+vVersion)
|
|
||||||
|
|
||||||
fEngine, err := newFulltextEngine(fulltextPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("init fulltext error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var vStore *Store
|
|
||||||
if db.embedding != nil {
|
|
||||||
vStore, err = newVectorStore(Config{StorageDir: vectorPath})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("init vector error: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
db.fulltext = fEngine
|
|
||||||
db.vector = vStore
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *IndexDBUnauthorized) Auth(userId string) *IndexDB {
|
func (db *IndexDBUnauthorized) Auth(userId string) *IndexDB {
|
||||||
return &IndexDB{
|
return &IndexDB{
|
||||||
db: db,
|
db: db,
|
||||||
@ -109,10 +82,12 @@ func (idx *IndexDB) Add(id string, text string, metadata map[string]any, allowUs
|
|||||||
idx.db.mu.RLock()
|
idx.db.mu.RLock()
|
||||||
defer idx.db.mu.RUnlock()
|
defer idx.db.mu.RUnlock()
|
||||||
|
|
||||||
|
if idx.db.fulltext != nil {
|
||||||
err := idx.db.fulltext.AddDocument(id, text, metadata, allowUsers)
|
err := idx.db.fulltext.AddDocument(id, text, metadata, allowUsers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("fulltext add: %w", err)
|
return fmt.Errorf("fulltext add: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if idx.db.embedding != nil && idx.db.vector != nil {
|
if idx.db.embedding != nil && idx.db.vector != nil {
|
||||||
vec, err := idx.db.embedding(text)
|
vec, err := idx.db.embedding(text)
|
||||||
@ -135,10 +110,12 @@ func (idx *IndexDB) Remove(id string) error {
|
|||||||
idx.db.mu.RLock()
|
idx.db.mu.RLock()
|
||||||
defer idx.db.mu.RUnlock()
|
defer idx.db.mu.RUnlock()
|
||||||
|
|
||||||
|
if idx.db.fulltext != nil {
|
||||||
err := idx.db.fulltext.RemoveDocument(id)
|
err := idx.db.fulltext.RemoveDocument(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("fulltext remove: %w", err)
|
return fmt.Errorf("fulltext remove: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if idx.db.vector != nil {
|
if idx.db.vector != nil {
|
||||||
coll, err := idx.db.vector.GetOrCreateCollection("main")
|
coll, err := idx.db.vector.GetOrCreateCollection("main")
|
||||||
@ -158,11 +135,13 @@ func (idx *IndexDB) Search(idPrefix string, queryStr string, topK int, filter []
|
|||||||
var vResults []SearchResult
|
var vResults []SearchResult
|
||||||
var fErr, vErr error
|
var fErr, vErr error
|
||||||
|
|
||||||
|
if idx.db.fulltext != nil {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
fResults, fErr = idx.db.fulltext.Search(idPrefix, queryStr, topK*5, idx.userId, filter)
|
fResults, fErr = idx.db.fulltext.Search(idPrefix, queryStr, topK*5, idx.userId, filter)
|
||||||
}()
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
if idx.db.vector != nil && idx.db.embedding != nil && queryStr != "" {
|
if idx.db.vector != nil && idx.db.embedding != nil && queryStr != "" {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -193,7 +172,34 @@ func (idx *IndexDB) Search(idPrefix string, queryStr string, topK int, filter []
|
|||||||
return nil, fmt.Errorf("vector search: %w", vErr)
|
return nil, fmt.Errorf("vector search: %w", vErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return mergeAndRRF(fResults, vResults, topK), nil
|
merged := mergeAndRRF(fResults, vResults, topK)
|
||||||
|
|
||||||
|
// Patch content for vector-only results
|
||||||
|
if idx.db.fulltext != nil {
|
||||||
|
for i := range merged {
|
||||||
|
if merged[i].Content == "" {
|
||||||
|
q := query.NewDocIDQuery([]string{merged[i].ID})
|
||||||
|
req := bleve.NewSearchRequest(q)
|
||||||
|
req.Fields = []string{"content", "metadata"}
|
||||||
|
res, _ := idx.db.fulltext.index.Search(req)
|
||||||
|
if res != nil && len(res.Hits) > 0 {
|
||||||
|
merged[i].Content, _ = res.Hits[0].Fields["content"].(string)
|
||||||
|
// Metadata might also be missing if it was vector-only hit
|
||||||
|
if merged[i].Metadata == nil {
|
||||||
|
metaIfc := make(map[string]any)
|
||||||
|
for k, v := range res.Hits[0].Fields {
|
||||||
|
if strings.HasPrefix(k, "metadata.") {
|
||||||
|
metaIfc[strings.TrimPrefix(k, "metadata.")] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
merged[i].Metadata = metaIfc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return merged, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeAndRRF(fResults []SearchResult, vResults []SearchResult, topK int) []SearchResult {
|
func mergeAndRRF(fResults []SearchResult, vResults []SearchResult, topK int) []SearchResult {
|
||||||
@ -233,6 +239,67 @@ func mergeAndRRF(fResults []SearchResult, vResults []SearchResult, topK int) []S
|
|||||||
return merged
|
return merged
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (idx *IndexDB) ScanDocuments(lastID string, limit int) ([]RawDocument, error) {
|
||||||
|
idx.db.mu.RLock()
|
||||||
|
defer idx.db.mu.RUnlock()
|
||||||
|
|
||||||
|
if idx.db.fulltext == nil {
|
||||||
|
return nil, fmt.Errorf("fulltext engine not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
var q query.Query
|
||||||
|
if lastID == "" {
|
||||||
|
q = query.NewMatchAllQuery()
|
||||||
|
} else {
|
||||||
|
rq := query.NewTermRangeQuery(lastID, "")
|
||||||
|
f := false
|
||||||
|
rq.InclusiveMin = &f
|
||||||
|
rq.SetField("id")
|
||||||
|
q = rq
|
||||||
|
}
|
||||||
|
|
||||||
|
req := bleve.NewSearchRequest(q)
|
||||||
|
req.Size = limit
|
||||||
|
req.SortBy([]string{"id"})
|
||||||
|
req.Fields = []string{"*"}
|
||||||
|
|
||||||
|
res, err := idx.db.fulltext.index.Search(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("scan search error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var docs []RawDocument
|
||||||
|
for _, hit := range res.Hits {
|
||||||
|
idVal, _ := hit.Fields["id"].(string)
|
||||||
|
if idVal == "" {
|
||||||
|
idVal = hit.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
content, _ := hit.Fields["content"].(string)
|
||||||
|
metaIfc := make(map[string]any)
|
||||||
|
var allowUsers []string
|
||||||
|
|
||||||
|
for k, v := range hit.Fields {
|
||||||
|
if strings.HasPrefix(k, "metadata.") {
|
||||||
|
metaIfc[strings.TrimPrefix(k, "metadata.")] = v
|
||||||
|
} else if strings.HasPrefix(k, "U-") && k != "U-_system" {
|
||||||
|
if cast.To[string](v) == "1" {
|
||||||
|
allowUsers = append(allowUsers, strings.TrimPrefix(k, "U-"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
docs = append(docs, RawDocument{
|
||||||
|
ID: idVal,
|
||||||
|
Text: content,
|
||||||
|
Metadata: metaIfc,
|
||||||
|
AllowUsers: allowUsers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return docs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func evalCondition(metadata map[string]any, id string, idPrefix string, filters []Condition) bool {
|
func evalCondition(metadata map[string]any, id string, idPrefix string, filters []Condition) bool {
|
||||||
if idPrefix != "" && !strings.HasPrefix(id, idPrefix) {
|
if idPrefix != "" && !strings.HasPrefix(id, idPrefix) {
|
||||||
return false
|
return false
|
||||||
@ -293,128 +360,3 @@ func evalCondition(metadata map[string]any, id string, idPrefix string, filters
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (idx *IndexDB) RebuildAll() error {
|
|
||||||
idx.db.mu.Lock()
|
|
||||||
defer idx.db.mu.Unlock()
|
|
||||||
|
|
||||||
// 1. Get current versions
|
|
||||||
fvPath := filepath.Join(idx.db.indexDBPath, "fulltextVersion.txt")
|
|
||||||
vvPath := filepath.Join(idx.db.indexDBPath, "vectorVersion.txt")
|
|
||||||
|
|
||||||
fv, _ := os.ReadFile(fvPath)
|
|
||||||
vv, _ := os.ReadFile(vvPath)
|
|
||||||
|
|
||||||
fVersion := cast.To[int](strings.TrimSpace(string(fv)))
|
|
||||||
vVersion := cast.To[int](strings.TrimSpace(string(vv)))
|
|
||||||
|
|
||||||
if fVersion <= 0 { fVersion = 1 }
|
|
||||||
if vVersion <= 0 { vVersion = 1 }
|
|
||||||
|
|
||||||
newFVersion := fVersion + 1
|
|
||||||
newVVersion := vVersion + 1
|
|
||||||
|
|
||||||
newFPath := filepath.Join(idx.db.indexDBPath, "fulltextV"+cast.To[string](newFVersion))
|
|
||||||
newVPath := filepath.Join(idx.db.indexDBPath, "vectorV"+cast.To[string](newVVersion))
|
|
||||||
|
|
||||||
// 2. Initialize new engines
|
|
||||||
newFEngine, err := newFulltextEngine(newFPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("rebuild new fulltext err: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var newVStore *Store
|
|
||||||
if idx.db.embedding != nil {
|
|
||||||
newVStore, err = newVectorStore(Config{StorageDir: newVPath})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("rebuild new vector err: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Read all data from old fulltext engine
|
|
||||||
// Bleve search MatchAll with large size or pagination
|
|
||||||
req := bleve.NewSearchRequest(query.NewMatchAllQuery())
|
|
||||||
req.Fields = []string{"*"}
|
|
||||||
req.Size = 1000
|
|
||||||
|
|
||||||
from := 0
|
|
||||||
for {
|
|
||||||
req.From = from
|
|
||||||
res, err := idx.db.fulltext.index.Search(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("rebuild search fulltext err: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(res.Hits) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, hit := range res.Hits {
|
|
||||||
idVal, _ := hit.Fields["id"].(string)
|
|
||||||
if idVal == "" {
|
|
||||||
idVal = hit.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
content, _ := hit.Fields["content"].(string)
|
|
||||||
|
|
||||||
// Extract metadata and allowUsers
|
|
||||||
metaIfc := make(map[string]any)
|
|
||||||
allowUsers := []string{}
|
|
||||||
|
|
||||||
for k, v := range hit.Fields {
|
|
||||||
if strings.HasPrefix(k, "metadata.") {
|
|
||||||
metaIfc[strings.TrimPrefix(k, "metadata.")] = v
|
|
||||||
} else if strings.HasPrefix(k, "U-") && k != "U-_system" {
|
|
||||||
if cast.To[string](v) == "1" {
|
|
||||||
allowUsers = append(allowUsers, strings.TrimPrefix(k, "U-"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to new engines
|
|
||||||
err = newFEngine.AddDocument(idVal, content, metaIfc, allowUsers)
|
|
||||||
if err != nil {
|
|
||||||
idx.db.logger.Println("Rebuild fulltext add err:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if newVStore != nil && idx.db.embedding != nil {
|
|
||||||
vec, err := idx.db.embedding(content)
|
|
||||||
if err == nil {
|
|
||||||
coll, _ := newVStore.GetOrCreateCollection("main")
|
|
||||||
err = coll.AddRecord(idVal, vec, metaIfc, allowUsers)
|
|
||||||
if err != nil {
|
|
||||||
idx.db.logger.Println("Rebuild vector add err:", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
idx.db.logger.Println("Rebuild vector embed err:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
from += len(res.Hits)
|
|
||||||
if from >= int(res.Total) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Swap and cleanup
|
|
||||||
oldFEngine := idx.db.fulltext
|
|
||||||
oldVStore := idx.db.vector
|
|
||||||
|
|
||||||
idx.db.fulltext = newFEngine
|
|
||||||
idx.db.vector = newVStore
|
|
||||||
|
|
||||||
os.WriteFile(fvPath, []byte(cast.To[string](newFVersion)), 0644)
|
|
||||||
os.WriteFile(vvPath, []byte(cast.To[string](newVVersion)), 0644)
|
|
||||||
|
|
||||||
// Close old engines and remove dirs in background
|
|
||||||
go func() {
|
|
||||||
oldFEngine.Close()
|
|
||||||
os.RemoveAll(filepath.Join(idx.db.indexDBPath, "fulltextV"+cast.To[string](fVersion)))
|
|
||||||
if oldVStore != nil {
|
|
||||||
os.RemoveAll(filepath.Join(idx.db.indexDBPath, "vectorV"+cast.To[string](vVersion)))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
@ -15,10 +15,12 @@ func mockEmbedding(text string) ([]float32, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestIndexDB(t *testing.T) {
|
func TestIndexDB(t *testing.T) {
|
||||||
dbPath := "test_db"
|
fPath := "test_fulltext"
|
||||||
defer os.RemoveAll(dbPath)
|
vPath := "test_vector"
|
||||||
|
defer os.RemoveAll(fPath)
|
||||||
|
defer os.RemoveAll(vPath)
|
||||||
|
|
||||||
dbUnauth, err := GetDB(dbPath, mockEmbedding, nil)
|
dbUnauth, err := GetDB(fPath, vPath, mockEmbedding, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create engine: %v", err)
|
t.Fatalf("Failed to create engine: %v", err)
|
||||||
}
|
}
|
||||||
@ -74,19 +76,20 @@ func TestIndexDB(t *testing.T) {
|
|||||||
t.Fatalf("User2 should NOT see doc1")
|
t.Fatalf("User2 should NOT see doc1")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild Test
|
// Scan Test
|
||||||
err = db.RebuildAll()
|
docs, err := db.ScanDocuments("", 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Rebuild failed: %v", err)
|
t.Fatalf("Scan failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(docs) != 2 {
|
||||||
|
t.Fatalf("Expected 2 docs in scan, got %d", len(docs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait and test search after rebuild
|
docsPart, err := db.ScanDocuments("1", 10)
|
||||||
resultsRebuilt, err := db.Search("", "火星探测", 10, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Search after rebuild failed: %v", err)
|
t.Fatalf("Scan from 1 failed: %v", err)
|
||||||
}
|
}
|
||||||
if len(resultsRebuilt) == 0 || resultsRebuilt[0].ID != "1" {
|
if len(docsPart) != 1 || docsPart[0].ID != "2" {
|
||||||
t.Fatalf("Expected doc1 after rebuild")
|
t.Fatalf("Expected doc 2 in scan from 1, got %v", docsPart)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
7
types.go
7
types.go
@ -13,3 +13,10 @@ type SearchResult struct {
|
|||||||
Preview string
|
Preview string
|
||||||
Metadata map[string]any
|
Metadata map[string]any
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RawDocument struct {
|
||||||
|
ID string
|
||||||
|
Text string
|
||||||
|
Metadata map[string]any
|
||||||
|
AllowUsers []string
|
||||||
|
}
|
||||||
|
|||||||
13
vector.go
13
vector.go
@ -81,14 +81,19 @@ func (c *Collection) Search(queryVector []float32, topK int, userID string, idPr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nResults := topK * 10
|
|
||||||
count := c.coll.Count()
|
count := c.coll.Count()
|
||||||
if nResults > count {
|
if count == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nResults := topK * 10
|
||||||
|
if len(filter) > 0 || idPrefix != "" {
|
||||||
|
// If there are filters, we should fetch as much as possible to ensure accuracy.
|
||||||
nResults = count
|
nResults = count
|
||||||
}
|
}
|
||||||
|
|
||||||
if nResults == 0 {
|
if nResults > count {
|
||||||
return nil, nil
|
nResults = count
|
||||||
}
|
}
|
||||||
|
|
||||||
// We query more and filter in memory since Chromem-go only supports exact map filter.
|
// We query more and filter in memory since Chromem-go only supports exact map filter.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user