执行增量 Map-Reduce
在本页面
Map-reduce 操作可以处理复杂的聚合任务。要执行 map-reduce 操作,MongoDB 提供MapReduce命令,并在mongo shell 中提供db.collection.mapReduce() wrapper 方法。
如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce 而不是每个 time 对整个数据集执行 map-reduce 操作。
执行增量 map-reduce:
在当前集合上运行 map-reduce job 并将结果输出到单独的集合。
如果有更多数据要进行 process,run 后续 map-reduce job:
query参数指定仅匹配新文档的条件。out参数,指定将新结果合并到现有输出集合中的reduce操作。
请考虑以下 example,其中您在sessions集合上安排 map-reduce 操作,以在每天结束时运行 run。
数据设置
sessions集合包含 log 用户每天会话的文档,例如:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );当前集合的初始 Map-Reduce
运行第一个 map-reduce 操作如下:
定义 map function _将
userid映射到包含字段userid,total_time,count和avg_time的 object:使用两个 arguments
key和values定义相应的 reduce function 以计算总 time 和计数。key对应于userid,values是 array,其元素对应于映射到mapFunction中userid的各个 object。使用两个 arguments
key和reducedValue定义 finalize function。 function 修改reducedValue文档以添加另一个字段average并返回修改后的文档。使用
mapFunction,reduceFunction和finalizeFunction函数在session集合上执行 map-reduce。将结果输出到集合session_stat。如果session_stat集合已存在,则操作将替换内容:查询
session_stats集合以验证结果:该操作返回以下文档:
后续增量 Map-Reduce
之后,随着sessions集合的增长,您可以运行其他 map-reduce 操作。对于 example,将新文档添加到sessions集合:
最终,对usersessions集合执行增量map-reduce ,但使用该query字段仅选择新文档。将结果输出到collection session_stats,但是reduce将内容与增量map-reduce的结果进行比较:
查询session_stats集合以验证结果:
该操作返回以下文档:
聚合替代
前提条件:将集合设置为原始状态:
使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:
该
$project阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id和value。如果不需要镜像_idandvalue结构,则该阶段是可选的 。该
$merge阶段将结果输出到session_stats_agg集合。如果现有文档_id与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id中session_stats_agg,操作插入文档。查询
session_stats_agg集合以验证结果:该操作返回以下文档:
新文档添加到
usersessions集合中:$match在管道的开头添加一个阶段以指定日期过滤器:查询
session_stats_agg集合以验证结果:该操作返回以下文档:
可选的。为了避免
$match每次运行时都必须修改聚合管道的日期条件,可以在帮助函数中定义包装聚合:然后,要运行,您只需将开始日期传递给该
updateSessionStats()函数:
也可以看看
译者:李冠飞
校对:
最后更新于