About Iceberg Topics

Beta

This feature requires an enterprise license. To get a trial license key or extend your trial period, generate a new trial license key. To purchase a license, contact Redpanda Sales.

If Redpanda has enterprise features enabled and it cannot find a valid license, restrictions apply.

The Apache Iceberg integration for Redpanda allows you to store topic data in the cloud in the Iceberg open table format. This makes your streaming data immediately available in downstream analytical systems, including data warehouses like Snowflake, Databricks, ClickHouse, and Redshift, without setting up and maintaining additional ETL pipelines. You can also integrate your data directly into commonly-used big data processing frameworks, such as Apache Spark and Flink, standardizing and simplifying the consumption of streams as tables in a wide variety of data analytics pipelines.

The Iceberg integration uses Tiered Storage. When a cluster or topic has Tiered Storage enabled, Redpanda stores the Iceberg files in the configured Tiered Storage bucket or container.

Redpanda supports version 2 of the Iceberg table format.

Iceberg concepts

Apache Iceberg is an open source format specification for defining structured tables in a data lake. The table format lets you quickly and easily manage, query, and process huge amounts of structured and unstructured data. This is similar to the way in which you would manage and run SQL queries against relational data in a database or data warehouse. The open format lets you use many different languages, tools, and applications to process the same data in a consistent way, so you can avoid vendor lock-in. This data management system is also known as a data lakehouse.

In the Iceberg specification, tables consist of the following layers:

  • Data layer: Stores the data in data files. The Iceberg integration currently supports the Parquet file format. Parquet files are column-based and suitable for analytical workloads at scale. They come with compression capabilities that optimize files for object storage.

  • Metadata layer: Stores table metadata separately from data files. The metadata layer allows multiple writers to stage metadata changes and apply updates atomically. It also supports database snapshots, and time travel queries that query the database at a previous point in time.

    • Manifest files: Track data files and contain metadata about these files, such as record count, partition membership, and file paths.

    • Manifest list: Tracks all the manifest files belonging to a table, including file paths and upper and lower bounds for partition fields.

    • Metadata file: Stores metadata about the table, including its schema, partition information, and snapshots. Whenever a change is made to the table, a new metadata file is created and becomes the latest version of the metadata in the catalog.

    For Iceberg-enabled topics, the manifest files are in JSON format.

  • Catalog: Contains the current metadata pointer for the table. Clients reading and writing data to the table see the same version of the current state of the table. The Iceberg integration supports two catalog integration types. You can configure Redpanda to catalog files stored in the same object storage bucket or container where the Iceberg data files are located, or you can configure Redpanda to use an Iceberg REST catalog endpoint to update an externally-managed catalog when there are changes to the Iceberg data and metadata.

Redpanda’s Iceberg integration

When you enable the Iceberg integration for a Redpanda topic, Redpanda brokers store streaming data in the Iceberg-compatible format in Parquet files in object storage, in addition to the log segments uploaded using Tiered Storage. Storing the streaming data in Iceberg tables in the cloud allows you to derive real-time insights through many compatible data lakehouse, data engineering, and business intelligence tools.

Prerequisites

To enable Iceberg for Redpanda topics, you must have the following:

  • rpk: See Install or Update rpk.

  • Enterprise license: To check if you already have a license key applied to your cluster:

    rpk cluster license info
  • Tiered Storage: Enable Tiered Storage for the topics for which you want to generate Iceberg tables.

Limitations

  • It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda.

  • If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data.

  • JSON schemas are not currently supported. If the topic data is in JSON, use the key_value mode to store the JSON in Iceberg, which then can be parsed by most query engines.

  • If you’re using Avro or Protobuf data, you must use the Schema Registry wire format, where producers include the magic byte and schema ID in the message payload header. See also: Server-Side Schema ID Validation and the Understanding Apache Kafka Schema Registry blog post to learn more about the wire format.

Enable Iceberg integration

To create an Iceberg table for a Redpanda topic, you must set the cluster configuration property iceberg_enabled to true, and also configure the topic property redpanda.iceberg.mode. You can choose to provide a schema if you need the Iceberg table to be structured with defined columns.

  1. Set the iceberg_enabled configuration option on your cluster to true. You must restart your cluster if you change this configuration for a running cluster.

    rpk cluster config set iceberg_enabled true
    Successfully updated configuration. New configuration version is 2.
  2. (Optional) Create a new topic.

    rpk topic create <new-topic-name>
    TOPIC              STATUS
    <new-topic-name>   OK

    To improve query performance, consider implementing custom partitioning for the Iceberg topic. Use the redpanda.iceberg.partition.spec topic property to define the partitioning scheme:

    # Create new topic with five topic partitions, replication factor 1, and custom table partitioning for Iceberg
    rpk topic create <new-topic-name> -p5 -r1 -c "redpanda.iceberg.partition.spec=(<partition-key1>, <partition-key2>, ...)"

    Valid <partition-key> values include a source column name or a transformation of a column. The columns referenced can be Redpanda-defined (such as redpanda.timestamp) or user-defined based on a schema that you register for the topic. The Iceberg table stores records that share the same partition key values in the same files based on this specification.

    For example:

    • To partition the table by a single key, such as a column col1, use: redpanda.iceberg.partition.spec=(col1).

    • To partition by multiple columns, use a comma-separated list: redpanda.iceberg.partition.spec=(col1, col2).

    • To partition by the year of a timestamp column ts1, and a string column col1, use: redpanda.iceberg.partition.spec=(year(ts1), col1).

    For details on the partitioning specification, including allowed transforms, see the Apache Iceberg documentation.

  3. Enable the integration for the topic by configuring redpanda.iceberg.mode. You can choose one of the following modes:

    • key_value: Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record’s value.

    • value_schema_id_prefix: Creates an Iceberg table whose structure matches the Redpanda schema for this topic, with columns corresponding to each field. You must register a schema in the Schema Registry (see next step), and producers must write to the topic using the Schema Registry wire format. Redpanda parses the schema used by the record based on the schema ID encoded in the payload header, and stores the topic values in the corresponding table columns.

    • disabled (default): Disables writing to an Iceberg table for this topic.

    rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<topic-iceberg-mode>
    TOPIC              STATUS
    <new-topic-name>   OK
  4. Register a schema for the topic. This step is required for the value_schema_id_prefix mode, but is optional otherwise.

    rpk registry schema create <subject-name> --schema </path-to-schema> --type <format>
    SUBJECT          VERSION   ID   TYPE
    <subject-name>   1         1    PROTOBUF

The Iceberg table is inside a namespace called redpanda, and has the same name as the Redpanda topic name. As you produce records to the topic, the data also becomes available in object storage for consumption by Iceberg-compatible clients. You can use the same analytical tools to read the Iceberg topic data in a data lake as you would for a relational database.

About schema support and translation to Iceberg format

The redpanda.iceberg.mode property determines how Redpanda maps the topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of a Avro or Protobuf schema in the Schema Registry, or you can use the key_value mode where Redpanda stores the record values as-is in the table.

The JSON Schema format is not supported. If your topic data is in JSON, it is recommended to use the key_value mode.

Iceberg modes and table schemas

For both key_value and value_schema_id_prefix modes, Redpanda writes to a redpanda table column that stores a single Iceberg struct per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset.

For example, if you produce to a topic according to the following Avro schema:

{
    "type": "record",
    "name": "ClickEvent",
    "fields": [
        {
            "name": "user_id",
            "type": "int"
        },
        {
            "name": "event_type",
            "type": "string"
        },
        {
            "name": "ts",
            "type": "string"
        }
    ]
}

The key_value mode writes to the following table format:

CREATE TABLE ClickEvent (
    redpanda struct<
        partition: integer NOT NULL,
        timestamp: timestamp NOT NULL,
        offset:    long NOT NULL,
        headers:   array<struct<key: binary NOT NULL, value: binary>>,
        key:       binary
    >,
    value binary
)

Consider this approach if the topic data is in JSON, or if you can use the Iceberg data in its semi-structured format.

The value_schema_id_prefix mode translates to the following table format:

CREATE TABLE ClickEvent (
    redpanda struct<
        partition: integer NOT NULL,
        timestamp: timestamp NOT NULL,
        offset:    long NOT NULL,
        headers:   array<struct<key: binary NOT NULL, value: binary>>,
        key:       binary
    >,
    user_id integer NOT NULL,
    event_type string,
    ts string
)

With schema integration, Redpanda uses the schema ID prefix embedded in each record to find the matching schema in the Schema Registry. Producers to the topic must use the schema ID prefix in the serialization process so Redpanda can determine the schema used for each record, parse the record according to that schema, and use the schema for the Iceberg table as well.

If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See Manage dead-letter queue for more information.

Schema types translation

Redpanda supports direct translations of the following types to Iceberg value domains:

  • Avro

  • Protobuf

Avro type Iceberg type

boolean

boolean

int

int

long

long

float

float

double

double

bytes

binary

string

string

record

struct

array

list

maps

list

fixed

fixed

decimal

decimal

uuid

uuid

date

date

time

time

timestamp

timestamp

  • Different flavors of time (such as time-millis) and timestamp (such as timestamp-millis) types are translated to the same Iceberg time and timestamp types respectively.

  • Avro unions are flattened to Iceberg structs with optional fields:

    • For example, the union ["int", "long", "float"] is represented as an Iceberg struct struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>.

    • The union ["int", null, "float"] is represented as an Iceberg struct struct<0 INT NULLABLE, 1 FLOAT NULLABLE>.

  • All fields are required by default (Avro always sets a default in binary representation).

  • The Avro duration logical type is ignored.

  • The Avro null type is ignored and not represented in the Iceberg schema.

  • Recursive types are not supported.

Protobuf type Iceberg type

bool

boolean

double

double

float

float

int32

int

sint32

int

int64

long

sint64

long

sfixed32

int

sfixed64

int

string

string

bytes

binary

map

map

  • Repeated values are translated into Iceberg array types.

  • Enums are translated into Iceberg int types based on the integer value of the enumerated type.

  • uint32 and fixed32 are translated into Iceberg long types as that is the existing semantic for unsigned 32-bit values in Iceberg.

  • uint64 and fixed64 values are translated into their Base-10 string representation.

  • The timestamp type in Protobuf is translated into timestamp in Iceberg.

  • Messages are converted into Iceberg structs.

  • Recursive types are not supported.

Schema evolution

Redpanda supports schema evolution for Avro and Protobuf schemas in accordance with the Iceberg specification. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema.

For example, if you produce records to a topic demo-topic with the following Avro schema:

schema_1.avsc
{
  "type": "record",
  "name": "ClickEvent",
  "fields": [
    {
      "name": "user_id",
      "type": "int"
    },
    {
      "name": "event_type",
      "type": "string"
    }
  ]
}
rpk registry schema create demo-topic-value --schema schema_1.avsc

echo '{"user_id":23, "event_type":"BUTTON_CLICK"}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic

Then, you update the schema to add a new field ts, and produce records with the updated schema:

schema_2.avsc
{
  "type": "record",
  "name": "ClickEvent",
  "fields": [
    {
      "name": "user_id",
      "type": "int"
    },
    {
      "name": "event_type",
      "type": "string"
    }.
    {
      "name": "ts",
      "type": [
          "null",
          { "type": "string", "logicalType": "date" }
        ],
      "default": null  # Default value for the new field
    }
  ]
}

The ts field can be either null or a string representing a date. The default value is null.

rpk registry schema create demo-topic-value --schema schema_2.avsc

echo '{"user_id":858, "event_type":"BUTTON_CLICK", "ts":{"string":"2025-02-26T20:05:23.230ZZ"}}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic

Querying the Iceberg table for demo-topic includes the new column ts:

+---------+--------------+--------------------------+
| user_id | event_type   | ts                       |
+---------+--------------+--------------------------+
| 858     | BUTTON_CLICK | 2025-02-26T20:05:23.230Z |
| 23      | BUTTON_CLICK | NULL                     |
+---------+--------------+--------------------------+

Manage dead-letter queue

Errors may occur when translating records in the value_schema_id_prefix mode to the Iceberg table format; for example, if you do not use the Schema Registry wire format with the magic byte, if the schema ID in the record is not found in the Schema Registry, or if an Avro or Protobuf data type cannot be translated to an Iceberg type.

If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named <topic-name>~dlq. To disable the default behavior for a topic and drop the record, set the redpanda.iceberg.invalid.record.action topic property to drop. You can also configure the default cluster-wide behavior for invalid records by setting the iceberg_invalid_record_action property.

The DLQ table itself uses the key_value schema, consisting of two columns: the record metadata including the key, and a binary column for the record’s value.

You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream.

Reprocess DLQ records

The following example produces a record to a topic named ClickEvent and does not use the Schema Registry wire format that includes the magic byte and schema ID:

echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n'

Querying the DLQ table returns the record that was not translated:

SELECT
    value
FROM <catalog-name>."ClickEvent~dlq"; -- Fully qualified table name
+-------------------------------------------------+
| value                                           |
+-------------------------------------------------+
| 7b 22 75 73 65 72 5f 69 64 22 3a 32 33 32 34 2c |
| 22 65 76 65 6e 74 5f 74 79 70 65 22 3a 22 42 55 |
| 54 54 4f 4e 5f 43 4c 49 43 4b 22 2c 22 74 73 22 |
| 3a 22 32 30 32 34 2d 31 31 2d 32 35 54 32 30 3a |
| 32 33 3a 35 39 2e 33 38 30 5a 22 7d             |
+-------------------------------------------------+

The data is in binary format, and the first byte is not 0x00, indicating that it was not produced with a schema.

You can apply a transformation and reprocess the record in your data lakehouse to the original Iceberg table. In this case, you have a JSON value represented as a UTF-8 binary. Depending on your query engine, you might need to decode the binary value first before extracting the JSON fields. Some engines may automatically decode the binary value for you:

ClickHouse SQL example to reprocess DLQ record
SELECT
    CAST(jsonExtractString(json, 'user_id') AS Int32) AS user_id,
    jsonExtractString(json, 'event_type') AS event_type,
    jsonExtractString(json, 'ts') AS ts
FROM (
    SELECT
        CAST(value AS String) AS json
    FROM <catalog-name>.`ClickEvent~dlq` -- Ensure that the table name is properly parsed
);
+---------+--------------+--------------------------+
| user_id | event_type   | ts                       |
+---------+--------------+--------------------------+
|    2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z |
+---------+--------------+--------------------------+

You can now insert the transformed record back into the main Iceberg table. Redpanda recommends employing a strategy for exactly-once processing to avoid duplicates when reprocessing records.