# Create a MongoDB Sink Connector

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Create a MongoDB Sink Connector
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: managed-connectors/create-mongodb-sink-connector
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: managed-connectors/create-mongodb-sink-connector.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/managed-connectors/create-mongodb-sink-connector.adoc
description: Use the Redpanda Cloud UI to create a MongoDB Sink Connector.
page-git-created-date: "2024-06-06"
page-git-modified-date: "2025-08-05"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/create-mongodb-sink-connector.md -->

> ❗ **IMPORTANT**
>
> -   To enable this feature, contact [Redpanda Support](https://support.redpanda.com/hc/en-us/requests/new). To disable this feature, see [Disable Kafka Connect](https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/disable-kc/).
>
> -   Redpanda Support does not manage or monitor Kafka Connect. For fully-supported connectors, consider [Redpanda Connect](https://docs.redpanda.com/cloud-data-platform/develop/connect/about/).
>
> -   When Kafka Connect is enabled, there is a dedicated node running even when no connectors are deployed.

The MongoDB Sink managed connector exports Redpanda structured data to a MongoDB database.

## [](#prerequisites)Prerequisites

-   Valid credentials with the `readWrite` role to access the MongoDB database. For more granular access, you need to allow `insert`, `remove` and `update` actions for specific databases or collections.


## [](#limitations)Limitations

If you want to use the MongoDB sink connector with the `MongoDB` CDC handler for data sourced from MongoDB (using the MongoDB source connector), you must select `STRING` or `BYTES` as the value converter for both the source and sink connectors.

## [](#create-a-mongodb-sink-connector)Create a MongoDB Sink connector

To create a MongoDB Sink connector:

1.  In Redpanda Cloud, click **Connectors** in the navigation menu, and then click **Create Connector**.

2.  Select **Export to MongoDB Sink**.

3.  On the **Create Connector** page, specify the following required connector configuration options:

    | Property name | Property key | Description |
    | --- | --- | --- |
    | Topics to export | topics | A comma-separated list of the cluster topics you want to export to MongoDB. |
    | Topics regex | topics.regex | Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected. |
    | MongoDB Connection URL | connection.url | The MongoDB connection URI string to connect to your MongoDB instance or cluster. For example, mongodb://locahost/. |
    | MongoDB username | connection.username | A valid MongoDB user. |
    | MongoDB password | connection.password | The password for the account associated with the MongoDB user. |
    | MongoDB database name | database | The name of an existing MongoDB database to store output files in. |
    | Kafka message key format | key.converter | Format of the key in the Redpanda topic. Default is STRING. |
    | Kafka message value format | value.converter | Format of the value in the Redpanda topic. Default is STRING. |
    | Default MongoDB collection name | collection | (Optional). Single sink collection name to write to. If following multiple topics, then this will be the default collection to which they are mapped. |
    | Max Tasks | tasks.max | Maximum number of tasks to use for this connector. The default is 1. Each task replicates exclusive set of partitions assigned to it. |
    | Connector name | name | Globally-unique name to use for this connector. |

4.  Click **Next**. Review the connector properties specified, then click **Create**.


### [](#advanced-mongodb-sink-connector-configuration)Advanced MongoDB Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following _optional_ advanced connector configuration properties by selecting **Show advanced options** on the **Create Connector** page:

| Property name | Property key | Description |
| --- | --- | --- |
| CDC handler | change.data.capture.handler | The CDC (change data capture) handler to use for processing. The MongoDB handler requires plain JSON or BSON format. The default is NONE. |
| Key projection type | key.projection.type | The type of key projection to use: either AllowList or BlockList. |
| Key projection list | key.projection.list | A comma-separated list of field names for key projection. |
| Value projection type | value.projection.type | Only use with Value projection list. The type of value projection to use: AllowList or BlockList. The default is NONE. |
| Value projection list | value.projection.list | A comma-separated list of field names for value projection. |
| Field renamer mapping | field.renamer.mapping | An inline JSON array with objects describing field name mappings. For example: [{"oldName":"key.fieldA","newName":"field1"},{"oldName":"value.xyz","newName":"abc"}]. |
| Field used for time | timeseries.timefield | Name of the top level field used for time. Inserted documents must specify this field, and it must be of the BSON datetime type. |
| Field describing the series | timeseries.metafield | The name of the top-level field that contains metadata in each time series document. The metadata in the specified field should be data that is used to label a unique series of documents. The metadata should rarely, if ever, change. This field is used to group related data and may be of any BSON type, except for array. The metadata field may not be the same as the timeField or _id. |
| Convert the field to a BSON datetime type | timeseries.timefield.auto.convert | Converts the timeseries field to a BSON datetime type. If the value is a numeric value it will use the milliseconds from epoch. Any fractional parts are discarded. If the value is a STRING it will use the timeseries.timefield.auto.convert.date.format property to parse the date. |
| DateTimeFormatter pattern for the date | timeseries.timefield.auto.convert .date.format | The DateTimeFormatter pattern to use when converting string dates. Defaults to support ISO style date times. A string is expected to contain both the date and time. If the string only contains date information, then the time since epoch is taken from the start of that day. If a string representation does not contain a timezone offset, then the extracted date and time is interpreted as UTC. |
| Data expiry time in seconds | timeseries.expire.after.seconds | The amount of time in seconds that the data will be kept in MongoDB before being automatically deleted. |
| Data expiry time | timeseries.granularity | The expected interval between subsequent measurements for a time series. Possible values are "seconds", "minutes" or "hours". |
| Error tolerance | errors.tolerance | Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records. |
| Dead letter queue topic name | errors.deadletterqueue.topic.name | The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
| Dead letter queue topic replication factor | errors.deadletterqueue.topic .replication.factor | Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
| Enable error context headers | errors.deadletterqueue.context .headers.enable | When true, adds a header containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, start with __connect.errors. |

## [](#map-data)Map data

Use the appropriate key or value converter (input data format) for your data as follows:

-   `JSON` (`org.apache.kafka.connect.json.JsonConverter`) when your messages are structured JSON. Select `Message JSON contains schema`, with the `schema` and `payload` fields.

-   `AVRO` (`io.confluent.connect.avro.AvroConverter`) when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.

-   `STRING` (`org.apache.kafka.connect.storage.StringConverter`) when your messages contain plaintext JSON.

-   `BYTES` (`org.apache.kafka.connect.converters.ByteArrayConverter`) when your messages contain BSON.


## [](#test-the-connection)Test the connection

After the connector is created, verify that your new collections apper in your MongoDB database:

show collections

## [](#use-the-connectors-api)Use the Connectors API

When using the Connectors API, instead of specifying a value for `connection.url`, `connection.username`, and `connection.password`, you can specify a value for `connection.uri` in the form `mongodb+srv://username:password@cluster0.xxx.mongodb.net`.

## [](#troubleshoot)Troubleshoot

Issues are reported using a failed task error message. Select **Show Logs** to view error details.

| Message | Action |
| --- | --- |
| Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv:// | Check to make sure the Connection URI is a valid MongoDB URL. |
| Unable to connect to the server. | Check to ensure that the Connection URI is valid and that the MongoDB server accepts connections. |
| Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}. | Check to ensure that you specified valid username and password credentials. |
| DataException: Could not convert key into a BsonDocument. | Make sure your message keys are valid JSONs or skip configuration for fields that require valid JSON keys. |
| DataException: Error: operationType field doc is missing. | Make sure the input record format is correct (produced by a MongoDB source connector if you use MongoDB CDC handler). |
| DataException: Value document is missing or CDC operation is not a string | Make sure the input record format is correct (produced by a Debezium source connector if you use Debezium CDC handler). |
| JsonParseException: Unrecognized token 'text': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') | Make sure the input record format is JSON. |
| Unexpected documentKey field type, expecting a document but found BsonString…​: {…​} | Make sure the source data is in the plain JSON or BSON format (value converter STRING or BYTES). |

## [](#suggested-reading)Suggested reading

-   [MongoDB Kafka Sink Connector](https://www.mongodb.com/docs/kafka-connector/current/sink-connector/)