Create a JDBC Source Connector
You can use a JDBC Source connector to import batches of rows from MySQL, PostgreSQL, SQLite, and SQL Server relational databases into Redpanda topics.
Prerequisites
-
Relational database instance that is accessible from the JDBC Source connector instance.
-
Database user has been created.
Limitations
The JDBC Source connector has the following limitations:
-
Only
JSON
orAVRO
formats can be used as a value converter. -
Only the following databases are supported:
-
MySQL 5.7 and 8.0
-
PostgreSQL 8.2 and higher using the version 3.0 of the PostgreSQL® protocol
-
SQLite
-
SQL Server - Microsoft SQL versions: Azure SQL Database, Azure Synapse Analytics, Azure SQL Managed Instance, SQL Server 2014, SQL Server 2016, SQL Server 2017, SQL Server 2019
-
Create a JDBC Source connector
To create the JDBC Source connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Import from JDBC.
-
On the Create Connector page, specify the following required connector configuration options:
Property name Property key Description Topic prefix
topic.prefix
Prefix to prepend to table names to generate the name of the Kafka topic to which to publish data, or in the case of a custom query, the full name of the topic to publish to.
JDBC URL
connection.url
The database connection JDBC URL.
User
connection.user
Name of the database user to be used when connecting to the database.
Password
connection.password
Password of the database user to be used when connecting to the database.
Redpanda message value format
value.converter
Format of the value in the Kafka topic.
JSON
is the default.Max Tasks
tasks.max
Maximum number of tasks to use for this connector. The default is
1
. Each task replicates an exclusive set of partitions assigned to it.Connector name
name
Globally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Advanced JDBC Source connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property name | Property key | Description |
---|---|---|
|
|
Maximum number of attempts to retrieve a valid JDBC connection. The default is |
|
|
Backoff time between connection attempts. The default is |
|
|
Format of the key in the Kafka topic. |
|
|
Format of the headers in the Kafka topic. The default is |
|
|
List of tables to include when copying. If specified, you cannot specify the |
|
|
List of tables to exclude when copying. If specified, you cannot specify the |
|
|
Specifies whether or not to use fully-qualified table names when querying the database. If disabled, queries are performed with unqualified table names. This property may be useful if the database has been configured with a search path that automatically directs unqualified queries to the correct table when there are multiple tables available with the same unqualified name. |
|
|
Catalog pattern used to fetch table metadata from the database. |
|
|
Schema pattern used to fetch table metadata from the database: |
|
|
Name of the JDBC timezone that should be used in the connector when querying with time-based criteria. Default is |
|
|
Maximum number of rows to include in a single batch when polling for new data. You can use this property to limit the amount of data buffered internally in the connector. The default is |
|
|
The name of the strictly incrementing column to use to detect new rows. An empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column cannot not be nullable. |
|
|
For the incrementing column, consider only the rows that have a value greater than this. Specify if you need to pick up rows with negative or zero value, or if you want to skip rows. The default is |
|
|
The mode for updating a table each time it is polled. Options include:
|
|
|
Map
|
|
|
Frequency used to poll for new data in each table. The default is |
|
|
Specifies the query to use to select new or updated rows. Use to join tables, select subsets of columns in a table, or to filter data. When specified, this connector will only copy data using this query, and whole-table copying will be disabled. Different query modes may still be used for incremental updates, but to properly construct the incremental query, it must be possible to append a |
|
|
Specifies whether or not to delimit (in most databases, a quote with double quotation marks) identifiers (for example, table names and column names) in SQL statements. |
|
|
Frequency to poll for new or removed tables, which may result in updated task configurations to start polling for data in added tables, or stop polling for data in removed tables. The default is |
|
|
By default, the JDBC connector only detects tables with type TABLE from the source Database. This property allows a command separated list of table types to extract. Options include: |
|
|
Comma separated list of one or more timestamp columns to detect new or modified rows using the |
|
|
The amount of time to wait after a row with a certain timestamp appears before including it in the result. You can add a delay to allow transactions with earlier timestamp to complete. The first execution fetches all available records (that is, starting at a timestamp greater than |
|
|
The initial value of the timestamp when selecting records. Value can be negative. The records having a timestamp greater than the value are included in the result. |
|
|
By default, the JDBC connector validates that all incrementing and timestamp tables have |
|
|
The name of the database dialect that should be used for this connector. By default. the connector automatically determines the dialect based upon the JDBC connection URL. Use if you want to override that behavior and specify a specific dialect. |
|
|
Specifies whether or not to allow automatic creation of topics. Default is enabled. |
|
|
Specifies the number of partitions for the created topics. The default is |
|
|
Specifies the replication factor for the created topics. The default is |
Map data
Use the appropriate key or value converter (input data format) for your data as follows:
-
You can use Schema Registry as an alternative to the JSON schema.
-
Use
Kafka message value format
=AVRO
(io.confluent.connect.avro.AvroConverter
) to use Schema Registry withAvroConverter
.
Use the following properties to select the database data set to read from:
-
Include tables
-
Exclude tables
-
Catalog pattern
-
Schema pattern
Test the connection
After the connector is created, check to ensure that:
-
There are no errors in logs and in Redpanda Console.
-
Redpanda topics contain data from relational database tables.
Troubleshoot
Most JDBC Source connector issues are identified in the connector creation phase.
Invalid Include tables
are reported in logs. Select Show Logs to view error details.
Message | Action |
---|---|
PSQLException: FATAL: database "invalid-database" does not exist |
Make sure the |
PSQLException: The connection attempt failed. for configuration Couldn’t open connection / PSQLException: Connection to postgres:1234 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections |
Make sure the |
PSQLException: FATAL: password authentication failed for user "postgres" |
Verify that the |
IllegalArgumentException: Number of groups must be positive. |
|