# Create a JDBC Sink Connector

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Create a JDBC Sink Connector
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: managed-connectors/create-jdbc-sink-connector
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: managed-connectors/create-jdbc-sink-connector.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/managed-connectors/create-jdbc-sink-connector.adoc
description: Use the Redpanda Cloud UI to create a JDBC Sink Connector.
page-git-created-date: "2024-06-06"
page-git-modified-date: "2025-08-05"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/create-jdbc-sink-connector.md -->

> ❗ **IMPORTANT**
>
> -   To enable this feature, contact [Redpanda Support](https://support.redpanda.com/hc/en-us/requests/new). To disable this feature, see [Disable Kafka Connect](https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/disable-kc/).
>
> -   Redpanda Support does not manage or monitor Kafka Connect. For fully-supported connectors, consider [Redpanda Connect](https://docs.redpanda.com/cloud-data-platform/develop/connect/about/).
>
> -   When Kafka Connect is enabled, there is a dedicated node running even when no connectors are deployed.

You can use a JDBC Sink connector to export structured data from Redpanda to a relational database.

## [](#prerequisites)Prerequisites

Before you can create a JDBC Sink connector in the Redpanda Cloud, you must have a:

-   Relational database instance that is accessible from the JDBC Sink connector instance

-   Database user


## [](#limitations)Limitations

The JDBC Sink connector has the following limitations:

-   Only `JSON` or `AVRO` formats can be used as a value converter.

-   Only the following databases are supported:

    -   MySQL 5.7 and 8.0

    -   PostgreSQL 8.2 and higher using the version 3.0 of the PostgreSQL® protocol

    -   SQLite

    -   SQL Server - Microsoft SQL versions: Azure SQL Database, Azure Synapse Analytics, Azure SQL Managed Instance, SQL Server 2014, SQL Server 2016, SQL Server 2017, SQL Server 2019



## [](#create-a-jdbc-sink-connector)Create a JDBC Sink connector

To create the JDBC Sink connector:

1.  In Redpanda Cloud, click **Connectors** in the navigation menu, and then click **Create Connector**.

2.  Select **Export to JDBC**.

3.  On the **Create Connector** page, specify the following required connector configuration options:

    | Property name | Property key | Description |
    | --- | --- | --- |
    | Topics to export | topics | Comma-separated list of the cluster topics you want to replicate. |
    | Topics regex | topics.regex | Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected. |
    | JDBC URL | connection.url | The database connection JDBC URL. |
    | User | connection.user | Name of the database user to be used when connecting to the database. |
    | Password | connection.password | Password of the database user to be used when connecting to the database. |
    | Redpanda message key format | key.converter | Format of the key in the Redpanda topic. BYTES is the default. |
    | Redpanda message value format | value.converter | Format of the value in the Redpanda topic. JSON is the default. |
    | Auto-create | auto.create | When enabled, automatically creates the destination table (if it is missing) based on the record schema (issues a CREATE). The default is disabled. |
    | Max Tasks | tasks.max | Maximum number of tasks to use for this connector. The default is 1. Each task replicates exclusive set of partitions assigned to it. |
    | Connector name | name | Globally-unique name to use for this connector. |

4.  Click **Next**. Review the connector properties specified, then click **Create**.


### [](#advanced-jdbc-sink-connector-configuration)Advanced JDBC Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following _optional_ advanced connector configuration properties by selecting **Show advanced options** on the **Create Connector** page:

| Property name | Property key | Description |
| --- | --- | --- |
| Include fields | fields.whitelist | List of comma-separated record value field names. If the value of this property is empty, the connector uses all fields from the record to migrate to a database. Otherwise, the connector uses only the record fields that are specified (in a comma-separated format). Note that Primary Key Fields is applied independently in the context of which fields form the primary key columns in the destination database, while this configuration is applicable for the other columns. |
| Topics to tables mapping | topics.to.tables.mapping | Kafka topics to database tables mapping. Comma-separated list of topic to table mapping in the format: topic_name:table_name. If the destination table is found in the mapping, then it overrides the generated one defined in table.name.format. |
| Table name format | table.name.format | A format string for the destination table name, which may contain ${topic} as a placeholder for the original topic name. For example, kafka_${topic} for the topic orders maps to the table name kafka_orders. The default is ${topic}. |
| Table name normalize | table.name.normalize | Specifies whether or not to normalize destination table names for topics. When enabled, the alphanumeric characters (a-z, A-Z, 0-9) and remain as is, others (such as .) are replaced with . By default, is disabled. |
| Quote SQL identifiers | sql.quote.identifiers | Specifies whether or not to delimit (in most databases, a quote with double quotation marks) identifiers (for example, table names and column names) in SQL statements. By default, enabled. |
| Auto-evolve | auto.evolve | Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing ALTER. |
| Batch size | batch.size | Specifies how many records to attempt to batch together for insertion into the destination table, when possible. The default is 3000. |
| DB time zone | db.timezone | Name of the JDBC timezone that should be used in the connector when querying with time-based criteria. Default is UTC. |
| Insert mode | insert.mode | The insertion mode to use. The supported modes are:INSERT: standard SQL INSERT statementsMULTI: multi-row INSERT statementsUPSERT: use the appropriate upsert semantics for the target database if it is supported by the connector; for example, INSERT .. ON CONFLICT .. DO UPDATE SET ..UPDATE: use the appropriate update semantics for the target database if it is supported by the connector; for example, UPDATE. |
| Primary key mode | pk.mode | The primary key mode to use. Supported modes are:NONE: no keys utilizedkafka: Kafka coordinates (the topic, partition, and offset) are used as the primary keyRECORD_KEY: fields from the record key are used, which may be a primitive or a structRECORD_VALUE: fields from the record value are used, which must be a struct. |
| Primary key fields | pk.fields | Comma-separated list of primary key field names. The runtime interpretation of this configuration depends on the pk.mode. Supported modes are:none: ignored because no fields are used as primary key in this mode.kafka: must be a trio representing the Kafka coordinates (the topic, partition, and offset). Defaults to connect_topic,connect_partition,__connect_offset if empty.record_key: if empty, all fields from the key struct will be used, otherwise used to extract the desired fields. For primitive key, only a single field name must be configured.record_value: if empty, all fields from the value struct will be used, otherwise used to extract the desired fields. |
| Maximum retries | max.retries | The maximum number of times to retry on errors before failing the task. The default is 10. |
| Retry backoff (ms) | retry.backoff.ms | The time in milliseconds to wait before a retry attempt is made following an error. The default is 3000. |
| Database dialect | dialect.name | The name of the database dialect that should be used for this connector. By default. the connector automatically determines the dialect based upon the JDBC connection URL. Use if you want to override that behavior and specify a specific dialect. |
| Error tolerance | errors.tolerance | Error tolerance response during connector operation. Default value is none and signals that any error will result in an immediate connector task failure. Value of all changes the behavior to skip over problematic records. |
| Dead letter queue topic name | errors.deadletterqueue.topic.name | The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, its transformations, or converters. The topic name is blank by default, which means that no messages are recorded in the DLQ. |
| Dead letter queue topic replication factor | errors.deadletterqueue.topic .replication.factor | Replication factor used to create the dead letter queue topic when it doesn’t already exist. |
| Enable error context headers | errors.deadletterqueue.context .headers.enable | When true, adds a header containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, start with __connect.errors. |

## [](#map-data)Map data

Use the appropriate key or value converter (input data format) for your data as follows:

-   Use the default `Redpanda message value format` = `JSON` (`org.apache.kafka.connect.json.JsonConverter`) property in your configuration.

-   Topics should contain data in JSON format with a defined JSON schema. For example:

    ```json
    {
       "schema": {
         "type": "struct",
         "fields": [
         ]
       },
       "payload": {
       }
    }
    ```


## [](#test-the-connection)Test the connection

After the connector is created, ensure that:

-   There are no errors in logs and in Redpanda Console.

-   Database tables contain data from Redpanda topics.


## [](#troubleshoot)Troubleshoot

JDBC Sink connector issues are reported as failed tasks. Select **Show Logs** to view error details.

| Message | Action |
| --- | --- |
| PSQLException: FATAL: database "invalid-database" does not exist | Make sure the JDBC URL specifies an existing database name. |
| UnknownHostException: invalid-host | Make sure the JDBC URL specifies a valid database host name. |
| PSQLException: Connection to postgres:1234 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections | Make sure the JDBC URL specifies a valid database host name and port, and that the port is accessible. |
| PSQLException: FATAL: password authentication failed for user "postgres" | Verify that the User and Password are correct. |
| ConnectException: topic_name.Value (STRUCT) type doesn’t have a mapping to the SQL database column type | The JDBC Sink connector is not compatible with the Debezium PostgreSQL Source connector. Kafka Connect JSON produced by the Debezium Connector is not compatible with what the JDBC Sink Connector is expecting. Try changing a topic name. The JDBC Source connector is compatible with the JDBC Sink connector, and can be used as an alternative for a Debezium PostgreSQL source connector. |