Create a JDBC Sink Connector
You can use a JDBC Sink connector to export structured data from Redpanda to a relational database.
Before you can create a JDBC Sink connector in the Redpanda Cloud, you must have a:
- Relational database instance that is acceccible from the JDBC Sink connector instance
- Database user
The JDBC Sink connector has the following limitations:
AVROformats can be used as a value converter.
Only the following databases are supported:
- MySQL 5.7 and 8.0
- PostgreSQL 8.2 and higher using the version 3.0 of the PostgreSQL® protocol
- SQL Server - Microsoft SQL versions: Azure SQL Database, Azure Synapse Analytics, Azure SQL Managed Instance, SQL Server 2014, SQL Server 2016, SQL Server 2017, SQL Server 2019
Create a JDBC Sink connector
To create the JDBC Sink connector:
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
Select Export to JDBC.
On the Create Connector page, specify the following required connector configuration options:
Topics to export
Comma-separated list of the cluster topics you want to replicate.
The database connection JDBC URL.
Name of the database user to be used when connecting to the database.
Password of the database user to be used when connecting to the database.
Kafka message key format
Format of the key in the Kafka topic.
BYTESis the default.
Kafka message value format
Format of the value in the Kafka topic.
JSONis the default.
When enabled, automatically creates the destination table (if it is missing) based on the record schema (issues a
CREATE). The default is disabled.
Maximum number of tasks to use for this connector. The default is
1. Each task replicates exclusive set of partitions assigned to it.
Globally-unique name to use for this connector.
Click Next. Review the connector properties specified, then click Create.
Advanced JDBC Sink connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
|List of comma-separated record value field names. If the value of this property is empty, the connector uses all fields from the record to migrate to a database. Otherwise, the connector uses only the record fields that are specified (in a comma-separated format). Note that |
|Kafka topics to database tables mapping. Comma-separated list of topic to table mapping in the format: |
|A format string for the destination table name, which may contain |
|Specifies whether or not to normalize destination table names for topics. When enabled, the alphanumeric characters |
|Specifies whether or not to delimit (in most databases, a quote with double quotation marks) identifiers (for example, table names and column names) in SQL statements. By default, enabled.|
|Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing |
|Specifies how many records to attempt to batch together for insertion into the destination table, when possible. The default is |
|Name of the JDBC timezone that should be used in the connector when querying with time-based criteria. Default is |
|The insertion mode to use. The supported modes are: |
|The primary key mode to use. Supported modes are: |
|Comma-separated list of primary key field names. The runtime interpretation of this configuration depends on the |
|The maximum number of times to retry on errors before failing the task. The default is |
|The time in milliseconds to wait before a retry attempt is made following an error. The default is |
|The name of the database dialect that should be used for this connector. By default. the connector automatically determines the dialect based upon the JDBC connection URL. Use if you want to override that behavior and specify a specific dialect.|
Use the appropriate key or value converter (input data format) for your data as follows:
- Use the default
Kafka message value format=JSONproperty in your configuration.
- Topics should contain data in JSON format with a defined JSON schema. For example:
Test the connection
After the connector is created, ensure that:
- There are no errors in logs and in Redpanda Console.
- Database tables contain data from Redpanda topics.
JDBC Sink connector issues are reported as failed tasks.
|PSQLException: FATAL: database "invalid-database" does not exist||Make sure the |
|UnknownHostException: invalid-host||Make sure the |
|PSQLException: Connection to postgres:1234 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections||Make sure the |
|PSQLException: FATAL: password authentication failed for user "postgres"||Verify that the |
|ConnectException: topic_name.Value (STRUCT) type doesn't have a mapping to the SQL database column type||The JDBC Sink connector is not compatible with the Debezium PostgreSQL Source connector. Kafka Connect JSON produced by the Debezium Connector is not compatible with what the JDBC Sink Connector is expecting. Try changing a topic name. The JDBC Source connector is compatible with the JDBC Sink connector, and can be used as an alternative for a Debezium PostgreSQL source connector.|