Docs Cloud Manage Iceberg Migrate to Iceberg Topics Migrate to Iceberg Topics Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Migrate existing Iceberg pipelines to Redpanda Iceberg topics to simplify your architecture and reduce operational overhead. After reading this page, you will be able to: Compare external Iceberg integrations with Iceberg Topics architectures Implement data merge strategies using SQL patterns Execute validation checks and perform cutover procedures Why migrate to Iceberg Topics Redpanda’s built-in Iceberg-enabled topics offer a simpler alternative to external Iceberg integrations for writing streaming data to Iceberg tables. This page focuses on migrating from Kafka Connect Iceberg Sink. The migration patterns and SQL examples can be adapted for other Iceberg sources such as Apache Flink or Spark. Kafka Connect Iceberg Sink comparison The following table compares Kafka Connect Iceberg Sink with Redpanda Iceberg Topics: Aspect Kafka Connect Iceberg Sink Iceberg Topics Infrastructure Requires external Kafka Connect cluster Built into Redpanda brokers Dependencies Separate service to manage No external dependencies Setup time Medium (deploy connector) Fast (enable topic property and post schema) Prerequisites To migrate from an existing Iceberg integration to Iceberg Topics, you must have: Iceberg Topics enabled on your Redpanda cluster. Understanding of your current schema format (Avro, Protobuf, or JSON Schema). For Kafka Connect migrations, knowledge of your Kafka Connect configuration, especially if using iceberg.tables.route-field for multi-table routing. If migrating multi-table fan-out patterns, data transforms enabled on your cluster. Access to both source and target (Iceberg Topics) tables in your query engine. Query engine access (Snowflake, Databricks, ClickHouse, or Spark) for data merging. Migration steps Redpanda recommends following a phased approach to ensure data consistency and minimize risk: Enable Iceberg on target topics and verify new data flows. Run both systems concurrently during transition. Choose a strategy to combine historical and new data. Verify data completeness and accuracy. Disable the external Iceberg integration. Iceberg Topics cannot append to existing Iceberg tables that are not created by Redpanda. You must create new Iceberg tables and merge historical data separately. Enable Iceberg Topics For simple migrations (one topic mapping to one Iceberg table), enable the Iceberg integration for your Redpanda topics. Set the iceberg_enabled configuration property on your cluster to true: rpk Cloud API rpk cloud login rpk profile create --from-cloud <cluster-id> rpk cluster config set iceberg_enabled true # Store your cluster ID in a variable export RP_CLUSTER_ID=<cluster-id> # Retrieve a Redpanda Cloud access token export RP_CLOUD_TOKEN=$(curl -X POST "https://auth.prd.cloud.redpanda.com/oauth/token" \ -H "content-type: application/x-www-form-urlencoded" \ -d "grant_type=client_credentials" \ -d "client_id=<client-id>" \ -d "client_secret=<client-secret>") # Update cluster configuration to enable Iceberg topics curl -H "Authorization: Bearer ${RP_CLOUD_TOKEN}" -X PATCH \ "https://api.cloud.redpanda.com/v1/clusters/${RP_CLUSTER_ID}" \ -H 'accept: application/json' \ -H 'content-type: application/json' \ -d '{"cluster_configuration":{"custom_properties": {"iceberg_enabled":true}}}' Configure the redpanda.iceberg.mode property for the topic: rpk topic alter-config <topic-name> --set redpanda.iceberg.mode=<mode> Choose the mode based on your message format and schema configuration. For Kafka Connect migrations, use this mapping: Kafka Connect Converter Recommended Iceberg Mode io.confluent.connect.avro.AvroConverter value_schema_id_prefix (messages already use Schema Registry wire format) io.confluent.connect.protobuf.ProtobufConverter value_schema_id_prefix (messages already use Schema Registry wire format) org.apache.kafka.connect.json.JsonConverter with schemas value_schema_latest (Schema Registry resolves schema automatically) org.apache.kafka.connect.json.JsonConverter with embedded schemas key_value (schema included with each message) See Specify Iceberg Schema to learn more about the different Iceberg modes. If using value_schema_id_prefix or value_schema_latest modes, register a schema for the topic: rpk registry schema create <topic-name>-value --schema <path-to-schema> --type <format> If using the value_schema_id_prefix mode, schema subjects must use the <topic-name>-value naming convention (TopicNameStrategy). Note the schema ID returned, in case you need it for troubleshooting. Verify that new records are being written to the Iceberg table: Check that data appears in your query engine. Validate that the schema translation is correct. Confirm record counts are increasing. Multi-table fan-out pattern If your existing integration routes records to multiple Iceberg tables based on a field value (for example, Kafka Connect’s iceberg.tables.route-field property), you need to implement equivalent routing logic. You create separate Iceberg-enabled topics for each target table, and Redpanda automatically creates corresponding Iceberg tables. Use either of the following approaches to route records to the correct topic: Option 1: Data transforms with separate topics (recommended) Use a data transform to read the routing field from each message and write records to separate Iceberg-enabled topics. This approach keeps routing logic within Redpanda and avoids external dependencies. When using Iceberg modes that require schema validation, the transform can register schemas dynamically and encode messages with the appropriate format. Enable data transforms on your cluster: rpk cluster config set data_transforms_enabled true Create output topics and enable Iceberg with Schema Registry validation: rpk topic create <output-topic-1> <output-topic-2> <output-topic-3> rpk topic alter-config <output-topic-1> --set redpanda.iceberg.mode=value_schema_id_prefix rpk topic alter-config <output-topic-2> --set redpanda.iceberg.mode=value_schema_id_prefix rpk topic alter-config <output-topic-3> --set redpanda.iceberg.mode=value_schema_id_prefix Implement a transform function that: Reads the routing field from each input message. If using Schema Registry validation, registers schemas dynamically and encodes messages with the appropriate format. Writes to a specific output topic based on the routing field. Deploy the transform, specifying multiple output topics: rpk transform deploy \ --file transform.wasm \ --name <transform-name> \ --input-topic <input-topic> \ --output-topic <output-topic-1> \ --output-topic <output-topic-2> \ --output-topic <output-topic-3> Validate the fanout by checking that each output topic receives the correct records. For a complete implementation example with dynamic schema registration, see Multi-topic fan-out with Schema Registry. The example demonstrates Schema Registry wire format encoding for use with value_schema_id_prefix mode. Option 2: External stream processor Use an external stream processor for complex routing logic: Use a stream processor (Redpanda Connect or Flink) to split records. Write to separate Iceberg-enabled topics. This approach is more complex but offers more flexibility for advanced routing requirements not supported by data transforms. Validate Schema Registry integration If using value_schema_id_prefix mode, verify that messages use the Schema Registry wire format. rpk topic consume <topic-name> --num=1 --format='%v\n' | xxd | head -n 1 If the first byte is not 00 (magic byte), you must configure your producer to use the wire format. The value_schema_id_prefix mode also requires that schema subjects follow the TopicNameStrategy: <topic-name>-value. Verify your schemas use the correct naming: rpk registry schema list Verify no records in DLQ Check that no records failed validation and were written to the dead-letter queue. If records are present, see Records in DLQ table for resolution steps. SELECT COUNT(*) FROM <catalog>."<topic-name>~dlq"; Run systems in parallel Keep your existing Iceberg integration running while Iceberg Topics is enabled. This provides a safety net during the transition period: New data flows to both the source tables and new Iceberg Topics tables. You can validate data consistency between both systems. You have a fallback option if issues arise. Run a query to compare record counts between systems: -- Source table SELECT COUNT(*) AS source_count FROM <catalog>.<source-table>; -- Iceberg Topics table SELECT COUNT(*) AS iceberg_topics_count FROM <catalog>.<iceberg-topics-table>; Record counts should increase at similar rates, accounting for the time Iceberg Topics was enabled. Check for DLQ records (see Records in DLQ table). Monitor Iceberg topic metrics to validate that data is flowing at expected rates: redpanda_iceberg_translation_parquet_rows_added: Track rows written to Iceberg tables (compare with source write rate) redpanda_iceberg_translation_translations_finished: Number of completed translation executions redpanda_iceberg_translation_invalid_records: Records that failed validation redpanda_iceberg_translation_dlq_files_created: Dead-letter queue activity redpanda_iceberg_rest_client_num_commit_table_update_requests_failed: Failed table commits to catalog If using data transforms for multi-table fanout, also monitor: redpanda_transform_processor_lag: Records pending processing in transform input topic For a complete list of Iceberg metrics, see the Iceberg metrics reference. Run both systems for at least 24-48 hours to ensure stability before proceeding with data merge. Merge historical data Choose a strategy to combine your historical data with new Iceberg Topics data. Option 1: INSERT INTO pattern (recommended) Use this approach to create a unified table with all data, taking into consideration the following: You want a single table for queries. You can afford the one-time data copy cost. You need optimal query performance. This SQL pattern uses partition and offset metadata to identify and copy only records not yet in the target table: -- Step 1: Find the latest offset per partition in the target (Iceberg Topics) table WITH latest_offsets AS ( SELECT partition, MAX(offset) AS max_offset FROM target_iceberg_topics_table GROUP BY partition ) -- Step 2: Insert records from source table that don't exist in target INSERT INTO target_iceberg_topics_table SELECT s.* FROM source_table AS s LEFT JOIN latest_offsets AS t ON s.partition = t.partition WHERE t.max_offset IS NULL -- Partition not seen before in target OR s.offset > t.max_offset; -- Record is newer than target's latest offset The latest_offsets CTE finds the highest offset in the target table for each partition. The LEFT JOIN ensures you include partitions never seen before in the target (t.max_offset IS NULL). The WHERE clause filters to only records with offsets greater than the target’s latest. This avoids duplicates by using Kafka partition and offset as the deduplication key. This approach may take significant time for large datasets. Consider executing this process during low-query periods. You can also execute on an incremental basis to ease the load on your query engine, for example, by date or partition ranges. Option 2: View-based query federation Use this approach to query both tables without copying data if: You cannot afford data copy time or cost. You need immediate access to a unified view. Query complexity and performance are acceptable with federated queries. You may consolidate data later. Create a view that queries both tables and deduplicates on the fly: CREATE VIEW unified_iceberg_view AS WITH latest_offsets AS ( SELECT partition, MAX(offset) AS max_offset FROM target_iceberg_topics_table GROUP BY partition ), historical_data AS ( SELECT s.* FROM source_table AS s LEFT JOIN latest_offsets AS t ON s.partition = t.partition WHERE t.max_offset IS NULL OR s.offset <= t.max_offset -- Only historical records not in target ), new_data AS ( SELECT * FROM target_iceberg_topics_table ) SELECT * FROM historical_data UNION ALL SELECT * FROM new_data; Most Iceberg-compatible query engines support views, including Snowflake, Databricks, ClickHouse, and Spark. Validate the migration After completing the data merge, verify the migration before cutting over: Record counts match between source and target: -- Compare record counts SELECT 'Source' AS table_name, COUNT(*) AS record_count FROM <catalog>.<source-table> UNION ALL SELECT 'Target', COUNT(*) FROM <catalog>.<iceberg-topics-table>; All partitions are represented in the target: -- Check for missing partitions SELECT DISTINCT partition FROM <catalog>.<source-table> EXCEPT SELECT DISTINCT partition FROM <catalog>.<iceberg-topics-table>; -- Should return no rows Date ranges cover the full historical period. Compare MIN(timestamp) and MAX(timestamp) between source and target tables to ensure the target covers the same time range. No gaps in offset sequences: -- Check for offset gaps (may indicate missing data) WITH offset_check AS ( SELECT partition, offset, LAG(offset) OVER (PARTITION BY partition ORDER BY offset) AS prev_offset FROM <catalog>.<iceberg-topics-table> ) SELECT * FROM offset_check WHERE offset - prev_offset > 1; -- Should return no rows Sample queries return expected results. Spot check specific records by ID to verify data accuracy. Schema translation is correct. Run DESCRIBE on both tables and verify all fields are present with correct data types. New records are flowing to Iceberg Topics. Check record count for a recent time window (for example, the last hour). Query performance is acceptable. Monitoring and alerts are configured. No records in DLQ (see Records in DLQ table). Troubleshoot common migration issues Records in DLQ table Iceberg Topics write records that fail validation to a dead-letter queue (DLQ) table. Records may appear in the DLQ due to: Schema Registry issues. For example, using the wrong schema subject name, or Redpanda cannot find the embedded schema ID in Schema Registry. When using value_schema_id_prefix mode: messages not encoded with Schema Registry wire format. Incompatible schema changes. For example, changing field types or removing required fields. Data type translation failures. To check for DLQ records during migration: SELECT COUNT(*) FROM <catalog>."<topic-name>~dlq"; If the count is greater than zero, inspect the failed records. See Troubleshoot errors for steps to inspect and reprocess DLQ records. Multi-table fan-out transform issues If the transform does not process messages, check if: The specified output topics don’t exist or aren’t enabled with Iceberg. The routing logic in the transform is incorrect, or the routing field is missing from input messages. (When using Schema Registry validation) The schema registration failed during initialization, preventing the transform from starting. To check the transform status: rpk transform list To view logs and check for errors: rpk transform logs <transform-name> To check for routing errors: rpk transform logs <transform-name> | grep -i "unknown\|error" If using Schema Registry validation, verify schema registration: # Check transform logs for schema registration messages rpk transform logs <transform-name> | grep -i "schema" # List registered schemas rpk registry schema list Plan for rollback Before cutting over, ensure you have a rollback strategy. See the Pre-cutover checklist in the cutover section to verify you’re ready. Rollback during parallel operation If you discover issues while both systems are running: Keep producing to both systems. Point consumers back to source tables. Investigate Iceberg Topics issues using troubleshooting section. Fix issues and re-validate. Attempt cutover again when ready. Rollback after external integration disabled Rollback after stopping your external Iceberg integration may result in data loss or gaps. If you must rollback after disabling the external integration: Restart your external Iceberg integration immediately. Identify data written only to Iceberg Topics during the gap. Export that data from Iceberg Topics tables: SELECT * FROM iceberg_topics_table WHERE timestamp > '<integration_stop_time>'; Write exported data back to the source system (for example, Kafka Connect input topics or directly to source tables). Verify data completeness across both systems. Resume operations on the external integration. Redpanda recommends maintaining the ability to rollback for at least seven days after cutover to allow for issue discovery. Cut over to Iceberg Topics Pre-cutover checklist Before disabling your external Iceberg integration, ensure you have completed all validation steps: All historical data is successfully merged (see Merge historical data). Parallel operation is complete and stable for at least 24-48 hours. All validation queries pass (see Validate the migration). No records in DLQ tables, or all DLQ records are investigated and resolved. Query performance meets requirements. Downstream consumers are successfully tested with Iceberg Topics tables. Monitoring and alerts are configured. Rollback plan is verified and documented. Cutover procedure Set an appropriate maintenance window, ideally during low-traffic periods. Stop your external Iceberg integration. For Kafka Connect: # Stop connector curl -X PUT http://<dataplane-api-url>/kafka-connect/clusters/iceberg-sink-connector/stop # Or delete connector (permanent) curl -X DELETE http://<dataplane-api-url>/kafka-connect/clusters/iceberg-sink-connector Monitor Iceberg Topics to ensure data continues flowing. Verify that no new records are being written to source tables: SELECT MAX(timestamp) FROM <catalog>.<source-table>; -- Should not change after integration is stopped Run validation queries from Validate the migration after 1-2 hours of operation. Wait for a short period, such as 24-48 hours, to monitor and validate stability. If migrating to a unified table of historical plus new data, optionally delete old source tables after an extended validation period (for example, at least seven days): Ensure you have backups before deleting historical data. Some organizations keep old tables for compliance or audit purposes. DROP TABLE <catalog>.<source-table>; Decommission external Iceberg infrastructure after an extended safety period (30+ days, for example). If any issues arise during cutover, see Plan for rollback. Next steps Query Iceberg Topics About Iceberg Topics Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Query Iceberg Topics Schema Registry