Back to Curriculum

MongoDB Change Streams

📚 Lesson 13 of 15 ⏱️ 35 min

MongoDB Change Streams

35 min

MongoDB change streams provide a way to watch for real-time data changes in collections, databases, or entire deployments. Change streams enable applications to react immediately to database changes, making them ideal for building reactive applications, real-time analytics, data synchronization, and event-driven architectures. Change streams use the oplog to stream change events, providing a reliable way to track data modifications.

Change streams can watch collections, databases, or entire deployments. Collection-level change streams watch a single collection, database-level streams watch all collections in a database, and deployment-level streams watch all databases. Change streams return change events for insert, update, replace, delete, invalidate, and other operations. Understanding change stream scopes enables you to monitor changes at the appropriate level.

Change events include operation type (insert, update, delete, etc.), document key (_id), full document (for inserts and replaces), update description (for updates), and other metadata. Change streams can filter events using aggregation pipeline stages like $match, enabling you to watch only specific changes. Understanding change event structure enables you to process changes appropriately in your applications.

Resume tokens enable change streams to resume from a specific point after interruption. Resume tokens are included in change events and can be saved to resume streaming from that point. This enables reliable change processing even if the application restarts. Understanding resume tokens enables building resilient change stream applications.

Change streams support full document lookup for update operations, enabling you to see the complete updated document rather than just the changes. This is useful when you need the full document state after updates. Change streams also support various options like batch size and max await time. Understanding these options enables you to optimize change stream performance.

Best practices for change streams include handling errors appropriately, using resume tokens for reliability, filtering events to reduce processing overhead, and understanding performance implications. Change streams add overhead to replica sets, so they should be used judiciously. Understanding change streams enables you to build reactive, event-driven applications with MongoDB.

Key Concepts

  • Change streams enable real-time monitoring of data changes.
  • Change streams can watch collections, databases, or deployments.
  • Change events include operation type, document key, and change details.
  • Resume tokens enable resuming change streams after interruption.
  • Change streams use the oplog to stream change events.

Learning Objectives

Master

  • Implementing change streams for real-time data monitoring
  • Understanding change event structure and types
  • Using resume tokens for reliable change processing
  • Filtering change events with aggregation pipelines

Develop

  • Understanding event-driven architectures
  • Building reactive applications with MongoDB
  • Implementing real-time data synchronization

Tips

  • Watch collection: const stream = db.collection.watch().
  • Filter changes: db.collection.watch([{ $match: { operationType: 'insert' } }]).
  • Use resume tokens: { resumeAfter: resumeToken } to resume from specific point.
  • Handle errors: stream.on('error', (error) => { ... }) for error handling.

Common Pitfalls

  • Not handling errors, missing change events when streams fail.
  • Not using resume tokens, losing change events on application restart.
  • Watching too broadly, processing unnecessary change events.
  • Not understanding performance implications, impacting replica set performance.

Summary

  • Change streams enable real-time monitoring of MongoDB data changes.
  • Change streams can watch at collection, database, or deployment level.
  • Resume tokens enable reliable change processing.
  • Understanding change streams enables building reactive applications.

Exercise

Implement change streams for real-time data monitoring.

// Watch all changes in a collection
const changeStream = db.users.watch()

// Process change events
while (changeStream.hasNext()) {
  const change = changeStream.next()
  print(JSON.stringify(change, null, 2))
}

// Watch specific operations
const changeStream = db.users.watch([
  { $match: { "operationType": { $in: ["insert", "update", "delete"] }}}
])

// Watch with pipeline
const changeStream = db.users.watch([
  { $match: { 
    "operationType": "update",
    "updateDescription.updatedFields.email": { $exists: true }
  }}
])

// Watch with resume token
const changeStream = db.users.watch([], {
  resumeAfter: { _data: "resume_token_here" }
})

// Watch multiple collections
const changeStream = db.watch([
  { $match: { "ns.db": "myApp" }}
])

// Watch with full document
const changeStream = db.users.watch([], {
  fullDocument: "updateLookup"
})

// Error handling
const changeStream = db.users.watch()

changeStream.on("error", function(error) {
  print("Change stream error:", error)
})

// Close change stream
changeStream.close()

// JavaScript example with Node.js
const { MongoClient } = require("mongodb")

async function watchChanges() {
  const client = await MongoClient.connect("mongodb://localhost:27017")
  const db = client.db("myApp")
  
  const changeStream = db.collection("users").watch()
  
  changeStream.on("change", (change) => {
    console.log("Change detected:", change)
    
    switch (change.operationType) {
      case "insert":
        console.log("New user added:", change.fullDocument)
        break
      case "update":
        console.log("User updated:", change.documentKey)
        break
      case "delete":
        console.log("User deleted:", change.documentKey)
        break
    }
  })
  
  changeStream.on("error", (error) => {
    console.error("Change stream error:", error)
  })
}

watchChanges()

Exercise Tips

  • Use resume tokens for reliability: save tokens and resume after restarts.
  • Filter change events: use $match in pipeline to reduce processing overhead.
  • Handle errors: implement error handling for stream failures.
  • Use fullDocument option: { fullDocument: 'updateLookup' } for complete documents.

Code Editor

Output