Docs Cloud Redpanda Connect Components Catalog Outputs iceberg iceberg Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Write data to Apache Iceberg tables via REST catalog. Write streaming data to Apache Iceberg tables using the REST catalog API. This output supports: Multiple storage backends (S3, GCS, Azure) Automatic table creation with schema detection Partition transforms (year, month, day, hour, bucket, truncate) Schema evolution (automatic column addition) Transaction retry logic for concurrent writes This output is designed to work with REST catalog implementations like Apache Polaris, AWS Glue Data Catalog, and the Databricks Unity Catalog. Apache Polaris To use with Apache Polaris: Set catalog.url to the Polaris REST endpoint (e.g., http://localhost:8181/api/catalog). Set catalog.warehouse to the catalog name configured in Polaris. Configure catalog.auth.oauth2 with client credentials granted access to the catalog. AWS Glue Data Catalog To use with AWS Glue Data Catalog: Set catalog.url to https://glue.<region>.amazonaws.com/iceberg (the REST client appends the API version automatically). Set catalog.warehouse to your AWS account ID (the Glue catalog identifier). Set schema_evolution.table_location to an S3 prefix (e.g., s3://my-bucket/) since Glue does not automatically assign table locations. Configure catalog.auth.aws_sigv4 with the appropriate region and set service to glue. Configure storage.aws_s3 with the same bucket and region. Azure Blob Storage (ADLS Gen2) To use with Azure Data Lake Storage Gen2: Configure storage.azure_blob_storage with your storage account name and container. Authenticate using one of: storage_access_key (shared key), storage_sas_token, or storage_connection_string. The storage account must have hierarchical namespace (HNS) enabled for ADLS Gen2 compatibility. Bloblang type Iceberg type string string bytes binary bool boolean number double timestamp timestamp (with timezone) object struct array list Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight. This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc. Common Advanced outputs: label: "" iceberg: catalog: url: "" # No default (required) warehouse: "" # No default (optional) auth: oauth2: server_uri: /v1/oauth/tokens client_id: "" # No default (required) client_secret: "" # No default (required) scope: "" # No default (optional) bearer: "" # No default (optional) aws_sigv4: region: "" # No default (optional) service: "" # No default (optional) headers: "" # No default (optional) tls_skip_verify: false namespace: "" # No default (required) table: "" # No default (required) storage: aws_s3: bucket: "" # No default (required) region: "" # No default (optional) endpoint: "" # No default (optional) force_path_style_urls: false tcp: connect_timeout: 0s keep_alive: idle: 15s interval: 15s count: 9 tcp_user_timeout: 0s credentials: profile: "" # No default (optional) id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) from_ec2_role: false role: "" # No default (optional) role_external_id: "" # No default (optional) gcp_cloud_storage: bucket: "" # No default (required) endpoint: "" # No default (optional) credentials_type: "" # No default (optional) credentials_file: "" # No default (optional) credentials_json: "" # No default (optional) azure_blob_storage: storage_account: "" # No default (required) container: "" # No default (required) endpoint: "" # No default (optional) storage_sas_token: "" # No default (optional) storage_connection_string: "" # No default (optional) storage_access_key: "" # No default (optional) batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) max_in_flight: 4 outputs: label: "" iceberg: catalog: url: "" # No default (required) warehouse: "" # No default (optional) auth: oauth2: server_uri: /v1/oauth/tokens client_id: "" # No default (required) client_secret: "" # No default (required) scope: "" # No default (optional) bearer: "" # No default (optional) aws_sigv4: region: "" # No default (optional) service: "" # No default (optional) headers: "" # No default (optional) tls_skip_verify: false namespace: "" # No default (required) table: "" # No default (required) storage: aws_s3: bucket: "" # No default (required) region: "" # No default (optional) endpoint: "" # No default (optional) force_path_style_urls: false tcp: connect_timeout: 0s keep_alive: idle: 15s interval: 15s count: 9 tcp_user_timeout: 0s credentials: profile: "" # No default (optional) id: "" # No default (optional) secret: "" # No default (optional) token: "" # No default (optional) from_ec2_role: false role: "" # No default (optional) role_external_id: "" # No default (optional) gcp_cloud_storage: bucket: "" # No default (required) endpoint: "" # No default (optional) credentials_type: "" # No default (optional) credentials_file: "" # No default (optional) credentials_json: "" # No default (optional) azure_blob_storage: storage_account: "" # No default (required) container: "" # No default (required) endpoint: "" # No default (optional) storage_sas_token: "" # No default (optional) storage_connection_string: "" # No default (optional) storage_access_key: "" # No default (optional) schema_evolution: enabled: false partition_spec: () table_location: "" # No default (optional) commit: manifest_merge_enabled: true max_snapshot_age: 24h max_retries: 3 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) max_in_flight: 4 Fields batching Allows you to configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s # --- batching: count: 10 period: 1s # --- batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size An amount of bytes at which the batch should be flushed. If 0 disables size based batching. Type: int Default: 0 batching.check A Bloblang query that should return a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count A number of messages at which the batch should be flushed. If 0 disables count based batching. Type: int Default: 0 batching.period A period in which an incomplete batch should be flushed regardless of its size. Type: string Default: "" # Examples: period: 1s # --- period: 1m # --- period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type: processor # Examples: processors: - archive: format: concatenate # --- processors: - archive: format: lines # --- processors: - archive: format: json_array catalog REST catalog configuration. Type: object catalog.auth Authentication configuration for the REST catalog. Only one authentication method can be active at a time. Type: object catalog.auth.aws_sigv4 AWS SigV4 authentication (for AWS Glue Data Catalog or API Gateway). Uses the same credentials as the storage configuration. Type: object catalog.auth.aws_sigv4.region AWS region for SigV4 signing. If not specified, uses the region from AWS credentials. Type: string # Examples: region: us-east-1 catalog.auth.aws_sigv4.service AWS service name for SigV4 signing. Type: string catalog.auth.bearer Static bearer token for authentication. For testing only, not recommended for production. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string catalog.auth.oauth2 OAuth2 authentication configuration. Type: object catalog.auth.oauth2.client_id OAuth2 client identifier. Type: string catalog.auth.oauth2.client_secret OAuth2 client secret. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string catalog.auth.oauth2.scope OAuth2 scope to request. Type: string catalog.auth.oauth2.server_uri OAuth2 token endpoint URI. Type: string Default: /v1/oauth/tokens catalog.headers Custom HTTP headers to include in all requests to the catalog. Type: object # Examples: headers: X-Api-Key: your-api-key catalog.tls_skip_verify Skip TLS certificate verification. Not recommended for production. Type: bool Default: false catalog.url The REST catalog endpoint URL. Type: string # Examples: url: http://localhost:8181/api/catalog # --- url: https://polaris.example.com/api/catalog # --- url: https://glue.us-east-1.amazonaws.com/iceberg catalog.warehouse The REST catalog warehouse. Type: string # Examples: warehouse: redpanda-catalog commit Commit behavior configuration. Type: object commit.manifest_merge_enabled Merge small manifest files during commits to reduce metadata overhead. Type: bool Default: true commit.max_retries Maximum number of times to retry a failed transaction commit. Type: int Default: 3 commit.max_snapshot_age Maximum age of snapshots to retain for time-travel queries. Set to zero to disable removing old snapshots. Type: string Default: 24h max_in_flight The maximum number of messages to have in flight at a given time. Increase this to improve throughput. Type: int Default: 4 namespace The Iceberg namespace for the table, dot delimiters are split as nested namespaces. This field supports interpolation functions. Type: string # Examples: namespace: analytics.events # --- namespace: production schema_evolution Schema evolution configuration. Type: object schema_evolution.enabled Enable automatic schema evolution. When enabled, new columns will be automatically added to the table. Type: bool Default: false schema_evolution.partition_spec A Bloblang expression to evaluate when a new table is created to determine the table’s partition spec. The result of the mapping should be an iceberg partition spec in the same string format as the ^Redpanda Streaming Topic Property This field supports interpolation functions. Type: string Default: () # Examples: partition_spec: (col1) # --- partition_spec: (nested.col) # --- partition_spec: (year(my_ts_col)) # --- partition_spec: (year(my_ts_col), col2) # --- partition_spec: (hour(my_ts_col), truncate(42, col2)) # --- partition_spec: (day(my_ts_col), bucket(4, nested.col)) # --- partition_spec: (day(my_ts_col), void(`non.nested column.with.dots`), identity(nested.column)) schema_evolution.table_location A prefix used as the location for new tables when the catalog does not automatically assign one. For example, AWS Glue requires explicit table locations. When set, table locations are derived as {prefix}{namespace}/{table}. Type: string # Examples: table_location: s3://my-iceberg-bucket/ storage Storage backend configuration for data files. Exactly one of aws_s3, gcp_cloud_storage, or azure_blob_storage must be specified. Type: object storage.aws_s3 S3 storage configuration. Type: object storage.aws_s3.bucket The S3 bucket name. Type: string # Examples: bucket: my-iceberg-data storage.aws_s3.credentials Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services. Type: object storage.aws_s3.credentials.from_ec2_role Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance. Type: bool Default: false storage.aws_s3.credentials.id The ID of credentials to use. Type: string storage.aws_s3.credentials.profile A profile from ~/.aws/credentials to use. Type: string storage.aws_s3.credentials.role A role ARN to assume. Type: string storage.aws_s3.credentials.role_external_id An external ID to provide when assuming a role. Type: string storage.aws_s3.credentials.secret The secret for the credentials being used. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string storage.aws_s3.credentials.token The token for the credentials being used, required when using short term credentials. Type: string storage.aws_s3.endpoint Custom endpoint for S3-compatible storage (e.g., MinIO). Type: string # Examples: endpoint: http://localhost:9000 storage.aws_s3.force_path_style_urls Forces the client API to use path style URLs, which is often required when connecting to custom endpoints. Type: bool Default: false storage.aws_s3.region The AWS region. Type: string # Examples: region: us-west-2 storage.aws_s3.tcp TCP socket configuration. Type: object storage.aws_s3.tcp.connect_timeout Maximum amount of time a dial will wait for a connect to complete. Zero disables. Type: string Default: 0s storage.aws_s3.tcp.keep_alive TCP keep-alive probe configuration. Type: object storage.aws_s3.tcp.keep_alive.count Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9. Type: int Default: 9 storage.aws_s3.tcp.keep_alive.idle Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes. Type: string Default: 15s storage.aws_s3.tcp.keep_alive.interval Duration between keep-alive probes. Zero defaults to 15s. Type: string Default: 15s storage.aws_s3.tcp.tcp_user_timeout Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables. Type: string Default: 0s storage.azure_blob_storage Azure Blob Storage (ADLS Gen2) configuration. Type: object storage.azure_blob_storage.container The Azure blob container name. Type: string # Examples: container: iceberg-data storage.azure_blob_storage.endpoint Custom endpoint for Azure-compatible storage. Type: string storage.azure_blob_storage.storage_access_key Azure storage access key for shared key authentication. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string storage.azure_blob_storage.storage_account The Azure storage account name. Type: string # Examples: storage_account: mystorageaccount storage.azure_blob_storage.storage_connection_string Azure storage connection string. Use this or other auth methods, not both. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string storage.azure_blob_storage.storage_sas_token SAS token for authentication. Prefix with the container name followed by a dot if container-specific. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string storage.gcp_cloud_storage Google Cloud Storage configuration. Type: object storage.gcp_cloud_storage.bucket The GCS bucket name. Type: string # Examples: bucket: my-iceberg-data storage.gcp_cloud_storage.credentials_file Path to a GCP credentials JSON file. Type: string storage.gcp_cloud_storage.credentials_json GCP credentials JSON content. Use this or credentials_file, not both. This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. Type: string storage.gcp_cloud_storage.credentials_type The type of credentials to use. Valid values: service_account, authorized_user, impersonated_service_account, external_account. Type: string # Examples: credentials_type: service_account storage.gcp_cloud_storage.endpoint Custom endpoint for GCS-compatible storage. Type: string table The Iceberg table name. Supports interpolation functions for dynamic table names. Type: string # Examples: table: user_events # --- table: events_${!meta("topic")} Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! gcp_pubsub http_client