# mongodb_cdc

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: mongodb_cdc
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: inputs/mongodb_cdc
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: inputs/mongodb_cdc.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/components/pages/inputs/mongodb_cdc.adoc
page-git-created-date: "2025-03-11"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/connect/components/inputs/mongodb_cdc.md -->

**Available in:** [Cloud](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/mongodb_cdc/%20%22View%20the%20Cloud%20version%20of%20this%20component%22), Self-Managed

**License**: This component requires an [enterprise license](https://docs.redpanda.com/redpanda-connect/get-started/licensing/). You can either [upgrade to an Enterprise Edition license](https://www.redpanda.com/upgrade), or [generate a trial license key](http://redpanda.com/try-enterprise) that's valid for 30 days.

Streams data changes from a MongoDB replica set, using MongoDB’s [change streams](https://www.mongodb.com/docs/manual/changeStreams/) to capture data updates.

Introduced in version 4.48.1.

#### Common

```yml
inputs:
  label: ""
  mongodb_cdc:
    url: "" # No default (required)
    database: "" # No default (required)
    username: ""
    password: ""
    collections: [] # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_cache: "" # No default (required)
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    auto_replay_nacks: true
```

#### Advanced

```yml
inputs:
  label: ""
  mongodb_cdc:
    url: "" # No default (required)
    database: "" # No default (required)
    username: ""
    password: ""
    collections: [] # No default (required)
    checkpoint_key: mongodb_cdc_checkpoint
    checkpoint_cache: "" # No default (required)
    checkpoint_interval: 5s
    checkpoint_limit: 1000
    read_batch_size: 1000
    read_max_wait: 1s
    stream_snapshot: false
    snapshot_parallelism: 1
    snapshot_auto_bucket_sharding: false
    document_mode: update_lookup
    json_marshal_mode: canonical
    app_name: benthos
    auto_replay_nacks: true
```

## [](#prerequisites)Prerequisites

-   MongoDB version 6 or later

-   Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment.

-   A MongoDB database running as a [replica set](https://www.mongodb.com/docs/manual/replication/#replication-in-mongodb) or in a [sharded cluster](https://www.mongodb.com/docs/manual/sharding/) using replica set [protocol version 1](https://www.mongodb.com/docs/manual/reference/replica-configuration/#rsconf.protocolVersion).

-   A MongoDB database using the [WiredTiger](https://www.mongodb.com/docs/manual/core/wiredtiger/#storage-wiredtiger) storage engine.


## [](#enable-connectivity-from-cloud-based-data-sources)Enable connectivity from cloud-based data sources

To establish a secure connection between a cloud-based data source and Redpanda Connect, you must add the IP addresses of your Redpanda Connect instances to your firewall rules.

## [](#data-capture-method)Data capture method

The `mongodb_cdc` input uses [change streams](https://www.mongodb.com/docs/manual/changeStreams/) to capture data changes, which does not propagate _all_ changes to Redpanda Connect. To capture all changes in a MongoDB cluster, including deletions, enable pre- and post-image saving for the cluster and [required collections](#collections). For more information, see [`document_mode` options](#document_mode) and the [MongoDB documentation](https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre—​and-post-images).

## [](#data-replication)Data replication

Redpanda Connect allows you to specify which [database collections](#collections) in your source database to receive changes from.

You can also run the `mongodb_cdc` input in one of two modes, depending on whether you need a snapshot of existing data before streaming updates.

-   Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected collections and streams the contents before processing changes from the last recorded [operations log (oplog)](https://www.mongodb.com/docs/manual/core/replica-set-oplog/) position.

-   Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest oplog position.


### [](#snapshot-mode)Snapshot mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `true`, Redpanda Connect connects to your MongoDB database and does the following to capture a snapshot of all data in the selected collections:

1.  Records the latest oplog position.

2.  Determines the strategy for splitting the snapshot data down into shards or chunks for more efficient processing:

    1.  If [`snapshot_auto_bucket_sharding`](#snapshot_auto_bucket_sharding) is set to `false`, the internal `$splitVector` command is used to compute shards.

    2.  If [`snapshot_auto_bucket_sharding`](#snapshot_auto_bucket_sharding) is set to `true`, the [`$bucketAuto`](https://www.mongodb.com/docs/manual/reference/operator/aggregation/bucketAuto/) command is used instead. This setting is for environments, such as MongoDB Atlas, where the `$splitVector` command is not available.


3.  This input then uses the number of connections specified in [`snapshot-parallelism`](#snapshot_parallelism) to read the selected collections.

    > 📝 **NOTE**
    >
    > If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current oplog position in the [`checkpoint_cache`](#checkpoint_cache).

4.  Finally, the input uses the stored oplog position to catch up with changes that occurred during snapshot processing.


### [](#streaming-mode)Streaming mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `false`, Redpanda Connect connects to your MongoDB database and starts processing data changes from the latest oplog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last oplog position written to the [`checkpoint_cache`](#checkpoint_cache).

## [](#metadata)Metadata

This input adds the following metadata fields to each message:

-   `operation`: The type of data change that generated the message: `read`, `create`, `update`, `replace`, `delete`, `update`. A `read` operation occurs when the initial snapshot of the database is processed.

-   `collection`: The name of the collection from which the message originated.

-   `operation_time`: The time the data change was written to the [operations log (oplog)](https://www.mongodb.com/docs/manual/core/replica-set-oplog/) in the form of a Binary JSON (BSON) timestamp: `{"t": <unix seconds>, "i": <sequence number>}`.


## [](#fields)Fields

### [](#app_name)`app_name`

The client application name.

**Type**: `string`

**Default**: `benthos`

### [](#auto_replay_nacks)`auto_replay_nacks`

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set `auto_replay_nacks` to `false` to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

**Type**: `bool`

**Default**: `true`

### [](#checkpoint_cache)`checkpoint_cache`

Specify a [`cache` resource](https://docs.redpanda.com/connect/components/caches/about/) to store the oplog position for the most recent data update streamed to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this position, avoiding the need to reprocess all collection updates.

**Type**: `string`

### [](#checkpoint_interval)`checkpoint_interval`

The interval between writing checkpoints to the cache.

**Type**: `string`

**Default**: `5s`

### [](#checkpoint_key)`checkpoint_key`

The key identifier used to store the oplog position in [`checkpoint_cache`](#checkpoint_cache). If you have multiple `mongodb_cdc` inputs sharing the same cache, you can provide an alternative key.

**Type**: `string`

**Default**: `mongodb_cdc_checkpoint`

### [](#checkpoint_limit)`checkpoint_limit`

The maximum number of in-flight messages emitted from this input. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given oplog position is not acknowledged until all messages under that offset are delivered.

**Type**: `int`

**Default**: `1000`

### [](#collections)`collections[]`

A list of collections to stream changes from. Specify each collection name as a separate item.

**Type**: `array`

### [](#database)`database`

The name of the MongoDB database to stream changes from.

**Type**: `string`

### [](#document_mode)`document_mode`

The mode in which MongoDB emits document changes to Redpanda Connect, specifically updates and deletes.

**Type**: `string`

**Default**: `update_lookup`

| Option | Summary |
| --- | --- |
| partial_update | In this mode update operations only have a description of the update operation, which follows the following schema: { "_id": <document_id>, "operations": [ # type == set means that the value was updated like so: # root.foo."bar.baz" = "world" {"path": ["foo", "bar.baz"], "type": "set", "value":"world"}, # type == unset means that the value was deleted like so: # root.qux = deleted() {"path": ["qux"], "type": "unset", "value": null}, # type == truncatedArray means that the array at that path was truncated to value number of elements # root.array = this.array.slice(2) {"path": ["array"], "type": "truncatedArray", "value": 2} ] } |
| pre_and_post_images | Uses pre and post image collection to emit the full documents for update and delete operations. To use and configure this mode see the setup steps in the ^MongoDB documentation. |
| update_lookup | In this mode insert, replace and update operations have the full document emitted and deletes only have the _id field populated. Documents updates lookup the full document. This corresponds to the updateLookup option, see the ^MongoDB documentation for more information. |

### [](#json_marshal_mode)`json_marshal_mode`

Controls the format used to convert a message from BSON to JSON when it is received by Redpanda Connect.

**Type**: `string`

**Default**: `canonical`

| Option | Summary |
| --- | --- |
| canonical | A string format that emphasizes type preservation at the expense of readability and interoperability. That is, conversion from canonical to BSON will generally preserve type information except in certain specific cases. |
| relaxed | A string format that emphasizes readability and interoperability at the expense of type preservation.That is, conversion from relaxed format to BSON can lose type information. |

### [](#password)`password`

The password to connect to the database.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Secrets](https://docs.redpanda.com/connect/configuration/secrets/).

**Type**: `string`

**Default**: `""`

### [](#read_batch_size)`read_batch_size`

The number of documents to fetch in each message batch from MongoDB.

**Type**: `int`

**Default**: `1000`

### [](#read_max_wait)`read_max_wait`

The maximum duration MongoDB waits to accumulate the [`read_batch_size`](#read_batch_size) documents on a change stream before returning the batch to Redpanda Connect.

**Type**: `string`

**Default**: `1s`

### [](#snapshot_auto_bucket_sharding)`snapshot_auto_bucket_sharding`

Uses the [`$bucketAuto`](https://www.mongodb.com/docs/manual/reference/operator/aggregation/bucketAuto/) command instead of the default, `$splitVector`, to split the snapshot data into chunks for processing. This is required for environments, such as MongoDB Atlas, where the `$splitVector` command is not available. To enable parallel processing in these environments:

-   Set this field to to `true`.

-   Set `stream_snapshot` to `true`.

-   Increase `snapshot_parallelism` to a value greater than `1`.


**Type**: `bool`

**Default**: `false`

### [](#snapshot_parallelism)`snapshot_parallelism`

Specifies the number of connections to use when reading the initial snapshot from one or more collections. Increase this number to enable parallel processing of the snapshot.

This feature uses the `$splitVector` command to split snapshot data into chunks for more efficient processing.

This field is only applicable when `stream_snapshot` is set to `true`.

**Type**: `int`

**Default**: `1`

### [](#stream_snapshot)`stream_snapshot`

When set to `true`, this input streams a snapshot of all existing data in the source collections before streaming data changes.

**Type**: `bool`

**Default**: `false`

### [](#url)`url`

The URL of the target MongoDB server.

**Type**: `string`

```yaml
# Examples:
url: mongodb://localhost:27017
```

### [](#username)`username`

The username to connect to the database.

**Type**: `string`

**Default**: `""`