Create a MongoDB Source Connector
The MongoDB Source managed connector imports collections from a MongoDB database into Redpanda topics.
Prerequisites
-
Access to the MongoDB database
-
Valid credentials with
read
permission to access the MongoDB database
Create a MongoDB Source connector
To create a MongoDB Source connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Import from MongoDB.
-
On the Create Connector page, specify the following required connector configuration options:
Property Description Topic prefix
Prefix to prepend to database and collection names to generate the name of the Kafka topic to which to publish data. Used by the
DefaultTopicMapper
.MongoDB Connection URL
The MongoDB connection URL string as supported by the official drivers. For example,
mongodb://locahost/
.MongoDB username
A valid MongoDB user.
MongoDB password
The password for the account associated with the MongoDB user.
Database to watch
The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases.
Kafka message key format
Format of the key in the Kafka topic. Default is
STRING
.Kafka message value format
Format of the value in the Kafka topic. Default is
STRING
.Collection to watch
The collection in the MongoDB database to watch. If not set, then all collections are watched.
Start up behavior when there is no source offset available
Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are:
-
latest
(default): The connector creates a new change stream, processes change events from it and stores resume tokens from them, thus ignoring all existing source data. -
timestamp
: actuatesstartup.mode.timestamp.*
properties. If no such properties are configured, thentimestamp
is equivalent tolatest
. -
copy_existing
: actuatesstartup.mode.copy.existing.*
properties. The connector creates a new change stream and stores its resume token, copies all existing data from all the collections being used as the source, then processes new data starting from the stored resume token. Note that reads of all the data during the copy and subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the source data, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent, it’s possible to apply them multiple times with the same effect as if they were applied once. Renaming a collection during the copying process is not supported.
Max Tasks
Maximum number of tasks to use for this connector. The default is
1
. Each task replicates an exclusive set of partitions assigned to it.Connector name
Globally-unique name to use for this connector.
-
-
Click Next. Review the connector properties specified, then click Create.
Advanced MongoDB Source connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property | Description |
---|---|
|
Specifies whether or not to infer the schema for the value. Each Document is processed in isolation, which may lead to multiple schema definitions for the data. Only enable when Kafka message value format is set to |
|
Actuated only if |
|
Use a regular expression to define which existing namespaces data should be copied from. A namespace is the database name and collection, separated by a period (for example, |
|
An inline JSON array with objects describing the pipeline operations to run when copying existing data. Specifying this property can improve the use of indexes by the copying manager and make copying more efficient. Use this property if there is any filtering of collection data in the |
|
An inline JSON array with objects describing the pipeline operations to run. For example: |
|
Specifies what to return for update operations when using a change stream. When set to |
|
Specifies the pre-image configuration when creating a change stream. The pre-image is not available in source records published while copying existing data as a result of enabling |
|
When enabled, only publishes the actual changed document (rather than the full change stream document). Automatically sets |
|
When enabled, requires |
|
Error tolerance response during connector operation. Default value is |
Map data
-
AVRO
orJSON
for output with a preset schema. Additionally, you can setEnable Infer Schemas
for the value. Each document will be processed in isolation, which may lead to multiple schema definitions for the data. -
STRING
when your messages contain plaintext JSON. -
BYTES
when your messages contain BSON.
After the connector is created, check to ensure that:
-
There are no errors in logs and in Redpanda Console.
-
Redpanda topics contain data from relational database tables.
Troubleshoot
Most MongoDB Source connector issues are identified in the connector creation phase. Invalid Include Tables are reported in logs. Select Show Logs to view error details.
Message | Action |
---|---|
Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv:// |
Check to make sure the |
Unable to connect to the server. |
Check to ensure that the |
Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}. |
Check to ensure that you specified valid username and password credentials. |
MongoCommandException: Command failed with error 8000 (AtlasError): 'user is not allowed to do action [find] on [db1.characters]' on server ac-nboibsg-shard-00-01.4hagsz0.mongodb.net:27017. The full response is {"ok": 0, "errmsg": "user is not allowed to do action [find] on [db1.characters]", "code": 8000, "codeName": "AtlasError"} |
Check the permissions of the MongoDB user. Also confirm that the MongoDB server accepts connections. |