Create a MongoDB Source Connector

The MongoDB Source managed connector imports collections from a MongoDB database into Redpanda topics.


  • Valid credentials with the read role to access the MongoDB database. For more granular access, you need to allow find and changeStream actions for specific databases or collections.

Create a MongoDB Source connector

To create a MongoDB Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from MongoDB.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topic prefix


    Prefix to prepend to database and collection names to generate the name of the Kafka topic to which to publish data. Used by the DefaultTopicMapper.

    MongoDB Connection URL


    The MongoDB connection URL string as supported by the official drivers. For example, mongodb://locahost/.

    MongoDB username


    A valid MongoDB user.

    MongoDB password


    The password for the account associated with the MongoDB user.

    Database to watch


    The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases.

    Kafka message key format


    Format of the key in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON.

    Kafka message value format


    Format of the value in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON.

    Collection to watch


    The collection in the MongoDB database to watch. If not set, then all collections are watched.

    Start up behavior when there is no source offset available


    Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are:

    • latest (default): The connector creates a new change stream, processes change events from it and stores resume tokens from them, thus ignoring all existing source data.

    • timestamp: actuates startup.mode.timestamp.* properties. If no such properties are configured, then timestamp is equivalent to latest.

    • copy_existing: actuates startup.mode.copy.existing.* properties. The connector creates a new change stream and stores its resume token, copies all existing data from all the collections being used as the source, then processes new data starting from the stored resume token. Note that reads of all the data during the copy and subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the source data, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent, it’s possible to apply them multiple times with the same effect as if they were applied once. Renaming a collection during the copying process is not supported.

    Max Tasks


    Maximum number of tasks to use for this connector. The default is 1. Each task replicates an exclusive set of partitions assigned to it.

    Connector name


    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced MongoDB Source connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

Enable Infer Schemas for the value


Specifies whether or not to infer the schema for the value. Each Document is processed in isolation, which may lead to multiple schema definitions for the data. Only enable when Kafka message value format is set to AVRO or JSON.



Actuated only if startup.mode = timestamp specifies the starting point for the change stream. Must be either an integer number of seconds because the Epoch is in the decimal format (for example: 30), or an instant in the ISO-8601 format with one second precision (for example: 1970-01-01T00:00:30Z), or a BSON timestamp in the canonical extended JSON (v2) format (for example: {"$timestamp": {"t": 30, "i": 0}}). You can specify 0 to start at the beginning of the oplog. Requires MongoDB 4.0 or above. For more detail, see the $changeStream definition.

Copy existing namespace regex

startup.mode.copy.existing .namespace.regex

Use a regular expression to define which existing namespaces data should be copied from. A namespace is the database name and collection, separated by a period (for example, database.collection). Example: The following regular expression only includes collections starting with a in the demo database: demo\.a.*.

Copy existing initial pipeline

startup.mode.copy.existing .pipeline

An inline JSON array with objects describing the pipeline operations to run when copying existing data. Specifying this property can improve the use of indexes by the copying manager and make copying more efficient. Use this property if there is any filtering of collection data in the pipeline configuration to speed up the copying process. For example: [{"$match": {"closed": "false"}}].

Pipeline to apply to the change stream


An inline JSON array with objects describing the pipeline operations to run. For example: [{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}].


Specifies what to return for update operations when using a change stream. When set to updateLookup, the change stream for partial updates will include both a delta describing the changes to the document, and a copy of the entire document that was changed _ at some point_ after the change occurred. See for more detail.

fullDocumentBeforeChange .before.change

Specifies the pre-image configuration when creating a change stream. The pre-image is not available in source records published while copying existing data as a result of enabling copy.existing. The pre-image configuration has no effect on copying. Requires MongoDB 6.0 or above. For details, see possible values.

Publish only the fullDocument


When enabled, only publishes the actual changed document (rather than the full change stream document). Automatically sets so updated documents will be included.

Send a null value on a delete event

publish.full.document.only .tombstone.on.delete

When enabled, requires publish.full.document.only=true. Default is false (disabled).

Error tolerance


Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records.

Map data

  • AVRO (io.confluent.connect.avro.AvroConverter) or JSON (org.apache.kafka.connect.json.JsonConverter) for output with a preset schema. Additionally, you can set Enable Infer Schemas for the value. Each document will be processed in isolation, which may lead to multiple schema definitions for the data.

  • STRING ( when your messages contain plaintext JSON.

  • BYTES (org.apache.kafka.connect.converters.ByteArrayConverter) when your messages contain BSON.

After the connector is created, check to ensure that:

  • There are no errors in logs and in Redpanda Console.

  • Redpanda topics contain data from relational database tables.


Most MongoDB Source connector issues are identified in the connector creation phase. Invalid Include Tables are reported in logs. Select Show Logs to view error details.

Message Action

Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv://

Check to make sure the MongoDB Connection URL is a valid MongoDB URL.

Unable to connect to the server.

Check to ensure that the MongoDB Connection URL is valid and that the MongoDB server accepts connections.

Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}.

Check to ensure that you specified valid username and password credentials.

MongoCommandException: Command failed with error 8000 (AtlasError): 'user is not allowed to do action [find] on [db1.characters]' on server The full response is {"ok": 0, "errmsg": "user is not allowed to do action [find] on [db1.characters]", "code": 8000, "codeName": "AtlasError"}

Check the permissions of the MongoDB user. Also confirm that the MongoDB server accepts connections.

Suggested reading