# Create an Iceberg Sink Connector

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Create an Iceberg Sink Connector
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: managed-connectors/create-iceberg-sink-connector
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: managed-connectors/create-iceberg-sink-connector.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/managed-connectors/create-iceberg-sink-connector.adoc
description: Use the Redpanda Cloud UI to create an Iceberg Sink Connector.
page-git-created-date: "2024-06-06"
page-git-modified-date: "2026-03-31"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/create-iceberg-sink-connector.md -->

> ❗ **IMPORTANT**
>
> -   To enable this feature, contact [Redpanda Support](https://support.redpanda.com/hc/en-us/requests/new). To disable this feature, see [Disable Kafka Connect](https://docs.redpanda.com/cloud-data-platform/develop/managed-connectors/disable-kc/).
>
> -   Redpanda Support does not manage or monitor Kafka Connect. For fully-supported connectors, consider [Redpanda Connect](https://docs.redpanda.com/cloud-data-platform/develop/connect/about/).
>
> -   When Kafka Connect is enabled, there is a dedicated node running even when no connectors are deployed.

You can use the Iceberg Sink connector to accomplish the following:

-   Write data into Iceberg tables

-   Commit coordination for centralized Iceberg commits

-   Exactly-once delivery semantics

-   Multi-table fan-out

-   Row mutations (update/delete rows), upsert mode

-   Automatic table creation and schema evolution

-   Field name mapping via Iceberg’s column mapping functionality


## [](#prerequisites)Prerequisites

Before you can create an Iceberg Sink connector in Redpanda Cloud, you must:

1.  [Set up an Iceberg catalog](https://iceberg.apache.org/concepts/catalog/).

2.  Create the Iceberg connector control topic, which cannot be used by other connectors. For details, see [Create a Topic](https://docs.redpanda.com/cloud-data-platform/develop/topics/create-topic/).


## [](#limitations)Limitations

-   Each Iceberg sink connector must have its own control topic, which you should create before creating the connector.


## [](#create-an-iceberg-sink-connector)Create an Iceberg Sink connector

To create the Iceberg Sink connector:

1.  In Redpanda Cloud, click **Connectors** in the navigation menu and then click **Create Connector**.

2.  Select **Export to Iceberg**.

3.  On the **Create Connector** page, specify the following required connector configuration options:

    | Property name | Property key | Description |
    | --- | --- | --- |
    | Topics to export | topics | Comma-separated list of the cluster topics you want to replicate. |
    | Topics regex | topics.regex | Java regular expression of topics to replicate. For example: specify .* to replicate all available topics in the cluster. Applicable only when Use regular expressions is selected. |
    | Iceberg control topic | iceberg.control.topic | The name of the control topic. You must create this topic before creating the Iceberg connector. It cannot be used by other Iceberg connectors. |
    | Iceberg catalog type | iceberg.catalog.type | The type of Iceberg catalog. Allowed options are: REST, HIVE, HADOOP. |
    | Iceberg tables | iceberg.tables | Comma-separated list of Iceberg table names, which are specified using the format {namespace}.{table}. |

4.  Click **Next**. Review the connector properties specified, then click **Create**.


### [](#advanced-iceberg-sink-connector-configuration)Advanced Iceberg Sink connector configuration

In most instances, the preceding basic configuration properties are sufficient. If you require additional property settings, then specify any of the following _optional_ advanced connector configuration properties by selecting **Show advanced options** on the **Create Connector** page:

| Property name | Property key | Description |
| --- | --- | --- |
| Iceberg commit timeout | iceberg.control.commit.timeout-ms | Commit timeout interval in ms. The default is 30000 (30 sec). |
| Iceberg tables route field | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables. |
| Iceberg tables CDC field | iceberg.tables.cdc-field | Name of the field containing the CDC operation, I, U, or D. Default is none. |

## [](#map-data)Map data

Use the appropriate key or value converter (input data format) for your data as follows:

-   `JSON` when your messages are JSON-encoded. Select `Message JSON contains schema` with the `schema` and `payload` fields. If your messages do not contain schema, create Iceberg tables manually.

-   `AVRO` when your messages contain AVRO-encoded messages, with schema stored in the Schema Registry.


An Iceberg table’s schema is a list of named columns. All data types are either primitives or nested types, which are maps, lists, or structs. A table schema is also a struct type.

See also: [Schemas and Data Types](https://iceberg.apache.org/spec/#schemas-and-data-types)

## [](#sinking-data-produced-by-debezium-source-connector)Sinking data produced by Debezium source connector

Debezium connectors produce data in CDC format. The message structure can be flattened by using Debezium built-in New Record State Extraction Single Message Transformation (SMT). Add the following properties to the Debezium connector configuration to make it produce flat messages:

```json
{
    ...
    "transforms", "unwrap",
    "transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones", "false",
    ...
}
```

Depending on your particular use case, you can apply the SMT to a Debezium connector, or to a sink connector that consumes messages that the Debezium connector produces. To enable Apache Kafka to retain the Debezium change event messages in their original format, configure the SMT for a sink connector.

See also: [Debezium New Record State Extraction SMT](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html)

## [](#use-analytical-tools-with-iceberg)Use analytical tools with Iceberg

Iceberg serves as a single storage solution for analytical data. It is inexpensive to read from various tools such as AWS Athena, Snowflake, or Apache Spark.

Traditionally, data import involved pushing data to every tool, incurring high costs for data transfer and storage. Alternatively, you could use plain S3 buckets with Avro or CSV files, but this struggles with schema evolution. [Apache Iceberg](https://iceberg.apache.org) addresses all of these challenges: cost of data transfer, multiple data copies in storage, and support for schema evolution.

![Iceberg sink connector diagram](https://docs.redpanda.com/cloud-data-platform/shared/_images/iceberg_sink_connector_diagram.png)

The following example uses:

-   Iceberg REST catalog

-   AWS S3 bucket as the storage for Iceberg files

-   Apache Spark, which reads the Iceberg data from an S3 bucket


```yaml
version: '3'
services:
  redpanda:
    image: docker.redpanda.com/redpandadata/redpanda:latest
    command:
      - redpanda start
      - --smp 1
      - --overprovisioned
      - --node-id 0
      - --reserve-memory 0M
      - --check=false
      - --set redpanda.auto_create_topics_enabled=false
      - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
      - --pandaproxy-addr 0.0.0.0:8082
      - --advertise-pandaproxy-addr localhost:8082
    ports:
      - 8081:8081
      - 8082:8082
      - 9092:9092
      - 9644:9644
      - 29092:29092

  console:
    image: docker.redpanda.com/redpandadata/console:latest
    restart: on-failure
    entrypoint: /bin/sh
    command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda:29092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda:8081"]
        connect:
          enabled: true
          clusters:
            - name: connectors
              url: http://connect:8083
    ports:
      - "8090:8080"
    depends_on:
      - redpanda

  connect:
    image: docker.redpanda.com/redpandadata/connectors:latest
    hostname: connect
    depends_on:
      - redpanda
      - spark-iceberg
    ports:
      - "8083:8083"
      - "9404:9404"
    environment:
      CONNECT_CONFIGURATION: |
        key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        group.id=connectors-cluster
        offset.storage.topic=_internal_connectors_offsets
        config.storage.topic=_internal_connectors_configs
        status.storage.topic=_internal_connectors_status
        config.storage.replication.factor=-1
        offset.storage.replication.factor=-1
        status.storage.replication.factor=-1
        producer.linger.ms=1
        producer.batch.size=131072
        config.providers=file
        config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
      CONNECT_BOOTSTRAP_SERVERS: redpanda:29092
      SCHEMA_REGISTRY_URL: http://redpanda:8081
      CONNECT_GC_LOG_ENABLED: "false"
      CONNECT_HEAP_OPTS: -Xms512M -Xmx512M
      CONNECT_LOG_LEVEL: info
      CONNECT_TOPIC_LOG_ENABLED: "true"
      CONNECT_PLUGIN_PATH: "/opt/kafka/connect-plugins"

  spark-iceberg:
    image: tabulario/spark-iceberg:3.4.1_1.3.1
    build: spark/
    depends_on:
      - rest
    volumes:
      - ./warehouse:/home/iceberg/warehouse
    environment:
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
      - AWS_REGION=${AWS_REGION}
    ports:
      - 8888:8888
      - 8080:8080
      - 10000:10000
      - 10001:10001

  rest:
    image: tabulario/iceberg-rest:0.6.0
    ports:
      - 8181:8181
    environment:
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
      - AWS_REGION=${AWS_REGION}
      - CATALOG_WAREHOUSE=s3://bucket-name/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
```

Use Spark-SQL to:

-   List databases:

    ```none
    spark-sql ()> show databases;
    testdb
    ```

-   Show tables in database:

    ```none
    spark-sql ()> show tables in testdb;
    testtable
    ```

-   Select data from table:

    ```none
    spark-sql ()> select * from testdb.testtable;
    ```


## [](#use-with-aws-glue-data-catalog-and-aws-lake-formation)Use with AWS Glue Data Catalog and AWS Lake Formation

The connector can be used with the AWS Glue Data Catalog and the AWS Lake Formation service. AWS Lake Formation only lets you use the role form of authentication. The connectors UI does not support Lake Formation-specific properties. Use the JSON editor instead. Sample configuration:

```json
{
    ...
    "iceberg.catalog.client.assume-role.region": "the-region",
    "iceberg.catalog.client.assume-role.arn": "arn:aws:iam::account-number:role/role-name",
    "iceberg.catalog.glue.account-id": "NNN",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.catalog.client.assume-role.tags.LakeFormationAuthorizedCaller": "iceberg-connect",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog": "catalog_name",
    "iceberg.catalog.warehouse": "s3://bucket-name/my/data",
    "iceberg.catalog.s3.path-style-access": "true"
}
```

## [](#test-the-connection)Test the connection

After the connector is created, execute SELECT query on the Iceberg table to verify data. It may take a couple of minutes for the records to be visible in Iceberg. Check connector state and logs for errors.

## [](#troubleshoot)Troubleshoot

Iceberg connection settings are checked for validity during first data processing. The connector can be successfully created with incorrect configuration and fail only when there are messages in source topic to process.

| Message | Action |
| --- | --- |
| NoSuchTableException: Table does not exist | Make sure Iceberg table exists and the connector iceberg.tables configuration contains correct table name in {namespace}.{table} format. |
| UnknownHostException: incorrectcatalog: Name or service not known | Cannot connect to Iceberg catalog. Check if Iceberg catalog URI is correct and accessible. |
| DataException: An error occurred converting record, topic: topicName, partition, 0, offset: 0 | The connector cannot read the message format. Ensure the connector mapping configuration and data format are correct. |
| NullPointerException: Cannot invoke "java.lang.Long.longValue()" because "value" is null | The connector cannot read the message format. Ensure the connector mapping configuration and data format are correct. |

## [](#suggested-reading)Suggested reading

-   For details about the Iceberg Sink connector configuration properties, see [Iceberg-Kafka-Connect](https://github.com/tabular-io/iceberg-kafka-connect)

-   For details about the Iceberg Sink connector internals, see [Iceberg-Kafka-Connect documentation](https://github.com/tabular-io/iceberg-kafka-connect/tree/main/docs)