Map-Reduce转换到聚合管道

从4.4版本开始,MongoDB添加了$accumulator$function aggregation运算符。这些运算符为用户提供了定义自定义聚合表达式的能力。使用这些操作,可以大致重写map-reduce表达式,如下表所示。

注意

可以使用聚合管道操作符(如$group、$merge等)重写各种map-reduce表达式,而不需要自定义函数。

例如,请参见map-reduce示例。

Map-Reduce到聚合管道转换表

这张表只是粗略的翻译。例如,该表显示了使用$projectmapFunction的近似转换。

  • 然而,mapFunction逻辑可能需要额外的阶段,例如,如果逻辑包括对数组的迭代:

    function() {
       this.items.forEach(function(item){ emit(item.sku, 1); });
    }

    然后,聚合管道包括一个$unwind和一个$project:

    { $unwind: "$items "},
    { $project: { emits: { key: { "$items.sku" }, value: 1 } } },
  • $project中的emit字段可以被命名为其他名称。为了进行可视化比较,选择了字段名称emit。

Map-Reduce
Aggregation Pipeline

db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } )

db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: <collection> } ] )

db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } )

db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $out: { db: <db>, coll: <collection> } } ] )

db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } )

db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection>}, on: “_id” whenMatched: “replace”, whenNotMatched: “insert” } }, ] )

db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } )

db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } }, { $merge: { into: { db: <db>, coll: <collection> }, on: “_id” whenMatched: [ { $project: { value: { $function: { body: <reduceFunction>, args: [ “$_id”, [ “$value”, “$$new.value” ] ], lang: “js” } } } } ] whenNotMatched: “insert” } }, ] )

db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } )

db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: “$emits” }, { $group: { _id: “$emits.k”}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ “$emit.v”], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: “js” }} } } ] )

例子

可以使用聚合管道操作符(如$group$merge等)重写各种map-reduce表达式,而不需要自定义函数。但是,为了说明目的,下面的例子提供了两种选择。

示例1

通过cust_id对订单集合组执行以下map-reduce操作,并计算每个cust_id的价格总和:

**备选方案1:(推荐)**您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:

**备选方案2:(仅为说明目的)**下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator定义自定义函数:

  1. 首先,$project阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:

    • key包含cust_id文档的值

    • value包含price文档的值

  2. 然后,$group使用$accumulator操作符来添加发出的值:

  3. 最后,$out将输出写入集合agg_alternative_2。或者,您可以使用$merge而不是$out

示例2

以下字段对orders集合组的map-reduce操作,item.sku并计算每个sku的订单数量和总订购量。然后,该操作将为每个sku值计算每个订单的平均数量,并将结果合并到输出集合中。

**备选方案1:(推荐)**您可以重写操作到聚合管道,而不将map-reduce函数转换为等效的管道阶段:

**备选方案2:(仅为说明目的)**下面的聚合管道提供了各种map-reduce函数的转换,使用$accumulator定义自定义函数:

  1. $match阶段只选择那些ord_date大于或等于new Date("2020-03-01")的文档。

  2. $unwinds阶段按items数组字段分解文档,为每个数组元素输出一个文档。例如:

  3. $project阶段输出带有emit字段的文档。emit字段是一个包含以下字段的文档:

    • key包含items.sku

    • value包含具有qty值和count值的文档

  4. $group使用$accumulator操作符来添加发出的计数和数量,并计算avg字段:

  5. 最后,$merge将输出写入集合agg_alternative_4。如果现有文档具有与新结果相同的键_id,则操作将覆盖现有文档。如果没有具有相同密钥的现有文档,操作将插入该文档。

也可以看看 聚合命令比较

译者:李冠飞

校对:

最后更新于