Create a JDBC Source Connector

You can use a JDBC Source connector to import batches of rows from MySQL, PostgreSQL, SQLite, and SQL Server relational databases into Redpanda topics.

Prerequisites

  • Relational database instance that is accessible from the JDBC Source connector instance.

  • Database user has been created.

Limitations

The JDBC Source connector has the following limitations:

  • Only JSON or AVRO formats can be used as a value converter.

  • Only the following databases are supported:

    • MySQL 5.7 and 8.0

    • PostgreSQL 8.2 and higher using the version 3.0 of the PostgreSQL® protocol

    • SQLite

    • SQL Server - Microsoft SQL versions: Azure SQL Database, Azure Synapse Analytics, Azure SQL Managed Instance, SQL Server 2014, SQL Server 2016, SQL Server 2017, SQL Server 2019

Create a JDBC Source connector

To create the JDBC Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from JDBC.

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topic prefix

    topic.prefix

    Prefix to prepend to table names to generate the name of the Kafka topic to which to publish data, or in the case of a custom query, the full name of the topic to publish to.

    JDBC URL

    connection.url

    The database connection JDBC URL.

    User

    connection.user

    Name of the database user to be used when connecting to the database.

    Password

    connection.password

    Password of the database user to be used when connecting to the database.

    Redpanda message value format

    value.converter

    Format of the value in the Redpanda topic. JSON is the default.

    Max Tasks

    tasks.max

    Maximum number of tasks to use for this connector. The default is 1. Each task replicates an exclusive set of partitions assigned to it.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Advanced JDBC Source connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

Property name Property key Description

JDBC connection attempts

connection.attempts

Maximum number of attempts to retrieve a valid JDBC connection. The default is 3.

JDBC connection backoff (ms))

connection.backoff.ms

Backoff time between connection attempts. The default is 10000.

Kafka message key format

key.converter

Format of the key in the Redpanda topic. BYTES is the default.

Kafka message headers format

header.converter

Format of the headers in the Kafka topic. The default is SIMPLE.

Include tables

table.whitelist

List of tables to include when copying. If specified, you cannot specify the Exclude Tables property.

Exclude tables

table.blacklist

List of tables to exclude when copying. If specified, you cannot specify the Include Tables property.

Qualify table names

table.names.qualify

Specifies whether or not to use fully-qualified table names when querying the database. If disabled, queries are performed with unqualified table names. This property may be useful if the database has been configured with a search path that automatically directs unqualified queries to the correct table when there are multiple tables available with the same unqualified name.

Catalog pattern

catalog.pattern

Catalog pattern used to fetch table metadata from the database. null (default) means that the catalog name is not to be used to narrow the search to fetch all table metadata, regardless of the catalog. `""`retrieves those without a catalog.

Schema pattern

schema.pattern

Schema pattern used to fetch table metadata from the database: * "" retrieves those without a schema. * null (default) specifies that the schema name is not to be used to narrow the search, so that all table metadata is fetched, regardless of the schema.

DB time zone

db.timezone

Name of the JDBC timezone that should be used in the connector when querying with time-based criteria. Default is UTC.

Max rows per batch

batch.max.rows

Maximum number of rows to include in a single batch when polling for new data. You can use this property to limit the amount of data buffered internally in the connector. The default is 100.

Incrementing column name

incrementing.column.name

The name of the strictly incrementing column to use to detect new rows. An empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column cannot not be nullable.

Incrementing column initial value

incrementing.initial

For the incrementing column, consider only the rows that have a value greater than this. Specify if you need to pick up rows with negative or zero value, or if you want to skip rows. The default is -1.

Table loading mode

mode

The mode for updating a table each time it is polled. Options include:

  • bulk: perform a bulk load of the entire table each time it is polled.

  • incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this does not detect modifications or deletions of existing rows.

  • timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. Based on the assumption that the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.

  • timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column, which provides a globally unique ID for updates so that each row can be assigned a unique stream offset.

Map Numeric Values, Integral or Decimal, By Precision and Scale

numeric.mapping

Map NUMERIC values by precision and optionally scale to integral or decimal types:

  • none (default): use if all NUMERIC columns are to be represented by Connect’s DECIMAL logical type. This may lead to serialization issues with Avro because Connect’s DECIMAL type is mapped to its binary representation

  • best_fit: use if NUMERIC columns should be cast to Connect’s INT8, INT16, INT32, INT64, or FLOAT64 based upon the column’s precision and scale. Is often preferred because it maps to the most appropriate primitive type.

  • precision_only: use to map NUMERIC columns based only on the column’s precision (assuming that column’s scale is 0).

Poll interval (ms)

poll.interval.ms

Frequency used to poll for new data in each table. The default is 5000.

Query

query

Specifies the query to use to select new or updated rows. Use to join tables, select subsets of columns in a table, or to filter data. When specified, this connector will only copy data using this query, and whole-table copying will be disabled. Different query modes may still be used for incremental updates, but to properly construct the incremental query, it must be possible to append a WHERE clause to this query (that is, no WHERE clauses can be used). If you use a WHERE clause, it must handle incremental queries itself.

Quote SQL identifiers

sql.quote.identifiers

Specifies whether or not to delimit (in most databases, a quote with double quotation marks) identifiers (for example, table names and column names) in SQL statements.

Metadata change monitoring interval (ms)

table.poll.interval.ms

Frequency to poll for new or removed tables, which may result in updated task configurations to start polling for data in added tables, or stop polling for data in removed tables. The default is 60000.

Table types

table.types

By default, the JDBC connector only detects tables with type TABLE from the source Database. This property allows a command separated list of table types to extract. Options include: TABLE (default) VIEW SYSTEM TABLE GLOBAL TEMPORARY LOCAL TEMPORARY ALIAS SYNONYM. In most cases, it is best to specify TABLE or VIEW.

Timestamp column name

timestamp.column.name

Comma separated list of one or more timestamp columns to detect new or modified rows using the COALESCE SQL function. Rows whose first non-null timestamp value is greater than the largest previous timestamp value seen aare discovered with each poll. At least one column should not be nullable.

Delay interval (ms)

timestamp.delay.interval.ms

The amount of time to wait after a row with a certain timestamp appears before including it in the result. You can add a delay to allow transactions with earlier timestamp to complete. The first execution fetches all available records (that is, starting at a timestamp greater than 0) until current time minus the delay. Every following execution will get data from the last time fetched until the current time, minus the delay.

Initial timestamp (ms) since epoch

timestamp.initial.ms

The initial value of the timestamp when selecting records. Value can be negative. The records having a timestamp greater than the value are included in the result.

Validate non null

validate.non.null

By default, the JDBC connector validates that all incrementing and timestamp tables have NOT NULL set for the columns being used as their ID/timestamp. If the tables don’t, then the JDBC connector will fail to start. Setting to false disables these checks.

Database dialect

dialect.name

The name of the database dialect that should be used for this connector. By default. the connector automatically determines the dialect based upon the JDBC connection URL. Use if you want to override that behavior and specify a specific dialect.

Topic creation enabled

topic.creation.enable

Specifies whether or not to allow automatic creation of topics. Default is enabled.

Topic creation partitions

topic.creation.default. partitions

Specifies the number of partitions for the created topics. The default is 1.

Topic creation replication factor

topic.creation.default. replication.factor

Specifies the replication factor for the created topics. The default is -1.

Map data

Use the appropriate key or value converter (input data format) for your data as follows:

  • You can use Schema Registry as an alternative to the JSON schema.

  • Use Kafka message value format = AVRO (io.confluent.connect.avro.AvroConverter) to use Schema Registry with AvroConverter.

Use the following properties to select the database data set to read from:

  • Include tables

  • Exclude tables

  • Catalog pattern

  • Schema pattern

Test the connection

After the connector is created, check to ensure that:

  • There are no errors in logs and in Redpanda Console.

  • Redpanda topics contain data from relational database tables.

Troubleshoot

Most JDBC Source connector issues are identified in the connector creation phase. Invalid Include tables are reported in logs. Select Show Logs to view error details.

Message Action

PSQLException: FATAL: database "invalid-database" does not exist

Make sure the JDBC URL specifies an existing database name.

PSQLException: The connection attempt failed. for configuration Couldn’t open connection / PSQLException: Connection to postgres:1234 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections

Make sure the JDBC URL specifies a valid database host name and port, and that the port is accessible.

PSQLException: FATAL: password authentication failed for user "postgres"

Verify that the User and Password are correct.

IllegalArgumentException: Number of groups must be positive.

  1. Make sure Include tables contains a valid tables list.

  2. Include tables setting is case-sensitive, even though the underlying database isn’t. Revise Include tables = tablename to Include Tables: tableName.

  3. Postgres occasionally refuses a connection for the first time. Retry creating the connector.