Docs Connect Components Catalog Inputs mysql_cdc mysql_cdc Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Available in: Cloud, Self-Managed License: This component requires an enterprise license. You can either upgrade to an Enterprise Edition license, or generate a trial license key that's valid for 30 days. Streams data changes from a MySQL database, using MySQL’s binary log to capture data updates. This input is built on the mysql-canal library but uses a custom approach for streaming historical data. Introduced in version 4.46.0. Common Advanced inputs: label: "" mysql_cdc: flavor: mysql dsn: "" # No default (required) tables: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mysql_binlog_position snapshot_max_batch_size: 1000 stream_snapshot: "" # No default (required) auto_replay_nacks: true checkpoint_limit: 1024 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) inputs: label: "" mysql_cdc: flavor: mysql dsn: "" # No default (required) tables: [] # No default (required) checkpoint_cache: "" # No default (required) checkpoint_key: mysql_binlog_position snapshot_max_batch_size: 1000 max_reconnect_attempts: 10 stream_snapshot: "" # No default (required) auto_replay_nacks: true checkpoint_limit: 1024 tls: skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: cert: "" key: "" cert_file: "" key_file: "" password: "" aws: enabled: false region: "" # No default (optional) endpoint: "" # No default (required) id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) role: "" # No default (optional) role_external_id: "" # No default (optional) roles: role: "" role_external_id: "" batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) The mysql_cdc input uses MySQL’s binary log (binlog) to capture changes made to a MySQL database in real time and streams them to Redpanda Connect. Redpanda Connect allows you to specify which database tables in your source database to receive changes from. There are also two replication modes to choose from. Prerequisites MySQL version 8 or later A MySQL instance with binary logging enabled Configuration resources Cloud platforms Self-hosted MySQL Change data capture on Amazon RDS for MySQL Azure MySQL Database (CDC) Google Cloud SQL for MySQL Binary Logging Options and Variables Choose a replication mode You can run the mysql_cdc input in one of two modes, depending on whether you need a snapshot of existing data. Snapshot mode: Redpanda Connect first captures a snapshot of all data in the selected tables and streams the contents before processing changes from the last recorded binlog position. Streaming mode: Redpanda Connect skips the snapshot and processes only the most recent data changes, starting from the latest binlog position. Snapshot mode If you set the stream_snapshot field to true, Redpanda Connect connects to your MySQL database and does the following to capture a snapshot of all data in the selected tables: Executes the FLUSH TABLES WITH READ LOCK query to write any outstanding table updates to disk, and locks the tables. Runs the START TRANSACTION WITH CONSISTENT SNAPSHOT statement to create a new transaction with a consistent view of all data, capturing the state of the database at the moment the transaction started. Reads the current binlog position. Runs the UNLOCK TABLES statement to release the database. Preserves the initial transaction for data integrity. If the pipeline restarts during this process, Redpanda Connect must start the snapshot capture from scratch to store the current binlog position in the checkpoint_cache. After the snapshot is taken, the input executes SELECT statements to extract data from the selected tables in two stages: The input finds the primary keys of a table. It selects the data ordered by primary key. Finally, the input uses the stored binlog position to catch up with changes that occurred during snapshot processing. Streaming mode If you set the stream_snapshot field to false, Redpanda Connect connects to your MySQL database and starts processing data changes from the latest binlog position. If the pipeline restarts, Redpanda Connect resumes processing updates from the last binlog position written to the checkpoint_cache. Binlog rotation While the mysql_cdc input is streaming changes to Redpanda Connect, your MySQL server may rotate the binlog file. When this occurs, Redpanda Connect flushes the existing message batch and stores the new binlog position so that it can resume processing using the latest offset. Data mappings The following table shows how selected MySQL data types are mapped to data types supported in Redpanda Connect. All other data types are mapped to string values. MySQL data type Bloblang value TEXT, VARCHAR A string value, for example: "this data" BINARY, VARBINARY, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB An array of byte values, for example: [byte1,byte2,byte3] DECIMAL, NUMERIC, TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT, YEAR A standard numeric type, for example: 123 FLOAT, DOUBLE A 64-bit decimal (float64), for example: 123.1234 DATETIME, TIMESTAMP A Bloblang timestamp, for example: 1257894000000 2009-11-10 23:00:00 +0000 UTC SET An array of strings, for example: ["apple", "banana", "orange"] JSON A map object of the JSON, for example: {"red": 1, "blue": 2, "green": 3} Metadata This input adds the following metadata fields to each message: operation: The type of database operation that generated the message, such as read, insert, update, delete. A read operation occurs when a snapshot of the database is processed. table: The name of the database table from which the message originated. binlog_position: The Binary Log (binlog) position of each data update streamed from the source MySQL database. No binlog_position is set for data extracted from the initial snapshot. The binlog values are strings that you can sort to determine the order in which data updates occurred. Fields auto_replay_nacks Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure. Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation. Type: bool Default: true aws AWS IAM authentication configuration for MySQL instances. When enabled, IAM credentials are used to generate temporary authentication tokens instead of a static password. Type: object aws.enabled Enable AWS IAM authentication for MySQL. When enabled, an IAM authentication token is generated and used as the password. When using IAM authentication ensure max_reconnect_attempts is set to a low value to ensure it can refresh credentials. Type: bool Default: false aws.endpoint The MySQL endpoint hostname (e.g., mydb.abc123.us-east-1.rds.amazonaws.com). Type: string aws.id The ID of credentials to use. Type: string aws.region The AWS region where the MySQL instance is located. If no region is specified then the environment default will be used. Type: string aws.role Optional AWS IAM role ARN to assume for authentication. Alternatively, use roles array for role chaining instead. Type: string aws.role_external_id Optional external ID for the role assumption. Only used with the role field. Alternatively, use roles array for role chaining instead. Type: string aws.roles[] Optional array of AWS IAM roles to assume for authentication. Roles can be assumed in sequence, enabling chaining for purposes such as cross-account access. Each role can optionally specify an external ID. Type: object aws.roles[].role AWS IAM role ARN to assume. Type: string Default: "" aws.roles[].role_external_id Optional external ID for the role assumption. Type: string Default: "" aws.secret The secret for the credentials being used. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string aws.token The token for the credentials being used, required when using short term credentials. Type: string batching Allows you to configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s # --- batching: count: 10 period: 1s # --- batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching. Type: int Default: 0 batching.check A Bloblang query that returns a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count The number of messages after which the batch is flushed. Set to 0 to disable count-based batching. Type: int Default: 0 batching.period The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s. Type: string Default: "" # Examples: period: 1s # --- period: 1m # --- period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op. Type: processor # Examples: processors: - archive: format: concatenate # --- processors: - archive: format: lines # --- processors: - archive: format: json_array checkpoint_cache Specify a cache resource to store the binlog position of the most recent data update delivered to Redpanda Connect. After a restart, Redpanda Connect can continue processing changes from this last known position, avoiding the need to reprocess all table updates. Type: string checkpoint_key The key identifier used to store the binlog position in checkpoint_cache. If you have multiple mysql_cdc inputs sharing the same cache, you can provide an alternative key. Type: string Default: mysql_binlog_position checkpoint_limit The maximum number of messages that this input can process at a given time. Increasing this limit enables parallel processing, and batching at the output level. To preserve at-least-once guarantees, any given binlog position is not acknowledged until all messages under that offset are delivered. Type: int Default: 1024 dsn The data source name (DSN) of the MySQL database from which you want to stream updates. Use the format user:password@tcp(localhost:3306)/database. Type: string # Examples: dsn: user:password@tcp(localhost:3306)/database flavor The type of MySQL database to connect to. Type: string Default: mysql Option Summary mariadb MariaDB flavored databases. mysql MySQL flavored databases. max_reconnect_attempts The maximum number of attempts the MySQL driver will try to re-establish a broken connection before Connect attempts reconnection. A zero or negative number means infinite retry attempts. Type: int Default: 10 snapshot_max_batch_size The maximum number of table rows to fetch in each batch when taking a snapshot. This option is only available when stream_snapshot is set to true. Type: int Default: 1000 stream_snapshot When set to true, this input streams a snapshot of all existing data in the source database before streaming data changes. To use this setting, all database tables that you want to replicate must have a primary key. Type: bool tables[] A list of the database table names to stream changes from. Specify each table name as a separate item. Type: array # Examples: tables: - table1 - table2 tls Using this field overrides the SSL/TLS settings in the environment and DSN. Type: object tls.client_certs[] A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both. Type: object Default: [] # Examples: client_certs: - cert: foo key: bar # --- client_certs: - cert_file: ./example.pem key_file: ./example.key tls.client_certs[].cert A plain text certificate to use. Type: string Default: "" tls.client_certs[].cert_file The path of a certificate to use. Type: string Default: "" tls.client_certs[].key A plain text certificate key to use. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" tls.client_certs[].key_file The path of a certificate key to use. Type: string Default: "" tls.client_certs[].password A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format. Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: password: foo # --- password: ${KEY_PASSWORD} tls.enable_renegotiation Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation. Requires version 3.45.0 or later. Type: bool Default: false tls.root_cas An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. Type: string Default: "" # Examples: root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- tls.root_cas_file An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. Type: string Default: "" # Examples: root_cas_file: ./root_cas.pem tls.skip_cert_verify Whether to skip server side certificate verification. Type: bool Default: false Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! mqtt nanomsg