Create a MongoDB Sink Connector
The MongoDB Sink managed connector exports Redpanda structured data to a MongoDB database.
Create a MongoDB Sink connector
To create a MongoDB Sink connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Export to MongoDB Sink.
-
On the Create Connector page, specify the following required connector configuration options:
Property Description Topics to export
A comma-separated list of the cluster topics you want to export to MongoDB.
MongoDB Connection URL
The MongoDB connection URI string to connect to your MongoDB instance or cluster. For example,
mongodb://locahost/
.MongoDB username
A valid MongoDB user.
MongoDB password
The password for the account associated with the MongoDB user.
MongoDB database name
The name of an existing MongoDB database to store output files in.
Kafka message key format
Format of the key in the Kafka topic. Default is
STRING
.Kafka message value format
Format of the value in the Kafka topic. Default is
STRING
.Default MongoDB collection name
(Optional). Single sink collection name to write to. If following multiple topics, then this will be the default collection to which they are mapped.
Max Tasks
Maximum number of tasks to use for this connector. The default is
1
. Each task replicates exclusive set of partitions assigned to it.Connector name
Globally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Advanced MongoDB Sink connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property | Description |
---|---|
|
The CDC (change data capture) handler to use for processing. The MongoDB handler requires plain JSON or BSON format. The default is |
|
The type of key projection to use: either |
|
A comma-separated list of field names for key projection. |
|
Only use with |
|
A comma-separated list of field names for value projection. |
|
An inline JSON array with objects describing field name mappings. For example: |
|
Name of the top level field used for time. Inserted documents must specify this field, and it must be of the |
|
The name of the top-level field that contains metadata in each time series document. The metadata in the specified field should be data that is used to label a unique series of documents. The metadata should rarely, if ever, change. This field is used to group related data and may be of any |
|
Converts the timeseries field to a |
|
The DateTimeFormatter pattern to use when converting string dates. Defaults to support ISO style date times. A string is expected to contain both the date and time. If the string only contains date information, then the time since epoch is taken from the start of that day. If a string representation does not contain a timezone offset, then the extracted date and time is interpreted as UTC. |
|
The amount of time in seconds that the data will be kept in MongoDB before being automatically deleted. |
|
The expected interval between subsequent measurements for a time series. Possible values are |
|
Error tolerance response during connector operation. Default value is |
|
The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
|
Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
|
When |
Map data
Use the appropriate key or value converter (input data format) for your data as follows:
-
JSON
when your messages are structured JSON. SelectMessage JSON contains schema
, with theschema
andpayload
fields. -
AVRO
when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry. -
STRING
when your messages contain plaintext JSON. -
BYTES
when your messages contain BSON.
Test the connection
After the connector is created, verify that your new collections appear in your MongoDB database:
show collections
Troubleshoot
Issues are reported using a failed task error message.
Message | Action |
---|---|
Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv:// |
Check to make sure the |
Unable to connect to the server. |
Check to ensure that the |
Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}. |
Check to ensure that you specified valid username and password credentials. |
DataException: Could not convert key `` into a BsonDocument. |
Make sure your message keys are valid JSONs or skip configuration for fields that require valid JSON keys. |
DataException: Error: |
Make sure the input record format is correct (produced by a MongoDB source connector if you use MongoDB CDC handler). |
DataException: Value document is missing or CDC operation is not a string |
Make sure the input record format is correct (produced by a Debezium source connector if you use Debezium CDC handler). |
JsonParseException: Unrecognized token 'text': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') |
Make sure the input record format is JSON. |