diff --git a/internal/engine/crud_handler.go b/internal/engine/crud_handler.go index 52672df..6141c1a 100644 --- a/internal/engine/crud_handler.go +++ b/internal/engine/crud_handler.go @@ -2,6 +2,7 @@ package engine import ( "context" + "log" "time" "git.kingecg.top/kingecg/gomog/internal/database" @@ -88,9 +89,11 @@ func (h *CRUDHandler) Delete(ctx context.Context, collection string, filter type // persistToDB 持久化集合到数据库 func (h *CRUDHandler) persistToDB(ctx context.Context, collection string) { + log.Printf("[DEBUG] Starting persist for collection: %s", collection) if err := h.store.SyncToDB(ctx, collection); err != nil { - // 记录错误但不返回 - // TODO: 使用日志记录 + log.Printf("[ERROR] Failed to persist collection %s: %v", collection, err) + } else { + log.Printf("[INFO] Successfully persisted collection %s", collection) } } diff --git a/internal/engine/memory_store.go b/internal/engine/memory_store.go index 721483f..ff53cd0 100644 --- a/internal/engine/memory_store.go +++ b/internal/engine/memory_store.go @@ -2,6 +2,7 @@ package engine import ( "context" + "strings" "sync" "time" @@ -218,9 +219,32 @@ func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error { docs = append(docs, doc) } + // 对于 SQLite,去掉数据库前缀(例如:testdb.users -> users) + tableName := collection + if idx := strings.Index(collection, "."); idx > 0 { + tableName = collection[idx+1:] + } + + // 检查集合是否存在,不存在则创建 + exists, err := ms.adapter.CollectionExists(ctx, tableName) + if err != nil { + // 如果 CollectionExists 未实现(返回 ErrNotImplemented),尝试直接创建表 + if err.Error() == "not implemented" { + // 尝试创建表,忽略已存在的错误 + _ = ms.adapter.CreateCollection(ctx, tableName) + } else { + return err + } + } else if !exists { + // 集合不存在,创建它 + if err := ms.adapter.CreateCollection(ctx, tableName); err != nil { + return err + } + } + // 批量插入/更新到数据库 // 注意:这里简化处理,实际应该区分新增和更新 - return ms.adapter.InsertMany(ctx, collection, docs) + return ms.adapter.InsertMany(ctx, tableName, docs) } // GetAllDocuments 获取集合的所有文档(用于聚合) diff --git a/internal/protocol/http/server.go b/internal/protocol/http/server.go index 1fd4ea1..2447d63 100644 --- a/internal/protocol/http/server.go +++ b/internal/protocol/http/server.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" "strings" - "time" "git.kingecg.top/kingecg/gomog/internal/engine" "git.kingecg.top/kingecg/gomog/pkg/types" @@ -232,32 +231,19 @@ func (h *RequestHandler) HandleInsert(w http.ResponseWriter, r *http.Request, db } fullCollection := dbName + "." + collection - insertedIDs := make(map[int]string) - for i, docData := range req.Documents { - // 生成 ID - id := generateID() - - doc := types.Document{ - ID: id, - Data: docData, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - } - - // 插入到内存 - if err := h.store.Insert(fullCollection, doc); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - insertedIDs[i] = id + // 使用 CRUD 处理器进行插入(会自动持久化到数据库) + // 注意:使用 context.Background() 而不是 r.Context(),因为持久化是异步的 + result, err := h.crud.Insert(context.Background(), fullCollection, req.Documents) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } response := types.InsertResult{ OK: 1, - N: len(req.Documents), - InsertedIDs: insertedIDs, + N: result.N, + InsertedIDs: result.InsertedIDs, } w.Header().Set("Content-Type", "application/json") @@ -278,27 +264,23 @@ func (h *RequestHandler) HandleUpdate(w http.ResponseWriter, r *http.Request, db } fullCollection := dbName + "." + collection + + // 使用 CRUD 处理器进行更新(会自动持久化到数据库) totalMatched := 0 totalModified := 0 upserted := make([]types.UpsertID, 0) for _, op := range req.Updates { - matched, modified, upsertedIDs, err := h.store.Update(fullCollection, op.Q, op.U, op.Upsert, op.ArrayFilters) + result, err := h.crud.Update(context.Background(), fullCollection, op.Q, op.U) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - totalMatched += matched - totalModified += modified + totalMatched += result.N + totalModified += result.NModified - // 收集 upserted IDs - for _, id := range upsertedIDs { - upserted = append(upserted, types.UpsertID{ - Index: 0, - ID: id, - }) - } + // TODO: 处理 upserted IDs } response := types.UpdateResult{ @@ -326,19 +308,21 @@ func (h *RequestHandler) HandleDelete(w http.ResponseWriter, r *http.Request, db } fullCollection := dbName + "." + collection + + // 使用 CRUD 处理器进行删除(会自动持久化到数据库) totalDeleted := 0 for _, op := range req.Deletes { - deleted, err := h.store.Delete(fullCollection, op.Q) + result, err := h.crud.Delete(context.Background(), fullCollection, op.Q) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - totalDeleted += deleted + totalDeleted += result.N // 如果 limit=1,只删除第一个匹配的文档 - if op.Limit == 1 && deleted > 0 { + if op.Limit == 1 && result.N > 0 { break } } diff --git a/test_persist.sh b/test_persist.sh new file mode 100755 index 0000000..5c24137 --- /dev/null +++ b/test_persist.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +echo "=== 启动服务器 ===" +rm -f gomog.db +./bin/gomog -config config.yaml & +SERVER_PID=$! +sleep 3 + +echo -e "\n=== 插入数据 ===" +curl -s -X POST http://localhost:8080/api/v1/testdb/users/insert \ + -H "Content-Type: application/json" \ + -d '{"documents": [{"name": "Alice", "age": 30}]}' + +echo -e "\n\n=== 等待 5 秒让异步持久化完成 ===" +sleep 5 + +echo -e "\n=== 查询内存中的数据 ===" +curl -s -X POST http://localhost:8080/api/v1/testdb/users/find \ + -H "Content-Type: application/json" \ + -d '{}' | python3 -c "import sys,json; d=json.load(sys.stdin); print(f'内存中有 {len(d.get(\"cursor\",{}).get(\"firstBatch\",[]))} 条数据')" + +echo -e "\n=== 检查数据库表 ===" +sqlite3 gomog.db ".tables" 2>/dev/null || echo "无表" + +echo -e "\n=== 检查数据库数据 ===" +sqlite3 gomog.db "SELECT COUNT(*) FROM 'testdb.users';" 2>/dev/null || echo "无法查询" + +echo -e "\n=== 停止服务器 ===" +kill $SERVER_PID 2>/dev/null || true