mongodb_cdc

Beta

Streams data changes from a MongoDB replica set, using MongoDB’s change streams to capture data updates.

  • Common

  • Advanced

# Configuration fields, showing default values
input:
  label: ""
  mongodb_cdc:
    url: mongodb://localhost:27017 # No default (required)
    database: "" # No default (required)
    username: "" # Optional
    password: "" # Optional
    collections: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    auto_replay_nacks: true
# All configuration fields, showing default values
input:
  label: ""
  mongodb_cdc:
    url: mongodb://localhost:27017 # No default (required)
    database: "" # No default (required)
    username: ""
    password: ""
    collections: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    snapshot_auto_bucket_sharding: false
    document_mode: update_lookup
    json_marshal_mode: canonical
    app_name: benthos
    auto_replay_nacks: true

Prerequisites

  • MongoDB version 6 or later

  • Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking.

  • A MongoDB database running as a replica set or in a sharded cluster using replica set protocol version 1.

  • A MongoDB database using the WiredTiger storage engine.

Enable connectivity from cloud-based data sources (BYOC)

To establish a secure connection between a cloud-based data source and Redpanda Connect, you must add the NAT Gateway IP address of your Redpanda cluster to the allowlist of your data source.

Data capture method

The mongodb_cdc input uses change streams to capture data changes, which does not propagate all changes to Redpanda Connect. To capture all changes in a MongoDB cluster, including deletions, enable pre- and post-image saving for the cluster and required collections. For more information, see document_mode options and the MongoDB documentation.

Data replication

Redpanda Connect allows you to specify which database collections in your source database to receive changes from.

You can also run the mongodb_cdc input in one of two modes, depending on whether you need a snapshot of existing data before streaming updates.

  • Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected collections and streams the contents before processing changes from the last recorded operations log (oplog) position.

  • Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest oplog position.

Snapshot mode

If you set the stream_snapshot field to true, Redpanda Connect connects to your MongoDB database and does the following to capture a snapshot of all data in the selected collections:

  1. Records the latest oplog position.

  2. Determines the strategy for splitting the snapshot data down into shards or chunks for more efficient processing:

    1. If snapshot_auto_bucket_sharding is set to false, the internal $splitVector command is used to compute shards.

    2. If snapshot_auto_bucket_sharding is set to true, the $bucketAuto command is used instead. This setting is for environments, such as MongoDB Atlas, where the $splitVector command is not available.

  3. This input then uses the number of connections specified in snapshot-parallelism to read the selected collections.

    If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current oplog position in the checkpoint_cache.
  4. Finally, the input uses the stored oplog position to catch up with changes that occurred during snapshot processing.

Streaming mode

If you set the stream_snapshot field to false, Redpanda Connect connects to your MongoDB database and starts processing data changes from the latest oplog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last oplog position written to the checkpoint_cache.

Metadata

This input adds the following metadata fields to each message:

  • operation: The type of data change that generated the message: read, create, update, replace, delete, update. A read operation occurs when the initial snapshot of the database is processed.

  • collection: The name of the collection from which the message originated.

  • operation_time: The time the data change was written to the operations log (oplog) in the form of a Binary JSON (BSON) timestamp: {"t": <unix seconds>, "i": <sequence number>}.

Fields

url

The URL of the target MongoDB server.

Type: string

Default: ""

# Examples

url: mongodb://localhost:27017

database

The name of the MongoDB database to stream changes from.

Type: string

Default: ""

username

The username to connect to the database.

Type: string

Default: ""

password

The password to connect to the database.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

Default: ""

collections

A list of collections to stream changes from. Specify each collection name as a separate item.

Type: array

Default: []

# Examples

collections:
  - orders
  - customers
  - inventory

checkpoint_cache

Specify a cache resource to store the oplog position for the most recent data update streamed to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this position, avoiding the need to reprocess all collection updates.

Type: string

Default: ""

# Examples

input:
  mongodb_cdc:
    url: mongodb://localhost:27017
    collections: [my_collection]
    checkpoint_cache: "my_cdc_cache"
cache_resources:
  - label: "my_cdc_cache"
    redis:
      url: redis://:6379

checkpoint_key

The key identifier used to store the oplog position in checkpoint_cache. If you have multiple mongodb_cdc inputs sharing the same cache, you can provide an alternative key.

Type: string

Default: mongodb_cdc_checkpoint

checkpoint_interval

The interval between writing checkpoints to the cache.

Type: string

Default: 5s

checkpoint_limit

The maximum number of in-flight messages emitted from this input. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given oplog position is not acknowledged until all messages under that offset are delivered.

Type: int

Default: 1000

read_batch_size

The number of documents to fetch in each message batch from MongoDB.

Type: int

Default: 1000

read_max_wait

The maximum duration MongoDB waits to accumulate the read_batch_size documents on a change stream before returning the batch to Redpanda Connect.

Type: string

Default: 1s

stream_snapshot

When set to true, this input streams a snapshot of all existing data in the source collections before streaming data changes.

Type: bool

Default: false

# Examples

stream_snapshot: true

snapshot_parallelism

Specifies the number of connections to use when reading the initial snapshot from one or more collections. Increase this number to enable parallel processing of the snapshot.

This feature uses the $splitVector command to split snapshot data into chunks for more efficient processing.

This field is only applicable when stream_snapshot is set to true.

Type: int

Default: 1

snapshot_auto_bucket_sharding

Uses the $bucketAuto command instead of the default, $splitVector, to split the snapshot data into chunks for processing. This is required for environments, such as MongoDB Atlas, where the $splitVector command is not available. To enable parallel processing in these environments:

  • Set this field to to true.

  • Set stream_snapshot to true.

  • Increase snapshot_parallelism to a value greater than 1.

Type: bool

Default: false

document_mode

The mode in which MongoDB emits document changes to Redpanda Connect, specifically updates and deletes.

Type: string

Default: update_lookup

Option Description

update_lookup

Emits full documents for insert, replace, and update operations, but only the _id field for delete operations. This corresponds to the updateLookup option. For more information, see the MongoDB documentation.

pre_and_post_images

Emits full documents for update and delete operations by using pre- and post-image collections. For setup and configuration details, see the MongoDB documentation.

partial_update

Update operations contain only a description of the changes made to the document, using the following schema:

{
  "_id": "<document_id>",
  "operations": [
    # type == set: Indicates that a value was updated. For example:
    # root.name."address.city" = "New York"
    {"path": ["name", "address.city"], "type": "set", "value": "New York"},

    # type == unset: Indicates that a value was deleted. For example:
    # root.phone = deleted()
    {"path": ["phone"], "type": "unset", "value": null},

    # type == truncatedArray: Indicates that an array was truncated to a specified number of elements. For example:
    # root.items = this.items.slice(2)
    {"path": ["items"], "type": "truncatedArray", "value": 2}
  ]
}

json_marshal_mode

Controls the format used to convert a message from BSON to JSON when it is received by Redpanda Connect.

Type: string

Default: canonical

Option Description

canonical

A string format that preserves the exact data types of values stored in MongoDB at the expense of readability and interoperability. Conversion from canonical to JSON generally preserves type information.

relaxed

A string format that is human-readable and compatible with most JSON parsers but may lose type fidelity. Conversion from relaxed format to JSON can lose type information.

app_name

The client application name.

Type: string

Default: benthos

auto_replay_nacks

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true