gomog/manual/AGGREGATION_PIPELINE.md

13 KiB

Gomog 聚合管道参考文档

版本: v1.0.0-alpha
最后更新: 2026-03-14


📖 目录

  1. 概述
  2. 数据过滤阶段
  3. 数据转换阶段
  4. 数据分组阶段
  5. 关联查询阶段
  6. 递归查找阶段
  7. 性能优化阶段
  8. 使用示例

概述

什么是聚合管道?

聚合管道是一种强大的数据处理机制,允许你将多个操作组合成一个流水线。每个阶段接收前一个阶段的输出作为输入,并产生新的输出传递给下一个阶段。

基本语法

{
  "pipeline": [
    {"$stage1": {...}},
    {"$stage2": {...}},
    {"$stage3": {...}}
  ]
}

执行流程

输入 → [$match] → [$group] → [$sort] → [$limit] → 输出
         ↓           ↓          ↓         ↓
       过滤       分组       排序      限制

数据过滤阶段

$match

过滤文档,只传递符合条件的文档到下一阶段。

语法:

{"$match": {"<field>": {"<operator>": <value>}}}

示例:

{
  "$match": {
    "status": "active",
    "age": {"$gte": 18, "$lte": 65},
    "score": {"$gt": 80}
  }
}

支持的查询操作符:

  • 比较:$eq, $ne, $gt, $gte, $lt, $lte, $in, $nin
  • 逻辑:$and, $or, $not, $nor
  • 元素:$exists, $type, $size
  • 数组:$all, $elemMatch
  • 正则:$regex

$limit

限制通过的文档数量。

语法:

{"$limit": <number>}

示例:

{"$limit": 10}

$skip

跳过指定数量的文档。

语法:

{"$skip": <number>}

示例:

{"$skip": 20}

$sample

随机采样文档。

语法:

{"$sample": {"size": <number>}}

示例:

{"$sample": {"size": 5}}

$bucket

将文档分组到桶中。

语法:

{
  "$bucket": {
    "groupBy": "<expression>",
    "boundaries": [<boundary1>, <boundary2>, ...],
    "default": <literal>,
    "output": {<output1>: {<expression1>}, ...}
  }
}

示例:

{
  "$bucket": {
    "groupBy": "$price",
    "boundaries": [0, 100, 500, 1000],
    "default": "Other",
    "output": {
      "count": {"$sum": 1},
      "avgPrice": {"$avg": "$price"}
    }
  }
}

数据转换阶段

$project

重塑文档结构(选择字段、添加计算字段等)。

语法:

{
  "$project": {
    "<field1>": <expression>,
    "<field2>": <expression>,
    ...
  }
}

示例:

{
  "$project": {
    "name": 1,
    "email": 1,
    "_id": 0,
    "fullName": {"$concat": ["$firstName", " ", "$lastName"]},
    "ageInMonths": {"$multiply": ["$age", 12]}
  }
}

支持的表达式操作符:

  • 算术:$add, $subtract, $multiply, $divide, $pow, $sqrt
  • 字符串:$concat, $substr, $trim, $split, $replaceAll
  • 比较:$max, $min, cmp
  • 条件:$cond, $ifNull, $switch
  • 类型转换:$toString, $toInt, $toDouble, $toBool

$addFields / $set

添加新字段或修改现有字段。

语法:

{
  "$addFields": {
    "<newField>": <expression>
  }
}

示例:

{
  "$addFields": {
    "total": {"$add": ["$price", "$tax"]},
    "discountedPrice": {"$multiply": ["$price", 0.9]}
  }
}

$unset / $unset

移除字段。

语法:

{"$unset": "<field>"}
// 或
{"$unset": ["<field1>", "<field2>", ...]}

示例:

{"$unset": ["tempField", "internalData"]}

$replaceRoot / $replaceWith

替换根文档。

语法:

{"$replaceRoot": {"newRoot": <expression>}}

示例:

{
  "$replaceRoot": {"newRoot": "$user"}
}

数据分组阶段

$group

按指定键分组并计算聚合值。

语法:

{
  "$group": {
    "_id": <expression>,
    "<field1>": {<accumulator1>},
    "<field2>": {<accumulator2>},
    ...
  }
}

示例:

{
  "$group": {
    "_id": "$department",
    "employeeCount": {"$sum": 1},
    "avgSalary": {"$avg": "$salary"},
    "maxSalary": {"$max": "$salary"},
    "minSalary": {"$min": "$salary"},
    "totalSalary": {"$sum": "$salary"}
  }
}

累加器操作符:

  • $sum: 求和
  • $avg: 平均值
  • $min: 最小值
  • $max: 最大值
  • $push: 推入数组
  • $addToSet: 添加到集合(去重)
  • $first: 第一个值
  • $last: 最后一个值
  • $count: 计数

$sort

对文档进行排序。

语法:

{"$sort": {"<field1>": <order>, "<field2>": <order>, ...}}

示例:

{
  "$sort": {
    "createdAt": -1,  // 降序
    "name": 1         // 升序
  }
}

$sortByCount

按值分组并计数。

语法:

{"$sortByCount": <expression>}

示例:

{"$sortByCount": "$category"}

等价于:

[
  {"$group": {"_id": "$category", "count": {"$sum": 1}}},
  {"$sort": {"count": -1}}
]

关联查询阶段

$lookup

左外连接其他集合。

语法 1 (基本形式):

{
  "$lookup": {
    "from": "<collection>",
    "localField": "<field>",
    "foreignField": "<field>",
    "as": "<outputField>"
  }
}

示例 1:

{
  "$lookup": {
    "from": "orders",
    "localField": "_id",
    "foreignField": "userId",
    "as": "orders"
  }
}

语法 2 (高级形式,支持管道):

{
  "$lookup": {
    "from": "<collection>",
    "let": {<var1>: <expression>, ...},
    "pipeline": [<stages>],
    "as": "<outputField>"
  }
}

示例 2:

{
  "$lookup": {
    "from": "orders",
    "let": {"userId": "$_id"},
    "pipeline": [
      {"$match": {"$expr": {"$eq": ["$userId", "$$userId"]}}},
      {"$sort": {"createdAt": -1}},
      {"$limit": 5}
    ],
    "as": "recentOrders"
  }
}

$graphLookup

递归查找关联文档。

语法:

{
  "$graphLookup": {
    "from": "<collection>",
    "startWith": <expression>,
    "connectFromField": "<field>",
    "connectToField": "<field>",
    "as": "<outputField>",
    "maxDepth": <number>,
    "depthField": "<string>",
    "restrictSearchWithMatch": <document>
  }
}

示例:

{
  "$graphLookup": {
    "from": "orgs",
    "startWith": "$parentId",
    "connectFromField": "_id",
    "connectToField": "parentId",
    "as": "subOrgs",
    "maxDepth": 5,
    "depthField": "level"
  }
}

性能优化阶段

$facet

多面聚合(并行执行多个子管道)。

语法:

{
  "$facet": {
    "<outputField1>": [<stages>],
    "<outputField2>": [<stages>],
    ...
  }
}

示例:

{
  "$facet": {
    "products": [
      {"$match": {"category": "electronics"}},
      {"$limit": 10}
    ],
    "categories": [
      {"$groupBy": "category"},
      {"$count": "count"}
    ],
    "priceStats": [
      {"$group": {
        "_id": null,
        "avgPrice": {"$avg": "$price"},
        "maxPrice": {"$max": "$price"}
      }}
    ]
  }
}

$indexStats

获取集合的索引统计信息。

语法:

{"$indexStats": {}}

示例:

{
  "$facet": {
    "data": [{"$match": {"status": "active"}}],
    "indexes": [{"$indexStats": {}}]
  }
}

窗口函数阶段

$setWindowFields

为每个文档计算窗口函数。

语法:

{
  "$setWindowFields": {
    "partitionBy": <expression>,
    "sortBy": {<field>: <order>, ...},
    "output": {
      "<field>": {
        "<windowOperator>": {},
        "window": {
          "documents": [<lower>, <upper>]
        }
      },
      ...
    }
  }
}

示例:

{
  "$setWindowFields": {
    "partitionBy": "$region",
    "sortBy": {"date": 1},
    "output": {
      "runningTotal": {
        "$sum": "$amount",
        "window": {"documents": ["unbounded", "current"]}
      },
      "movingAvg": {
        "$avg": "$amount",
        "window": {"documents": [-2, 0]}
      },
      "rank": {"$documentNumber": {}}
    }
  }
}

支持的窗口操作符:

  • $sum: 累加和
  • $avg: 移动平均
  • $min: 窗口最小值
  • $max: 窗口最大值
  • $first: 窗口第一个值
  • $last: 窗口最后一个值
  • $documentNumber: 文档序号
  • $rank: 排名
  • $denseRank: 密集排名
  • $percentRank: 百分比排名
  • $cumeDist: 累积分布
  • $shift: 偏移值
  • $fill: 填充值

使用示例

电商订单分析

{
  "pipeline": [
    {
      "$match": {
        "status": "completed",
        "createdAt": {"$gte": ISODate("2024-01-01")}
      }
    },
    {
      "$lookup": {
        "from": "customers",
        "localField": "customerId",
        "foreignField": "_id",
        "as": "customer"
      }
    },
    {
      "$unwind": "$customer"
    },
    {
      "$group": {
        "_id": "$customer.region",
        "totalRevenue": {"$sum": "$amount"},
        "orderCount": {"$sum": 1},
        "avgOrderValue": {"$avg": "$amount"},
        "topCustomers": {
          "$push": {
            "name": "$customer.name",
            "total": "$amount"
          }
        }
      }
    },
    {
      "$sort": {"totalRevenue": -1}
    },
    {
      "$limit": 10
    }
  ]
}

用户行为分析

{
  "pipeline": [
    {
      "$match": {
        "eventType": "page_view",
        "timestamp": {"$gte": ISODate("2024-01-01")}
      }
    },
    {
      "$setWindowFields": {
        "partitionBy": "$userId",
        "sortBy": {"timestamp": 1},
        "output": {
          "sessionNumber": {
            "$documentNumber": {}
          },
          "timeOnPage": {
            "$subtract": [
              {"$shift": {"n": 1, "default": "$$NOW.timestamp"}},
              "$timestamp"
            ]
          }
        }
      }
    },
    {
      "$group": {
        "_id": "$userId",
        "totalViews": {"$sum": 1},
        "avgTimeOnPage": {"$avg": "$timeOnPage"},
        "uniquePages": {"$addToSet": "$pageId"}
      }
    },
    {
      "$project": {
        "_id": 0,
        "userId": "$_id",
        "totalViews": 1,
        "avgTimeOnPage": 1,
        "uniquePages": {"$size": "$uniquePages"}
      }
    }
  ]
}

社交网络分析

{
  "pipeline": [
    {
      "$match": {"_id": ObjectId("...")}
    },
    {
      "$graphLookup": {
        "from": "users",
        "startWith": "$friends",
        "connectFromField": "friends",
        "connectToField": "_id",
        "as": "network",
        "maxDepth": 3,
        "depthField": "degree"
      }
    },
    {
      "$unwind": "$network"
    },
    {
      "$group": {
        "_id": "$network.degree",
        "people": {"$addToSet": "$network.name"}
      }
    },
    {
      "$project": {
        "degree": "$_id",
        "count": {"$size": "$people"},
        "people": 1
      }
    },
    {
      "$sort": {"degree": 1}
    }
  ]
}

财务报表生成

{
  "pipeline": [
    {
      "$facet": {
        "revenue": [
          {"$match": {"type": "income"}},
          {"$group": {"_id": null, "total": {"$sum": "$amount"}}}
        ],
        "expenses": [
          {"$match": {"type": "expense"}},
          {"$group": {"_id": null, "total": {"$sum": "$amount"}}}
        ],
        "byCategory": [
          {
            "$group": {
              "_id": "$category",
              "total": {"$sum": "$amount"},
              "count": {"$sum": 1}
            }
          },
          {"$sort": {"total": -1}},
          {"$limit": 10}
        ]
      }
    },
    {
      "$project": {
        "netProfit": {
          "$subtract": [
            {"$arrayElemAt": ["$revenue.total", 0]},
            {"$arrayElemAt": ["$expenses.total", 0]}
          ]
        },
        "topCategories": "$byCategory"
      }
    }
  ]
}

最佳实践

1. 尽早过滤

// ✅ 推荐:先过滤再处理
{"$match": {...}},
{"$group": {...}},
{"$sort": {...}}

// ❌ 不推荐:先处理再过滤
{"$group": {...}},
{"$match": {...}}

2. 使用投影减少数据量

{"$project": {"neededField": 1, "unneededField": 0}}

3. 合理使用索引

// 确保 $match 阶段使用有索引的字段
{"$match": {"indexedField": value}}

4. 限制结果集大小

// 总是使用 $limit 防止内存溢出
{"$limit": 1000}

5. 避免昂贵的操作

// ❌ 避免在 $match 中使用 $where
{"$match": {"$where": "this.field > 100"}}

// ✅ 使用标准操作符
{"$match": {"field": {"$gt": 100}}}

性能调优

explain 分析

curl -X POST http://localhost:8080/api/v1/testdb/users/aggregate \
  -H "Content-Type: application/json" \
  -d '{
    "pipeline": [...],
    "explain": true
  }'

监控慢聚合

log:
  level: "debug"
  slow_aggregation_threshold: "500ms"

维护者: Gomog Team
许可证: MIT