Create a MongoDB Source Connector

The MongoDB Source managed connector imports collections from a MongoDB database into Redpanda topics.

Prerequisites

  • Valid credentials with the read role to access the MongoDB database. For more granular access, you need to allow find and changeStream actions for specific databases or collections.

Create a MongoDB Source connector

To create a MongoDB Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from MongoDB.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topic prefix

    topic.prefix

    Prefix to prepend to database and collection names to generate the name of the Kafka topic to which to publish data. Used by the DefaultTopicMapper.

    MongoDB Connection URL

    connection.url

    The MongoDB connection URL string as supported by the official drivers. For example, mongodb://locahost/.

    MongoDB username

    connection.username

    A valid MongoDB user.

    MongoDB password

    connection.password

    The password for the account associated with the MongoDB user.

    Database to watch

    database

    The MongoDb database from which the connector imports data into Redpanda topics. The connector monitors changes in this database. Leave the field empty to watch all databases.

    Kafka message key format

    key.converter

    Format of the key in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON.

    Kafka message value format

    value.converter

    Format of the value in the Redpanda topic. Default is STRING. Use AVRO or JSON for schematic output, STRING for plain JSON, or BYTES for BSON.

    Collection to watch

    collection

    The collection in the MongoDB database to watch. If not set, then all collections are watched.

    Start up behavior when there is no source offset available

    startup.mode

    Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are:

    • latest (default): The connector creates a new change stream, processes change events from it and stores resume tokens from them, thus ignoring all existing source data.

    • timestamp: actuates startup.mode.timestamp.* properties. If no such properties are configured, then timestamp is equivalent to latest.

    • copy_existing: actuates startup.mode.copy.existing.* properties. The connector creates a new change stream and stores its resume token, copies all existing data from all the collections being used as the source, then processes new data starting from the stored resume token. Note that reads of all the data during the copy and subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the source data, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent, it’s possible to apply them multiple times with the same effect as if they were applied once. Renaming a collection during the copying process is not supported.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced MongoDB Source connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

Enable Infer Schemas for the value

output.schema.infer.value

Specifies whether or not to infer the schema for the value. Each Document is processed in isolation, which may lead to multiple schema definitions for the data. Only enable when Kafka message value format is set to AVRO or JSON.

startAtOperationTime

startup.mode.timestamp .start.at.operation.time

Actuated only if startup.mode = timestamp specifies the starting point for the change stream. Must be either an integer number of seconds because the Epoch is in the decimal format (for example: 30), or an instant in the ISO-8601 format with one second precision (for example: 1970-01-01T00:00:30Z), or a BSON timestamp in the canonical extended JSON (v2) format (for example: {"$timestamp": {"t": 30, "i": 0}}). You can specify 0 to start at the beginning of the oplog. Requires MongoDB 4.0 or above. For more detail, see the $changeStream definition.

Copy existing namespace regex

startup.mode.copy.existing .namespace.regex

Use a regular expression to define which existing namespaces data should be copied from. A namespace is the database name and collection, separated by a period (for example, database.collection). Example: The following regular expression only includes collections starting with a in the demo database: demo\.a.*.

Copy existing initial pipeline

startup.mode.copy.existing .pipeline

An inline JSON array with objects describing the pipeline operations to run when copying existing data. Specifying this property can improve the use of indexes by the copying manager and make copying more efficient. Use this property if there is any filtering of collection data in the pipeline configuration to speed up the copying process. For example: [{"$match": {"closed": "false"}}].

Pipeline to apply to the change stream

pipeline

An inline JSON array with objects describing the pipeline operations to run. For example: [{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}].

fullDocument

change.stream.full.document

Specifies what to return for update operations when using a change stream. When set to updateLookup, the change stream for partial updates will include both a delta describing the changes to the document, and a copy of the entire document that was changed _ at some point_ after the change occurred. See db.collection.watch for more detail.

fullDocumentBeforeChange

change.stream.full.document .before.change

Specifies the pre-image configuration when creating a change stream. The pre-image is not available in source records published while copying existing data as a result of enabling copy.existing. The pre-image configuration has no effect on copying. Requires MongoDB 6.0 or above. For details, see possible values.

Publish only the fullDocument

publish.full.document.only

When enabled, only publishes the actual changed document (rather than the full change stream document). Automatically sets change.stream.full.document=updateLookup so updated documents will be included.

Send a null value on a delete event

publish.full.document.only .tombstone.on.delete

When enabled, requires publish.full.document.only=true. Default is false (disabled).

Error tolerance

mongo.errors.tolerance

Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records.

Heartbeat interval milliseconds

heartbeat.interval.ms

The length of time it takes when sending heartbeat messages to record the post-batch resume token when no source records have been published. Improves the resumability of the connector for low volume namespaces. Specify 0 to disable.

heartbeat topic name

heartbeat.topic.name

The name of the topic to publish heartbeats to. Defaults to __mongodb_heartbeats.

Offset partition name

offset.partition.name

Use to specify a custom offset partition name. If blank, the default partition name based on the connection details is used.

Topic creation enabled

topic.creation.enable

Specifies whether or not to allow automatic creation of topics. Default is true.

Topic creation partitions

topic.creation.default. partitions

Specifies the number of partitions for the created topics. The default is 1.

Topic creation replication factor

topic.creation.default. replication.factor

Specifies the replication factor for the created topics. The default is -1.

Map data

  • AVRO (io.confluent.connect.avro.AvroConverter) or JSON (org.apache.kafka.connect.json.JsonConverter) for output with a preset schema. Additionally, you can set Enable Infer Schemas for the value. Each document will be processed in isolation, which may lead to multiple schema definitions for the data.

  • STRING (org.apache.kafka.connect.storage.StringConverter) when your messages contain plaintext JSON.

  • BYTES (org.apache.kafka.connect.converters.ByteArrayConverter) when your messages contain BSON.

After the connector is created, check to ensure that:

  • There are no errors in logs and in Redpanda Console.

  • Redpanda topics contain data from relational database tables.

Use the Connectors API

When using the Connectors API, instead of specifying a value for connection.url, connection.username, and connection.password, you can specify a value for connection.uri in the form mongodb+srv://username:password@cluster0.xxx.mongodb.net.

Troubleshoot

Most MongoDB Source connector issues are identified in the connector creation phase. Invalid Include Tables are reported in logs. Select Show Logs to view error details.

Message Action

Invalid value wrong_uri for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv://

Check to make sure the MongoDB Connection URL is a valid MongoDB URL.

Unable to connect to the server.

Check to ensure that the MongoDB Connection URL is valid and that the MongoDB server accepts connections.

Invalid user permissions authentication failed. Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='user', source='admin', password=, mechanismProperties=}.

Check to ensure that you specified valid username and password credentials.

MongoCommandException: Command failed with error 8000 (AtlasError): 'user is not allowed to do action [find] on [db1.characters]' on server ac-nboibsg-shard-00-01.4hagsz0.mongodb.net:27017. The full response is {"ok": 0, "errmsg": "user is not allowed to do action [find] on [db1.characters]", "code": 8000, "codeName": "AtlasError"}

Check the permissions of the MongoDB user. Also confirm that the MongoDB server accepts connections.

Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog

See Troubleshoot invalid resume token

Suggested reading