Skip to main content
Version: 23.1

Create a JDBC Source Connector

You can use a JDBC Source connector to import batches of rows from MySQL, PostgreSQL, SQLite, and SQL Server relational databases into Redpanda topics.

Prerequisites

  • Relational database instance that is accessible from the JDBC Source connector instance.
  • Database user has been created.

Limitations

The JDBC Source connector has the following limitations:

  • Only JSON or AVRO formats can be used as a value converter.

  • Only the following databases are supported:

    • MySQL 5.7 and 8.0
    • PostgreSQL 8.2 and higher using the version 3.0 of the PostgreSQL® protocol
    • SQLite
    • SQL Server - Microsoft SQL versions: Azure SQL Database, Azure Synapse Analytics, Azure SQL Managed Instance, SQL Server 2014, SQL Server 2016, SQL Server 2017, SQL Server 2019

Create a JDBC Source connector

To create the JDBC Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from JDBC.

  3. On the Create Connector page, specify the following required connector configuration options:

    PropertyDescription
    Topic prefixPrefix to prepend to table names to generate the name of the Kafka topic to which to publish data, or in the case of a custom query, the full name of the topic to publish to.
    JDBC URLThe database connection JDBC URL.
    UserName of the database user to be used when connecting to the database.
    PasswordPassword of the database user to be used when connecting to the database.
    Kafka message value formatFormat of the value in the Kafka topic. JSON is the default.
    Max TasksMaximum number of tasks to use for this connector. The default is 1. Each task replicates an exclusive set of partitions assigned to it.
    Connector nameGlobally-unique name to use for this connector.
  4. Click Next. Review the connector properties specified, then click Create.

Advanced JDBC Source connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:

PropertyDescription
JDBC connection attemptsMaximum number of attempts to retrieve a valid JDBC connection. The default is 3.
JDBC connection backoff (ms))Backoff time between connection attempts. The default is 10000.
Kafka message key formatFormat of the key in the Kafka topic. BYTES is the default.
Kafka message headers formatFormat of the headers in the Kafka topic. The default is SIMPLE.
Include tablesList of tables to include when copying. If specified, you cannot specify the Exclude Tables property.
Exclude tablesList of tables to exclude when copying. If specified, you cannot specify the Include Tables property.
Qualify table namesSpecifies whether or not to use fully-qualified table names when querying the database. If disabled, queries are performed with unqualified table names. This property may be useful if the database has been configured with a search path that automatically directs unqualified queries to the correct table when there are multiple tables available with the same unqualified name.
Catalog patternCatalog pattern used to fetch table metadata from the database. null (default) means that the catalog name is not to be used to narrow the search to fetch all table metadata, regardless of the catalog. ""retrieves those without a catalog.
Schema patternSchema pattern used to fetch table metadata from the database: * "" retrieves those without a schema. * null (default) specifies that the schema name is not to be used to narrow the search, so that all table metadata is fetched, regardless of the schema.
DB time zoneName of the JDBC timezone that should be used in the connector when querying with time-based criteria. Default is UTC.
Max rows per batchMaximum number of rows to include in a single batch when polling for new data. You can use this property to limit the amount of data buffered internally in the connector. The default is 100.
Incrementing column nameThe name of the strictly incrementing column to use to detect new rows. An empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column cannot not be nullable.
Incrementing column initial valueFor the incrementing column, consider only the rows that have a value greater than this. Specify if you need to pick up rows with negative or zero value, or if you want to skip rows. The default is -1.
Table loading modeThe mode for updating a table each time it is polled. Options include:
  • bulk: perform a bulk load of the entire table each time it is polled.
  • incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this does not detect modifications or deletions of existing rows.
  • timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. Based on the assumption that the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
  • timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column, which provides a globally unique ID for updates so that each row can be assigned a unique stream offset.
Map Numeric Values, Integral or Decimal, By Precision and ScaleMap NUMERIC values by precision and optionally scale to integral or decimal types:
  • none (default): use if all NUMERIC columns are to be represented by Connect's DECIMAL logical type. This may lead to serialization issues with Avro because Connect's DECIMAL type is mapped to its binary representation
  • best_fit: use if NUMERIC columns should be cast to Connect's INT8, INT16, INT32, INT64, or FLOAT64 based upon the column's precision and scale. Is often preferred because it maps to the most appropriate primitive type.
  • precision_only: use to map NUMERIC columns based only on the column's precision (assuming that column's scale is 0).
Poll interval (ms)Frequency used to poll for new data in each table. The default is 5000.
QuerySpecifies the query to use to select new or updated rows. Use to join tables, select subsets of columns in a table, or to filter data. When specified, this connector will only copy data using this query, and whole-table copying will be disabled. Different query modes may still be used for incremental updates, but to properly construct the incremental query, it must be possible to append a WHERE clause to this query (that is, no WHERE clauses can be used). If you use a WHERE clause, it must handle incremental queries itself.
Quote SQL identifiersSpecifies whether or not to delimit (in most databases, a quote with double quotation marks) identifiers (for example, table names and column names) in SQL statements.
Metadata change monitoring interval (ms)Frequency to poll for new or removed tables, which may result in updated task configurations to start polling for data in added tables, or stop polling for data in removed tables. The default is 60000.
Table typesBy default, the JDBC connector only detects tables with type TABLE from the source Database. This property allows a command separated list of table types to extract. Options include: TABLE (default) VIEW SYSTEM TABLE GLOBAL TEMPORARY LOCAL TEMPORARY ALIAS SYNONYM. In most cases, it is best to specify TABLE or VIEW.
Timestamp column nameComma separated list of one or more timestamp columns to detect new or modified rows using the COALESCE SQL function. Rows whose first non-null timestamp value is greater than the largest previous timestamp value seen aare discovered with each poll. At least one column should not be nullable.
Delay interval (ms)The amount of time to wait after a row with a certain timestamp appears before including it in the result. You can add a delay to allow transactions with earlier timestamp to complete. The first execution fetches all available records (that is, starting at a timestamp greater than 0) until current time minus the delay. Every following execution will get data from the last time fetched until the current time, minus the delay.
Initial timestamp (ms) since epochThe initial value of the timestamp when selecting records. Value can be negative. The records having a timestamp greater than the value are included in the result.
Validate non nullBy default, the JDBC connector validates that all incrementing and timestamp tables have NOT NULL set for the columns being used as their ID/timestamp. If the tables don't, then the JDBC connector will fail to start. Setting to false disables these checks.
Database dialectThe name of the database dialect that should be used for this connector. By default. the connector automatically determines the dialect based upon the JDBC connection URL. Use if you want to override that behavior and specify a specific dialect.
Topic creation enabledSpecifies whether or not to allow automatic creation of topics. Default is enabled.
Topic creation partitionsSpecifies the number of partitions for the created topics. The default is 1.
Topic creation replication factorSpecifies the replication factor for the created topics. The default is -1.

Map data

Use the appropriate key or value converter (input data format) for your data as follows:

  • You can use Schema Registry as an alternative to the JSON schema.
  • Use Kafka message value format = AVRO to use Schema Registry with AvroConverter.

Use the following properties to select the database data set to read from:

  • Include tables
  • Exclude tables
  • Catalog pattern
  • Schema pattern

Test the connection

After the connector is created, check to ensure that:

  • There are no errors in logs and in Redpanda Console.
  • Redpanda topics contain data from relational database tables.

Troubleshoot

Most JDBC Source connector issues are identified in the connector creation phase. Invalid Include tables are reported in logs.

MessageAction
PSQLException: FATAL: database "invalid-database" does not existMake sure the JDBC URL specifies an existing database name.
PSQLException: The connection attempt failed. for configuration Couldn't open connection / PSQLException: Connection to postgres:1234 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connectionsMake sure the JDBC URL specifies a valid database host name and port, and that the port is accessible.
PSQLException: FATAL: password authentication failed for user "postgres"Verify that the User and Password are correct.
IllegalArgumentException: Number of groups must be positive.
  1. Make sure Include tables contains a valid tables list.
  2. Include tables setting is case-sensitive, even though the underlying database isn't. Revise Include tables = tablename to Include Tables: tableName.
  3. Postgres occasionally refuses a connection for the first time. Retry creating the connector.
Error while fetching metadata with correlation id 412 : {topicName=UNKNOWN_TOPIC_OR_PARTITION}"}The specified topic does not exist. Create the topic manually or let the connector create it by setting: "topic.creation.default.partitions": "1", "topic.creation.default.replication.factor": "-1"

What do you like about this doc?




Optional: Share your email address if we can contact you about your feedback.

Let us know what we do well: