mysql_cdc

Streams data changes from a MySQL database, using MySQL’s binary log to capture data updates.

This input is built on the mysql-canal library but uses a custom approach for streaming historical data.

Introduced in version 4.46.0.

  • Common

  • Advanced

inputs:
  label: ""
  mysql_cdc:
    flavor: mysql
    dsn: "" # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    stream_snapshot: "" # No default (required)
    auto_replay_nacks: true
    checkpoint_limit: 1024
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
inputs:
  label: ""
  mysql_cdc:
    flavor: mysql
    dsn: "" # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    max_reconnect_attempts: 10
    stream_snapshot: "" # No default (required)
    auto_replay_nacks: true
    checkpoint_limit: 1024
    tls:
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs:
        cert: ""
        key: ""
        cert_file: ""
        key_file: ""
        password: ""
    aws:
      enabled: false
      region: "" # No default (optional)
      endpoint: "" # No default (required)
      id: "" # No default (optional)
      secret: "" # No default (optional)
      token: "" # No default (optional)
      role: "" # No default (optional)
      role_external_id: "" # No default (optional)
      roles:
        role: ""
        role_external_id: ""
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)

The mysql_cdc input uses MySQL’s binary log (binlog) to capture changes made to a MySQL database in real time and streams them to Redpanda Connect.

Redpanda Connect allows you to specify which database tables in your source database to receive changes from. There are also two replication modes to choose from.

Prerequisites

  • MySQL version 8 or later

  • A MySQL instance with binary logging enabled

Choose a replication mode

You can run the mysql_cdc input in one of two modes, depending on whether you need a snapshot of existing data.

  • Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected tables and streams the contents before processing changes from the last recorded binlog position.

  • Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest binlog position.

Snapshot mode

If you set the stream_snapshot field to true, Redpanda Connect connects to your MySQL database and does the following to capture a snapshot of all data in the selected tables:

  1. Executes the FLUSH TABLES WITH READ LOCK query to write any outstanding table updates to disk, and locks the tables.

  2. Runs the START TRANSACTION WITH CONSISTENT SNAPSHOT statement to create a new transaction with a consistent view of all data, capturing the state of the database at the moment the transaction started.

  3. Reads the current binlog position.

  4. Runs the UNLOCK TABLES statement to release the database.

  5. Preserves the initial transaction for data integrity.

If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current binlog position in the checkpoint_cache.

After the snapshot is taken, the input executes SELECT statements to extract data from the selected tables in two stages:

  1. The input finds the primary keys of a table.

  2. It selects the data ordered by primary key.

Finally, the input uses the stored binlog position to catch up with changes that occurred during snapshot processing.

Streaming mode

If you set the stream_snapshot field to false, Redpanda Connect connects to your MySQL database and starts processing data changes from the latest binlog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last binlog position written to the checkpoint_cache.

Binlog rotation

While the mysql_cdc input is streaming changes to Redpanda Connect, your MySQL server may rotate the binlog file. When this occurs, Redpanda Connect flushes the existing message batch and stores the new binlog position so that it can resume processing using the latest offset.

Data mappings

The following table shows how selected MySQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.

MySQL data type Bloblang value

TEXT, VARCHAR

A string value, for example: "this data"

BINARY, VARBINARY, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB

An array of byte values, for example: [byte1,byte2,byte3]

DECIMAL, NUMERIC, TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, YEAR

A standard numeric type, for example: 123

FLOAT, DOUBLE

A 64-bit decimal (float64), for example: 123.1234

DATETIME, TIMESTAMP

A Bloblang timestamp, for example:

1257894000000 2009-11-10 23:00:00 +0000 UTC

SET

An array of strings, for example: ["apple", "banana", "orange"]

JSON

A map object of the JSON, for example: {"red": 1, "blue": 2, "green": 3}

Metadata

This input adds the following metadata fields to each message:

  • operation: The type of database operation that generated the message, such as read, insert, update, delete. A read operation occurs when a snapshot of the database is processed.

  • table: The name of the database table from which the message originated.

  • binlog_position: The Binary Log (binlog) position of each data update streamed from the source MySQL database. No binlog_position is set for data extracted from the initial snapshot. The binlog values are strings that you can sort to determine the order in which data updates occurred.

Fields

auto_replay_nacks

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

aws

AWS IAM authentication configuration for MySQL instances. When enabled, IAM credentials are used to generate temporary authentication tokens instead of a static password.

Type: object

aws.enabled

Enable AWS IAM authentication for MySQL. When enabled, an IAM authentication token is generated and used as the password. When using IAM authentication ensure max_reconnect_attempts is set to a low value to ensure it can refresh credentials.

Type: bool

Default: false

aws.endpoint

The MySQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com).

Type: string

aws.id

The ID of credentials to use.

Type: string

aws.region

The AWS region where the MySQL instance is located. If no region is specified then the environment default will be used.

Type: string

aws.role

Optional AWS IAM role ARN to assume for authentication. Alternatively, use roles array for role chaining instead.

Type: string

aws.role_external_id

Optional external ID for the role assumption. Only used with the role field. Alternatively, use roles array for role chaining instead.

Type: string

aws.roles[]

Optional array of AWS IAM roles to assume for authentication. Roles can be assumed in sequence, enabling chaining for purposes such as cross-account access. Each role can optionally specify an external ID.

Type: object

aws.roles[].role

AWS IAM role ARN to assume.

Type: string

Default: ""

aws.roles[].role_external_id

Optional external ID for the role assumption.

Type: string

Default: ""

aws.secret

The secret for the credentials being used.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

aws.token

The token for the credentials being used, required when using short term credentials.

Type: string

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

checkpoint_cache

Specify a cache resource to store the binlog position of the most recent data update delivered to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this last known position, avoiding the need to reprocess all table updates.

Type: string

checkpoint_key

The key identifier used to store the binlog position in checkpoint_cache. If you have multiple mysql_cdc inputs sharing the same cache, you can provide an alternative key.

Type: string

Default: mysql_binlog_position

checkpoint_limit

The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given binlog position is not acknowledged until all messages under that offset are delivered.

Type: int

Default: 1024

dsn

The data source name (DSN) of the MySQL database from which you want to stream updates. Use the format user:password@tcp(localhost:3306)/database.

Type: string

# Examples:
dsn: user:password@tcp(localhost:3306)/database

flavor

The type of MySQL database to connect to.

Type: string

Default: mysql

Option Summary

mariadb

MariaDB flavored databases.

mysql

MySQL flavored databases.

max_reconnect_attempts

The maximum number of attempts the MySQL driver will try to re-establish a broken connection before Connect attempts reconnection. A zero or negative number means infinite retry attempts.

Type: int

Default: 10

snapshot_max_batch_size

The maximum number of table rows to fetch in each batch when taking a snapshot. This option is only available when stream_snapshot is set to true.

Type: int

Default: 1000

stream_snapshot

When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key.

Type: bool

tables[]

A list of the database table names to stream changes from. Specify each table name as a separate item.

Type: array

# Examples:
tables:
  - table1
  - table2

tls

Using this field overrides the SSL/TLS settings in the environment and DSN.

Type: object

tls.client_certs[]

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar


# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}

tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

tls.root_cas

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool

Default: false