# mysql_cdc

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: mysql_cdc
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/inputs/mysql_cdc
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/inputs/mysql_cdc.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/inputs/mysql_cdc.adoc
page-git-created-date: "2025-02-20"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/mysql_cdc.md -->

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/inputs/mysql_cdc/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

Streams data changes from a MySQL database, using MySQL’s binary log to capture data updates.

This input is built on the [`mysql-canal` library](https://github.com/go-mysql-org/go-mysql?tab=readme-ov-file#replication) but uses a custom approach for streaming historical data.

#### Common

```yml
inputs:
  label: ""
  mysql_cdc:
    flavor: mysql
    dsn: "" # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    stream_snapshot: "" # No default (required)
    max_parallel_snapshot_tables: 1
    auto_replay_nacks: true
    checkpoint_limit: 1024
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

#### Advanced

```yml
inputs:
  label: ""
  mysql_cdc:
    flavor: mysql
    dsn: "" # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    max_reconnect_attempts: 10
    stream_snapshot: "" # No default (required)
    max_parallel_snapshot_tables: 1
    auto_replay_nacks: true
    checkpoint_limit: 1024
    tls:
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    aws:
      enabled: false
      region: "" # No default (optional)
      endpoint: "" # No default (required)
      id: "" # No default (optional)
      secret: "" # No default (optional)
      token: "" # No default (optional)
      role: "" # No default (optional)
      role_external_id: "" # No default (optional)
      roles: [] # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
```

The `mysql_cdc` input uses MySQL’s [binary log (`binlog`)](https://dev.mysql.com/doc/refman/8.0/en/binary-log.html) to capture changes made to a MySQL database in real time and streams them to Redpanda Connect.

Redpanda Connect allows you to specify which [database tables](#tables) in your source database to receive changes from. There are also [two replication modes](#choose-a-replication-mode) to choose from.

## [](#prerequisites)Prerequisites

-   MySQL version 8 or later

-   Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see [Redpanda Cloud Networking](https://docs.redpanda.com/cloud-data-platform/networking/).

-   A MySQL instance with binary logging enabled


### [](#configuration-resources)Configuration resources

#### Cloud platforms

-   [Change data capture on Amazon RDS for MySQL](https://aws.amazon.com/blogs/database/enable-change-data-capture-on-amazon-rds-for-mysql-applications-that-are-using-xa-transactions/)

-   [Azure MySQL Database (CDC)](https://learn.microsoft.com/en-us/fabric/real-time-hub/add-source-mysql-database-cdc)

-   [Google Cloud SQL for MySQL](https://cloud.google.com/datastream/docs/configure-cloudsql-mysql)

#### Self-hosted MySQL

-   [Binary Logging Options and Variables](https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html)

## [](#choose-a-replication-mode)Choose a replication mode

You can run the `mysql_cdc` input in one of two modes, depending on whether you need a snapshot of existing data.

-   Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected tables and streams the contents before processing changes from the last recorded binlog position.

-   Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest binlog position.


### [](#snapshot-mode)Snapshot mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `true`, Redpanda Connect connects to your MySQL database and does the following to capture a snapshot of all data in the selected tables:

1.  Executes the `FLUSH TABLES WITH READ LOCK` query to write any outstanding table updates to disk, and locks the tables.

2.  Runs the `START TRANSACTION WITH CONSISTENT SNAPSHOT` statement to create a new transaction with a consistent view of all data, capturing the state of the database at the moment the transaction started.

3.  Reads the current binlog position.

4.  Runs the `UNLOCK TABLES` statement to release the database.

5.  Preserves the initial transaction for data integrity.


> 📝 **NOTE**
>
> If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current binlog position in the [`checkpoint_cache`](#checkpoint_cache).

After the snapshot is taken, the input executes SELECT statements to extract data from the selected tables in two stages:

1.  The input finds the primary keys of a table.

2.  It selects the data ordered by primary key.


Finally, the input uses the stored binlog position to catch up with changes that occurred during snapshot processing.

### [](#streaming-mode)Streaming mode

If you set the [`stream_snapshot` field](#stream_snapshot) to `false`, Redpanda Connect connects to your MySQL database and starts processing data changes from the latest binlog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last binlog position written to the [`checkpoint_cache`](#checkpoint_cache).

## [](#binlog-rotation)Binlog rotation

While the `mysql_cdc` input is streaming changes to Redpanda Connect, your MySQL server may rotate the binlog file. When this occurs, Redpanda Connect flushes the existing message batch and stores the new binlog position so that it can resume processing using the latest offset.

## [](#data-mappings)Data mappings

The following table shows how selected MySQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.

| MySQL data type | Bloblang value |
| --- | --- |
| TEXT, VARCHAR | A string value, for example: "this data" |
| BINARY, VARBINARY, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB | An array of byte values, for example: [byte1,byte2,byte3] |
| DECIMAL, NUMERIC, TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, YEAR | A standard numeric type, for example: 123 |
| FLOAT, DOUBLE | A 64-bit decimal (float64), for example: 123.1234 |
| DATETIME, TIMESTAMP | A Bloblang timestamp, for example:1257894000000 2009-11-10 23:00:00 +0000 UTC |
| SET | An array of strings, for example: ["apple", "banana", "orange"] |
| JSON | A map object of the JSON, for example: {"red": 1, "blue": 2, "green": 3} |

## [](#metadata)Metadata

This input adds the following metadata fields to each message:

-   `operation`: The type of database operation that generated the message, such as `read`, `insert`, `update`, `delete`. A `read` operation occurs when a snapshot of the database is processed.

-   `table`: The name of the database table from which the message originated.

-   `binlog_position`: The [Binary Log (binlog)](https://dev.mysql.com/doc/refman/8.0/en/binary-log.html) position of each data update streamed from the source MySQL database. No `binlog_position` is set for data extracted from the initial snapshot. The `binlog` values are strings that you can sort to determine the order in which data updates occurred.


## [](#fields)Fields

### [](#auto_replay_nacks)`auto_replay_nacks`

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set `auto_replay_nacks` to `false` to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

**Type**: `bool`

**Default**: `true`

### [](#aws)`aws`

AWS IAM authentication configuration for MySQL instances. When enabled, IAM credentials are used to generate temporary authentication tokens instead of a static password.

**Type**: `object`

### [](#aws-enabled)`aws.enabled`

Enable AWS IAM authentication for MySQL. When enabled, an IAM authentication token is generated and used as the password. When using IAM authentication ensure `max_reconnect_attempts` is set to a low value to ensure it can refresh credentials.

**Type**: `bool`

**Default**: `false`

### [](#aws-endpoint)`aws.endpoint`

The MySQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com).

**Type**: `string`

### [](#aws-id)`aws.id`

The ID of credentials to use.

**Type**: `string`

### [](#aws-region)`aws.region`

The AWS region where the MySQL instance is located. If no region is specified then the environment default will be used.

**Type**: `string`

### [](#aws-role)`aws.role`

Optional AWS IAM role ARN to assume for authentication. Alternatively, use `roles` array for role chaining instead.

**Type**: `string`

### [](#aws-role_external_id)`aws.role_external_id`

Optional external ID for the role assumption. Only used with the `role` field. Alternatively, use `roles` array for role chaining instead.

**Type**: `string`

### [](#aws-roles)`aws.roles[]`

Optional array of AWS IAM roles to assume for authentication. Roles can be assumed in sequence, enabling chaining for purposes such as cross-account access. Each role can optionally specify an external ID.

**Type**: `object`

### [](#aws-roles-role)`aws.roles[].role`

AWS IAM role ARN to assume.

**Type**: `string`

**Default**: `""`

### [](#aws-roles-role_external_id)`aws.roles[].role_external_id`

Optional external ID for the role assumption.

**Type**: `string`

**Default**: `""`

### [](#aws-secret)`aws.secret`

The secret for the credentials being used.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#aws-token)`aws.token`

The token for the credentials being used, required when using short term credentials.

**Type**: `string`

### [](#batching)`batching`

Allows you to configure a [batching policy](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

The number of bytes at which the batch is flushed. Set to `0` to disable size-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that returns a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

The number of messages after which the batch is flushed. Set to `0` to disable count-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#checkpoint_cache)`checkpoint_cache`

Specify a `cache` resource to store the binlog position of the most recent data update delivered to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this last known position, avoiding the need to reprocess all table updates.

**Type**: `string`

### [](#checkpoint_key)`checkpoint_key`

The key identifier used to store the binlog position in [`checkpoint_cache`](#checkpoint_cache). If you have multiple `mysql_cdc` inputs sharing the same cache, you can provide an alternative key.

**Type**: `string`

**Default**: `mysql_binlog_position`

### [](#checkpoint_limit)`checkpoint_limit`

The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given binlog position is not acknowledged until all messages under that offset are delivered.

**Type**: `int`

**Default**: `1024`

### [](#dsn)`dsn`

The data source name (DSN) of the MySQL database from which you want to stream updates. Use the format `user:password@tcp(localhost:3306)/database`.

**Type**: `string`

```yaml
# Examples:
dsn: user:password@tcp(localhost:3306)/database
```

### [](#flavor)`flavor`

The type of MySQL database to connect to.

**Type**: `string`

**Default**: `mysql`

| Option | Summary |
| --- | --- |
| mariadb | MariaDB flavored databases. |
| mysql | MySQL flavored databases. |

### [](#max_parallel_snapshot_tables)`max_parallel_snapshot_tables`

Specifies the number of tables that will be snapshotted in parallel.

**Type**: `int`

**Default**: `1`

### [](#max_reconnect_attempts)`max_reconnect_attempts`

The maximum number of attempts the MySQL driver will try to re-establish a broken connection before Connect attempts reconnection. A zero or negative number means infinite retry attempts.

**Type**: `int`

**Default**: `10`

### [](#snapshot_max_batch_size)`snapshot_max_batch_size`

The maximum number of table rows to fetch in each batch when taking a snapshot. This option is only available when `stream_snapshot` is set to `true`.

**Type**: `int`

**Default**: `1000`

### [](#stream_snapshot)`stream_snapshot`

When set to `true`, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate _must_ have a primary key.

**Type**: `bool`

### [](#tables)`tables[]`

A list of the database table names to stream changes from. Specify each table name as a separate item.

**Type**: `array`

```yaml
# Examples:
tables:
  - table1
  - table2
```

### [](#tls)`tls`

Using this field overrides the SSL/TLS settings in the environment and DSN.

**Type**: `object`

### [](#tls-client_certs)`tls.client_certs[]`

A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both.

**Type**: `object`

**Default**: `[]`

```yaml
# Examples:
client_certs:
  - cert: foo
    key: bar

# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key
```

### [](#tls-client_certs-cert)`tls.client_certs[].cert`

A plain text certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-cert_file)`tls.client_certs[].cert_file`

The path of a certificate to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key)`tls.client_certs[].key`

A plain text certificate key to use.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-key_file)`tls.client_certs[].key_file`

The path of a certificate key to use.

**Type**: `string`

**Default**: `""`

### [](#tls-client_certs-password)`tls.client_certs[].password`

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}
```

### [](#tls-enable_renegotiation)`tls.enable_renegotiation`

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message `local error: tls: no renegotiation`.

**Type**: `bool`

**Default**: `false`

### [](#tls-root_cas)`tls.root_cas`

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----
```

### [](#tls-root_cas_file)`tls.root_cas_file`

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
root_cas_file: ./root_cas.pem
```

### [](#tls-skip_cert_verify)`tls.skip_cert_verify`

Whether to skip server side certificate verification.

**Type**: `bool`

**Default**: `false`