MongoDB-CN-Manual
  • MongoDB中文手册|官方文档中文版
  • MongoDB用户手册说明
  • MongoDB简介
    • 入门
    • 数据库和集合
      • 视图
      • 按需物化视图
      • 封顶集合
      • 时间序列集合
    • 文档
    • BSON类型
      • Comparison and Sort Order
      • MongoDB Extended JSON (v2)
      • MongoDB Extended JSON (v1)
  • 安装 MongoDB
    • 安装MongoDB社区版
      • 在Linux上安装MongoDB社区版
      • 在macOS上安装MongoDB社区版
      • 在Windows上安装MongoDB社区版
    • 安装MongoDB企业版
      • 在Linux上安装MongoDB企业版
      • 在Mac OS安装MongoDB企业版
      • 在Windows安装MongoDB企业版
      • 使用Docker安装MongoDB企业版
    • 将社区版MongoDB升级到企业版MongoDB
    • 验证MongoDB软件包的完整性
  • The mongo Shell
    • 配置mongo Shell
    • 使用 mongo Shell帮助
    • 为mongo Shell编写脚本
    • mongo Shell中的数据类型
    • mongo Shell 快速参考
  • MongoDB CRUD操作
    • 插入文档
      • 插入方法
    • 查询文档
      • 在mongo Shell中迭代游标
      • 从查询返回的项目字段
      • 查询嵌入式文档数组
      • 查询数组
      • 查询空字段或缺少字段
      • 查询嵌入/嵌套文档
    • 更新文档
      • 更新方法
      • 聚合管道更新
    • 删除文档
      • 删除方法
    • 地理空间查询
      • 用地理空间查询查找餐馆
      • GeoJSON对象
    • 批量写入操作
    • 可重试写入
    • 可重试读取
    • SQL到MongoDB的映射图表
    • 文本搜索
      • 文本索引
      • 文本索引操作
      • 集合管道中的文本索引
      • 文本索引语言
    • Read Concern读关注
      • 读关注 "local"
      • 读关注 "available"
      • 读关注 "majority"
      • 读关注 "linearizable"
      • 读关注 "snapshot"
    • Write Concern写关注
    • MongoDB CRUD概念
      • 原子性和事务
      • 读隔离性,一致性和近因性
        • 因果一致性和读写关注
      • 分布式查询
      • 通过findAndModify进行线性化读取
      • 查询计划
      • 查询优化
        • 评估当前操作性能
        • 优化查询性能
        • 写操作性能
        • 说明结果
      • 分析查询表现
      • Tailable 游标
  • MongoDB聚合
    • 聚合管道
      • 聚合管道优化
      • 聚合管道限制
      • 聚合管道和分片集合
      • 使用 Zip Code 数据集进行聚合
      • 使用用户首选项数据进行聚合
    • Map-Reduce
      • Map-Reduce 和分片集合
      • Map-Reduce 并发
      • Map-Reduce 示例
      • 执行增量 Map-Reduce
      • 对 Map Function 进行故障排除
      • 排除 Reduce Function 问题
      • Map-Reduce转换到聚合管道
    • 聚合参考
      • 聚合管道快速参考
      • 聚合命令
      • 聚合命令对比
      • 聚合表达式中的变量
      • SQL 到聚合映射图表
  • MongoDB数据模型
    • 数据建模介绍
    • 模式验证
    • 数据模型设计
      • 一对一嵌套关系模型
  • MongoDB事务
  • MongoDB事务
    • 驱动程序 API
    • 生产注意事项
    • 生产注意事项 (分片集群)
    • 事务操作
  • MongoDB索引
    • 单字段索引
    • 复合索引
    • 多键索引
      • 多键索引范围
    • 文本索引
      • 为文本索引指定语言
      • 指定文本索引的名称
      • 用权重控制搜索结果
      • 限制扫描条目的数量
    • 通配符索引
      • 通配符索引限制
    • 2dsphere 索引
      • 查询一个2dsphere索引
    • 2d 索引
      • 创建一个2d索引
      • 查询一个2d索引
      • 2d索引内部
      • 使用球面几何计算距离
    • geoHaystack 索引
      • 创建Haystack索引
      • 查询Haystack索引
    • 哈希索引
    • 索引特性
      • TTL 索引
        • 通过设置TTL使集合中的数据过期
      • 唯一索引
      • 部分索引
      • 不分大小写索引
      • Sparse 索引
    • 在填充的集合上建立索引
      • 在副本集上建立滚动索引
      • 在分片群集上建立滚动索引
    • 索引交集
    • 管理索引
    • 衡量索引使用
    • 索引策略
      • 创建索引来支持查询
      • 使用索引对查询结果进行排序
      • 确保索引适合RAM
      • 创建以确保选择性的查询
    • 索引参考
  • MongoDB安全
    • 安全检查列表
    • 启用访问控制
    • 身份验证
      • 用户
        • 添加用户
        • 权限认证机制
          • SCRAM
            • 用x.509证书来认证客户端
    • 审计
      • 配置审计过滤器
      • 配置审计
      • 系统事件审计消息
    • 网络和配置强化
    • 安全参考
      • system.roles集合
      • system.users集合
      • 资源文档
      • 权限操作
    • 附录
      • 附录-A-用于测试的 OpenSSl CA 证书
      • 附录-B-用于测试的OpenSSL服务器证书
      • 附录-C-用于测试的OpenSSL客户端证书
  • Change Streams变更流
    • 变更流生产建议
    • 变更事件
  • MongoDB复制
    • 副本集成员
    • 副本集日志
    • 副本集数据同步
    • 副本集部署架构
    • 副本集成员配置教程
    • 副本集维护教程
    • MongoDB复制参考
  • MongoDB分片
    • 分片集群组件
    • 分片键
    • 哈希分片
    • 范围分片
    • 区
      • 管理分片区
      • 按位置细分数据
      • 用于更改SLA或SLO的分层硬件
      • 按应用或客户细分数据
      • 仅插入工作负载的分布式本地写入
      • 管理分片区
    • 使用块进行数据分区
      • 在分片集群中拆分数据块
    • 分片管理
      • 查看集群设置
    • 重启一个分片集群
    • [把一个分片集群迁移到不同的硬件](fen-pian/migrate-a -sharded-cluster-to-different-hardware.md)
    • 分片参考
  • MongoDB管理
    • 产品说明
    • 操作检查列表
    • 开发检查列表
    • 配置和维护
    • 性能
    • 数据中心意识
      • MongoDB部署中的工作负载隔离
      • 区
        • 管理分片区
        • 按位置细分数据
        • 用于更改SLA或SLO的分层硬件
        • 按应用或客户细分数据
        • 仅插入工作负载的分布式本地写入
        • 管理分片区
    • MongoDB备份方法
    • MongoDB监控
  • MongoDB存储
    • 存储引擎
      • WiredTiger 存储引擎
      • 内存存储引擎
    • 日志记录
      • 管理日志记录
        • GridFS
        • FAQ:MongoDB 存储
  • MongoDB参考
    • 运算符
      • 查询与映射运算符
        • 比较查询运算符
          • $eq
          • $gt
          • $gte
          • $in
          • $lt
          • $lte
          • $ne
          • $nin
        • 逻辑查询运算符
          • $and
          • $not
          • $nor
          • $or
        • 元素查询运算符
        • 评估查询运算符
        • 地理空间查询运算符
        • 数组查询运算符
        • 按位查询运算符
        • $comment
        • 映射运算符
      • 更新运算符
        • 字段更新运算符
        • 数组更新运算符
        • 按位更新运算符
      • 聚合管道阶段
      • 聚合管道操作符
        • $abs (aggregation)
        • $acos (aggregation)
        • $acosh (aggregation)
        • $add (aggregation)
        • $addToSet (aggregation)
        • $allElementsTrue (aggregation)
        • $and (aggregation)
        • $anyElementTrue (aggregation)
        • $arrayElemAt (aggregation)
        • $arrayToObject (aggregation)
        • $asin (aggregation)
        • $asinh (aggregation)
        • $atan (aggregation)
        • $atan2 (aggregation)
        • $atanh (aggregation)
        • $avg (aggregation)
        • $ceil (aggregation)
        • $cmp (aggregation)
        • $concat (aggregation)
        • $concatArrays (aggregation)
        • $cond (aggregation)
        • $convert (aggregation)
        • $cos (aggregation)
        • $dateFromParts (aggregation)
        • $dateToParts (aggregation)
        • $dateFromString (aggregation)
        • $literal (aggregation)
      • 查询修饰符
    • 数据库命令
      • 聚合命令
      • 地理空间命令
      • 查询和写操作命令
      • 查询计划缓存命令
      • 认证命令
      • 用户管理命令
      • 角色管理命令
      • 复制命令
      • 分片命令
      • 会话命令
      • 管理命令
      • 诊断命令
      • 免费监控命令
      • 系统事件审计命令
    • mongo Shell 方法
      • 集合方法
        • db.collection.aggregate()
        • db.collection.bulkWrite()
        • db.collection.copyTo()
        • db.collection.count()
        • db.collection.countDocuments()
        • db.collection.estimatedDocumentCount()
        • db.collection.createIndex()
        • db.collection.createIndexes()
        • db.collection.dataSize()
        • db.collection.deleteOne()
        • db.collection.deleteMany()
        • db.collection.distinct()
        • db.collection.drop()
        • db.collection.dropIndex()
        • db.collection.dropIndexes()
        • db.collection.ensureIndex()
        • db.collection.explain()
        • db.collection.find()
        • db.collection.findAndModify()
        • db.collection.findOne()
        • db.collection.findOneAndDelete()
        • db.collection.findOneAndReplace()
        • db.collection.findOneAndUpdate()
        • db.collection.getIndexes()
        • db.collection.getShardDistribution()
        • db.collection.getShardVersion()
        • db.collection.insert()
        • db.collection.insertOne()
        • db.collection.insertMany()
        • db.collection.isCapped()
        • db.collection.latencyStats()
        • db.collection.mapReduce()
        • db.collection.reIndex()
        • db.collection.remove()
        • db.collection.renameCollection()
        • db.collection.replaceOne()
        • db.collection.save()
        • db.collection.stats()
        • db.collection.storageSize()
        • db.collection.totalIndexSize()
        • db.collection.totalSize()
        • db.collection.update()
        • db.collection.updateOne()
        • db.collection.updateMany()
        • db.collection.watch()
        • db.collection.validate()
    • MongoDB中的限制与阈值
    • MongoDB系统集合
    • 词汇表
    • 默认的MongoDB端口
    • 默认的MongoDB读/写关注
    • 服务器会话
  • MongoDB FAQ
    • FAQ: MongoDB基础知识
    • FAQ: MongoDB索引
    • FAQ: MongoDB并发
    • FAQ: MongoDB分片
    • FAQ: MongoDB复制和副本集
    • FAQ: MongoDB存储
    • FAQ: MongoDB诊断
  • MongoDB 版本管理
  • 联系我们
    • Tapdata Cloud
    • MongoDB中文社区
    • 社区合作伙伴—锦木信息
由 GitBook 提供支持
在本页
  • 数据设置
  • 当前集合的初始 Map-Reduce
  • 后续增量 Map-Reduce
  • 聚合替代
  1. MongoDB聚合
  2. Map-Reduce

执行增量 Map-Reduce

上一页Map-Reduce 示例下一页对 Map Function 进行故障排除

最后更新于3年前

在本页面

Map-reduce 操作可以处理复杂的聚合任务。要执行 map-reduce 操作,MongoDB 提供命令,并在 shell 中提供 wrapper 方法。

如果 map-reduce 数据集不断增长,您可能希望执行增量 map-reduce 而不是每个 time 对整个数据集执行 map-reduce 操作。

执行增量 map-reduce:

  • 在当前集合上运行 map-reduce job 并将结果输出到单独的集合。

  • 如果有更多数据要进行 process,run 后续 map-reduce job:

    • query参数指定仅匹配新文档的条件。

    • out参数,指定将新结果合并到现有输出集合中的reduce操作。

请考虑以下 example,其中您在sessions集合上安排 map-reduce 操作,以在每天结束时运行 run。

数据设置

sessions集合包含 log 用户每天会话的文档,例如:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

当前集合的初始 Map-Reduce

运行第一个 map-reduce 操作如下:

  • 定义 map function _将userid映射到包含字段userid,total_time,count和avg_time的 object:

    var mapFunction = function() {
        var key = this.userid;
        var value = {
            userid: this.userid,
            total_time: this.length,
            count: 1,
            avg_time: 0
        };
        emit( key, value );
    };
  • 使用两个 arguments key和values定义相应的 reduce function 以计算总 time 和计数。 key对应于userid,values是 array,其元素对应于映射到mapFunction中userid的各个 object。

    var reduceFunction = function(key, values) {
        var reducedObject = {
            userid: key,
            total_time: 0,
            count:0,
            avg_time:0
        };
    
        values.forEach( function(value) {
            reducedObject.total_time += value.total_time;
            reducedObject.count += value.count;
        });
        return reducedObject;
    };
  • 使用两个 arguments key和reducedValue定义 finalize function。 function 修改reducedValue文档以添加另一个字段average并返回修改后的文档。

    var finalizeFunction = function (key, reducedValue) {
        if (reducedValue.count > 0)
            reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    
        return reducedValue;
    };
  • 使用mapFunction,reduceFunction和finalizeFunction函数在session集合上执行 map-reduce。将结果输出到集合session_stat。如果session_stat集合已存在,则操作将替换内容:

    db.sessions.mapReduce( mapFunction,
        reduceFunction,
        {
            out: "session_stat",
            finalize: finalizeFunction
        }
    )
  • 查询session_stats集合以验证结果:

    db.session_stats.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

后续增量 Map-Reduce

之后,随着sessions集合的增长,您可以运行其他 map-reduce 操作。对于 example,将新文档添加到sessions集合:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

最终,对usersessions集合执行增量map-reduce ,但使用该query字段仅选择新文档。将结果输出到collection session_stats,但是reduce将内容与增量map-reduce的结果进行比较:

db.usersessions.mapReduce(
   mapFunction,
   reduceFunction,
   {
     query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
     out: { reduce: "session_stats" },
     finalize: finalizeFunction
   }
);

查询session_stats集合以验证结果:

db.session_stats.find().sort( { _id: 1 } )

该操作返回以下文档:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

聚合替代

前提条件:将集合设置为原始状态:

db.usersessions.drop();

db.usersessions.insertMany([
   { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
   { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
   { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
   { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
   { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
   { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
   { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
   { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:

db.usersessions.aggregate([
   { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
   { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
   { $merge: {
      into: "session_stats_agg",
      whenMatched: [ { $set: {
         "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
         "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
         "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
      } } ],
      whenNotMatched: "insert"
   }}
])
    • total_time使用$sum操作

    • count使用$sum操作

    该操作返回以下文档:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  1. { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  2. 查询session_stats_agg集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  3. 新文档添加到usersessions集合中:

    db.usersessions.insertMany([
       { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
       { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
       { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
       { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  4. db.usersessions.aggregate([
       { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
       { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
       { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
       { $merge: {
          into: "session_stats_agg",
          whenMatched: [ { $set: {
             "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
             "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
             "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
          } } ],
          whenNotMatched: "insert"
       }}
    ])
  5. 查询session_stats_agg集合以验证结果:

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  6. updateSessionStats = function(startDate) {
       db.usersessions.aggregate([
          { $match: { ts: { $gte: startDate } } },
          { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
          { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
          { $merge: {
             into: "session_stats_agg",
             whenMatched: [ { $set: {
                "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
                "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
                "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
             } } ],
             whenNotMatched: "insert"
          }}
       ]);
    };

    然后,要运行,您只需将开始日期传递给该updateSessionStats()函数:

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

也可以看看

译者:李冠飞

校对:

通过userid,得出:

avg_time使用操作

该阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id和 value。如果不需要镜像_idand value结构,则该阶段是可选的 。

该阶段将结果输出到 session_stats_agg集合。如果现有文档_id与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id中session_stats_agg,操作插入文档。

在管道的开头添加一个阶段以指定日期过滤器:

可选的。为了避免每次运行时都必须修改聚合管道的日期条件,可以在帮助函数中定义包装聚合:

$group
$avg
$project
$merge
$match
$match
$ merge示例
按需实例化视图
MapReduce
mongo
db.collection.mapReduce()
数据设置
当前集合的初始 Map-Reduce
后续增量 Map-Reduce