Docs Cloud Redpanda Connect Components Inputs mongodb_cdc mongodb_cdc Beta Available in: Cloud, Self-Managed Streams data changes from a MongoDB replica set, using MongoDB’s change streams to capture data updates. Common Advanced # Configuration fields, showing default values input: label: "" mongodb_cdc: url: mongodb://localhost:27017 # No default (required) database: "" # No default (required) username: "" # Optional password: "" # Optional collections: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mongodb_cdc_checkpoint checkpoint_interval: 5s checkpoint_limit: 1000 read_batch_size: 1000 read_max_wait: 1s stream_snapshot: false snapshot_parallelism: 1 auto_replay_nacks: true # All configuration fields, showing default values input: label: "" mongodb_cdc: url: mongodb://localhost:27017 # No default (required) database: "" # No default (required) username: "" password: "" collections: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mongodb_cdc_checkpoint checkpoint_interval: 5s checkpoint_limit: 1000 read_batch_size: 1000 read_max_wait: 1s stream_snapshot: false snapshot_parallelism: 1 snapshot_auto_bucket_sharding: false document_mode: update_lookup json_marshal_mode: canonical app_name: benthos auto_replay_nacks: true Prerequisites MongoDB version 6 or later Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking. A MongoDB database running as a replica set or in a sharded cluster using replica set protocol version 1. A MongoDB database using the WiredTiger storage engine. Enable connectivity from cloud-based data sources (BYOC) To establish a secure connection between a cloud-based data source and Redpanda Connect, you must add the NAT Gateway IP address of your Redpanda cluster to the allowlist of your data source. Data capture method The mongodb_cdc input uses change streams to capture data changes, which does not propagate all changes to Redpanda Connect. To capture all changes in a MongoDB cluster, including deletions, enable pre- and post-image saving for the cluster and required collections. For more information, see document_mode options and the MongoDB documentation. Data replication Redpanda Connect allows you to specify which database collections in your source database to receive changes from. You can also run the mongodb_cdc input in one of two modes, depending on whether you need a snapshot of existing data before streaming updates. Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected collections and streams the contents before processing changes from the last recorded operations log (oplog) position. Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest oplog position. Snapshot mode If you set the stream_snapshot field to true, Redpanda Connect connects to your MongoDB database and does the following to capture a snapshot of all data in the selected collections: Records the latest oplog position. Determines the strategy for splitting the snapshot data down into shards or chunks for more efficient processing: If snapshot_auto_bucket_sharding is set to false, the internal $splitVector command is used to compute shards. If snapshot_auto_bucket_sharding is set to true, the $bucketAuto command is used instead. This setting is for environments, such as MongoDB Atlas, where the $splitVector command is not available. This input then uses the number of connections specified in snapshot-parallelism to read the selected collections. If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current oplog position in the checkpoint_cache. Finally, the input uses the stored oplog position to catch up with changes that occurred during snapshot processing. Streaming mode If you set the stream_snapshot field to false, Redpanda Connect connects to your MongoDB database and starts processing data changes from the latest oplog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last oplog position written to the checkpoint_cache. Metadata This input adds the following metadata fields to each message: operation: The type of data change that generated the message: read, create, update, replace, delete, update. A read operation occurs when the initial snapshot of the database is processed. collection: The name of the collection from which the message originated. operation_time: The time the data change was written to the operations log (oplog) in the form of a Binary JSON (BSON) timestamp: {"t": <unix seconds>, "i": <sequence number>}. Fields url The URL of the target MongoDB server. Type: string Default: "" # Examples url: mongodb://localhost:27017 database The name of the MongoDB database to stream changes from. Type: string Default: "" username The username to connect to the database. Type: string Default: "" password The password to connect to the database. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string Default: "" collections A list of collections to stream changes from. Specify each collection name as a separate item. Type: array Default: [] # Examples collections: - orders - customers - inventory checkpoint_cache Specify a cache resource to store the oplog position for the most recent data update streamed to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this position, avoiding the need to reprocess all collection updates. Type: string Default: "" # Examples input: mongodb_cdc: url: mongodb://localhost:27017 collections: [my_collection] checkpoint_cache: "my_cdc_cache" cache_resources: - label: "my_cdc_cache" redis: url: redis://:6379 checkpoint_key The key identifier used to store the oplog position in checkpoint_cache. If you have multiple mongodb_cdc inputs sharing the same cache, you can provide an alternative key. Type: string Default: mongodb_cdc_checkpoint checkpoint_interval The interval between writing checkpoints to the cache. Type: string Default: 5s checkpoint_limit The maximum number of in-flight messages emitted from this input. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given oplog position is not acknowledged until all messages under that offset are delivered. Type: int Default: 1000 read_batch_size The number of documents to fetch in each message batch from MongoDB. Type: int Default: 1000 read_max_wait The maximum duration MongoDB waits to accumulate the read_batch_size documents on a change stream before returning the batch to Redpanda Connect. Type: string Default: 1s stream_snapshot When set to true, this input streams a snapshot of all existing data in the source collections before streaming data changes. Type: bool Default: false # Examples stream_snapshot: true snapshot_parallelism Specifies the number of connections to use when reading the initial snapshot from one or more collections. Increase this number to enable parallel processing of the snapshot. This feature uses the $splitVector command to split snapshot data into chunks for more efficient processing. This field is only applicable when stream_snapshot is set to true. Type: int Default: 1 snapshot_auto_bucket_sharding Uses the $bucketAuto command instead of the default, $splitVector, to split the snapshot data into chunks for processing. This is required for environments, such as MongoDB Atlas, where the $splitVector command is not available. To enable parallel processing in these environments: Set this field to to true. Set stream_snapshot to true. Increase snapshot_parallelism to a value greater than 1. Type: bool Default: false document_mode The mode in which MongoDB emits document changes to Redpanda Connect, specifically updates and deletes. Type: string Default: update_lookup Option Description update_lookup Emits full documents for insert, replace, and update operations, but only the _id field for delete operations. This corresponds to the updateLookup option. For more information, see the MongoDB documentation. pre_and_post_images Emits full documents for update and delete operations by using pre- and post-image collections. For setup and configuration details, see the MongoDB documentation. partial_update Update operations contain only a description of the changes made to the document, using the following schema: { "_id": "<document_id>", "operations": [ # type == set: Indicates that a value was updated. For example: # root.name."address.city" = "New York" {"path": ["name", "address.city"], "type": "set", "value": "New York"}, # type == unset: Indicates that a value was deleted. For example: # root.phone = deleted() {"path": ["phone"], "type": "unset", "value": null}, # type == truncatedArray: Indicates that an array was truncated to a specified number of elements. For example: # root.items = this.items.slice(2) {"path": ["items"], "type": "truncatedArray", "value": 2} ] } json_marshal_mode Controls the format used to convert a message from BSON to JSON when it is received by Redpanda Connect. Type: string Default: canonical Option Description canonical A string format that preserves the exact data types of values stored in MongoDB at the expense of readability and interoperability. Conversion from canonical to JSON generally preserves type information. relaxed A string format that is human-readable and compatible with most JSON parsers but may lose type fidelity. Conversion from relaxed format to JSON can lose type information. app_name The client application name. Type: string Default: benthos auto_replay_nacks Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution kafka_franz mqtt