iceberg
Fan out Redpanda topics to Apache Iceberg tables using the REST catalog API.
Introduced in version 4.80.0.
This output is well suited for migrating fanout pipelines from Kafka Connect to Redpanda Connect, and supports:
-
Multiple storage backends (S3, GCS, Azure)
-
Automatic table creation with schema detection
-
Partition transforms (year, month, day, hour, bucket, truncate)
-
Schema evolution (automatic column addition)
-
Transaction retry logic for concurrent writes
Requirements
Ensure that your environment meets the following requirements:
-
This component is only available in cgo-enabled builds of Redpanda Connect (not in the Redpanda CLI or Docker image).
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
-
Common
-
Advanced
outputs:
label: ""
iceberg:
catalog:
url: "" # No default (required)
warehouse: "" # No default (optional)
auth:
oauth2:
server_uri: /v1/oauth/tokens
client_id: "" # No default (required)
client_secret: "" # No default (required)
scope: "" # No default (optional)
bearer: "" # No default (optional)
aws_sigv4:
region: "" # No default (optional)
endpoint: "" # No default (optional)
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
credentials:
profile: "" # No default (optional)
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
from_ec2_role: "" # No default (optional)
role: "" # No default (optional)
role_external_id: "" # No default (optional)
service: "" # No default (optional)
headers: "" # No default (optional)
tls_skip_verify: false
namespace: "" # No default (required)
table: "" # No default (required)
storage:
aws_s3:
bucket: "" # No default (required)
region: "" # No default (optional)
endpoint: "" # No default (optional)
force_path_style_urls: false
credentials:
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
gcp_cloud_storage:
bucket: "" # No default (required)
endpoint: "" # No default (optional)
credentials_type: "" # No default (optional)
credentials_file: "" # No default (optional)
credentials_json: "" # No default (optional)
azure_blob_storage:
storage_account: "" # No default (required)
container: "" # No default (required)
endpoint: "" # No default (optional)
storage_sas_token: "" # No default (optional)
storage_connection_string: "" # No default (optional)
storage_access_key: "" # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 4
outputs:
label: ""
iceberg:
catalog:
url: "" # No default (required)
warehouse: "" # No default (optional)
auth:
oauth2:
server_uri: /v1/oauth/tokens
client_id: "" # No default (required)
client_secret: "" # No default (required)
scope: "" # No default (optional)
bearer: "" # No default (optional)
aws_sigv4:
region: "" # No default (optional)
endpoint: "" # No default (optional)
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
credentials:
profile: "" # No default (optional)
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
from_ec2_role: "" # No default (optional)
role: "" # No default (optional)
role_external_id: "" # No default (optional)
service: "" # No default (optional)
headers: "" # No default (optional)
tls_skip_verify: false
namespace: "" # No default (required)
table: "" # No default (required)
case_sensitive_columns: true
storage:
aws_s3:
bucket: "" # No default (required)
region: "" # No default (optional)
endpoint: "" # No default (optional)
force_path_style_urls: false
credentials:
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
gcp_cloud_storage:
bucket: "" # No default (required)
endpoint: "" # No default (optional)
credentials_type: "" # No default (optional)
credentials_file: "" # No default (optional)
credentials_json: "" # No default (optional)
azure_blob_storage:
storage_account: "" # No default (required)
container: "" # No default (required)
endpoint: "" # No default (optional)
storage_sas_token: "" # No default (optional)
storage_connection_string: "" # No default (optional)
storage_access_key: "" # No default (optional)
schema_evolution:
enabled: false
partition_spec: ()
table_location: "" # No default (optional)
schema_metadata: ""
new_column_type_mapping: "" # No default (optional)
commit:
manifest_merge_enabled: true
max_snapshot_age: 24h
max_retries: 3
parquet:
string_encoding: delta_length_byte_array
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 4
Catalog integration
This output works with REST catalog implementations including Apache Polaris, AWS Glue Data Catalog, and Databricks Unity Catalog.
Apache Polaris
To use with Apache Polaris:
-
Set
catalog.urlto the Polaris REST endpoint (e.g.,http://localhost:8181/api/catalog). -
Set
catalog.warehouseto the catalog name configured in Polaris. -
Configure
catalog.auth.oauth2with client credentials granted access to the catalog.
AWS Glue Data Catalog
To use with AWS Glue Data Catalog:
-
Set
catalog.urltohttps://glue.<region>.amazonaws.com/iceberg(the REST client appends the API version automatically). -
Set
catalog.warehouseto your AWS account ID (the Glue catalog identifier). -
Set
schema_evolution.table_locationto an S3 prefix (e.g.,s3://my-bucket/) since Glue does not automatically assign table locations. -
Configure
catalog.auth.aws_sigv4with the appropriate region and setservicetoglue. -
Configure
storage.aws_s3with the same bucket and region.
Azure Blob Storage (ADLS Gen2)
To use with Azure Data Lake Storage Gen2:
-
Configure
storage.azure_blob_storagewith your storage account name and container. -
Authenticate using one of:
storage_access_key(shared key),storage_sas_token, orstorage_connection_string. -
The storage account must have hierarchical namespace (HNS) enabled for ADLS Gen2 compatibility.
Type mapping
| Bloblang type | Iceberg type |
|---|---|
string |
string |
bytes |
binary |
bool |
boolean |
number |
double |
timestamp |
timestamp (with timezone) |
object |
struct |
array |
list |
Fields
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
case_sensitive_columns
Controls how message field names are matched against table column names, and how column references in the partition spec are resolved. When true (the default), names must match exactly. When false, matching is case-insensitive — set this when your downstream catalog or query engine treats column names as case-insensitive (the iceberg specification’s recommended convention) so that, for example, a message keyed "COLUMN" lands in an existing column rather than triggering schema evolution. Ambiguous case-only duplicates in the input are rejected.
Type: bool
Default: true
catalog.auth
Authentication configuration for the REST catalog. Only one authentication method can be active at a time.
Type: object
catalog.auth.aws_sigv4
AWS SigV4 authentication (for AWS Glue Data Catalog or API Gateway).
Type: object
catalog.auth.aws_sigv4.credentials
Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.
Type: object
catalog.auth.aws_sigv4.credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Requires version 4.2.0 or later.
Type: bool
catalog.auth.aws_sigv4.credentials.role_external_id
An external ID to provide when assuming a role.
Type: string
catalog.auth.aws_sigv4.credentials.secret
The secret for the credentials being used.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
catalog.auth.aws_sigv4.credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
catalog.auth.aws_sigv4.endpoint
Allows you to specify a custom endpoint for the AWS API.
Type: string
catalog.auth.aws_sigv4.tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
catalog.auth.aws_sigv4.tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
catalog.auth.aws_sigv4.tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s
catalog.auth.aws_sigv4.tcp.keep_alive.interval
Duration between keep-alive probes. Zero defaults to 15s.
Type: string
Default: 15s
catalog.auth.aws_sigv4.tcp.tcp_user_timeout
Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.
Type: string
Default: 0s
catalog.auth.bearer
Static bearer token for authentication. For testing only, not recommended for production.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
catalog.auth.oauth2.client_secret
OAuth2 client secret.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
catalog.headers
Custom HTTP headers to include in all requests to the catalog.
Type: object
# Examples:
headers:
X-Api-Key: your-api-key
catalog.tls_skip_verify
Skip TLS certificate verification. Not recommended for production.
Type: bool
Default: false
catalog.url
The REST catalog endpoint URL.
Type: string
# Examples:
url: http://localhost:8181/api/catalog
# ---
url: https://polaris.example.com/api/catalog
# ---
url: https://glue.us-east-1.amazonaws.com/iceberg
commit.manifest_merge_enabled
Merge small manifest files during commits to reduce metadata overhead.
Type: bool
Default: true
commit.max_retries
Maximum number of times to retry a failed transaction commit.
Type: int
Default: 3
commit.max_snapshot_age
Maximum age of snapshots to retain for time-travel queries. Set to zero to disable removing old snapshots.
Type: string
Default: 24h
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 4
namespace
The Iceberg namespace for the table, dot delimiters are split as nested namespaces.
This field supports interpolation functions.
Type: string
# Examples:
namespace: analytics.events
# ---
namespace: production
parquet.string_encoding
The encoding to use for string and binary columns. Use plain for compatibility with readers that do not support DELTA_LENGTH_BYTE_ARRAY encoding, such as AWS Redshift Spectrum.
Type: string
Default: delta_length_byte_array
Options: plain, delta_length_byte_array
schema_evolution.enabled
Enable automatic schema evolution. When enabled, new columns will be automatically added to the table.
Type: bool
Default: false
schema_evolution.new_column_type_mapping
An optional Bloblang mapping to customize column types during schema evolution. This mapping is executed for each new column and can override the inferred or schema-metadata-derived type. The mapping receives an object with fields name (column name), path (dot-separated path), value (sample value), inferred_type (the type that would be used without this mapping), message (the full message body), namespace, and table. It must return a string with a valid Iceberg type name: boolean, int, long, float, double, string, binary, date, time, timestamp, timestamptz, uuid, decimal(p,s), or fixed[n].
Type: string
schema_evolution.partition_spec
A Bloblang expression to evaluate when a new table is created to determine the table’s partition spec. The result of the mapping should be an iceberg partition spec in the same string format as the ^Redpanda Streaming Topic Property
This field supports interpolation functions.
Type: string
Default: ()
# Examples:
partition_spec: (col1)
# ---
partition_spec: (nested.col)
# ---
partition_spec: (year(my_ts_col))
# ---
partition_spec: (year(my_ts_col), col2)
# ---
partition_spec: (hour(my_ts_col), truncate(42, col2))
# ---
partition_spec: (day(my_ts_col), bucket(4, nested.col))
# ---
partition_spec: (day(my_ts_col), void(`non.nested column.with.dots`), identity(nested.column))
schema_evolution.schema_metadata
The name of a message metadata field containing a schema definition. When set, the schema is used to determine column types during schema evolution and table creation instead of inferring types from values. The schema must be in the standard common schema format (the same format used by the parquet_encode processor’s schema_metadata field). For batches of messages, the first message’s schema is used. Record presence drives schema shape: fields declared in the schema metadata that are absent from the record are not added to the table, while the metadata controls column ordering, naming, and types for fields that are present. In case-insensitive mode, top-level column names use the metadata’s casing — record keys are matched by case-folding and the metadata’s name is what lands in the table.
Type: string
Default: ""
schema_evolution.table_location
A prefix used as the location for new tables when the catalog does not automatically assign one. For example, AWS Glue requires explicit table locations. When set, table locations are derived as {prefix}{namespace}/{table}.
Type: string
# Examples:
table_location: s3://my-iceberg-bucket/
storage
Storage backend configuration for data files. Exactly one of aws_s3, gcp_cloud_storage, or azure_blob_storage must be specified.
Type: object
storage.aws_s3.credentials
Static AWS credentials for S3 access. When not specified, credentials are loaded from the default AWS credential chain.
Type: object
storage.aws_s3.credentials.secret
The AWS secret access key.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
storage.aws_s3.credentials.token
The AWS session token, required when using short term credentials.
Type: string
storage.aws_s3.endpoint
Custom endpoint for S3-compatible storage (e.g., MinIO).
Type: string
# Examples:
endpoint: http://localhost:9000
storage.aws_s3.force_path_style_urls
Forces the client API to use path style URLs, which is often required when connecting to custom endpoints.
Type: bool
Default: false
storage.azure_blob_storage.container
The Azure blob container name.
Type: string
# Examples:
container: iceberg-data
storage.azure_blob_storage.storage_access_key
Azure storage access key for shared key authentication.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
storage.azure_blob_storage.storage_account
The Azure storage account name.
Type: string
# Examples:
storage_account: mystorageaccount
storage.azure_blob_storage.storage_connection_string
Azure storage connection string. Use this or other auth methods, not both.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
storage.azure_blob_storage.storage_sas_token
SAS token for authentication. Prefix with the container name followed by a dot if container-specific.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
storage.gcp_cloud_storage.bucket
The GCS bucket name.
Type: string
# Examples:
bucket: my-iceberg-data
storage.gcp_cloud_storage.credentials_json
GCP credentials JSON content. Use this or credentials_file, not both.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string