Create a MongoDB Source Connector
The MongoDB Source managed connector imports collections from a MongoDB database into Redpanda topics.
Prerequisites
- Access to the MongoDB database
- Valid credentials with
read
permission to access the MongoDB database
Create a MongoDB Source connector
To create a MongoDB Source connector:
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
Select Import from MongoDB.
On the Create Connector page, specify the following required connector configuration options:
Property Description Topic prefix
Prefix to prepend to database and collection names to generate the name of the Kafka topic to which to publish data. Used by the DefaultTopicMapper
.MongoDB Connection URL
The MongoDB connection URL string as supported by the official drivers. For example, mongodb://locahost/
.MongoDB username
A valid MongoDB user. MongoDB password
The password for the account associated with the MongoDB user. Database to watch
The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases. Kafka message key format
Format of the key in the Kafka topic. Default is STRING
.Kafka message value format
Format of the value in the Kafka topic. Default is STRING
.Collection to watch
The collection in the MongoDB database to watch. If not set, then all collections are watched. Start up behavior when there is no source offset available
Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are: latest
(default): The connector creates a new change stream, processes change events from it and stores resume tokens from them, thus ignoring all existing source data.timestamp
: actuatesstartup.mode.timestamp.*
properties. If no such properties are configured, thentimestamp
is equivalent tolatest
.copy_existing
: actuatesstartup.mode.copy.existing.*
properties. The connector creates a new change stream and stores its resume token, copies all existing data from all the collections being used as the source, then processes new data starting from the stored resume token. Note that reads of all the data during the copy and subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the source data, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent, it's possible to apply them multiple times with the same effect as if they were applied once. Renaming a collection during the copying process is not supported.
Max Tasks
Maximum number of tasks to use for this connector. The default is 1
. Each task replicates an exclusive set of partitions assigned to it.Connector name
Globally-unique name to use for this connector. Click Next. Review the connector properties specified, then click Create.
Advanced MongoDB Source connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property | Description |
---|---|
Enable Infer Schemas for the value | Specifies whether or not to infer the schema for the value. Each Document is processed in isolation, which may lead to multiple schema definitions for the data. Only enable when Kafka message value format is set to AVRO or JSON . |
startAtOperationTime | Actuated only if startup.mode = timestamp specifies the starting point for the change stream. Must be either an integer number of seconds because the Epoch is in the decimal format (for example: 30 ), or an instant in the ISO-8601 format with one second precision (for example: 1970-01-01T00:00:30Z ), or a BSON timestamp in the canonical extended JSON (v2) format (for example: {"$timestamp": {"t": 30, "i": 0}} ). You can specify 0 to start at the beginning of the oplog. Requires MongoDB 4.0 or above. For more detail, see the $changeStream definition. |
Copy existing namespace regex | Use a regular expression to define which existing namespaces data should be copied from. A namespace is the database name and collection, separated by a period (for example, database.collection ). Example: The following regular expression only includes collections starting with a in the demo database: demo\.a.* . |
Copy existing initial pipeline | An inline JSON array with objects describing the pipeline operations to run when copying existing data. Specifying this property can improve the use of indexes by the copying manager and make copying more efficient. Use this property if there is any filtering of collection data in the pipeline configuration to speed up the copying process. For example: [{"$match": {"closed": "false"}}] . |
Pipeline to apply to the change stream | An inline JSON array with objects describing the pipeline operations to run. For example: [{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}] . |
fullDocument | Specifies what to return for update operations when using a change stream. When set to updateLookup , the change stream for partial updates will include both a delta describing the changes to the document, and a copy of the entire document that was changed at some point after the change occurred. See db.collection.watch for more detail. |
fullDocumentBeforeChange | Specifies the pre-image configuration when creating a change stream. The pre-image is not available in source records published while copying existing data as a result of enabling copy.existing . The pre-image configuration has no effect on copying. Requires MongoDB 6.0 or above. For details, see possible values. |
Publish only the fullDocument | When enabled, only publishes the actual changed document (rather than the full change stream document). Automatically sets change.stream.full.document=updateLookup so updated documents will be included. |
Send a null value on a delete event | When enabled, requires publish.full.document.only=true . Default is false (disabled). |
Error tolerance | Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records. |
Map data
AVRO
orJSON
for output with a preset schema. Additionally, you can setEnable Infer Schemas
for the value. Each document will be processed in isolation, which may lead to multiple schema definitions for the data.STRING
when your messages contain plaintext JSON.BYTES
when your messages contain BSON.
After the connector is created, check to ensure that:
- There are no errors in logs and in Redpanda Console.
- Redpanda topics contain data from relational database tables.
Troubleshoot
Most MongoDB Source connector issues are identified in the connector creation phase. Invalid Include Tables are reported in logs.
Message | Action |
---|---|
Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv:// | Check to make sure the MongoDB Connection URL is a valid MongoDB URL. |
Unable to connect to the server. | Check to ensure that the MongoDB Connection URL is valid and that the MongoDB server accepts connections. |
Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}. | Check to ensure that you specified valid username and password credentials. |
MongoCommandException: Command failed with error 8000 (AtlasError): 'user is not allowed to do action [find] on [db1.characters]' on server ac-nboibsg-shard-00-01.4hagsz0.mongodb.net:27017. The full response is {"ok": 0, "errmsg": "user is not allowed to do action [find] on [db1.characters]", "code": 8000, "codeName": "AtlasError"} | Check the permissions of the MongoDB user. Also confirm that the MongoDB server accepts connections. |