iceberg

Write data to Apache Iceberg tables via REST catalog.

Write streaming data to Apache Iceberg tables using the REST catalog API. This output supports:

  • Multiple storage backends (S3, GCS, Azure)

  • Automatic table creation with schema detection

  • Partition transforms (year, month, day, hour, bucket, truncate)

  • Schema evolution (automatic column addition)

  • Transaction retry logic for concurrent writes

This output is designed to work with REST catalog implementations like Apache Polaris, AWS Glue Data Catalog, and the Databricks Unity Catalog.

Apache Polaris

To use with Apache Polaris:

  • Set catalog.url to the Polaris REST endpoint (e.g., http://localhost:8181/api/catalog).

  • Set catalog.warehouse to the catalog name configured in Polaris.

  • Configure catalog.auth.oauth2 with client credentials granted access to the catalog.

AWS Glue Data Catalog

To use with AWS Glue Data Catalog:

  • Set catalog.url to https://glue.<region>.amazonaws.com/iceberg (the REST client appends the API version automatically).

  • Set catalog.warehouse to your AWS account ID (the Glue catalog identifier).

  • Set schema_evolution.table_location to an S3 prefix (e.g., s3://my-bucket/) since Glue does not automatically assign table locations.

  • Configure catalog.auth.aws_sigv4 with the appropriate region and set service to glue.

  • Configure storage.aws_s3 with the same bucket and region.

Azure Blob Storage (ADLS Gen2)

To use with Azure Data Lake Storage Gen2:

  • Configure storage.azure_blob_storage with your storage account name and container.

  • Authenticate using one of: storage_access_key (shared key), storage_sas_token, or storage_connection_string.

  • The storage account must have hierarchical namespace (HNS) enabled for ADLS Gen2 compatibility.

Bloblang type Iceberg type

string

string

bytes

binary

bool

boolean

number

double

timestamp

timestamp (with timezone)

object

struct

array

list

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

  • Common

  • Advanced

outputs:
  label: ""
  iceberg:
    catalog:
      url: "" # No default (required)
      warehouse: "" # No default (optional)
      auth:
        oauth2:
          server_uri: /v1/oauth/tokens
          client_id: "" # No default (required)
          client_secret: "" # No default (required)
          scope: "" # No default (optional)
        bearer: "" # No default (optional)
        aws_sigv4:
          region: "" # No default (optional)
          service: "" # No default (optional)
      headers: "" # No default (optional)
      tls_skip_verify: false
    namespace: "" # No default (required)
    table: "" # No default (required)
    storage:
      aws_s3:
        bucket: "" # No default (required)
        region: "" # No default (optional)
        endpoint: "" # No default (optional)
        force_path_style_urls: false
        tcp:
          connect_timeout: 0s
          keep_alive:
            idle: 15s
            interval: 15s
            count: 9
          tcp_user_timeout: 0s
        credentials:
          profile: "" # No default (optional)
          id: "" # No default (optional)
          secret: "" # No default (optional)
          token: "" # No default (optional)
          from_ec2_role: false
          role: "" # No default (optional)
          role_external_id: "" # No default (optional)
      gcp_cloud_storage:
        bucket: "" # No default (required)
        endpoint: "" # No default (optional)
        credentials_type: "" # No default (optional)
        credentials_file: "" # No default (optional)
        credentials_json: "" # No default (optional)
      azure_blob_storage:
        storage_account: "" # No default (required)
        container: "" # No default (required)
        endpoint: "" # No default (optional)
        storage_sas_token: "" # No default (optional)
        storage_connection_string: "" # No default (optional)
        storage_access_key: "" # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 4
outputs:
  label: ""
  iceberg:
    catalog:
      url: "" # No default (required)
      warehouse: "" # No default (optional)
      auth:
        oauth2:
          server_uri: /v1/oauth/tokens
          client_id: "" # No default (required)
          client_secret: "" # No default (required)
          scope: "" # No default (optional)
        bearer: "" # No default (optional)
        aws_sigv4:
          region: "" # No default (optional)
          service: "" # No default (optional)
      headers: "" # No default (optional)
      tls_skip_verify: false
    namespace: "" # No default (required)
    table: "" # No default (required)
    storage:
      aws_s3:
        bucket: "" # No default (required)
        region: "" # No default (optional)
        endpoint: "" # No default (optional)
        force_path_style_urls: false
        tcp:
          connect_timeout: 0s
          keep_alive:
            idle: 15s
            interval: 15s
            count: 9
          tcp_user_timeout: 0s
        credentials:
          profile: "" # No default (optional)
          id: "" # No default (optional)
          secret: "" # No default (optional)
          token: "" # No default (optional)
          from_ec2_role: false
          role: "" # No default (optional)
          role_external_id: "" # No default (optional)
      gcp_cloud_storage:
        bucket: "" # No default (required)
        endpoint: "" # No default (optional)
        credentials_type: "" # No default (optional)
        credentials_file: "" # No default (optional)
        credentials_json: "" # No default (optional)
      azure_blob_storage:
        storage_account: "" # No default (required)
        container: "" # No default (required)
        endpoint: "" # No default (optional)
        storage_sas_token: "" # No default (optional)
        storage_connection_string: "" # No default (optional)
        storage_access_key: "" # No default (optional)
    schema_evolution:
      enabled: false
      partition_spec: ()
      table_location: "" # No default (optional)
    commit:
      manifest_merge_enabled: true
      max_snapshot_age: 24h
      max_retries: 3
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 4

Fields

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate


# ---

processors:
  - archive:
      format: lines


# ---

processors:
  - archive:
      format: json_array

catalog

REST catalog configuration.

Type: object

catalog.auth

Authentication configuration for the REST catalog. Only one authentication method can be active at a time.

Type: object

catalog.auth.aws_sigv4

AWS SigV4 authentication (for AWS Glue Data Catalog or API Gateway). Uses the same credentials as the storage configuration.

Type: object

catalog.auth.aws_sigv4.region

AWS region for SigV4 signing. If not specified, uses the region from AWS credentials.

Type: string

# Examples:
region: us-east-1

catalog.auth.aws_sigv4.service

AWS service name for SigV4 signing.

Type: string

catalog.auth.bearer

Static bearer token for authentication. For testing only, not recommended for production.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

catalog.auth.oauth2

OAuth2 authentication configuration.

Type: object

catalog.auth.oauth2.client_id

OAuth2 client identifier.

Type: string

catalog.auth.oauth2.client_secret

OAuth2 client secret.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

catalog.auth.oauth2.scope

OAuth2 scope to request.

Type: string

catalog.auth.oauth2.server_uri

OAuth2 token endpoint URI.

Type: string

Default: /v1/oauth/tokens

catalog.headers

Custom HTTP headers to include in all requests to the catalog.

Type: object

# Examples:
headers:
  X-Api-Key: your-api-key

catalog.tls_skip_verify

Skip TLS certificate verification. Not recommended for production.

Type: bool

Default: false

catalog.url

The REST catalog endpoint URL.

Type: string

# Examples:
url: http://localhost:8181/api/catalog

# ---

url: https://polaris.example.com/api/catalog

# ---

url: https://glue.us-east-1.amazonaws.com/iceberg

catalog.warehouse

The REST catalog warehouse.

Type: string

# Examples:
warehouse: redpanda-catalog

commit

Commit behavior configuration.

Type: object

commit.manifest_merge_enabled

Merge small manifest files during commits to reduce metadata overhead.

Type: bool

Default: true

commit.max_retries

Maximum number of times to retry a failed transaction commit.

Type: int

Default: 3

commit.max_snapshot_age

Maximum age of snapshots to retain for time-travel queries. Set to zero to disable removing old snapshots.

Type: string

Default: 24h

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 4

namespace

The Iceberg namespace for the table, dot delimiters are split as nested namespaces.

This field supports interpolation functions.

Type: string

# Examples:
namespace: analytics.events

# ---

namespace: production

schema_evolution

Schema evolution configuration.

Type: object

schema_evolution.enabled

Enable automatic schema evolution. When enabled, new columns will be automatically added to the table.

Type: bool

Default: false

schema_evolution.partition_spec

A Bloblang expression to evaluate when a new table is created to determine the table’s partition spec. The result of the mapping should be an iceberg partition spec in the same string format as the ^Redpanda Streaming Topic Property

This field supports interpolation functions.

Type: string

Default: ()

# Examples:
partition_spec: (col1)

# ---

partition_spec: (nested.col)

# ---

partition_spec: (year(my_ts_col))

# ---

partition_spec: (year(my_ts_col), col2)

# ---

partition_spec: (hour(my_ts_col), truncate(42, col2))

# ---

partition_spec: (day(my_ts_col), bucket(4, nested.col))

# ---

partition_spec: (day(my_ts_col), void(`non.nested column.with.dots`), identity(nested.column))

schema_evolution.table_location

A prefix used as the location for new tables when the catalog does not automatically assign one. For example, AWS Glue requires explicit table locations. When set, table locations are derived as {prefix}{namespace}/{table}.

Type: string

# Examples:
table_location: s3://my-iceberg-bucket/

storage

Storage backend configuration for data files. Exactly one of aws_s3, gcp_cloud_storage, or azure_blob_storage must be specified.

Type: object

storage.aws_s3

S3 storage configuration.

Type: object

storage.aws_s3.bucket

The S3 bucket name.

Type: string

# Examples:
bucket: my-iceberg-data

storage.aws_s3.credentials

Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.

Type: object

storage.aws_s3.credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Type: bool

Default: false

storage.aws_s3.credentials.id

The ID of credentials to use.

Type: string

storage.aws_s3.credentials.profile

A profile from ~/.aws/credentials to use.

Type: string

storage.aws_s3.credentials.role

A role ARN to assume.

Type: string

storage.aws_s3.credentials.role_external_id

An external ID to provide when assuming a role.

Type: string

storage.aws_s3.credentials.secret

The secret for the credentials being used.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

storage.aws_s3.credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string

storage.aws_s3.endpoint

Custom endpoint for S3-compatible storage (e.g., MinIO).

Type: string

# Examples:
endpoint: http://localhost:9000

storage.aws_s3.force_path_style_urls

Forces the client API to use path style URLs, which is often required when connecting to custom endpoints.

Type: bool

Default: false

storage.aws_s3.region

The AWS region.

Type: string

# Examples:
region: us-west-2

storage.aws_s3.tcp

TCP socket configuration.

Type: object

storage.aws_s3.tcp.connect_timeout

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

Type: string

Default: 0s

storage.aws_s3.tcp.keep_alive

TCP keep-alive probe configuration.

Type: object

storage.aws_s3.tcp.keep_alive.count

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

Type: int

Default: 9

storage.aws_s3.tcp.keep_alive.idle

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

Type: string

Default: 15s

storage.aws_s3.tcp.keep_alive.interval

Duration between keep-alive probes. Zero defaults to 15s.

Type: string

Default: 15s

storage.aws_s3.tcp.tcp_user_timeout

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.

Type: string

Default: 0s

storage.azure_blob_storage

Azure Blob Storage (ADLS Gen2) configuration.

Type: object

storage.azure_blob_storage.container

The Azure blob container name.

Type: string

# Examples:
container: iceberg-data

storage.azure_blob_storage.endpoint

Custom endpoint for Azure-compatible storage.

Type: string

storage.azure_blob_storage.storage_access_key

Azure storage access key for shared key authentication.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

storage.azure_blob_storage.storage_account

The Azure storage account name.

Type: string

# Examples:
storage_account: mystorageaccount

storage.azure_blob_storage.storage_connection_string

Azure storage connection string. Use this or other auth methods, not both.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

storage.azure_blob_storage.storage_sas_token

SAS token for authentication. Prefix with the container name followed by a dot if container-specific.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

storage.gcp_cloud_storage

Google Cloud Storage configuration.

Type: object

storage.gcp_cloud_storage.bucket

The GCS bucket name.

Type: string

# Examples:
bucket: my-iceberg-data

storage.gcp_cloud_storage.credentials_file

Path to a GCP credentials JSON file.

Type: string

storage.gcp_cloud_storage.credentials_json

GCP credentials JSON content. Use this or credentials_file, not both.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

storage.gcp_cloud_storage.credentials_type

The type of credentials to use. Valid values: service_account, authorized_user, impersonated_service_account, external_account.

Type: string

# Examples:
credentials_type: service_account

storage.gcp_cloud_storage.endpoint

Custom endpoint for GCS-compatible storage.

Type: string

table

The Iceberg table name. Supports interpolation functions for dynamic table names.

Type: string

# Examples:
table: user_events

# ---

table: events_${!meta("topic")}