Redpanda Migrator
With Redpanda Migrator, you can move your workloads from any Apache Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently.
Redpanda Connect’s Redpanda Migrator uses the unified migrator components (available in Redpanda Connect 4.67.5+):
-
redpanda_migratorinput connects to the source Kafka cluster and Schema Registry. -
redpanda_migratoroutput handles all migration logic including topic creation, schema synchronization, and consumer group offset translation.
If you’re currently using the legacy redpanda_migrator_bundle components, see Migrate to the Unified Redpanda Migrator for migration instructions.
|
Create the Docker containers
First, you need two clusters. To keep it simple, you can run the Bitnami Kafka and Schema Registry Docker containers for the source cluster and a Redpanda Docker container for the destination cluster via Docker Compose.
docker-compose.ymlservices:
source:
image: bitnami/kafka
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
ports:
- 9092:9092
- 19092:19092
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
start_period: 5s
interval: 3s
init_source:
image: bitnami/kafka
working_dir: /opt/bitnami/kafka/bin
entrypoint: /bin/bash
depends_on:
source:
condition: service_healthy
command: >
-c "kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic foo --replication-factor=1 --partitions=2 &&
kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic bar --replication-factor=1 --partitions=2 &&
echo 'Created topics:' &&
kafka-topics.sh --bootstrap-server source:19092 --list --exclude-internal &&
kafka-acls.sh --bootstrap-server source:19092 --add --allow-principal User:redpanda --operation Read --topic foo &&
kafka-acls.sh --bootstrap-server source:19092 --add --deny-principal User:redpanda --operation Read --topic bar
echo 'Created ACLs:' &&
kafka-acls.sh --bootstrap-server source:19092 --list"
source_schema_registry:
image: bitnami/schema-registry
environment:
SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://source:19092
ports:
- 8081:8081
depends_on:
source:
condition: service_healthy
destination:
image: redpandadata/redpanda
command:
- redpanda
- start
- --node-id 0
- --mode dev-container
- --set rpk.additional_start_flags=[--reactor-backend=epoll]
- --set redpanda.auto_create_topics_enabled=false
- --kafka-addr 0.0.0.0:9093
- --advertise-kafka-addr localhost:9093
- --schema-registry-addr 0.0.0.0:8081
ports:
- 8082:8081
- 9093:9093
- 9645:9644
docker compose -f docker-compose.yml up --force-recreate -V
This command uses an init container to create two topics, foo and bar, each with two partitions and an associated ACL.
|
Create schemas
When the demo clusters are up and running, use curl to create a schema for each topic in the source cluster.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions
Generate messages
Let’s simulate an application with a producer and consumer actively publishing and reading messages on the source cluster. You can use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics from the source cluster.
generate_data.yamlhttp:
enabled: false
input:
sequence:
inputs:
- generate:
mapping: |
let msg = counter()
root.data = $msg
meta kafka_topic = match $msg % 2 {
0 => "foo"
1 => "bar"
}
interval: 1s
count: 0
batch_size: 1
processors:
- schema_registry_encode:
url: "http://localhost:8081"
subject: ${! metadata("kafka_topic") }
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ "localhost:9092" ]
topic: ${! @kafka_topic }
partitioner: manual
partition: ${! random_int(min:0, max:1) }
Now, run this command to start the pipeline, and leave it running:
rpk connect run generate_data.yaml
Next, add a Redpanda Connect consumer, which reads messages from the source cluster topics, and leave it running. This consumer uses the foobar consumer group, which is reused in a later step when consuming from the destination cluster.
read_data_source.yamlhttp:
enabled: false
input:
kafka_franz:
seed_brokers: [ "localhost:9092" ]
topics:
- '^[^_]' # Skip topics which start with `_`
regexp_topics: true
consumer_group: foobar
processors:
- schema_registry_decode:
url: "http://localhost:8081"
avro_raw_json: true
output:
stdout: {}
processors:
- mapping: |
root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
Launch the source consumer pipeline, and leave it running:
rpk connect run read_data_source.yaml
At this point, the source cluster has some data in both foo and bar topics, and the consumer prints the messages it reads from these topics to stdout.
Configure and start Redpanda Migrator
The unified Redpanda Migrator does the following:
-
The
redpanda_migratorinput connects to the source Kafka cluster and Schema Registry to consume messages and schema information. -
The
redpanda_migratoroutput handles all migration logic:-
Schema migration: reads schemas from the source Schema Registry and synchronizes them to the destination.
-
Topic creation: automatically creates destination topics that don’t exist with proper configurations.
-
ACL migration: migrates access control lists according to the migration rules.
-
Message streaming: processes and routes messages from source to destination topics.
-
Consumer group offset translation: maps source consumer group offsets to equivalent destination positions.
-
-
If new topics are created in the source cluster while the migrator is running, they are migrated when messages are written to them.
ACL migration for topics adheres to the following principles:
-
ALLOW WRITEACLs for topics are not migrated -
ALLOW ALLACLs for topics are downgraded toALLOW READ -
Group ACLs are not migrated
| Changing topic configurations, such as partition count, isn’t currently supported. |
Now, use the following unified Redpanda Migrator configuration. See the redpanda_migrator input and redpanda_migrator output docs for details.
redpanda_migrator.yamlinput:
label: "migration_pipeline" (1)
redpanda_migrator:
# Source Kafka settings
seed_brokers: [ "localhost:9092" ]
topics:
- '^[^_]' # Skip internal topics which start with `_`
regexp_topics: true
consumer_group: migrator
# Source Schema Registry settings
schema_registry:
url: http://localhost:8081
output:
label: "migration_pipeline" (2)
redpanda_migrator:
# Destination Redpanda settings
seed_brokers: [ "localhost:9093" ]
# Destination Schema Registry and migration settings
schema_registry:
url: http://localhost:8082
include_deleted: true
translate_ids: true
# Consumer group migration settings
consumer_groups:
enabled: true
interval: 30s
metrics:
prometheus: {}
mapping: |
meta label = if this == "input_redpanda_migrator_lag" { "source" }
| 1 | Labels are used for pairing input and output components. | ||
| 2 | Matching label pairs the input and output components.
Labels are required only when using multiple input/output pairs (for example, migrating from multiple Kafka clusters to multiple Redpanda clusters). However, it’s recommended to always use labels for clarity and consistency, even with single input/output configurations.
|
Launch the unified Redpanda Migrator pipeline, and leave it running:
rpk connect run redpanda_migrator.yaml
Check the status of migrated topics
You can use the Redpanda rpk CLI tool to check which topics and ACLs have been migrated to the destination cluster. You can quickly install rpk if you don’t already have it.
For now, users require manual migration. However, this step is not required for the current demo. Similarly, roles are specific to Redpanda and, for now, also require manual migration if the source cluster is based on Redpanda.
|
rpk -X brokers=localhost:9093 topic list
NAME PARTITIONS REPLICAS
_schemas 1 1
bar 2 1
foo 2 1
rpk -X brokers=localhost:9093 security acl list
PRINCIPAL HOST RESOURCE-TYPE RESOURCE-NAME RESOURCE-PATTERN-TYPE OPERATION PERMISSION ERROR
User:redpanda * TOPIC bar LITERAL READ DENY
User:redpanda * TOPIC foo LITERAL READ ALLOW
Check metrics to monitor progress
Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the standard Redpanda Connect metrics, the redpanda_migrator input also emits an input_redpanda_migrator_lag metric for monitoring the migration progress of each topic and partition.
curl http://localhost:4195/metrics
...
# HELP input_redpanda_migrator_lag Benthos Gauge metric
# TYPE input_redpanda_migrator_lag gauge
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
...
Read from the migrated topics
Stop the read_data_source.yaml consumer you started earlier and, afterwards, start a similar consumer for the destination cluster. Before starting the consumer up on the destination cluster, make sure you give the migrator bundle some time to replicate the translated offset.
read_data_destination.yamlhttp:
enabled: false
input:
kafka_franz:
seed_brokers: [ "localhost:9093" ]
topics:
- '^[^_]' # Skip topics which start with `_`
regexp_topics: true
consumer_group: foobar
processors:
- schema_registry_decode:
url: "http://localhost:8082"
avro_raw_json: true
output:
stdout: {}
processors:
- mapping: |
root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
Now launch the destination consumer pipeline, and leave it running:
rpk connect run read_data_destination.yaml
The source cluster consumer uses the same foobar consumer group. This consumer resumes reading messages from where the source consumer left off.
Redpanda Migrator performs offset remapping when migrating consumer group offsets to the destination cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the destination cluster is queried to find the latest offset before the received offset timestamp. Redpanda Migrator then writes this offset as the destination consumer group offset for the corresponding topic and partition pair.
Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimizes the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.