mongodb_cdc

Streams data changes from a MongoDB replica set, using MongoDB’s change streams to capture data updates.

  • Common

  • Advanced

# Configuration fields, showing default values
input:
  label: ""
  mongodb_cdc:
    url: mongodb://localhost:27017 # No default (required)
    database: "" # No default (required)
    username: "" # Optional
    password: "" # Optional
    collections: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    auto_replay_nacks: true
# All configuration fields, showing default values
input:
  label: ""
  mongodb_cdc:
    url: mongodb://localhost:27017 # No default (required)
    database: "" # No default (required)
    username: ""
    password: ""
    collections: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    snapshot_auto_bucket_sharding: false
    document_mode: update_lookup
    json_marshal_mode: canonical
    app_name: benthos
    auto_replay_nacks: true

Prerequisites

  • MongoDB version 6 or later

  • Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking.

  • A MongoDB database running as a replica set or in a sharded cluster using replica set protocol version 1.

  • A MongoDB database using the WiredTiger storage engine.

Enable connectivity from cloud-based data sources (BYOC)

To establish a secure connection between a cloud-based data source and Redpanda Connect, you must add the NAT Gateway IP address of your Redpanda cluster to the allowlist of your data source.

Data capture method

The mongodb_cdc input uses change streams to capture data changes, which does not propagate all changes to Redpanda Connect. To capture all changes in a MongoDB cluster, including deletions, enable pre- and post-image saving for the cluster and required collections. For more information, see document_mode options and the MongoDB documentation.

Data replication

Redpanda Connect allows you to specify which database collections in your source database to receive changes from.

You can also run the mongodb_cdc input in one of two modes, depending on whether you need a snapshot of existing data before streaming updates.

  • Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected collections and streams the contents before processing changes from the last recorded operations log (oplog) position.

  • Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest oplog position.

Snapshot mode

If you set the stream_snapshot field to true, Redpanda Connect connects to your MongoDB database and does the following to capture a snapshot of all data in the selected collections:

  1. Records the latest oplog position.

  2. Determines the strategy for splitting the snapshot data down into shards or chunks for more efficient processing:

    1. If snapshot_auto_bucket_sharding is set to false, the internal $splitVector command is used to compute shards.

    2. If snapshot_auto_bucket_sharding is set to true, the $bucketAuto command is used instead. This setting is for environments, such as MongoDB Atlas, where the $splitVector command is not available.

  3. This input then uses the number of connections specified in snapshot-parallelism to read the selected collections.

    If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current oplog position in the checkpoint_cache.
  4. Finally, the input uses the stored oplog position to catch up with changes that occurred during snapshot processing.

Streaming mode

If you set the stream_snapshot field to false, Redpanda Connect connects to your MongoDB database and starts processing data changes from the latest oplog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last oplog position written to the checkpoint_cache.

Metadata

This input adds the following metadata fields to each message:

  • operation: The type of data change that generated the message: read, create, update, replace, delete, update. A read operation occurs when the initial snapshot of the database is processed.

  • collection: The name of the collection from which the message originated.

  • operation_time: The time the data change was written to the operations log (oplog) in the form of a Binary JSON (BSON) timestamp: {"t": <unix seconds>, "i": <sequence number>}.

Fields

app_name

The client application name.

Type: string

Default: benthos

auto_replay_nacks

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

checkpoint_cache

Specify a cache resource to store the oplog position for the most recent data update streamed to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this position, avoiding the need to reprocess all collection updates.

Type: string

checkpoint_interval

The interval between writing checkpoints to the cache.

Type: string

Default: 5s

checkpoint_key

The key identifier used to store the oplog position in checkpoint_cache. If you have multiple mongodb_cdc inputs sharing the same cache, you can provide an alternative key.

Type: string

Default: mongodb_cdc_checkpoint

checkpoint_limit

The maximum number of in-flight messages emitted from this input. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given oplog position is not acknowledged until all messages under that offset are delivered.

Type: int

Default: 1000

collections[]

A list of collections to stream changes from. Specify each collection name as a separate item.

Type: array

database

The name of the MongoDB database to stream changes from.

Type: string

document_mode

The mode in which MongoDB emits document changes to Redpanda Connect, specifically updates and deletes.

Type: string

Default: update_lookup

Option Summary

partial_update

In this mode update operations only have a description of the update operation, which follows the following schema: { "_id": <document_id>, "operations": [ # type == set means that the value was updated like so: # root.foo."bar.baz" = "world" {"path": ["foo", "bar.baz"], "type": "set", "value":"world"}, # type == unset means that the value was deleted like so: # root.qux = deleted() {"path": ["qux"], "type": "unset", "value": null}, # type == truncatedArray means that the array at that path was truncated to value number of elements # root.array = this.array.slice(2) {"path": ["array"], "type": "truncatedArray", "value": 2} ] }

pre_and_post_images

Uses pre and post image collection to emit the full documents for update and delete operations. To use and configure this mode see the setup steps in the ^MongoDB documentation.

update_lookup

In this mode insert, replace and update operations have the full document emitted and deletes only have the _id field populated. Documents updates lookup the full document. This corresponds to the updateLookup option, see the ^MongoDB documentation for more information.

json_marshal_mode

Controls the format used to convert a message from BSON to JSON when it is received by Redpanda Connect.

Type: string

Default: canonical

Option Summary

canonical

A string format that emphasizes type preservation at the expense of readability and interoperability. That is, conversion from canonical to BSON will generally preserve type information except in certain specific cases.

relaxed

A string format that emphasizes readability and interoperability at the expense of type preservation.That is, conversion from relaxed format to BSON can lose type information.

password

The password to connect to the database.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

read_batch_size

The number of documents to fetch in each message batch from MongoDB.

Type: int

Default: 1000

read_max_wait

The maximum duration MongoDB waits to accumulate the read_batch_size documents on a change stream before returning the batch to Redpanda Connect.

Type: string

Default: 1s

snapshot_auto_bucket_sharding

Uses the $bucketAuto command instead of the default, $splitVector, to split the snapshot data into chunks for processing. This is required for environments, such as MongoDB Atlas, where the $splitVector command is not available. To enable parallel processing in these environments:

  • Set this field to to true.

  • Set stream_snapshot to true.

  • Increase snapshot_parallelism to a value greater than 1.

Type: bool

Default: false

snapshot_parallelism

Specifies the number of connections to use when reading the initial snapshot from one or more collections. Increase this number to enable parallel processing of the snapshot.

This feature uses the $splitVector command to split snapshot data into chunks for more efficient processing.

This field is only applicable when stream_snapshot is set to true.

Type: int

Default: 1

stream_snapshot

When set to true, this input streams a snapshot of all existing data in the source collections before streaming data changes.

Type: bool

Default: false

url

The URL of the target MongoDB server.

Type: string

# Examples:
url: mongodb://localhost:27017

username

The username to connect to the database.

Type: string

Default: ""