db.collection.watch()
åšæ¬é¡µé¢
db.collection.
watch
(管éïŒé项)ä» éçšäºå¯æ¬éååç矀éåšéåäžæåŒæ¹åæµæžžæ ã
åæ° | ç±»å | æè¿° |
---|---|---|
pipeline | array | 以äžäžäžªæå€äžªèåé¶æ®µçåºåïŒ $match $project $addFields $replaceRoot $replaceWith ïŒä»MongoDB 4.2åŒå§å¯çšïŒ $redact $set ïŒä»MongoDB 4.2åŒå§å¯çšïŒ $unset ïŒä»MongoDB 4.2åŒå§å¯çšïŒ æå®ç®¡éä»¥è¿æ»€/ä¿®æ¹åæŽäºä»¶èŸåºã ä»MongoDB 4.2åŒå§ïŒåŠææŽæ¹æµèå管éä¿®æ¹äºäºä»¶ç_idåæ®µïŒåæŽæ¹æµå°åŒååŒåžžã |
options | document | å¯éçãä¿®æ¹watch()è¡äžºçå
¶ä»é项ã åŠææªæå®ç®¡éäœäŒ é options ææ¡£ïŒåå¿
é¡»å°ç©º array [] äŒ éç»pipeline åæ°ã |
options
ææ¡£å¯ä»¥å
å«ä»¥äžå段ååŒïŒå段 | ç±»å | æè¿° |
---|---|---|
resumeAfter | document | å¯éçãæç€ºwatchå°è¯åšæ¢å€ä»€çäžæå®çæäœä¹åéæ°åŒå§éç¥ã æ¯äžªæŽæ¹æµ event ææ¡£éœå
å«äžäžªæ¢å€æ è®°äœäžº _id åæ®µãäŒ é change event ææ¡£çæŽäžª_id åæ®µïŒè¯¥å段衚瀺æšèŠåšä¹åæ¢å€çæäœã resumeAfter äžstartAfter å äºæ¥startAtOperationTime ã |
startAfter | document | å¯éçãæç€º watch åšæ¢å€ä»€çäžæå®çæäœä¹åå°è¯å¯åšæ°çæŽæ¹æµãå
讞åšäºä»¶æ æåæ¢å€éç¥ã æ¯äžªåæŽæµäºä»¶ææ¡£éœå
æ¬äžäžªæ¢å€ä»€çäœäžº _id åæ®µãäŒ éæŽæ¹äºä»¶ææ¡£ç_æŽäžª_ _id åæ®µïŒè¯¥å段代衚æšèŠæ¢å€çæäœã startAfter äžresumeAfter å äºæ¥startAtOperationTime ã 4.2çäžçæ°åèœã |
fullDocument | string | å¯éçãé»è®€æ
åµäžïŒwatch()è¿åç±æŽæ°æäœä¿®æ¹çåæ®µçå¢éïŒèäžæ¯æŽäžªæŽæ°çææ¡£ã 讟眮 fullDocument 䞺â "updateLookup" çŽæ¥â watch() ä»¥æ¥æŸæŽæ°ææ¡£çææ°å€æ°æ¹åçæ¬ã watch() è¿åäžäžªfullDocument åæ®µïŒå
¶äžå
å«é€äºupdateDescription å¢é以å€çææ¡£æ¥æŸã |
batchSize | int | å¯éçãæå® MongoDBéçŸ€çæ¯æ¹ååºäžè¿åçæå€§æŽæ¹æ¬¡æ°ã å
·æäžcursor.batchSize()çåèœçžåã |
maxAwaitTimeMS | int | å¯éçãæå¡åšçåŸ
æ°æ°æ®æŽæ¹ä»¥åšè¿å空æ¹å€çä¹ååæŽæ¹æµæžžæ æ¥åçæå€§æ¶éŽ(以毫ç§äžºåäœ)ã é»è®€äžº 1000 毫ç§ã |
collation | document | å¯éçãäŒ éæåºè§åæä»¶äžºæŽæ¹æµæžžæ æå®æåºã ä»MongoDB 4.2åŒå§ïŒ simple åŠæçç¥ïŒé»è®€äžºäºè¿å¶æ¯èŸãåšæ©æçæ¬äžïŒåšå䞪éåäžæåŒçæŽæ¹æµå°ç»§æ¿è¯¥éåçé»è®€æåºè§åã |
startAtOperationTime | Timestamp | å¯éçãåæŽæµçèµ·ç¹ãåŠææå®çèµ·ç¹æ¯è¿å»çæ¶éŽïŒåå¿
é¡»åšæäœæ¥å¿çæ¶éŽèåŽå
ãèŠæ¥çæäœæ¥å¿çæ¶éŽèåŽïŒè¯·åé
rs.printReplicationInfo() ã startAtOperationTime äžresumeAfter åäºæ¥startAfter ã çæ¬4.0äžçæ°åèœã |
è¿ååŒïŒ | äžäžªæžžæ æ¯ä¿æè¢«æåŒïŒä»¥MongoDBçéšçœ²çè¿æ¥ä¿ææåŒç¶æïŒå¹¶æ¶éååšãæå
³åæŽäºä»¶ææ¡£ç瀺äŸïŒè¯·åè§åæŽäºä»¶ã |
ä¹å¯ä»¥ççdb.watch()
åMongo.watch()
db.collection.watch()
å¯çšäºå¯æ¬éååç矀ééšçœ²ïŒ- 对äºå¯æ¬éïŒæšå¯ä»¥
db.collection.watch()
åšä»»äœæ°æ®æ¿èœœæåäžåè¡ã - 对äºåç矀éïŒå¿ é¡»
db.collection.watch()
åšmongos
å®äŸäžååºã
æšåªèœ
db.collection.watch()
äžWired TigerååšåŒæäžèµ·äœ¿çšãä»MongoDB 4.2åŒå§ïŒæ 论æ¯åŠæ¯æè¯»å
³æ³šïŒæŽæ¹æµéœå¯çš
"majority"
ãä¹å°±æ¯è¯ŽïŒmajority
å¯ä»¥å¯çšïŒé»è®€ïŒè¯»åå
³æ³šæ¯ææçŠçš ä»¥äœ¿çšæŽæ¹æµãåšMongoDB 4.0åæŽæ©çæ¬äžïŒæŽæ¹æµä»
åš
"majority"
å¯çšäºé
读å
³æ³šæ¯æåæå¯çšïŒé»è®€ïŒã- db.collection.watch()ä» éç¥æç»ååšäºå€§å€æ° data-bearing æåçæ°æ®æŽæ¹ã
- æ¹åæµæžžæ ä¿ææåŒç¶æïŒçŽå°åºç°ä»¥äžæ åµä¹äžïŒ
- æžžæ æŸåŒå ³éã
- åçæ æäºä»¶ïŒäŸåŠïŒéåå 逿éåœåã
- äž MongoDB éšçœ²çè¿æ¥å·²å ³éã
- åŠæéšçœ²æ¯åçé矀ïŒåå é€åçå¯èœäŒå¯ŒèŽæåŒæŽæ¹æµæžžæ å ³éïŒå¹¶äžå ³éçæŽæ¹æµæžžæ å¯èœæ æ³å®å šæ¢å€ã
äž MongoDB 驱åšçšåºäžåïŒmongo shell åšåçé误åäžäŒèªåšå°è¯æ¢å€æŽæ¹æµæžžæ ã MongoDB 驱åšçšåºå°è¯åšæäºé误åèªåšæ¢å€æŽæ¹æµæžžæ ã
db.collection.watch()䜿çšååšåš oplog äžçä¿¡æ¯æ¥çææŽæ¹ event æè¿°å¹¶çæäžè¯¥æäœå
³èçæ¢å€æ è®°ãåŠæç±äŒ éç»
resumeAfter
or startAfter
éé¡¹çæ¢å€ä»€çæ è¯çæäœå·²ç»ä»oplogäžå é€ïŒdb.collection.watch()
åæ æ³æ¢å€æŽæ¹æµãæå
³æ¢å€æŽæ¹æµçæŽå€ä¿¡æ¯ïŒè¯·åé
æ¢å€åæŽæµã
泚æ
resumeAfter
åšæ æäºä»¶ïŒäŸåŠïŒéåå 逿éåœåïŒå ³éæµä¹åïŒæšäžèœç𿥿¢å€æŽæ¹ æµãä»MongoDB 4.2åŒå§ïŒæšå¯ä»¥äœ¿çš startAfteråšinvalidateäºä»¶ä¹åå¯åšæ°çæŽæ¹æµã åŠæéšçœ²æ¯åçé矀ïŒååçå é€å¯èœäŒå¯ŒèŽæåŒçæŽæ¹æµæžžæ å ³éïŒå¹¶äžå ³éçæŽæ¹æµæžžæ å¯èœæ æ³å®å šæ¢å€ã
æ¢å€ä»€ç
æ¢å€ä»€ç
_data
ç±»ååå³äºMongoDBçæ¬ïŒåšæäºæ
åµäžïŒåå³äºæŽæ¹æµæåŒ/æ¢å€æ¶çåèœå
Œå®¹æ§çæ¬ïŒfcvïŒïŒå³ïŒfcvåŒçæŽæ¹äžäŒåœ±åå·²æåŒçæŽæ¹æµçæ¢å€ä»€çã ïŒïŒMongoDBçæ¬ | åèœå
Œå®¹çæ¬ | æ¢å€ä»€ç _data ç±»å |
MongoDB 4.2åæŽé«çæ¬ | â 4.2âæâ 4.0â | åå
è¿å¶çŒç çåç¬Šäž²ïŒ v1 ïŒ |
MongoDB 4.0.7åæŽé«çæ¬ | â 4.0âæâ 3.6â | åå
è¿å¶çŒç çåç¬Šäž²ïŒ v1 ïŒ |
MongoDB 4.0.6åæŽæ©çæ¬ | â 4.0â | åå
è¿å¶çŒç çåç¬Šäž²ïŒ v0 ïŒ |
MongoDB 4.0.6åæŽæ©çæ¬ | â 3.6â | BinData |
MongoDB 3.6 | â 3.6â | BinData |
䜿çšåå
è¿å¶çŒç çå笊䞲æ¢å€ä»€çïŒæšå¯ä»¥å¯¹æ¢å€ä»€çè¿è¡æ¯èŸåæåºã
æ 论fcvåŒåŠäœïŒ4.0éšçœ²éœå¯ä»¥äœ¿çšBinDataæ¢å€ä»€çæåå
è¿å¶å笊䞲æ¢å€ä»€çæ¥æ¢å€æŽæ¹æµãè¿æ ·ïŒ4.0éšçœ²å¯ä»¥äœ¿çšåš3.6éšçœ²çéåäžæåŒçæŽæ¹æµäžçæ¢å€ä»€çã
MongoDBçæ¬äžåŒå
¥çæ°çæ¢å€ä»€çæ ŒåŒäžèœè¢«æ©æMongoDBçæ¬äœ¿çšã
é»è®€æ
åµäžïŒæŽæ¹æµæžžæ è¿åçšäºæŽæ°æäœçç¹å®åæ®µæŽæ¹/å¢éãæšè¿å¯ä»¥é
çœ®æŽæ¹æµä»¥æ¥æŸå¹¶è¿åæŽæ¹ææ¡£çåœå倿°æäº€çæ¬ãæ ¹æ®æŽæ°åæ¥æŸä¹éŽå¯èœåççå
¶ä»åå
¥æäœïŒè¿åçææ¡£å¯èœäžæŽæ°æ¶çææ¡£æåŸå€§äžåã
æ ¹æ®æŽæ°æäœæéŽåºçšçæŽæ¹æ°éåæŽäžªææ¡£ç倧å°ïŒååšæŽæ°æäœçæŽæ¹äºä»¶ææ¡£ç倧å°å€§äº16MB BSONææ¡£éå¶çé£é©ãåŠæåçè¿ç§æ
åµïŒæå¡åšå°å
³éæŽæ¹æµæžžæ å¹¶è¿åé误ã
䜿çšè®¿é®æ§å¶è¿è¡æ¶ïŒçšæ·å¿
须对éåèµæºå
·æ
find
åchangeStream
ç¹ææäœãä¹å°±æ¯è¯ŽïŒçšæ·å¿
é¡»å
·ææäºä»¥äžç¹æçè§è²ïŒ{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
å
眮
read
è§è²æäŸéåœçç¹æãä»¥äžæäœå°é对
data.sensors
éåæåŒæŽæ¹æµæžžæ ïŒwatchCursor = db.getSiblingDB("data").sensors.watch()
è¿ä»£å
æ ä»¥æ£æ¥æ° ç eventsã䜿çš
cursor.isExhausted()
æ¹æ³ç¡®ä¿åŸªç¯ä»
åšæŽæ¹æµæžžæ å
³éäžææ°æ¹æ¬¡äžæ²¡æ objects æ¶éåºïŒwhile (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
æå
³æŽæ¹æµèŸåºçå®æŽææ¡£ïŒè¯·åé
åæŽäºä»¶ã
讟眮
fullDocument
é项以"updateLookup"
æç€ºæŽæ¹æµæžžæ æ¥æŸäžæŽæ°æŽæ¹æµäºä»¶çžå
³èçææ¡£çææ°ç倿°æäº€çæ¬ãä»¥äžæäœäœ¿çš
fullDocument : "updateLookup"
该é项é对éå data.sensors
æåŒæŽæ¹æµæžžæ ãwatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)
è¿ä»£å
æ ä»¥æ£æ¥æ°ç eventsã䜿çš
cursor.isExhausted()
æ¹æ³ç¡®ä¿åŸªç¯ä»
åšæŽæ¹æµæžžæ å
³éäžææ°æ¹æ¬¡äžæ²¡æ objects æ¶éåºïŒwhile (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
对äºä»»äœæŽæ°æäœïŒchangeäºä»¶éœäŒåš
fullDocument
åæ®µäžè¿åææ¡£æ¥æŸçç»æãæå
³å®æŽææ¡£æŽæ°èŸåºç瀺äŸïŒè¯·åé
æŽæ¹æµæŽæ°äºä»¶ã
æå
³æŽæ¹æµèŸåºçå®æŽææ¡£ïŒè¯·åé
æ¹åäºä»¶ã
泚æä»MongoDB 4.2åŒå§ïŒåŠææŽæ¹æµèå管éä¿®æ¹äºäºä»¶ç_idåæ®µïŒåæŽæ¹æµå°åŒååŒåžžã
ä»¥äžæäœäœ¿çšèå管éæåŒé对
data.sensors
éåçæŽæ¹æµæžžæ ïŒwatchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)
è¿ä»£å
æ ä»¥æ£æ¥æ°çäºä»¶ã䜿çš
cursor.isExhausted()
æ¹æ³ç¡®ä¿åŸªç¯ä»
åšæŽæ¹æµæžžæ å
³éäžææ°æ¹æ¬¡äžæ²¡æ objects æ¶éåºïŒwhile (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
æŽæ¹æµæžžæ ä»
è¿å䞺
insert
ç change eventsãæå
³æŽæ¹æµèŸåºçå®æŽææ¡£ïŒè¯·åé
åæŽäºä»¶ãæŽæ¹æµæžžæ è¿åçæ¯äžªææ¡£éœå
å«äžäžªæ¢å€æ è®°äœäžº
_id
åæ®µãèŠæ¢å€æŽæ¹æµïŒè¯·å°èŠæ¢å€çæŽæ¹äºä»¶çæŽäžª_id
ææ¡£äŒ éç»watch()çresumeAfter
æstartAfter
é项ãä»¥äžæäœ
data.sensors
äœ¿çšæ¢å€ä»€çæ¢å€é对éåçæŽæ¹æµæžžæ ãåè®Ÿçææ¢å€ä»€ ççæäœå°æªè±çŠ»é矀çæäœæ¥å¿ãlet watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
â
while (!watchCursor.isExhausted()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
â
watchCursor.close();
â
let resumeToken = firstChange._id;
â
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)
è¿ä»£å
æ ä»¥æ£æ¥æ°çäºä»¶ã䜿çš
cursor.isExhausted()
æ¹æ³ç¡®ä¿åŸªç¯ä»
åšæŽæ¹æµæžžæ å
³éäžææ°æ¹æ¬¡äžæ²¡æ objects æ¶éåºïŒwhile (!resumedWatchCursor.isExhausted()){
if (resumedWatchCursor.hasNext()){
resumedWatchCursor.next();
}
}
æå
³æ¢å€æŽæ¹æµçå®æŽææ¡£ïŒè¯·åé
æ¢å€åæŽæµã
è¯è
ïŒæå é£
æ ¡å¯¹ïŒ
æè¿æŽæ° 1yr ago