Docs Cloud Redpanda Connect Components Inputs mysql_cdc mysql_cdc Beta Available in: Cloud, Self-Managed Streams data changes from a MySQL database, using MySQL’s binary log to capture data updates. This input is built on the mysql-canal library but uses a custom approach for streaming historical data. Common Advanced # Configuration fields, showing default values input: label: "" mysql_cdc: dsn: username:password@tcp(localhost:3306)/database # No default (required) tables: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mysql_binlog_position snapshot_max_batch_size: 1000 stream_snapshot: false # No default (required) auto_replay_nacks: true checkpoint_limit: 1024 batching: count: 0 byte_size: 0 period: "" # No default (optional) check: "" # No default (optional) # All configuration fields, showing default values input: label: "" mysql_cdc: dsn: username:password@tcp(localhost:3306)/database # No default (required) tables: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mysql_binlog_position snapshot_max_batch_size: 1000 stream_snapshot: false # No default (required) auto_replay_nacks: true checkpoint_limit: 1024 batching: count: 0 byte_size: 0 period: "" # No default (optional) check: "" # No default (optional) processors: [] # No default (optional) The mysql_cdc input uses MySQL’s binary log (binlog) to capture changes made to a MySQL database in real time and streams them to Redpanda Connect. Redpanda Connect allows you to specify which database tables in your source database to receive changes from. There are also two replication modes to choose from. Prerequisites MySQL version 8 or later Network access from the cluster where your Redpanda Connect pipeline is running to the source database environment. For detailed networking information, including how to set up a VPC peering connection, see Redpanda Cloud Networking. A MySQL instance with binary logging enabled Configuration resources Cloud platforms Self-hosted MySQL Change data capture on Amazon RDS for MySQL Azure MySQL Database (CDC) Google Cloud SQL for MySQL Binary Logging Options and Variables Choose a replication mode You can run the mysql_cdc input in one of two modes, depending on whether you need a snapshot of existing data. Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected tables and streams the contents before processing changes from the last recorded binlog position. Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest binlog position. Snapshot mode If you set the stream_snapshot field to true, Redpanda Connect connects to your MySQL database and does the following to capture a snapshot of all data in the selected tables: Executes the FLUSH TABLES WITH READ LOCK query to write any outstanding table updates to disk, and locks the tables. Runs the START TRANSACTION WITH CONSISTENT SNAPSHOT statement to create a new transaction with a consistent view of all data, capturing the state of the database at the moment the transaction started. Reads the current binlog position. Runs the UNLOCK TABLES statement to release the database. Preserves the initial transaction for data integrity. If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current binlog position in the checkpoint_cache. After the snapshot is taken, the input executes SELECT statements to extract data from the selected tables in two stages: The input finds the primary keys of a table. It selects the data ordered by primary key. Finally, the input uses the stored binlog position to catch up with changes that occurred during snapshot processing. Streaming mode If you set the stream_snapshot field to false, Redpanda Connect connects to your MySQL database and starts processing data changes from the latest binlog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last binlog position written to the checkpoint_cache. Binlog rotation While the mysql_cdc input is streaming changes to Redpanda Connect, your MySQL server may rotate the binlog file. When this occurs, Redpanda Connect flushes the existing message batch and stores the new binlog position so that it can resume processing using the latest offset. Data mappings The following table shows how selected MySQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values. MySQL data type Bloblang value TEXT, VARCHAR A string value, for example: "this data" BINARY, VARBINARY, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB An array of byte values, for example: [byte1,byte2,byte3] DECIMAL, NUMERIC, TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, YEAR A standard numeric type, for example: 123 FLOAT, DOUBLE A 64-bit decimal (float64), for example: 123.1234 DATETIME, TIMESTAMP A Bloblang timestamp, for example: 1257894000000 2009-11-10 23:00:00 +0000 UTC SET An array of strings, for example: ["apple", "banana", "orange"] JSON A map object of the JSON, for example: {"red": 1, "blue": 2, "green": 3} Metadata This input adds the following metadata fields to each message: operation: The type of database operation that generated the message, such as read, insert, update, delete. A read operation occurs when a snapshot of the database is processed. table: The name of the database table from which the message originated. binlog_position: The Binary Log (binlog) position of each data update streamed from the source MySQL database. No binlog_position is set for data extracted from the initial snapshot. The binlog values are strings that you can sort to determine the order in which data updates occurred. Fields dsn The data source name (DSN) of the MySQL database from which you want to stream updates. Use the format user:password@tcp(localhost:3306)/database. Type: string # Examples dsn: my_username:my_password@tcp(localhost:3306)/my_db tables A list of the database table names to stream changes from. Specify each table name as a separate item. Type: array # Examples tables: - orders_table - customer_address_table - inventory_table checkpoint_cache Specify a cache resource to store the binlog position of the most recent data update delivered to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this last known position, avoiding the need to reprocess all table updates. Type: string Default: "" # Examples input: mysql_cdc: dsn: username:password@tcp(localhost:3306)/database tables: [my_table] checkpoint_cache: "my_cdc_cache" cache_resources: - label: "my_cdc_cache" redis: url: redis://:6379 checkpoint_key The key identifier used to store the binlog position in checkpoint_cache. If you have multiple mysql_cdc inputs sharing the same cache, you can provide an alternative key. Type: string Default: mysql_binlog_position snapshot_max_batch_size The maximum number of table rows to fetch in each batch when taking a snapshot. This option is only available when stream_snapshot is set to true. Type: int Default: 1000 stream_snapshot When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key. Type: bool Default: false # Examples stream_snapshot: true auto_replay_nacks Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true checkpoint_limit The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given binlog position is not acknowledged until all messages under that offset are delivered. Type: int Default: 1024 batching Allows you to configure a batching policy. Type: object # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.period The period of time after which an incomplete batch is flushed regardless of its size. Type: string Default: "" # Examples period: 1s period: 1m period: 500ms batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples check: this.type == "end_of_transaction" batching.processors For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches. Type: array # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution mqtt nats