Docs Cloud Develop Kafka Connect MongoDB Source Connector Create a MongoDB Source Connector The MongoDB Source managed connector imports collections from a MongoDB database into Redpanda topics. Prerequisites Valid credentials with the read role to access the MongoDB database. For more granular access, you need to allow find and changeStream actions for specific databases or collections. Create a MongoDB Source connector To create a MongoDB Source connector: In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector. Select Import from MongoDB. On the Create Connector page, specify the following required connector configuration options: Property name Property key Description Topic prefix topic.prefix Prefix to prepend to database and collection names to generate the name of the Kafka topic to which to publish data. Used by the DefaultTopicMapper. MongoDB Connection URL connection.url The MongoDB connection URL string as supported by the official drivers. For example, mongodb://locahost/. MongoDB username connection.username A valid MongoDB user. MongoDB password connection.password The password for the account associated with the MongoDB user. Database to watch database The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases. Kafka message key format key.converter Format of the key in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON. Kafka message value format value.converter Format of the value in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON. Collection to watch collection The collection in the MongoDB database to watch. If not set, then all collections are watched. Start up behavior when there is no source offset available startup.mode Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are: latest (default): The connector creates a new change stream, processes change events from it and stores resume tokens from them, thus ignoring all existing source data. timestamp: actuates startup.mode.timestamp.* properties. If no such properties are configured, then timestamp is equivalent to latest. copy_existing: actuates startup.mode.copy.existing.* properties. The connector creates a new change stream and stores its resume token, copies all existing data from all the collections being used as the source, then processes new data starting from the stored resume token. Note that reads of all the data during the copy and subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the source data, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent, it’s possible to apply them multiple times with the same effect as if they were applied once. Renaming a collection during the copying process is not supported. Connector name name Globally-unique name to use for this connector. Click Next. Review the connector properties specified, then click Create. Advanced MongoDB Source connector configuration In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page: Property name Property key Description Enable Infer Schemas for the value output.schema.infer.value Specifies whether or not to infer the schema for the value. Each Document is processed in isolation, which may lead to multiple schema definitions for the data. Only enable when Kafka message value format is set to AVRO or JSON. startAtOperationTime startup.mode.timestamp .start.at.operation.time Actuated only if startup.mode = timestamp specifies the starting point for the change stream. Must be either an integer number of seconds because the Epoch is in the decimal format (for example: 30), or an instant in the ISO-8601 format with one second precision (for example: 1970-01-01T00:00:30Z), or a BSON timestamp in the canonical extended JSON (v2) format (for example: {"$timestamp": {"t": 30, "i": 0}}). You can specify 0 to start at the beginning of the oplog. Requires MongoDB 4.0 or above. For more detail, see the $changeStream definition. Copy existing namespace regex startup.mode.copy.existing .namespace.regex Use a regular expression to define which existing namespaces data should be copied from. A namespace is the database name and collection, separated by a period (for example, database.collection). Example: The following regular expression only includes collections starting with a in the demo database: demo\.a.*. Copy existing initial pipeline startup.mode.copy.existing .pipeline An inline JSON array with objects describing the pipeline operations to run when copying existing data. Specifying this property can improve the use of indexes by the copying manager and make copying more efficient. Use this property if there is any filtering of collection data in the pipeline configuration to speed up the copying process. For example: [{"$match": {"closed": "false"}}]. Pipeline to apply to the change stream pipeline An inline JSON array with objects describing the pipeline operations to run. For example: [{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]. fullDocument change.stream.full.document Specifies what to return for update operations when using a change stream. When set to updateLookup, the change stream for partial updates will include both a delta describing the changes to the document, and a copy of the entire document that was changed _ at some point_ after the change occurred. See db.collection.watch for more detail. fullDocumentBeforeChange change.stream.full.document .before.change Specifies the pre-image configuration when creating a change stream. The pre-image is not available in source records published while copying existing data as a result of enabling copy.existing. The pre-image configuration has no effect on copying. Requires MongoDB 6.0 or above. For details, see possible values. Publish only the fullDocument publish.full.document.only When enabled, only publishes the actual changed document (rather than the full change stream document). Automatically sets change.stream.full.document=updateLookup so updated documents will be included. Send a null value on a delete event publish.full.document.only .tombstone.on.delete When enabled, requires publish.full.document.only=true. Default is false (disabled). Error tolerance mongo.errors.tolerance Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records. Heartbeat interval milliseconds heartbeat.interval.ms The length of time it takes when sending heartbeat messages to record the post-batch resume token when no source records have been published. Improves the resumability of the connector for low volume namespaces. Specify 0 to disable. heartbeat topic name heartbeat.topic.name The name of the topic to publish heartbeats to. Defaults to __mongodb_heartbeats. Offset partition name offset.partition.name Use to specify a custom offset partition name. If blank, the default partition name based on the connection details is used. Topic creation enabled topic.creation.enable Specifies whether or not to allow automatic creation of topics. Default is true. Topic creation partitions topic.creation.default. partitions Specifies the number of partitions for the created topics. The default is 1. Topic creation replication factor topic.creation.default. replication.factor Specifies the replication factor for the created topics. The default is -1. Map data AVRO (io.confluent.connect.avro.AvroConverter) or JSON (org.apache.kafka.connect.json.JsonConverter) for output with a preset schema. Additionally, you can set Enable Infer Schemas for the value. Each document will be processed in isolation, which may lead to multiple schema definitions for the data. STRING (org.apache.kafka.connect.storage.StringConverter) when your messages contain plaintext JSON. BYTES (org.apache.kafka.connect.converters.ByteArrayConverter) when your messages contain BSON. After the connector is created, check to ensure that: There are no errors in logs and in Redpanda Console. Redpanda topics contain data from relational database tables. Use the Connectors API When using the Connectors API, instead of specifying a value for connection.url, connection.username, and connection.password, you can specify a value for connection.uri in the form mongodb+srv://username:password@cluster0.xxx.mongodb.net. Troubleshoot Most MongoDB Source connector issues are identified in the connector creation phase. Invalid Include Tables are reported in logs. Select Show Logs to view error details. Message Action Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv:// Check to make sure the MongoDB Connection URL is a valid MongoDB URL. Unable to connect to the server. Check to ensure that the MongoDB Connection URL is valid and that the MongoDB server accepts connections. Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}. Check to ensure that you specified valid username and password credentials. MongoCommandException: Command failed with error 8000 (AtlasError): 'user is not allowed to do action [find] on [db1.characters]' on server ac-nboibsg-shard-00-01.4hagsz0.mongodb.net:27017. The full response is {"ok": 0, "errmsg": "user is not allowed to do action [find] on [db1.characters]", "code": 8000, "codeName": "AtlasError"} Check the permissions of the MongoDB user. Also confirm that the MongoDB server accepts connections. Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog See Troubleshoot invalid resume token Suggested reading MongoDB Kafka Source Connector Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution MongoDB Sink Connector MySQL (Debezium) Source Connector