Docs Cloud Redpanda Connect Cookbooks Redpanda Migrator Redpanda Migrator With Redpanda Migrator, you can move your workloads from any Apache Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently. Redpanda Connect’s Redpanda Migrator uses functionality from the following components: redpanda_migrator input redpanda_migrator output kafka_franz input redpanda_migrator_offsets output schema_registry input schema_registry output For convenience, these components are bundled together into the following: redpanda_migrator_bundle input redpanda_migrator_bundle output This cookbook shows how to use the bundled components. Create a Kafka cluster and a Redpanda Cloud cluster First, you’ll need to provision two clusters, a Kafka one called source and a Redpanda Cloud one called destination. We’ll use the following sample connection details throughout the rest of this cookbook: source broker: source.cloud.kafka.com:9092 schema registry: https://schema-registry-source.cloud.kafka.com:30081 username: kafka password: testpass destination broker: destination.cloud.redpanda.com:9092 schema registry: https://schema-registry-destination.cloud.redpanda.com:30081 username: redpanda password: testpass Then you’ll have to create two topics in the source Kafka cluster, foo and bar, and an ACL for each topic: cat > ./config.properties <<EOF security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="testpass"; EOF kafka-topics.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --create --if-not-exists --topic foo --replication-factor=3 --partitions=2 kafka-topics.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --create --if-not-exists --topic bar --replication-factor=3 --partitions=2 kafka-topics.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --list --exclude-internal kafka-acls.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --add --allow-principal User:redpanda --operation Read --topic foo kafka-acls.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --add --deny-principal User:redpanda --operation Read --topic bar kafka-acls.sh --bootstrap-server source.cloud.kafka.com:9092 --command-config config.properties --list Create schemas When the demo clusters are up and running, use curl to create a schema for each topic in the source cluster. curl -X POST -u "kafka:testpass" -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' https://schema-registry-source.cloud.kafka.com:30081/subjects/foo/versions curl -X POST -u "kafka:testpass" -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' https://schema-registry-source.cloud.kafka.com:30081/subjects/bar/versions Generate messages Let’s simulate an application with a producer and consumer actively publishing and reading messages on the source cluster. You can use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics from the source cluster. generate_data.yaml http: enabled: false input: sequence: inputs: - generate: mapping: | let msg = counter() root.data = $msg meta kafka_topic = match $msg % 2 { 0 => "foo" 1 => "bar" } interval: 1s count: 0 batch_size: 1 processors: - schema_registry_encode: url: "https://schema-registry-source.cloud.kafka.com:30081" subject: ${! metadata("kafka_topic") } avro_raw_json: true basic_auth: enabled: true username: kafka password: testpass output: kafka_franz: seed_brokers: [ "source.cloud.kafka.com:9092" ] topic: ${! @kafka_topic } partitioner: manual partition: ${! random_int(min:0, max:1) } tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: kafka password: testpass Now, run this command to start the pipeline, and leave it running: rpk connect run generate_data.yaml Next, add a Redpanda Connect consumer, which reads messages from the source cluster topics, and leave it running. This consumer uses the foobar consumer group, which will be reused in a later step when consuming from the destination cluster. read_data_source.yaml http: enabled: false input: kafka_franz: seed_brokers: [ "source.cloud.kafka.com:9092" ] topics: - '^[^_]' # Skip topics which start with `_` regexp_topics: true start_from_oldest: true consumer_group: foobar tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: kafka password: testpass processors: - schema_registry_decode: url: "https://schema-registry-source.cloud.kafka.com:30081" avro_raw_json: true basic_auth: enabled: true username: kafka password: testpass output: stdout: {} processors: - mapping: | root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition}) Launch the source consumer pipeline, and leave it running: rpk connect run read_data_source.yaml At this point, the source cluster should have some data in both foo and bar topics, and the consumer should print the messages it reads from these topics to stdout. Configure and start Redpanda Migrator The Redpanda Migrator Bundle does the following: On startup, it reads all the schemas from the source cluster Schema Registry through the REST API and pushes them to the destination cluster Schema Registry using the same API. It needs to preserve the schema IDs, so the destination cluster must not have any schemas in it. Once the schemas have been imported, Redpanda Migrator begins the migration of all the selected topics from the source cluster, and any associated ACLs. After it finishes creating all the topics and ACLs that don’t exist in the destination cluster, it begins the migration of messages and performs consumer group offsets remapping. If any new topics are created in the source cluster while Redpanda Migrator is running, they are only migrated to the destination cluster if messages are written to them. ACL migration for topics adheres to the following principles: ALLOW WRITE ACLs for topics are not migrated ALLOW ALL ACLs for topics are downgraded to ALLOW READ Group ACLs are not migrated Changing topic configurations, such as partition count, isn’t currently supported. Now, use the following Redpanda Migrator Bundle configuration. See the redpanda_migrator_bundle input and redpanda_migrator_bundle output docs for details. The max_in_flight: 1 setting is required to preserve message ordering at the partition level. See the redpanda_migrator output documentation for more details. redpanda_migrator_bundle.yaml input: redpanda_migrator_bundle: redpanda_migrator: seed_brokers: [ "source.cloud.kafka.com:9092" ] topics: - '^[^_]' # Skip internal topics which start with `_` regexp_topics: true consumer_group: migrator_bundle start_from_oldest: true sasl: - mechanism: SCRAM-SHA-256 username: kafka password: testpass schema_registry: url: "https://schema-registry-source.cloud.kafka.com:30081" include_deleted: true subject_filter: "" basic_auth: enabled: true username: kafka password: testpass output: redpanda_migrator_bundle: redpanda_migrator: seed_brokers: [ "destination.cloud.redpanda.com:9092" ] max_in_flight: 1 sasl: - mechanism: SCRAM-SHA-256 username: redpanda password: testpass schema_registry: url: https://schema-registry-destination.cloud.redpanda.com:30081 basic_auth: enabled: true username: redpanda password: testpass metrics: prometheus: {} mapping: | meta label = if this == "input_redpanda_migrator_lag" { "source" } Launch the Redpanda Migrator Bundle pipeline, and leave it running: rpk connect run redpanda_migrator_bundle.yaml Check the status of migrated topics You can use the Redpanda rpk CLI tool to check which topics and ACLs have been migrated to the destination cluster. You can quickly install rpk if you don’t already have it. For now, users need to be migrated manually. However, this step is not required for the current demo. Similarly, roles are specific to Redpanda and, for now, will also require manual migration if the source cluster is based on Redpanda. rpk -X brokers=destination.cloud.redpanda.com:9092 -X tls.enabled=true -X sasl.mechanism=SCRAM-SHA-256 -X user=redpanda -X pass=testpass topic list NAME PARTITIONS REPLICAS _schemas 1 1 bar 2 1 foo 2 1 rpk -X brokers=destination.cloud.redpanda.com:9092 -X tls.enabled=true -X sasl.mechanism=SCRAM-SHA-256 -X user=redpanda -X pass=testpass security acl list PRINCIPAL HOST RESOURCE-TYPE RESOURCE-NAME RESOURCE-PATTERN-TYPE OPERATION PERMISSION ERROR User:redpanda * TOPIC bar LITERAL READ DENY User:redpanda * TOPIC foo LITERAL READ ALLOW Check metrics to monitor progress Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the standard Redpanda Connect metrics, the redpanda_migrator input also emits an input_redpanda_migrator_lag metric for monitoring the migration progress of each topic and partition. curl http://localhost:4195/metrics ... # HELP input_redpanda_migrator_lag Benthos Gauge metric # TYPE input_redpanda_migrator_lag gauge input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0 input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1 input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 ... Read from the migrated topics Stop the read_data_source.yaml consumer you started earlier and, afterwards, start a similar consumer for the destination cluster. Before starting the consumer up on the destination cluster, make sure you give the migrator bundle some time to replicate the translated offset. read_data_destination.yaml http: enabled: false input: kafka_franz: seed_brokers: [ "destination.cloud.redpanda.com:9092" ] topics: - '^[^_]' # Skip topics which start with `_` regexp_topics: true start_from_oldest: true consumer_group: foobar sasl: - mechanism: SCRAM-SHA-256 username: redpanda password: testpass processors: - schema_registry_decode: url: "https://schema-registry-destination.cloud.redpanda.com:30081" avro_raw_json: true basic_auth: enabled: true username: redpanda password: testpass output: stdout: {} processors: - mapping: | root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition}) Now launch the destination consumer pipeline, and leave it running: rpk connect run read_data_destination.yaml It’s worth clarifying that the source cluster consumer uses the same foobar consumer group. As you can see, this consumer resumes reading messages from where the source consumer left off. And you’re all done! Due to the mechanics of the Kafka protocol, Redpanda Migrator needs to perform offset remapping when migrating consumer group offsets to the destination cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the destination cluster is queried to find the latest offset before the received offset timestamp. Redpanda Migrator then writes this offset as the destination consumer group offset for the corresponding topic and partition pair. Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimises the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic. The content from this cookbook was first introduced on the Redpanda Blog. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Joining Streams Retrieval-Augmented Generation (RAG)