Iceberg Topics

Beta

This feature requires an enterprise license. To get a trial license key or extend your trial period, generate a new trial license key. To purchase a license, contact Redpanda Sales.

If Redpanda has enterprise features enabled and it cannot find a valid license, restrictions apply.

The Apache Iceberg integration for Redpanda allows you to store topic data in the cloud in the Iceberg open table format. This makes your streaming data immediately available in downstream analytical systems, including data warehouses like Snowflake, Databricks, Clickhouse, and Redshift, without setting up and maintaining additional ETL pipelines. You can also integrate your data directly into commonly-used big data processing frameworks, such as Apache Spark and Flink, standardizing and simplifying the consumption of streams as tables in a wide variety of data analytics pipelines.

The Iceberg integration uses Tiered Storage. When a cluster or topic has Tiered Storage enabled, Redpanda stores the Iceberg files in the configured Tiered Storage bucket or container.

Prerequisites

  • Install rpk.

  • To check if you already have a license key applied to your cluster:

    rpk cluster license info
  • Enable Tiered Storage for the topics for which you want to generate Iceberg tables.

Limitations

  • It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda.

  • If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data.

  • JSON schemas are not currently supported. If the topic data is in JSON, use the key_value mode to store the JSON in Iceberg, which then can be parsed by most query engines.

  • If you are using Avro or Protobuf data, you must use the Schema Registry wire format, where producers include the magic byte and schema ID in the message payload header. See also: Server-Side Schema ID Validation and the Understanding Apache Kafka Schema Registry blog post to learn more about the wire format.

  • You can only use one schema per topic. Schema versioning as well as upcasting (where a value is cast into its more generic data type) are not supported. See Schema types translation for more details.

  • Remote read replicas and topic recovery are not supported for Iceberg-enabled topics.

Iceberg concepts

Apache Iceberg is an open source format specification for defining structured tables in a data lake. The table format lets you quickly and easily manage, query, and process huge amounts of structured and unstructured data. This is similar to the way in which you would manage and run SQL queries against relational data in a database or data warehouse. The open format lets you use many different languages, tools, and applications to process the same data in a consistent way, so you can avoid vendor lock-in. This data management system is referred to as a data lakehouse.

In the Iceberg specification, tables consist of the following layers:

  • Data layer

    • Data files: Store the data. The Iceberg integration currently supports the Parquet file format. Parquet files are column-based and suitable for analytical workloads at scale. They come with compression capabilities that optimize files for object storage.

  • Metadata layer: Stores table metadata separately from data files. The metadata layer allows multiple writers to stage metadata changes and apply updates atomically. It also supports database snapshots, and time travel queries that query the database at a previous point in time.

    • Manifest files: Track data files and contain metadata about these files, such as record count, partition membership, and file paths.

    • Manifest list: Tracks all the manifest files belonging to a table, including file paths and upper and lower bounds for partition fields.

    • Metadata file: Stores metadata about the table, including its schema, partition information, and snapshots. Whenever a change is made to the table, a new metadata file is created and becomes the latest version of the metadata in the catalog.

    In the Redpanda Iceberg integration, the manifest files are in JSON format.

  • Catalog: Contains the current metadata pointer for the table. Clients reading and writing data to the table see the same version of the current state of the table. The Iceberg integration supports two catalog integration types. You can configure Redpanda to catalog files stored in the same object storage bucket or container where the Iceberg data files are located, or you can configure Redpanda to use an Iceberg REST catalog endpoint to update an externally-managed catalog when there are changes to the Iceberg data and metadata.

iceberg integration

When you enable the Iceberg integration for a Redpanda topic, Redpanda brokers store streaming data in the Iceberg-compatible format in Parquet files in object storage, in addition to the log segments uploaded using Tiered Storage. Storing the streaming data in Iceberg tables in the cloud allows you to derive real-time insights through many compatible data lakehouse, data engineering, and business intelligence tools.

Enable Iceberg integration

To create an Iceberg table for a Redpanda topic, you must set the cluster configuration property iceberg_delete to true, and also configure the topic property redpanda.iceberg.mode. You can choose to provide a schema if you need the Iceberg table to be structured with defined columns.

  1. Set the iceberg_enabled configuration option on your cluster to true. You must restart your cluster if you change this configuration for a running cluster.

    rpk cluster config set iceberg_enabled true
    Successfully updated configuration. New configuration version is 2.
  2. Create a new topic.

    rpk topic create <new-topic-name>
    TOPIC            STATUS
    <new-topic-name>   OK
  3. Enable the integration for the topic by configuring redpanda.iceberg.mode. You can choose one of the following modes:

    • key_value: Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record’s value.

    • value_schema_id_prefix: Creates an Iceberg table whose structure matches the Redpanda schema for this topic, with columns corresponding to each field. You must register a schema in the Schema Registry (see next step), and producers must write to the topic using the Schema Registry wire format. Redpanda parses the schema used by the record based on the schema ID encoded in the payload header, and stores the topic values in the corresponding table columns.

    • disabled (default): Disables writing to an Iceberg table for this topic.

    rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<topic-iceberg-mode>
    TOPIC            STATUS
    <new-topic-name>   OK
  4. Register a schema for the topic. This step is required for the value_schema_id_prefix mode, but is optional otherwise.

    rpk registry schema create <subject-name> --schema </path-to-schema> --type <format>
    SUBJECT                VERSION   ID   TYPE
    <subject-name>   1         1    PROTOBUF

The Iceberg table is inside a namespace called redpanda, and has the same name as the Redpanda topic name. As you produce records to the topic, the data also becomes available in object storage for consumption by Iceberg-compatible clients.

Schema support and mapping

The redpanda.iceberg.mode property determines how Redpanda maps the topic data to the Iceberg table structure. You can either have the generated Iceberg table match the stucture of a Avro or Protobuf schema in the Schema Registry, or use the key_value mode where Redpanda stores the record values as-is in the table.

The JSON Schema format is not supported. If your topic data is in JSON, it is recommended to use the key_value mode.

Iceberg modes and table schemas

For both key_value and value_schema_id_prefix modes, Redpanda writes to a redpanda table column that stores a single Iceberg struct per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset.

For example, if you produce to a topic according to the following Avro schema:

{
    "type": "record",
    "name": "ClickEvent",
    "fields": [
        {
            "name": "user_id",
            "type": "int"
        },
        {
            "name": "event_type",
            "type": "string"
        },
        {
            "name": "ts",
            "type": "string"
        }
    ]
}

The key_value mode writes to the following table format:

CREATE TABLE ClickEvent (
    redpanda struct<
        partition: integer NOT NULL,
        timestamp: timestamp NOT NULL,
        offset:    long NOT NULL,
        headers:   array<struct<key: binary NOT NULL, value: binary>>,
        key:       binary
    >,
    value binary
)

Consider this approach if the topic data is in JSON, or if you can use the Iceberg data in its semi-structured format.

The value_schema_id_prefix mode translates to the following table format:

CREATE TABLE ClickEvent (
    redpanda struct<
        partition: integer NOT NULL,
        timestamp: timestamp NOT NULL,
        offset:    long NOT NULL,
        headers:   array<struct<key: binary NOT NULL, value: binary>>,
        key:       binary
    >,
    user_id integer NOT NULL,
    event_type string,
    ts string
)

With schema integration, Redpanda uses the schema ID prefix embedded in each record to find the matching schema in the Schema Registry. Producers to the topic must use the schema ID prefix in the serialization process so Redpanda can determine the schema used for each record, parse the record according to that schema, and use the schema for the Iceberg table as well.

Schema types translation

Redpanda supports direct translations of the following types to Iceberg value domains:

  • Avro

  • Protobuf

Avro type Iceberg type

boolean

boolean

int

int

long

long

float

float

double

double

bytes

binary

string

string

record

struct

array

list

maps

list

fixed

fixed

decimal

decimal

uuid

uuid

date

date

time

time

timestamp

timestamp

  • Different flavors of time (such as time-millis) and timestamp (such as timestamp-millis) types are translated to the same Iceberg time and timestamp types respectively.

  • Avro unions are flattened to Iceberg structs with optional fields:

    • For example, the union ["int", "long", "float"] is represented as an Iceberg struct struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>.

    • The union ["int", null, "float"] is represented as an Iceberg struct struct<0 INT NULLABLE, 1 FLOAT NULLABLE>.

  • All fields are required by default (Avro always sets a default in binary representation).

  • The Avro duration logical type is ignored.

  • The Avro null type is ignored and not represented in the Iceberg schema.

  • Recursive types are not supported.

Protobuf type Iceberg type

bool

boolean

double

double

float

float

int32

int

sint32

int

int64

long

sint64

long

sfixed32

int

sfixed64

int

string

string

bytes

binary

map

map

  • Repeated values are translated into Iceberg array types.

  • Enums are translated into Iceberg int types based on the integer value of the enumerated type.

  • uint32 and fixed32 are translated into Iceberg long types as that is the existing semantic for unsigned 32-bit values in Iceberg.

  • uint64 and fixed64 values are translated into their Base-10 string representation.

  • The timestamp type in Protobuf is translated into timestamp in Iceberg.

  • Messages are converted into Iceberg structs.

  • Recursive types are not supported.

Set up catalog integration

You can configure the Iceberg integration to either store the metadata in HadoopCatalog format in the same object storage bucket or container, or connect to a REST-based catalog.

Set the cluster configuration property iceberg_catalog_type with one of the following values:

  • rest: Connect to and update an Iceberg catalog using a REST API. See the Iceberg REST Catalog API specification.

  • object_storage: Write catalog files to the same object storage bucket as the data files. Use the object storage URL with an Iceberg client to access the catalog and data files for your Redpanda Iceberg tables.

Once you have enabled the Iceberg integration for a topic and selected a catalog type, you cannot switch to another catalog type.

For production use cases, Redpanda recommends the rest option with REST-enabled Iceberg catalog services such as Tabular, Databricks Unity and Apache Polaris.

For an Iceberg REST catalog, set the following additional cluster configuration properties:

  • iceberg_rest_catalog_endpoint: The endpoint URL for your Iceberg catalog, which you are either managing directly, or is managed by an external catalog service.

  • iceberg_rest_catalog_client_id: The ID to connect to the REST server.

  • iceberg_rest_catalog_client_secret: The secret data to connect to the REST server.

    For REST catalogs that use self-signed certificates, also configure these properties:

    • iceberg_rest_catalog_trust_file: The path to a file containing a certificate chain to trust for the REST catalog.

    • iceberg_rest_catalog_crl_file: The path to the certificate revocation list for the specified trust file.

See Cluster Configuration Properties for the full list of cluster properties to configure for a catalog integration.

Example catalog integration

You’ll be able to use to the catalog to load, query, or refresh the Iceberg data as you produce to the Redpanda topic. Refer to the official documentation of your query engine or Iceberg-compatible tool for guidance on integrating your Iceberg catalog.

REST catalog

For example, if you have Redpanda cluster configuration properties set to connect to a REST catalog named streaming:

iceberg_rest_catalog_type: rest
iceberg_rest_catalog_endpoint: http://catalog-service:8181
iceberg_rest_catalog_client_id: <rest-connection-user>
iceberg_rest_catalog_client_secret: <rest-connection-password>

And you use Spark as a processing engine, configured to use the streaming catalog:

spark.sql.catalog.streaming = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.streaming.type = rest
spark.sql.catalog.streaming.uri = http://catalog-service:8181

Using Spark SQL, you can query the Iceberg table directly by specifying the catalog name:

SELECT * FROM streaming.redpanda.ClickEvent;

Spark can use the REST catalog to automatically discover the topic’s Iceberg table.

File system-based catalog (object_storage)

If you are using the object_storage catalog type, you must set up the catalog integration in your processing engine accordingly. For example, you can configure Spark to use a file system-based catalog with at least the following properties, is using AWS S3 for object storage:

spark.sql.catalog.streaming.type = hadoop
spark.sql.catalog.streaming.warehouse = s3a://<bucket-name>/path/to/redpanda-iceberg-table

Depending on your processing engine, you may also need to create a new table in your data warehouse or lakehouse for the Iceberg data.

Access data in Iceberg tables

You can use the same analytical tools to access table data in a data lake as you would for a relational database.

For example, suppose you produce the same stream of events to a topic ClickEvent, which uses a schema, and another topic ClickEvent_key_value, which uses the key-value mode. The topics have Tiered Storage configured to an AWS S3 bucket. A sample record contains the following data:

{"user_id": 2324, "event_type": "BUTTON_CLICK", "ts": "2024-11-25T20:23:59.380Z"}

When you point your Iceberg-compatible tool or framework to the object storage location of the Iceberg tables, how you consume the data depends on the topic Iceberg mode and whether you’ve registered a schema on which to derive the table format. Depending on the processing engine, you may need to first create a new table that gets added to your Iceberg catalog implementation.

In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda in your data lakehouse.

Query topic with schema (value_schema_id_prefix mode)

In this example, it is assumed you have created the ClickEvent topic, set redpanda.iceberg.mode to value_schema_id_prefix, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for ClickEvent:

schema.avsc
{
    "type" : "record",
    "namespace" : "com.redpanda.examples.avro",
    "name" : "ClickEvent",
    "fields" : [
       { "name": "user_id", "type" : "int" },
       { "name": "event_type", "type" : "string" },
       { "name": "ts", "type": "string" }
    ]
 }

You can register the schema under the ClickEvent-value subject:

rpk registry schema create ClickEvent-value --schema path/to/schema.avsc --type avro

You can then produce to the ClickEvent topic using the following format:

echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' --schema-id=topic

The following SQL query returns values from columns in the ClickEvent table, with the table structure derived from the schema, and column names matching the schema fields. If you’ve integrated a catalog, query engines such as Spark SQL provide Iceberg integrations that allow easy discovery and access to existing Iceberg tables in object storage.

SELECT user_id,
    event_type,
    ts
FROM <catalog-name>.ClickEvent;
+---------+--------------+--------------------------+
| user_id | event_type   | ts                       |
+---------+--------------+--------------------------+
| 2324    | BUTTON_CLICK | 2024-11-25T20:23:59.380Z |
+---------+--------------+--------------------------+

Query topic in key-value mode

In key_value mode, you do not associate the topic with a schema in the Schema Registry, which means using semi-structured data in Iceberg.

In this example, it is assumed you have created the ClickEvent_key_value topic, set redpanda.iceberg.mode to key_value, and are also connecting to a REST-based Iceberg catalog.

You can produce to the ClickEvent_key_value topic using the following format:

echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'

This example queries the semi-structured data in the ClickEvent_key_value table, which also consists of another column redpanda containing the record key and other metadata:

SELECT
    value
FROM <catalog-name>.ClickEvent_key_value;
+------------------------------------------------------------------------------+
| value                                                                        |
+------------------------------------------------------------------------------+
| {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"} |
+------------------------------------------------------------------------------+