执行增量 Map-Reduce

在本页面

Map-reduce 操作可以处理复杂的聚合任务。要执行 map-reduce 操作,MongoDB 提供MapReduce命令,并在mongo shell 中提供db.collection.mapReduce() wrapper 方法。

如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce 而不是每个 time 对整个数据集执行 map-reduce 操作。

执行增量 map-reduce:

  • 在当前集合上运行 map-reduce job 并将结果输出到单独的集合。

  • 如果有更多数据要进行 process,run 后续 map-reduce job:

    • query参数指定仅匹配新文档的条件。

    • out参数,指定将新结果合并到现有输出集合中的reduce操作。

请考虑以下 example,其中您在sessions集合上安排 map-reduce 操作,以在每天结束时运行 run。

数据设置

sessions集合包含 log 用户每天会话的文档,例如:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

当前集合的初始 Map-Reduce

运行第一个 map-reduce 操作如下:

  • 定义 map function _将userid映射到包含字段useridtotal_timecountavg_time的 object:

    var mapFunction = function() {
        var key = this.userid;
        var value = {
            userid: this.userid,
            total_time: this.length,
            count: 1,
            avg_time: 0
        };
        emit( key, value );
    };
  • 使用两个 arguments keyvalues定义相应的 reduce function 以计算总 time 和计数。 key对应于useridvalues是 array,其元素对应于映射到mapFunctionuserid的各个 object。

    var reduceFunction = function(key, values) {
        var reducedObject = {
            userid: key,
            total_time: 0,
            count:0,
            avg_time:0
        };
    
        values.forEach( function(value) {
            reducedObject.total_time += value.total_time;
            reducedObject.count += value.count;
        });
        return reducedObject;
    };
  • 使用两个 arguments keyreducedValue定义 finalize function。 function 修改reducedValue文档以添加另一个字段average并返回修改后的文档。

    var finalizeFunction = function (key, reducedValue) {
        if (reducedValue.count > 0)
            reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    
        return reducedValue;
    };
  • 使用mapFunctionreduceFunctionfinalizeFunction函数在session集合上执行 map-reduce。将结果输出到集合session_stat。如果session_stat集合已存在,则操作将替换内容:

    db.sessions.mapReduce( mapFunction,
        reduceFunction,
        {
            out: "session_stat",
            finalize: finalizeFunction
        }
    )
  • 查询session_stats集合以验证结果:

    db.session_stats.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

后续增量 Map-Reduce

之后,随着sessions集合的增长,您可以运行其他 map-reduce 操作。对于 example,将新文档添加到sessions集合:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

最终,对usersessions集合执行增量map-reduce ,但使用该query字段仅选择新文档。将结果输出到collection session_stats,但是reduce将内容与增量map-reduce的结果进行比较:

db.usersessions.mapReduce(
   mapFunction,
   reduceFunction,
   {
     query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
     out: { reduce: "session_stats" },
     finalize: finalizeFunction
   }
);

查询session_stats集合以验证结果:

db.session_stats.find().sort( { _id: 1 } )

该操作返回以下文档:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

聚合替代

前提条件:将集合设置为原始状态:

db.usersessions.drop();

db.usersessions.insertMany([
   { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
   { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
   { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
   { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
   { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
   { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
   { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
   { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:

db.usersessions.aggregate([
   { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
   { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
   { $merge: {
      into: "session_stats_agg",
      whenMatched: [ { $set: {
         "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
         "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
         "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
      } } ],
      whenNotMatched: "insert"
   }}
])
  1. 通过userid$group,得出:

    • total_time使用$sum操作

    • count使用$sum操作

    • avg_time使用$avg操作

    该操作返回以下文档:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. $project阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_idvalue。如果不需要镜像_idand value结构,则该阶段是可选的 。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. $merge阶段将结果输出到 session_stats_agg集合。如果现有文档_id与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_idsession_stats_agg,操作插入文档。

  4. 查询session_stats_agg集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. 新文档添加到usersessions集合中:

    db.usersessions.insertMany([
       { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
       { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
       { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
       { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. $match在管道的开头添加一个阶段以指定日期过滤器:

    db.usersessions.aggregate([
       { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
       { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
       { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
       { $merge: {
          into: "session_stats_agg",
          whenMatched: [ { $set: {
             "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
             "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
             "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
          } } ],
          whenNotMatched: "insert"
       }}
    ])
  7. 查询session_stats_agg集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. 可选的。为了避免$match每次运行时都必须修改聚合管道的日期条件,可以在帮助函数中定义包装聚合:

    updateSessionStats = function(startDate) {
       db.usersessions.aggregate([
          { $match: { ts: { $gte: startDate } } },
          { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
          { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
          { $merge: {
             into: "session_stats_agg",
             whenMatched: [ { $set: {
                "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
                "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
                "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
             } } ],
             whenNotMatched: "insert"
          }}
       ]);
    };

    然后,要运行,您只需将开始日期传递给该updateSessionStats()函数:

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

也可以看看

译者:李冠飞

校对:

最后更新于