Create an Iceberg Sink Connector
You can use the Iceberg Sink connector to write data into Iceberg tables.
Prerequisites
Before you can create an Iceberg Sink connector in Redpanda Cloud, you must:
-
Create the Iceberg connector control topic, which cannot be used by other connectors. For details, see Create a Topic.
Create an Iceberg Sink connector
To create the Iceberg Sink connector:
-
In Redpanda Cloud, click Connectors in the navigation menu and then click Create Connector.
-
Select Export to Iceberg.
-
On the Create Connector page, specify the following required connector configuration options:
Property Description Topics to export
Comma-separated list of the cluster topics you want to replicate.
Iceberg control topic
The name of the control topic. You must create this topic before creating the Iceberg connector. It cannot be used by other Iceberg connectors.
Iceberg catalog type
The type of Iceberg catalog. Allowed options are:
REST
,HIVE
.Iceberg tables
Comma-separated list of Iceberg table names, which are specified using the format
{namespace}.{table}
. -
Click Next. Review the connector properties specified, then click Create.
Advanced Iceberg Sink connector configuration
In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following optional advanced connector configuration properties by selecting Show advanced options on the Create Connector page:
Property | Description |
---|---|
|
Commit timeout interval in ms. The default is 30000 (30 sec). |
|
For multi-table fan-out, the name of the field used to route records to tables. |
|
Name of the field containing the CDC operation, |
Map data
Use the appropriate key or value converter (input data format) for your data as follows:
-
JSON
when your messages are JSON-encoded. SelectMessage JSON contains schema
with theschema
andpayload
fields. If your messages do not contain schema, create Iceberg tables manually. -
AVRO
when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.
Sinking data produced by Debezium source connector
Debezium connectors produce data in CDC format. The message structure can be flattened by using Debezium built-in New Record State Extraction Single Message Transformation (SMT). Add the following properties to the Debezium connector configuration to make it produce flat messages:
{
...
"transforms", "unwrap",
"transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones", "false",
...
}
Depending on your particular use case, you can apply the SMT to a Debezium connector, or to a sink connector that consumes messages that the Debezium connector produces. To enable Apache Kafka to retain the Debezium change event messages in their original format, configure the SMT for a sink connector.
See also: Debezium New Record State Extraction SMT
Test the connection
After the connector is created, execute SELECT query on the Iceberg table to verify data. It may take a couple of minutes for the records to be visible in Iceberg. Check connector state and logs for errors.
Troubleshoot
Iceberg connection settings are checked for validity during first data processing. The connector can be successfully created with incorrect configuration and fail only when there are messages in source topic to process.
Message | Action |
---|---|
NoSuchTableException: Table does not exist |
Make sure Iceberg table exists and the connector iceberg.tables configuration contains correct table name in |
UnknownHostException: incorrectcatalog: Name or service not known |
Cannot connect to Iceberg catalog. Check if Iceberg catalog URI is correct and accessible. |
DataException: An error occurred converting record, topic: topicName, partition, 0, offset: 0 |
The connector cannot read the message format. Ensure the connector mapping configuration and data format are correct. |
NullPointerException: Cannot invoke "java.lang.Long.longValue()" because "value" is null |
The connector cannot read the message format. Ensure the connector mapping configuration and data format are correct. |
Suggested reading
-
For details about the Iceberg Sink connector configuration properties, see Iceberg-Kafka-Connect
-
For details about the Iceberg Sink connector internals, see Iceberg-Kafka-Connect documentation