Create a PostgreSQL (Debezium) Source Connector

You can use a PostgreSQL (Debezium) Source connector to import updates to Redpanda from PostgreSQL.

Prerequisites

Before you can create a PostgreSQL (Debezium) Source connector in the Redpanda Cloud, you must:

Limitations

The PostgreSQL (Debezium) Source connector has the following limitations:

  • Only JSON, CloudEvents or AVRO formats can be used for a Kafka message key and value format.

  • PostgreSQL (Debezium) connector can work with only a single task at a time.

Create a PostgreSQL (Debezium) Source connector

To create the PostgreSQL (Debezium) Source connector:

  1. In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.

  2. Select Import from PostgreSQL (Debezium).

  3. On the Create Connector page, specify the following required connector configuration options:

    Property name Property key Description

    Topic prefix

    topic.prefix

    A topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing changes. The topic prefix should be unique across all other connectors because it is used as a prefix for all Kafka topic names that receive events emitted by this connector. Only alphanumeric characters, hyphens, dots, and underscores are accepted.

    Hostname

    database.hostname

    A resolvable hostname or IP address of the PostgreSQL database server.

    Port

    database.port

    Integer port number of the PostgreSQL database server.

    User

    database.user

    Name of the PostgreSQL user to be used when connecting to the PostgreSQL database.

    Password

    database.password

    The password of the PostgreSQL database user who will be connecting to the PostgreSQL database.

    Database

    database.dbname

    The name of the database from which the connector will import changes.

    SSL mode

    database.sslmode

    Specifies whether to use an encrypted connection to the PostgreSQL server. Select disable to use an unencrypted connection. Select require to use a secure, or encrypted connection. If a secure connection cannot be established when required is selected, then the connector fails.

    Kafka message key format

    key.converter

    Format of the key in the Redpanda topic.

    Message key JSON contains schema

    key.converter.schemas.enable

    Enable to specify that the message key contains schema in the schema field.

    Kafka message value format

    value.converter

    Format of the value in the Redpanda topic.

    Message value JSON contains schema

    value.converter.schemas.enable

    Enable to specify that the message value contains schema in the schema field.

    Connector name

    name

    Globally-unique name to use for this connector.

  4. Click Next. Review the connector properties specified, then click Create.

Map data

Use the appropriate key or value converter (input data format) for your data as follows:

  • Use Include Schemas, Include Tables and Include Columns properties to define lists of columns, tables, and schemas to read from. Alternatively, use Exclude Schemas, Exclude Tables, and Exclude Columns to define lists of columns, tables, and schemas to exclude from sources list.

  • Use only JSON (org.apache.kafka.connect.json.JsonConverter), AVRO (io.confluent.connect.avro.AvroConverter) and CloudEvents (io.debezium.converters.CloudEventsConverter) formats for the Kafka message key and value format.

Test the connection

After the connector is created:

  1. Open Redpanda Console, click the Topics tab and select a topic. Check to check to confirm that it contains data migrated from PostgreSQL. Alternatively, use the rpk consume to check the topic.

  2. Click the Connectors tab to confirm no issues have been reported for the connector.

Troubleshoot

If the connector configuration is invalid, an error appears upon clicking Finish. Select Show Logs to view error details.

Additional errors and corrective actions follow.

Message Action

Missing tables or topics

The Debezium connector replicates tables one by one. Wait for other tables to be replicated. If the database is quite large, then replication takes longer to complete.

non-existing-db

Make sure the provided database name in Database is correct, and that the database exists.

The connection attempt failed / Connection to postgres:9999 refused

Check to make sure that hostname and port are correct.

Password authentication failed for user

Make sure that the User and Password credentials are valid.

The Plugin name value is invalid

Make sure that Plugin contains a valid value, either decoderbufs or pgoutput.

Postgres server wal_level property is replica

Specify wal_level as logical for your database.

RecordTooLargeException: The message is 1050766 bytes when serialized, which is larger than 1048576, the value of the max.request.size configuration.

Increase the max request size to unblock the connector and allow large messages to pass: "producer.override.max.request.size": "209715200". The connector may be reaching memory limits and failing if the amount of data to pass or your messages are too large.