mysql_cdc

Beta

Streams data changes from a MySQL database, using MySQL’s binary log to capture data updates.

This input is built on the mysql-canal library but uses a custom approach for streaming historical data.

  • Common

  • Advanced

# Configuration fields, showing default values
input:
  label: ""
  mysql_cdc:
    dsn: username:password@tcp(localhost:3306)/database # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    stream_snapshot: false # No default (required)
    auto_replay_nacks: true
    checkpoint_limit: 1024
    batching:
      count: 0
      byte_size: 0
      period: "" # No default (optional)
      check: "" # No default (optional)
# All configuration fields, showing default values
input:
  label: ""
  mysql_cdc:
    dsn: username:password@tcp(localhost:3306)/database # No default (required)
    tables: [] # No default (required)
    checkpoint_cache: "" # No default (required)
    checkpoint_key: mysql_binlog_position
    snapshot_max_batch_size: 1000
    stream_snapshot: false # No default (required)
    auto_replay_nacks: true
    checkpoint_limit: 1024
    batching:
      count: 0
      byte_size: 0
      period: "" # No default (optional)
      check: "" # No default (optional)
      processors: [] # No default (optional)

The mysql_cdc input uses MySQL’s binary log (binlog) to capture changes made to a MySQL database in real time and streams them to Redpanda Connect.

Redpanda Connect allows you to specify which database tables in your source database to receive changes from. There are also two replication modes to choose from.

Prerequisites

  • MySQL version 8 or later

  • Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking.

  • A MySQL instance with binary logging enabled

Choose a replication mode

You can run the mysql_cdc input in one of two modes, depending on whether you need a snapshot of existing data.

  • Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected tables and streams the contents before processing changes from the last recorded binlog position.

  • Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest binlog position.

Snapshot mode

If you set the stream_snapshot field to true, Redpanda Connect connects to your MySQL database and does the following to capture a snapshot of all data in the selected tables:

  1. Executes the FLUSH TABLES WITH READ LOCK query to write any outstanding table updates to disk, and locks the tables.

  2. Runs the START TRANSACTION WITH CONSISTENT SNAPSHOT statement to create a new transaction with a consistent view of all data, capturing the state of the database at the moment the transaction started.

  3. Reads the current binlog position.

  4. Runs the UNLOCK TABLES statement to release the database.

  5. Preserves the initial transaction for data integrity.

If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current binlog position in the checkpoint_cache.

After the snapshot is taken, the input executes SELECT statements to extract data from the selected tables in two stages:

  1. The input finds the primary keys of a table.

  2. It selects the data ordered by primary key.

Finally, the input uses the stored binlog position to catch up with changes that occurred during snapshot processing.

Streaming mode

If you set the stream_snapshot field to false, Redpanda Connect connects to your MySQL database and starts processing data changes from the latest binlog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last binlog position written to the checkpoint_cache.

Binlog rotation

While the mysql_cdc input is streaming changes to Redpanda Connect, your MySQL server may rotate the binlog file. When this occurs, Redpanda Connect flushes the existing message batch and stores the new binlog position so that it can resume processing using the latest offset.

Data mappings

The following table shows how selected MySQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values.

MySQL data type Bloblang value

TEXT, VARCHAR

A string value, for example: "this data"

BINARY, VARBINARY, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB

An array of byte values, for example: [byte1,byte2,byte3]

DECIMAL, NUMERIC, TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, YEAR

A standard numeric type, for example: 123

FLOAT, DOUBLE

A 64-bit decimal (float64), for example: 123.1234

DATETIME, TIMESTAMP

A Bloblang timestamp, for example:

1257894000000 2009-11-10 23:00:00 +0000 UTC

SET

An array of strings, for example: ["apple", "banana", "orange"]

JSON

A map object of the JSON, for example: {"red": 1, "blue": 2, "green": 3}

Metadata

This input adds the following metadata fields to each message:

  • operation: The type of database operation that generated the message, such as read, insert, update, delete. A read operation occurs when a snapshot of the database is processed.

  • table: The name of the database table from which the message originated.

  • binlog_position: The Binary Log (binlog) position of each data update streamed from the source MySQL database. No binlog_position is set for data extracted from the initial snapshot. The binlog values are strings that you can sort to determine the order in which data updates occurred.

Fields

dsn

The data source name (DSN) of the MySQL database from which you want to stream updates. Use the format user:password@tcp(localhost:3306)/database.

Type: string

# Examples

dsn: my_username:my_password@tcp(localhost:3306)/my_db

tables

A list of the database table names to stream changes from. Specify each table name as a separate item.

Type: array

# Examples

tables:
  - orders_table
  - customer_address_table
  - inventory_table

checkpoint_cache

Specify a cache resource to store the binlog position of the most recent data update delivered to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this last known position, avoiding the need to reprocess all table updates.

Type: string

Default: ""

# Examples

input:
  mysql_cdc:
    dsn: username:password@tcp(localhost:3306)/database
    tables: [my_table]
    checkpoint_cache: "my_cdc_cache"
cache_resources:
  - label: "my_cdc_cache"
    redis:
      url: redis://:6379

checkpoint_key

The key identifier used to store the binlog position in checkpoint_cache. If you have multiple mysql_cdc inputs sharing the same cache, you can provide an alternative key.

Type: string

Default: mysql_binlog_position

snapshot_max_batch_size

The maximum number of table rows to fetch in each batch when taking a snapshot. This option is only available when stream_snapshot is set to true.

Type: int

Default: 1000

stream_snapshot

When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key.

Type: bool

Default: false

# Examples

stream_snapshot: true

auto_replay_nacks

Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

checkpoint_limit

The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given binlog position is not acknowledged until all messages under that offset are delivered.

Type: int

Default: 1024

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
  byte_size: 5000
  count: 0
  period: 1s

batching:
  count: 10
  period: 1s

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array