# Create a MySQL (Debezium) Source Connector

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Create a MySQL (Debezium) Source Connector
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: managed-connectors/create-mysql-source-connector
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: managed-connectors/create-mysql-source-connector.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/managed-connectors/create-mysql-source-connector.adoc
description: Use the Redpanda Cloud UI to create a MySQL (Debezium) Source Connector.
page-git-created-date: "2024-06-06"
page-git-modified-date: "2025-08-05"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/create-mysql-source-connector.md -->

> ❗ **IMPORTANT**
>
> -   To enable this feature, contact [Redpanda Support](https://support.redpanda.com/hc/en-us/requests/new). To disable this feature, see [Disable Kafka Connect](https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/disable-kc/).
>
> -   Redpanda Support does not manage or monitor Kafka Connect. For fully-supported connectors, consider [Redpanda Connect](https://docs.redpanda.com/cloud-data-platform/develop/connect/about/).
>
> -   When Kafka Connect is enabled, there is a dedicated node running even when no connectors are deployed.

You can use a MySQL (Debezium) Source connector to import a stream of changes from MySQL, AmazonRDS, and Amazon Aurora.

## [](#prerequisites)Prerequisites

-   A MySQL database that is accessible from the connector instance.

-   A MySQL user exists. This database user for the Debezium connector must have LOCK TABLES privileges. For details, see [MySQL Creating a user](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-creating-user).

-   A [binlog must be enabled](https://debezium.io/documentation/reference/stable/connectors/mysql.html#enable-mysql-binlog) for the source MySQL cluster.


## [](#limitations)Limitations

-   Only `JSON`, `CloudEvents` or `AVRO` formats can be used as a a Kafka message key and value format.

-   The MySQL (Debezium) Source connector can work with only a single task at a time.


## [](#create-a-mysql-debezium-source-connector)Create a MySQL (Debezium) Source connector

To create the MySQL (Debezium) Source connector:

1.  In Redpanda Cloud, click **Connectors** in the navigation menu, and then click **Create Connector**.

2.  Select **Import from MySQL (Debezium)**.

3.  On the **Create Connector** page, specify the following required connector configuration options:

    | Property name | Property key | Description |
    | --- | --- | --- |
    | Topic prefix | topic.prefix | A topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing changes. The topic prefix should be unique across all other connectors because it is used as a prefix for all Kafka topic names that receive events emitted by this connector. Only alphanumeric characters, hyphens, dots, and underscores are accepted. |
    | Hostname | database.hostname | A resolvable hostname or IP address of the MySQL database server. |
    | Port | database.port | Integer port number of the MySQL database server. |
    | User | database.user | Name of the MySQL user to be used when connecting to the MySQL database. |
    | Password | database.password | The password of the MySQL database user who will be connecting to the MySQL database. |
    | SSL mode | database.ssl.mode | Specifies whether to use an encrypted connection to the MySQL server. Select disable to use an unencrypted connection. Select 'preferred' to use an encrypted connection if the server supports secure connections. If the server does not support secure connections, falls back to an unencrypted connection. Select require to use a secure, or encrypted connection. If a secure connection cannot be established when required is selected, then the connector fails. |
    | Kafka message key format | key.converter | Format of the key in the Redpanda topic. |
    | Message key JSON contains schema | key.converter.schemas.enable | Enable to specify that the message key contains schema in the schema field. |
    | Kafka message value format | value.converter | Format of the value in the Redpanda topic. |
    | Message value JSON contains schema | value.converter.schemas.enable | Enable to specify that the message value contains schema in the schema field. |
    | Connector name | name | Globally-unique name to use for this connector. |

4.  Click **Next**. Review the connector properties specified, then click **Create**.


## [](#map-data)Map data

Use `Include databases`, `Include tables`, and `Include columns` to define data mapping. Alternatively, use `Exclude databases`, `Exclude tables`, and `Exclude columns`.

Following is an example table in `db` database:

```sql
CREATE TABLE IF NOT EXISTS Persons
(
    Id        int PRIMARY KEY,
    FirstName varchar(255),
    LastName  varchar(255)
);
```

The table has one record:

```sql
INSERT INTO Persons (FirstName, LastName) VALUES (1, 'Winnie', 'the Pooh');
```

The connector configuration for the table:

```bash
column.include.list = db\\.Persons\\.(Id|FirstName|LastName)
table.include.list = db\\.Persons
database.include.list = db
topic.prefix = frommysql
```

The connector configuration will create the Redpanda topic `frommysql.db.Persons`.

For `Kafka message value format` = `JSON` (`org.apache.kafka.connect.json.JsonConverter`), the connector produces JSON messages with a schema like the following:

```json
{
   "payload": {
      "schema": {
         // schema definition
      },
      "payload": {
         "before": null,
         "after": {
            "Id": 1,
            "FirstName": "Winnie",
            "LastName": "the Pooh"
         },
         ...
      }
   },
   "encoding": "json",
   "schemaId": 0
}
```

For `Kafka message value format` = `AVRO` (`io.confluent.connect.avro.AvroConverter`), the connector creates a Schema Registry `frommysql.db.Persons-value` record and produces messages like the following:

```js
{
   "payload": {
      "before": null,
      "after": {
         "mysql.db.Persons.Value": {
            "Id": 1,
            "FirstName": {
               "string": "Winnie"
            },
            "LastName": {
               "string": "the Pooh"
            }
         }
      },
      ...
   },
   "encoding": "avro",
   "schemaId": 2
}
```

For `Kafka message value format` = `CloudEvents` (`io.debezium.converters.CloudEventsConverter`), the connector uses `JSON` or `AVRO` data serializer.

-   For `JSON` data serializer, enable `Message value CloudEvents JSON contains schema` to include JSON schema in message

-   For `AVRO` data serializer, connector creates schema in Schema Registry and produces messages in CloudEvents data format.


## [](#test-the-connection)Test the connection

After the connector is created:

-   Check the connector status and confirm that there are no errors in logs and in Redpanda Console.

-   Review the Redpanda topic to confirm that it contains the expected data.


## [](#troubleshoot)Troubleshoot

If the connector configuration is invalid, an error appears upon clicking **Finish**. If the connector fails, check the error message or select **Show Logs** to view error details.

-   **Topics not created by the connector**

    Create the topic manually or let the connector create it by setting (use desired number of partitions and replication factor):

    Topic creation enabled: true
    Topic creation partitions: 1
    Topic creation replication factor: -1

    Or in JSON:

    ```json
    "topic.creation.enable": true,
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "-1"
    ```

-   **Connector requires binlog file 'mysql-bin-changelog.257116', but MySQL only has mysql-bin-changelog.257123**

    Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted"
    Connector requires binlog file 'mysql-bin-changelog.257116', but MySQL only has mysql-bin-changelog.257123, mysql-bin-changelog.257124, mysql-bin-changelog.257125

    The connector needs a binlog file that was already purged. Change the `Snapshot mode` property from the default to `when_needed`.


Additional errors and corrective actions follow.

| Message | Action |
| --- | --- |
| Unable to connect: Public Key Retrieval is not allowed | Set Allow public key retrieval property to true. |
| Unable to connect: Communications link failure | Confirm that Hostname and Port are correct. |
| Access denied for user | Confirm that User and Password credentials are valid. |
| Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Invalid schema Invalid namespace: from-mysql.db.Persons; error code: 422 | The Schema Registry namespace is incorrect. Consider changing the Topic prefix value, remove unallowed characters. |

## [](#suggested-reading)Suggested reading

-   [Debezium connector for MySQL](https://debezium.io/documentation/reference/stable/connectors/mysql.html)