gomog/internal/engine/memory_store.go

396 lines
9.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
"git.kingecg.top/kingecg/gomog/internal/database"
"git.kingecg.top/kingecg/gomog/pkg/errors"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// MemoryStore 内存数据存储
type MemoryStore struct {
mu sync.RWMutex
collections map[string]*Collection
adapter database.DatabaseAdapter
}
// Collection 内存集合
type Collection struct {
name string
documents map[string]types.Document // id -> Document
mu sync.RWMutex
}
// NewMemoryStore 创建内存存储
func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore {
return &MemoryStore{
collections: make(map[string]*Collection),
adapter: adapter,
}
}
// Initialize 从数据库加载所有现有集合到内存
func (ms *MemoryStore) Initialize(ctx context.Context) error {
if ms.adapter == nil {
log.Println("[INFO] No database adapter, skipping initialization")
return nil
}
// 获取所有现有集合
tables, err := ms.adapter.ListCollections(ctx)
if err != nil {
// 如果 ListCollections 未实现,返回 nil不加载
if err.Error() == "not implemented" {
log.Println("[WARN] ListCollections not implemented, skipping initialization")
return nil
}
return fmt.Errorf("failed to list collections: %w", err)
}
log.Printf("[INFO] Found %d collections in database", len(tables))
// 逐个加载集合
loadedCount := 0
for _, tableName := range tables {
// 从数据库加载所有文档
docs, err := ms.adapter.FindAll(ctx, tableName)
if err != nil {
log.Printf("[WARN] Failed to load collection %s: %v", tableName, err)
continue
}
// 创建集合并加载文档
// 注意:为了兼容 HTTP API 的 dbName.collection 格式,我们同时创建两个名称的引用
ms.mu.Lock()
coll := &Collection{
name: tableName,
documents: make(map[string]types.Document),
}
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
// 以表名作为集合名存储例如users
ms.collections[tableName] = coll
// TODO: 如果需要支持 dbName.collection 格式,需要在这里建立映射
// 但目前无法确定 dbName所以暂时只使用纯表名
ms.mu.Unlock()
loadedCount++
log.Printf("[DEBUG] Loaded collection %s with %d documents", tableName, len(docs))
}
log.Printf("[INFO] Successfully loaded %d collections from database", loadedCount)
return nil
}
// CreateTestCollectionForTesting 为测试创建集合(仅用于测试)
func CreateTestCollectionForTesting(store *MemoryStore, name string, documents map[string]types.Document) {
store.collections[name] = &Collection{
name: name,
documents: documents,
}
}
// LoadCollection 从数据库加载集合到内存
func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error {
// 检查集合是否存在
exists, err := ms.adapter.CollectionExists(ctx, name)
if err != nil {
return err
}
if !exists {
// 创建集合
if err := ms.adapter.CreateCollection(ctx, name); err != nil {
return err
}
}
// 从数据库加载所有文档
docs, err := ms.adapter.FindAll(ctx, name)
if err != nil {
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
coll := &Collection{
name: name,
documents: make(map[string]types.Document),
}
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
ms.collections[name] = coll
return nil
}
// GetCollection 获取集合(支持 dbName.collection 和纯表名两种格式)
func (ms *MemoryStore) GetCollection(name string) (*Collection, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
// 首先尝试完整名称例如testdb.users
coll, exists := ms.collections[name]
if exists {
return coll, nil
}
// 如果找不到尝试去掉数据库前缀例如users
if idx := strings.Index(name, "."); idx > 0 {
tableName := name[idx+1:]
coll, exists = ms.collections[tableName]
if exists {
return coll, nil
}
}
return nil, errors.ErrCollectionNotFnd
}
// Insert 插入文档到内存(集合不存在时自动创建)
func (ms *MemoryStore) Insert(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
// 集合不存在则创建
ms.mu.Lock()
ms.collections[collection] = &Collection{
name: collection,
documents: make(map[string]types.Document),
}
coll = ms.collections[collection]
ms.mu.Unlock()
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// Find 查询文档
func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
var results []types.Document
for _, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
results = append(results, doc)
}
}
return results, nil
}
// Update 更新文档(支持 upsert 和 arrayFilters
func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update, upsert bool, arrayFilters []types.Filter) (int, int, []string, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, 0, nil, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
matched := 0
modified := 0
var upsertedIDs []string
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
matched++
// 应用更新
newData := applyUpdateWithFilters(doc.Data, update, false, arrayFilters)
coll.documents[id] = types.Document{
ID: doc.ID,
Data: newData,
CreatedAt: doc.CreatedAt,
UpdatedAt: time.Now(),
}
modified++
}
}
// 处理 upsert如果没有匹配的文档且设置了 upsert
if matched == 0 && upsert {
// 创建新文档
newID := generateID()
newDoc := make(map[string]interface{})
// 应用更新($setOnInsert 会生效)
newData := applyUpdateWithFilters(newDoc, update, true, arrayFilters)
coll.documents[newID] = types.Document{
ID: newID,
Data: newData,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
matched = 1
modified = 1
upsertedIDs = append(upsertedIDs, newID)
}
return matched, modified, upsertedIDs, nil
}
// Delete 删除文档
func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
deleted := 0
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
delete(coll.documents, id)
deleted++
}
}
return deleted, nil
}
// SyncToDB 同步集合到数据库
func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
// 转换为文档数组
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
// 对于 SQLite去掉数据库前缀例如testdb.users -> users
tableName := collection
if idx := strings.Index(collection, "."); idx > 0 {
tableName = collection[idx+1:]
}
// 检查集合是否存在,不存在则创建
exists, err := ms.adapter.CollectionExists(ctx, tableName)
if err != nil {
// 如果 CollectionExists 未实现(返回 ErrNotImplemented尝试直接创建表
if err.Error() == "not implemented" {
// 尝试创建表,忽略已存在的错误
_ = ms.adapter.CreateCollection(ctx, tableName)
} else {
return err
}
} else if !exists {
// 集合不存在,创建它
if err := ms.adapter.CreateCollection(ctx, tableName); err != nil {
return err
}
}
// 批量插入/更新到数据库
// 注意:这里简化处理,实际应该区分新增和更新
return ms.adapter.InsertMany(ctx, tableName, docs)
}
// GetAllDocuments 获取集合的所有文档(用于聚合)
func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
return docs, nil
}
// DropCollection 删除整个集合
func (ms *MemoryStore) DropCollection(name string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
if _, exists := ms.collections[name]; !exists {
return errors.ErrCollectionNotFnd
}
delete(ms.collections, name)
// 如果使用了数据库适配器,同步到数据库
if ms.adapter != nil {
ctx := context.Background()
_ = ms.adapter.DropCollection(ctx, name) // 忽略错误,继续执行
}
return nil
}
// InsertDocument 插入单个文档(已存在则更新)
func (ms *MemoryStore) InsertDocument(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
// 集合不存在则创建
ms.mu.Lock()
ms.collections[collection] = &Collection{
name: collection,
documents: make(map[string]types.Document),
}
coll = ms.collections[collection]
ms.mu.Unlock()
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// UpdateDocument 更新单个文档
func (ms *MemoryStore) UpdateDocument(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
if _, exists := coll.documents[doc.ID]; !exists {
return errors.ErrDocumentNotFnd
}
coll.documents[doc.ID] = doc
return nil
}