MongoDB-CN-Manual
  • MongoDB中文手册|官方文档中文版
  • MongoDB用户手册说明
  • MongoDB简介
    • 入门
    • 数据库和集合
      • 视图
      • 按需物化视图
      • 封顶集合
      • 时间序列集合
    • 文档
    • BSON类型
      • Comparison and Sort Order
      • MongoDB Extended JSON (v2)
      • MongoDB Extended JSON (v1)
  • 安装 MongoDB
    • 安装MongoDB社区版
      • 在Linux上安装MongoDB社区版
      • 在macOS上安装MongoDB社区版
      • 在Windows上安装MongoDB社区版
    • 安装MongoDB企业版
      • 在Linux上安装MongoDB企业版
      • 在Mac OS安装MongoDB企业版
      • 在Windows安装MongoDB企业版
      • 使用Docker安装MongoDB企业版
    • 将社区版MongoDB升级到企业版MongoDB
    • 验证MongoDB软件包的完整性
  • The mongo Shell
    • 配置mongo Shell
    • 使用 mongo Shell帮助
    • 为mongo Shell编写脚本
    • mongo Shell中的数据类型
    • mongo Shell 快速参考
  • MongoDB CRUD操作
    • 插入文档
      • 插入方法
    • 查询文档
      • 在mongo Shell中迭代游标
      • 从查询返回的项目字段
      • 查询嵌入式文档数组
      • 查询数组
      • 查询空字段或缺少字段
      • 查询嵌入/嵌套文档
    • 更新文档
      • 更新方法
      • 聚合管道更新
    • 删除文档
      • 删除方法
    • 地理空间查询
      • 用地理空间查询查找餐馆
      • GeoJSON对象
    • 批量写入操作
    • 可重试写入
    • 可重试读取
    • SQL到MongoDB的映射图表
    • 文本搜索
      • 文本索引
      • 文本索引操作
      • 集合管道中的文本索引
      • 文本索引语言
    • Read Concern读关注
      • 读关注 "local"
      • 读关注 "available"
      • 读关注 "majority"
      • 读关注 "linearizable"
      • 读关注 "snapshot"
    • Write Concern写关注
    • MongoDB CRUD概念
      • 原子性和事务
      • 读隔离性,一致性和近因性
        • 因果一致性和读写关注
      • 分布式查询
      • 通过findAndModify进行线性化读取
      • 查询计划
      • 查询优化
        • 评估当前操作性能
        • 优化查询性能
        • 写操作性能
        • 说明结果
      • 分析查询表现
      • Tailable 游标
  • MongoDB聚合
    • 聚合管道
      • 聚合管道优化
      • 聚合管道限制
      • 聚合管道和分片集合
      • 使用 Zip Code 数据集进行聚合
      • 使用用户首选项数据进行聚合
    • Map-Reduce
      • Map-Reduce 和分片集合
      • Map-Reduce 并发
      • Map-Reduce 示例
      • 执行增量 Map-Reduce
      • 对 Map Function 进行故障排除
      • 排除 Reduce Function 问题
      • Map-Reduce转换到聚合管道
    • 聚合参考
      • 聚合管道快速参考
      • 聚合命令
      • 聚合命令对比
      • 聚合表达式中的变量
      • SQL 到聚合映射图表
  • MongoDB数据模型
    • 数据建模介绍
    • 模式验证
    • 数据模型设计
      • 一对一嵌套关系模型
  • MongoDB事务
  • MongoDB事务
    • 驱动程序 API
    • 生产注意事项
    • 生产注意事项 (分片集群)
    • 事务操作
  • MongoDB索引
    • 单字段索引
    • 复合索引
    • 多键索引
      • 多键索引范围
    • 文本索引
      • 为文本索引指定语言
      • 指定文本索引的名称
      • 用权重控制搜索结果
      • 限制扫描条目的数量
    • 通配符索引
      • 通配符索引限制
    • 2dsphere 索引
      • 查询一个2dsphere索引
    • 2d 索引
      • 创建一个2d索引
      • 查询一个2d索引
      • 2d索引内部
      • 使用球面几何计算距离
    • geoHaystack 索引
      • 创建Haystack索引
      • 查询Haystack索引
    • 哈希索引
    • 索引特性
      • TTL 索引
        • 通过设置TTL使集合中的数据过期
      • 唯一索引
      • 部分索引
      • 不分大小写索引
      • Sparse 索引
    • 在填充的集合上建立索引
      • 在副本集上建立滚动索引
      • 在分片群集上建立滚动索引
    • 索引交集
    • 管理索引
    • 衡量索引使用
    • 索引策略
      • 创建索引来支持查询
      • 使用索引对查询结果进行排序
      • 确保索引适合RAM
      • 创建以确保选择性的查询
    • 索引参考
  • MongoDB安全
    • 安全检查列表
    • 启用访问控制
    • 身份验证
      • 用户
        • 添加用户
        • 权限认证机制
          • SCRAM
            • 用x.509证书来认证客户端
    • 审计
      • 配置审计过滤器
      • 配置审计
      • 系统事件审计消息
    • 网络和配置强化
    • 安全参考
      • system.roles集合
      • system.users集合
      • 资源文档
      • 权限操作
    • 附录
      • 附录-A-用于测试的 OpenSSl CA 证书
      • 附录-B-用于测试的OpenSSL服务器证书
      • 附录-C-用于测试的OpenSSL客户端证书
  • Change Streams变更流
    • 变更流生产建议
    • 变更事件
  • MongoDB复制
    • 副本集成员
    • 副本集日志
    • 副本集数据同步
    • 副本集部署架构
    • 副本集成员配置教程
    • 副本集维护教程
    • MongoDB复制参考
  • MongoDB分片
    • 分片集群组件
    • 分片键
    • 哈希分片
    • 范围分片
    • 区
      • 管理分片区
      • 按位置细分数据
      • 用于更改SLA或SLO的分层硬件
      • 按应用或客户细分数据
      • 仅插入工作负载的分布式本地写入
      • 管理分片区
    • 使用块进行数据分区
      • 在分片集群中拆分数据块
    • 分片管理
      • 查看集群设置
    • 重启一个分片集群
    • [把一个分片集群迁移到不同的硬件](fen-pian/migrate-a -sharded-cluster-to-different-hardware.md)
    • 分片参考
  • MongoDB管理
    • 产品说明
    • 操作检查列表
    • 开发检查列表
    • 配置和维护
    • 性能
    • 数据中心意识
      • MongoDB部署中的工作负载隔离
      • 区
        • 管理分片区
        • 按位置细分数据
        • 用于更改SLA或SLO的分层硬件
        • 按应用或客户细分数据
        • 仅插入工作负载的分布式本地写入
        • 管理分片区
    • MongoDB备份方法
    • MongoDB监控
  • MongoDB存储
    • 存储引擎
      • WiredTiger 存储引擎
      • 内存存储引擎
    • 日志记录
      • 管理日志记录
        • GridFS
        • FAQ:MongoDB 存储
  • MongoDB参考
    • 运算符
      • 查询与映射运算符
        • 比较查询运算符
          • $eq
          • $gt
          • $gte
          • $in
          • $lt
          • $lte
          • $ne
          • $nin
        • 逻辑查询运算符
          • $and
          • $not
          • $nor
          • $or
        • 元素查询运算符
        • 评估查询运算符
        • 地理空间查询运算符
        • 数组查询运算符
        • 按位查询运算符
        • $comment
        • 映射运算符
      • 更新运算符
        • 字段更新运算符
        • 数组更新运算符
        • 按位更新运算符
      • 聚合管道阶段
      • 聚合管道操作符
        • $abs (aggregation)
        • $acos (aggregation)
        • $acosh (aggregation)
        • $add (aggregation)
        • $addToSet (aggregation)
        • $allElementsTrue (aggregation)
        • $and (aggregation)
        • $anyElementTrue (aggregation)
        • $arrayElemAt (aggregation)
        • $arrayToObject (aggregation)
        • $asin (aggregation)
        • $asinh (aggregation)
        • $atan (aggregation)
        • $atan2 (aggregation)
        • $atanh (aggregation)
        • $avg (aggregation)
        • $ceil (aggregation)
        • $cmp (aggregation)
        • $concat (aggregation)
        • $concatArrays (aggregation)
        • $cond (aggregation)
        • $convert (aggregation)
        • $cos (aggregation)
        • $dateFromParts (aggregation)
        • $dateToParts (aggregation)
        • $dateFromString (aggregation)
        • $literal (aggregation)
      • 查询修饰符
    • 数据库命令
      • 聚合命令
      • 地理空间命令
      • 查询和写操作命令
      • 查询计划缓存命令
      • 认证命令
      • 用户管理命令
      • 角色管理命令
      • 复制命令
      • 分片命令
      • 会话命令
      • 管理命令
      • 诊断命令
      • 免费监控命令
      • 系统事件审计命令
    • mongo Shell 方法
      • 集合方法
        • db.collection.aggregate()
        • db.collection.bulkWrite()
        • db.collection.copyTo()
        • db.collection.count()
        • db.collection.countDocuments()
        • db.collection.estimatedDocumentCount()
        • db.collection.createIndex()
        • db.collection.createIndexes()
        • db.collection.dataSize()
        • db.collection.deleteOne()
        • db.collection.deleteMany()
        • db.collection.distinct()
        • db.collection.drop()
        • db.collection.dropIndex()
        • db.collection.dropIndexes()
        • db.collection.ensureIndex()
        • db.collection.explain()
        • db.collection.find()
        • db.collection.findAndModify()
        • db.collection.findOne()
        • db.collection.findOneAndDelete()
        • db.collection.findOneAndReplace()
        • db.collection.findOneAndUpdate()
        • db.collection.getIndexes()
        • db.collection.getShardDistribution()
        • db.collection.getShardVersion()
        • db.collection.insert()
        • db.collection.insertOne()
        • db.collection.insertMany()
        • db.collection.isCapped()
        • db.collection.latencyStats()
        • db.collection.mapReduce()
        • db.collection.reIndex()
        • db.collection.remove()
        • db.collection.renameCollection()
        • db.collection.replaceOne()
        • db.collection.save()
        • db.collection.stats()
        • db.collection.storageSize()
        • db.collection.totalIndexSize()
        • db.collection.totalSize()
        • db.collection.update()
        • db.collection.updateOne()
        • db.collection.updateMany()
        • db.collection.watch()
        • db.collection.validate()
    • MongoDB中的限制与阈值
    • MongoDB系统集合
    • 词汇表
    • 默认的MongoDB端口
    • 默认的MongoDB读/写关注
    • 服务器会话
  • MongoDB FAQ
    • FAQ: MongoDB基础知识
    • FAQ: MongoDB索引
    • FAQ: MongoDB并发
    • FAQ: MongoDB分片
    • FAQ: MongoDB复制和副本集
    • FAQ: MongoDB存储
    • FAQ: MongoDB诊断
  • MongoDB 版本管理
  • 联系我们
    • Tapdata Cloud
    • MongoDB中文社区
    • 社区合作伙伴—锦木信息
由 GitBook 提供支持
在本页
  • 可用性
  • 存储引擎。
  • 副本集协议版本。
  • 阅读关注“多数”启用。
  • 监视集合/数据库/部署
  • 打开变更流
  • 查找完整文档以进行更新操作
  • 恢复变更流
  • resumeAfter用于变更流
  • startAfter用于变更流
  • 用例
  • 访问控制
  • 事件通知
  • 比较
  • MongoDB中文社区

Change Streams变更流

在本页面

  • 可用性

  • 监视集合/数据库/部署

  • 打开变更流

  • 修改变更流输出

  • 查找完整文档以进行更新操作

  • 恢复变更流

  • 用例

  • 访问控制

  • 活动通知

  • 比较

3.6版的新功能。

变更流允许应用程序访问实时数据更改,而不会带来复杂性和拖延oplog的风险。应用程序可以使用变更流来订阅单个集合,数据库或整个部署中的所有数据变更,并立即对其做出反应。因为变更流使用聚合框架,所以应用程序还可以过滤特定的变更或随意转换通知。

可用性

变更流可用于副本集和分片群集。

存储引擎。

副本集和分片群集必须使用WiredTiger存储引擎。变更流还可用于采用MongoDB静态加密功能的部署 。

副本集协议版本。

副本集和分片群集必须使用副本集协议版本1(pv1)。

阅读关注“多数”启用。

从MongoDB 4.2开始,无论是否支持读关注,更改流都可用"majority"。也就是说,majority可以启用(默认)读取关注支持或禁用以使用更改流。

在MongoDB 4.0和更早版本中,更改流仅在"majority"启用了阅读关注支持后才可用(默认)。

监视集合/数据库/部署

您可以针对以下情况打开变更流:

目标
描述

集合

你可以打开一个变换流光标单个集合(除system收藏,或任何集合在 admin,local和config数据库)。 本页上的示例使用MongoDB驱动程序打开并使用单个集合的变更流游标。另请参见mongoshell方法 db.collection.watch()。

| 数据库 |在MongoDB中4.0开始,你可以打开一个变换流光标单个数据库(不包括admin,local和 config数据库)来监视更改其所有非系统集合。 有关MongoDB驱动程序方法,请参阅驱动程序文档。另请参见mongoshell方法 db.watch()。|

|部署 | 在MongoDB中4.0开始,你可以打开一个变换流光标部署(无论是副本集中或分片集群)来监视除了所有数据库中更改所有非系统集合admin,local和config。 有关MongoDB驱动程序方法,请参阅驱动程序文档。另请参见mongoshell方法 Mongo.watch()。| 变更流示例

此页面上的示例使用MongoDB驱动程序来说明如何打开集合的更改流游标以及如何使用变更流游标。

打开变更流

要打开变更流:

对于副本集,您可以从任何数据承载成员发出开放变更流操作。 对于分片群集,必须从中发出开放更改流操作mongos。 以下示例打开一个集合的变更流,并在光标上进行迭代以检索变更流文档。 [1]

下面的Python示例假定您已连接到MongoDB副本集并访问了包含inventory集合的数据库。

cursor = db.inventory.watch()
document = next(cursor)

要从游标检索数据更改事件,请迭代更改流游标。有关变更流事件的信息,请参见变更事件。

与MongoDB部署的连接保持打开状态时,游标保持打开状态,直到发生以下情况之一:

  • 游标已显式关闭。

  • 发生无效事件。

  • 如果部署是分片群集,则分片删除可能会导致打开的更改流游标关闭,并且关闭的更改流游标可能无法完全恢复。

注意

未封闭游标的生命周期取决于语言。

[1] 从MongoDB 4.0开始,您可以指定a startAtOperationTime 在特定时间点打开游标。如果指定的起点是过去的时间,则必须在操作日志的时间范围内。 修改变更流输出;

在配置变更流时,可以通过提供以下一个或多个以下管道阶段的数组来控制变更流的输出:

$addFields $match $project $replaceRoot $replaceWith (从MongoDB 4.2开始可用) $redact $set (从MongoDB 4.2开始可用) $unset (从MongoDB 4.2开始可用)

pipeline = [
    {'$match': {'fullDocument.username': 'alice'}},
    {'$addFields': {'newField': 'this is an added field!'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)

提示:

变更流事件文档的_id字段用作恢复令牌。不要使用管道来修改或删除更改流事件的_id字段。

从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将引发异常。

有关更改流响应文档格式的更多信息,请参见更改事件。

查找完整文档以进行更新操作

默认情况下,更改流仅在更新操作期间返回字段增量。但是,您可以配置变更流以返回更新文档的最新的多数提交版本。

要返回更新文档的最新多数批准版本,请传递full_document='updateLookup'到 db.collection.watch()方法。

在下面的示例中,所有更新操作通知都包含一个full_document字段,该字段表示 受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)

注意

如果有一个或多个修改更新的文件多数提交的操作之后更新操作,但之前的查找,返回完整的文档可以显著从文档的更新操作的时间有所不同。

但是,变更流文档中包含的增量始终正确描述了应用于该更改流事件的监视集合更改。

有关变更流响应文档格式的更多信息,请参见更改事件。

恢复变更流

通过在打开游标时将resume令牌指定为resumeAfter或 startAfter,可以恢复变更流 。

resumeAfter用于变更流

通过resumeAfter在打开游标时将恢复令牌传递给特定事件,您可以在特定事件之后恢复更改流。对于恢复令牌,请使用更改流事件文档的 _id值。有关恢复令牌的更多信息,请参见恢复令牌。

重要

如果时间戳记是过去的,则oplog必须具有足够的历史记录来定位与令牌或时间戳记关联的操作。

resumeAfter在无效事件(例如,集合删除或重命名)关闭流之后,您不能用来恢复变更流。从MongoDB 4.2开始,您可以使用 startAfter在invalidate事件之后启动新的变更流。

您可以使用resume_after修饰符在恢复令牌中指定的操作之后恢复通知。该resume_after调节器将是必须解决的简历令牌的值,例如resume_token在下面的例子。

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

startAfter用于变更流

4.2版中的新功能。

您可以通过startAfter在打开游标时将恢复令牌传递到特定事件之后,开始新的变更流。与resumeAfter不同 ,startAfter可以 通过创建新的更改流在无效事件之后恢复通知。对于恢复令牌,请使用更改流事件文档的_id值。有关恢复令牌的更多信息,请参见恢复令牌。

重要

如果时间戳记是过去的,则oplog必须具有足够的历史记录来定位与令牌或时间戳记关联的操作。

恢复令牌

变更流事件文档的_id值用作恢复令牌:

{
   "_data" : <BinData|hex string>
}

恢复令牌_data类型取决于MongoDB版本,在某些情况下,取决于更改流打开/恢复时的功能兼容性版本(fcv)(即,fcv值的更改不会影响已打开的更改流的恢复令牌。 ):

MongoDB版本 功能兼容版本 恢复令牌_data类型 MongoDB 4.2及更高版本 “ 4.2”或“ 4.0” 十六进制编码的字符串(v1) MongoDB 4.0.7及更高版本 “ 4.0”或“ 3.6” 十六进制编码的字符串(v1) MongoDB 4.0.6及更早版本 “ 4.0” 十六进制编码的字符串(v0) MongoDB 4.0.6及更早版本 “ 3.6” BinData MongoDB 3.6 “ 3.6” BinData

使用十六进制编码的字符串恢复令牌,您可以对恢复令牌进行比较和排序。

无论fcv值如何,4.0部署都可以使用BinData恢复令牌或十六进制字符串恢复令牌来恢复变更流。这样,4.0部署可以使用在3.6部署的集合中打开的变更流中的恢复令牌。

MongoDB版本中引入的新的恢复令牌格式不能被早期MongoDB版本使用。

提示

从MongoDB 4.2开始,如果变更流聚合管道修改了事件的_id字段,则变更流将引发异常。

用例

变更流可以使具有相关业务系统的体系结构受益,一旦数据变更能够持久,就可以通知下游系统。例如,更改流可以在实现提取,转换和加载(ETL)服务,跨平台同步,协作功能和通知服务时为开发人员节省时间。

访问控制

对于执行身份验证和授权的部署:

要针对特定集合打开变更流,应用程序必须具有对相应集合进行授予changeStream和 find操作的特权。

{  resource : {  db : < dbname > , collection : < collection >  }, actions : [  “ find” , “ changeStream”  ]  }

要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非集合进行授予changeStream和 find操作的特权system。

{  resource : {  db : < dbname > , collection : “”  }, actions : [  “ find” , “ changeStream”  ]  }

要在整个部署上打开变更流,应用程序必须具有为部署中所有数据库的所有非集合授予权限changeStream并对其 find执行操作的特权system。

{  resource : {  db : “” , collection : “”  }, actions : [  “ find” , “ changeStream”  ]  }

事件通知

变更流仅通知已保留到副本集中大多数数据承载成员的数据更改。这样可以确保仅由多数情况下提交的更改触发通知,这些更改在故障情况下是持久的。

例如,考虑一个3成员副本集,该副本集具有针对primary打开的变更流游标。如果客户端发出插入操作,则该更改流仅在该插入一直存在于大多数承载数据的成员之后,才将数据更改通知应用程序。

如果操作与事务相关联,则更改事件文档包括 txnNumber和lsid。

比较

从MongoDB 4.2开始,更改流将使用simple二进制比较,除非提供了明确的排序规则。在早期版本中,在单个集合(db.collection.watch())上打开的变更流将继承该集合的默认排序规则。

译者: 王恒

MongoDB中文社区

资源列表推荐
资源入口

MongoDB中文社区官网

微信服务号 ——最新资讯和优质文章

Mongoing中文社区(mongoing-mongoing)

微信订阅号 ——发布文档翻译内容

MongoDB中文用户组(mongoing123)

官方微信号 —— 官方最新资讯

MongoDB数据库(MongoDB-China)

MongoDB中文社区组委会成员介绍

MongoDB中文社区翻译小组介绍

MongoDB中文社区微信技术交流群

添加社区助理小芒果微信(ID:mongoingcom),并备注 mongo

MongoDB中文社区会议及文档资源

MongoDB中文社区大咖博客

MongoDB白皮书

MongoDB初学者教程-7天入门

社区活动邮件订阅

上一页附录-C-用于测试的OpenSSL客户端证书下一页变更流生产建议

最后更新于3年前

MongoDB中文社区—MongoDB爱好者技术交流平台

https://mongoing.com/
https://mongoing.com/core-team-members
https://mongoing.com/translators
https://mongoing.com/resources
基础知识
性能优化
原理解读
运维监控
最佳实践
https://mongoing.com/mongodb-download-white-paper
https://mongoing.com/mongodb-beginner-tutorial
https://sourl.cn/spszjN