常用聚合管道、单用途聚合方法、MapReduce
一、聚合简述
在日常开发中,我们通常需要对存储数据进行聚合分析后,再返回给客户端。MongoDB提供了三种聚合的方式,分别是聚合管道,map-reduce 函数和单用途聚合方法。
二、聚合管道
MongoDB 的聚合操作类似于流水线处理,文档会依次进入多个管道阶段并执行相应的操作。这里先插入部分演示数据:
db.employees.insertMany([
{
emp_no: 10001,
name: {firstName:"Georgi",lastName:"Facello"},
age: 26,
gender: "F",
hobby: ["basketball", "football"]
},
{
emp_no: 10002,
name: {firstName:"Bezalel",lastName:"Simmel"},
age: 32,
gender: "M",
hobby: ["basketball", "tennis"]
},
{
emp_no: 10003,
name: {firstName:"Parto",lastName:"Bamford"},
age: 46,
gender: "M",
hobby: []
},
{
emp_no: 10004,
name: {firstName:"Chirstian",lastName:"Koblick"},
age: 40,
gender: "F",
hobby: ["football", "tennis"]
}
])
一个简单的聚合操作如下,这个聚合操作会经过两个阶段的数据处理:
- 第一个管道阶段为
$match
:会筛选出所有性别值为 F 的雇员的文档,然后输出到下一个管道操作中; - 第二个管道阶段为
$project
:用于定义返回的字段内容,这里返回 fullname 字段,它由firstName + lastName
组成。
db.employees.aggregate([
{ $match: { gender: "F" } },
{ $project:
{ fullName:
{ $concat: ["$name.firstName", "$name.lastName"]}
}
}
])
所以最后的输出结果如下:
{
"_id" : ObjectId("5d3fe6488ba16934ccce999d"),
"fullName" : "GeorgiFacello"
},
{
"_id" : ObjectId("5d3fe6488ba16934ccce99a0"),
"fullName" : "ChirstianKoblick"
}
在当前最新的 MongoDB 4.x 中,MongoDB 提供了将近 30 个管道阶段,用于满足不同数据处理的需求。以下主要介绍常用几个管道阶段,如果想要了解全部的管道阶段,可以参见官方文档:Aggregation Pipeline Stages 。
1.1 $match
$match
主要用于筛选符合条件的数据,通常应该把 $match
放在尽量靠前的位置,这时候它会利用索引来优化查询,同时还可以降低后续阶段所需要处理的数据量。示例如下:
db.employees.aggregate([
{ $match: { gender: "F" } }
])
1.2 $project
$project
主要用于定义需要返回的字段,1 代表包含该字段,0 代表不包含,除了可以作用于顶层字段外,还可以作用于内嵌字段。同时 $project
还支持使用表达式将多个字段或变量进行组合,并作为新的字段返回。示例如下:
db.employees.aggregate([
{
$project: {
_id: 0,
"name.firstName": 1,
gender: 1,
fullName: { $concat: ["$name.firstName", "$name.lastName"] }
}
}
])
从 MongoDB 3.6 开始,还可以在聚合表达式中使用 $project + REMOVE
来按照条件过滤返回字段,设置为 REMOVE 的字段将会从 $projection
的输出中排除。示例如下:
db.employees.aggregate([
{
$project: {
hobby: {
$cond: {
if: { $eq: [ [], "$hobby" ] },
then: "$$REMOVE",
else: "$hobby"
}
}
}
}
])
这里判断当文档的 hobby 属性为空数组时,其 hobby 属性不会被输出到下一个管道阶段。
1.3 $group
$group
管道阶段和大多数关系型数据库中的 group by 字句功能类似,都是用于分组计算。示例如下:
db.employees.aggregate(
[
{ $group : {
_id : "$gender",
totalAge: { $sum: "$age"},
avgAge: { $avg: "$age" },
count: { $sum: 1 }
}
}
]
)
上面的语句会按照性别进行分组,并计算分组后两组人的总年龄、平均年龄和总人数,输出如下:
{
"_id" : "M",
"totalAge" : 78,
"avgAge" : 39,
"count" : 2
},
{
"_id" : "F",
"totalAge" : 66,
"avgAge" : 33,
"count" : 2
}
如果你想计算所有员工的年龄总和、平均年龄、以及员工总数,则可以将 $group
管道阶段的 _id 字段设置为 null ,如下:
db.employees.aggregate(
[
{ $group : {
_id : null,
totalAge: { $sum: "$age"},
avgAge: { $avg: "$age" },
count: { $sum: 1 }
}
}
]
)
# 输出如下
{
"_id" : null,
"totalAge" : 144,
"avgAge" : 36,
"count" : 4
}
1.4 $unwind
$unwind
将文档按照数组中的每一个元素进行拆分,类似于大多数流式计算中的 flatMap 算子。其语法格式如下:
{
$unwind:
{
path: <field path>,
includeArrayIndex: <string>,
preserveNullAndEmptyArrays: <boolean>
}
}
- path :用于展开的数组字段;
- includeArrayIndex :用于显示对应元素在原数组的位置信息;
- preserveNullAndEmptyArrays :如果用于展开的字段值为 null 或空数组时,则对应的文档不会被输出到下一阶段。如果想要输出到下一阶段则需要将该属性设置为 true。示例语句如下:
db.employees.aggregate( [
{$project: {_id: 0, emp_no: 1, hobby:1}},
{ $unwind:
{ path: "$hobby",
includeArrayIndex: "arrayIndex",
preserveNullAndEmptyArrays: true
}
}
] )
此时输出内容如下。如果 preserveNullAndEmptyArrays 的值为 false 或者没有设置,则 10003 这条数据不会被输出:
{"emp_no":10001,"hobby":"basketball","arrayIndex":0},
{"emp_no":10001,"hobby":"football","arrayIndex":1},
{"emp_no":10002,"hobby":"basketball","arrayIndex":0},
{"emp_no":10002,"hobby":"tennis","arrayIndex":1},
{"emp_no":10003,"arrayIndex":null},
{"emp_no":10004,"hobby":"football","arrayIndex":0},
{"emp_no":10004,"hobby":"tennis","arrayIndex":1}
1.5 $sort
$sort
主要用于排序操作,需要注意的是如果可以,应当尽量将该操作放置在管道的第一阶段,从而可以利用索引进行排序,否则就需要使用内存进行排序,这时排序操作就会变得相当昂贵,需要额外消耗内存和计算资源。示例如下:
db.employees.aggregate([
{$skip: 2} ,
{$sort: {age: 1}},
{$limit: 10}
])
1.6 $limit
限制返回文档的数量。
1.7 $skip
跳过一定数量的文档。
1.8 $lookup
1. 关联查询
$lookup
类似与大多数关系型数据库中的 left outer join ,用于实现不同集合之间的连接。其基本的语法如下:
{
$lookup:
{
from: <collection to join>,
localField: <field from the input documents>,
foreignField: <field from the documents of the "from" collection>,
as: <output array field>
}
}
- from :指定同一数据库中的集合以进行连接操作;
- localField :连接集合中用于进行连接的字段;
- foreignField :待连接集合中用于进行连接的字段;
- as :指定用于存放匹配文档的新数组字段的名称。如果指定的字段已存在,则进行覆盖。
为了 $lookup
管道,我们需要额外添加一个集合,用于储存员工的职位信息 (一个员工可以有多个职位头衔) ,语法如下:
db.titles.insertMany([
{
emp_no: 10001,
title: "Senior Engineer"
},
{
emp_no: 10002,
title: "Staff"
},
{
emp_no: 10003,
title: "Senior Engineer"
},
{
emp_no: 10004,
title: "Engineer"
},
{
emp_no: 10004,
title: "Senior Engineer"
}
])
执行连接查询,连接条件为员工工号:
db.employees.aggregate([
{
$lookup:
{
from: "titles",
localField: "emp_no",
foreignField: "emp_no",
as: "emp_title"
}
}
])
输出结果如下,为节省篇幅和突出重点,这里只显示部分内容,下文亦同。从输出中可以看到员工的职位信息都作为内嵌文档插入到指定的 emp_title 字段下,等价于执行了 left outer join 操作:
{
"_id" : ObjectId("5d3ffaaa8ba16934ccce99a1"),
"emp_no" : 10001,
........
"emp_title" : [
{
"_id" : ObjectId("5d4011728ba16934ccce99a5"),
"emp_no" : 10001,
"title" : "Senior Engineer"
}
]
},
.........
{
"_id" : ObjectId("5d3ffaaa8ba16934ccce99a4"),
"emp_no" : 10004,
........
"emp_title" : [
{
"_id" : ObjectId("5d4011728ba16934ccce99a8"),
"emp_no" : 10004,
"title" : "Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a9"),
"emp_no" : 10004,
"title" : "Senior Engineer"
}
]
}
2. 非相关查询
除了关联查询外,MongoDB 从 3.6 开始,还支持非相关查询。其基本语法如下:
{
$lookup:
{
from: <collection to join>,
let: { <var_1>: <expression>, …, <var_n>: <expression> },
pipeline: [ <pipeline to execute on the collection to join> ],
as: <output array field>
}
}
- from :指定同一数据库中的集合以进行连接操作。
- let :可选操作。用于指定在管道阶段中使用的变量,需要注意的是访问 let 变量必须使用 $expr 运算符。
- pipeline :必选值,用于指定要在已连接集合上运行的管道。需要注意的是该管道无法直接访问输入文档中的字段,需要先在 let 中进行定义,然后再引用。
- as :指定用于存放匹配文档的新数组字段的名称。如果指定的字段已存在,则进行覆盖。
这里我们分别基于三种场景来讲解不相关查询:
场景一 :假设每个员工都需要了解其他员工的职位信息,以便进行沟通,则对应的查询语法如下:
db.employees.aggregate([
{
$lookup:
{
from: "titles",
pipeline: [],
as: "emps_title"
}
}
])
此时输出如下,可以看到每个员工的 emps_title 字段都是相同的,都包含了所有员工的职位信息。因为查询中并没有指定任何字段进行关联,所以这种查询也被称为非相关查询。
{
"_id" : ObjectId("5d3ffaaa8ba16934ccce99a1"),
"emp_no" : 10001,
.......
"emps_title" : [
{
"_id" : ObjectId("5d4011728ba16934ccce99a5"),
"emp_no" : 10001,
"title" : "Senior Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a6"),
"emp_no" : 10002,
"title" : "Staff"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a7"),
"emp_no" : 10003,
"title" : "Senior Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a8"),
"emp_no" : 10004,
"title" : "Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a9"),
"emp_no" : 10004,
"title" : "Senior Engineer"
}
]
},
{
"_id" : ObjectId("5d3ffaaa8ba16934ccce99a2"),
"emp_no" : 10002,
.......
"emps_title" : [
{
"_id" : ObjectId("5d4011728ba16934ccce99a5"),
"emp_no" : 10001,
"title" : "Senior Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a6"),
"emp_no" : 10002,
"title" : "Staff"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a7"),
"emp_no" : 10003,
"title" : "Senior Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a8"),
"emp_no" : 10004,
"title" : "Engineer"
},
{
"_id" : ObjectId("5d4011728ba16934ccce99a9"),
"emp_no" : 10004,
"title" : "Senior Engineer"
}
]
},
..........
场景二 :假设每个员工都只需要知道办公室职员 (即职位为 Staff ) 的员工的信息,此时就需要利用 pipeline 管道对 titles 集合中的数据进行过滤,对应的查询语法为:
db.employees.aggregate([
{
$lookup:
{
from: "titles",
pipeline: [
{ $match:
{ $expr: { $eq: [ "$title","Staff"]}}
}
],
as: "emps_title"
}
}
])
场景三 :假设只有男员工需要知道其他职员的信息,此时就需要利用 pipeline 管道对 employees 集合中的数据进行过滤。由于 employees 集合是输入集合,管道无法直接访问其中的字段,此时需要先在 let 中进行定义,然后再引用。语句如下:
db.employees.aggregate([
{
$lookup:
{
from: "titles",
let: { gender: "$gender"},
pipeline: [
{ $match:
{ $expr: { $eq: [ "$$gender","M"]}}
}
],
as: "emps_title"
}
}
])
这里需要使用两个 $
符号对 gender 变量进行引用,因为使用 let 定义后,其类似于系统变量。
1.9 $out
$out
用于将数据写入指定的集合,它必须是管道中的最后一个阶段。如果指定的集合不存在,则会自动新建;如果指定的集合存在,它会覆盖原有集合的数据。其实际的步骤如下:
- 创建临时集合;
- 将索引从现有集合复制到临时集合;
- 将文档插入临时集合中;
- 调用
db.collection.renameCollection(target, true)
方法将临时集合重命名为目标集合。
$out
的使用示例如下:
db.employees.aggregate([
{ $out: "emps"}
])
1.10 自动优化
在大多数情况下 MongoDB 会按照我们定义管道的先后顺序执行管道操作,但是某些情况下,MongoDB 会在不影响结果的前提下,改变管道执行顺序,从而获得更好的性能表现。常见的优化策略如下:
$project or $addFields + $match
当投影操作后面有匹配操作时,MongoDB 会将 $match
阶段中不需要进行投影操作的字段的过滤条件提前到投影操作前执行。
$sort + $match
当排序操作后面有匹配操作时,会将匹配操作提前,以减少需要排序的数据量。
$project + $skip
当投影操作后面有跳过操作时,会先执行跳过操作,从而减少需要进行投影操作的数据量。
$sort + $limit
当排序操作在限制操作之前时,如果没有中间阶段会修改文档数量 (例如 $unwind
,$group
),优化器会将 $limit
合并到$sort
中。如果在$sort
和$limit
之间存在修改文档数量的管道阶段,MongoDB 将不会执行合并。
了解这些优化策略可以有助于我们在开发中合理设置管道的顺序。想要了解全部的优化策略,可以参阅 MongoDB 的官方文档:Aggregation Pipeline Optimization 。
三、MapReduce
MongoDB 的 MapReduce 在概念上与 Hadoop MapReduce 类似,MongoDB 将 map 阶段应用于每个符合条件的输入文档,map 阶段的输出结果是 key-value 键值对。对于具有多个值的 key,MongoDB 会执行 reduce 阶段,该阶段负责收集并聚合数据,然后将结果存储在集合中。
这里我们以按照性别分组计算员工的平均年龄为例,演示 MapReduce 的基本使用,语句如下:
db.employees.mapReduce(
function(){
emit(this.gender,this.age)
},
function(key,values){
return Array.avg(values)
},
{ out:"age_avg"}
)
对应的计算结果会被输出到 age_avg 集合中,其内容如下:
{
"_id" : "F",
"value" : 33
},
{
"_id" : "M",
"value" : 39
}
当前 MongoDB 支持在分片集合执行 map-reduce 操作,也支持将 Map-reduce 操作的结果输出到分片集合。map-reduce 使得开发者可以灵活控制聚合行为,但其性能不如聚合管道,所以两者应适场景而选用。
四、单用途聚合方法
出于易用性考虑,MongoDB 提供了部分 API,用于实现对常见聚合过程的简单访问。如下:
4.1 count
用于计算符合条件的文档的数量,如果想要计算集合中所有文档的数量,则传入空对象即可:
db.employees.count({gender:"M"})
4.2 estimatedDocumentCount
官方更加推荐使用 estimatedDocumentCount 计算集合中所有文档的数量,它会直接获取元数据信息并返回,因此它不支持传入查询条件,示例如下:
db.employees.estimatedDocumentCount({})
4.3 distinct
和大多数数据库中的 distinct 关键字类似,用于返回不重复的数据:
db.titles.distinct("title")
参考资料
下一节:复制功能、故障发现、优先选举、投票成员、副本集搭建