Redpanda Migrator
With Redpanda Migrator, you can move your workloads from any Apache Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently.
Redpanda Connect’s Redpanda Migrator uses functionality from the following components:
For convenience, these components are bundled together into the following:
This cookbook shows how to use the bundled components.
Create the Docker containers
First, you’ll need two clusters. To keep it simple, you can run the Bitnami Kafka and Schema Registry Docker containers for the source cluster and a Redpanda Docker container for the destination cluster via Docker Compose.
docker-compose.yml
services:
source:
image: bitnami/kafka
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
ports:
- 9092:9092
- 19092:19092
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
start_period: 5s
interval: 3s
init_source:
image: bitnami/kafka
working_dir: /opt/bitnami/kafka/bin
entrypoint: /bin/bash
depends_on:
source:
condition: service_healthy
command: >
-c "kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic foo --replication-factor=1 --partitions=2 &&
kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic bar --replication-factor=1 --partitions=2 &&
echo 'Created topics:' &&
kafka-topics.sh --bootstrap-server source:19092 --list --exclude-internal &&
kafka-acls.sh --bootstrap-server source:19092 --add --allow-principal User:redpanda --operation Read --topic foo &&
kafka-acls.sh --bootstrap-server source:19092 --add --deny-principal User:redpanda --operation Read --topic bar
echo 'Created ACLs:' &&
kafka-acls.sh --bootstrap-server source:19092 --list"
source_schema_registry:
image: bitnami/schema-registry
environment:
SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://source:19092
ports:
- 8081:8081
depends_on:
source:
condition: service_healthy
destination:
image: redpandadata/redpanda
command:
- redpanda
- start
- --node-id 0
- --mode dev-container
- --set rpk.additional_start_flags=[--reactor-backend=epoll]
- --set redpanda.auto_create_topics_enabled=false
- --kafka-addr 0.0.0.0:9093
- --advertise-kafka-addr localhost:9093
- --schema-registry-addr 0.0.0.0:8081
ports:
- 8082:8081
- 9093:9093
- 9645:9644
docker-compose -f docker-compose.yml up --force-recreate -V
This command uses an init container to create two topics, foo and bar , each with two partitions and an associated ACL.
|
Create schemas
When the demo clusters are up and running, use curl to create a schema for each topic in the source
cluster.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions
Generate messages
Let’s simulate an application with a producer and consumer actively publishing and reading messages on the source
cluster. You can use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics from the source
cluster.
generate_data.yaml
http:
enabled: false
input:
sequence:
inputs:
- generate:
mapping: |
let msg = counter()
root.data = $msg
meta kafka_topic = match $msg % 2 {
0 => "foo"
1 => "bar"
}
interval: 1s
count: 0
batch_size: 1
processors:
- schema_registry_encode:
url: "http://localhost:8081"
subject: ${! metadata("kafka_topic") }
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ "localhost:9092" ]
topic: ${! @kafka_topic }
partitioner: manual
partition: ${! random_int(min:0, max:1) }
Now, run this command to start the pipeline, and leave it running:
rpk connect run generate_data.yaml
Next, add a Redpanda Connect consumer, which reads messages from the source
cluster topics, and leave it running. This consumer uses the foobar
consumer group, which will be reused in a later step when consuming from the destination
cluster.
read_data_source.yaml
http:
enabled: false
input:
kafka_franz:
seed_brokers: [ "localhost:9092" ]
topics:
- '^[^_]' # Skip topics which start with `_`
regexp_topics: true
start_from_oldest: true
consumer_group: foobar
processors:
- schema_registry_decode:
url: "http://localhost:8081"
avro_raw_json: true
output:
stdout: {}
processors:
- mapping: |
root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
Launch the source
consumer pipeline, and leave it running:
rpk connect run read_data_source.yaml
At this point, the source
cluster should have some data in both foo
and bar
topics, and the consumer should print the messages it reads from these topics to stdout
.
Configure and start Redpanda Migrator
The Redpanda Migrator Bundle does the following:
-
On startup, it reads all the schemas from the
source
cluster Schema Registry through the REST API and pushes them to the destination cluster Schema Registry using the same API. It needs to preserve the schema IDs, so thedestination
cluster must not have any schemas in it. -
Once the schemas have been imported, Redpanda Migrator begins the migration of all the selected topics from the
source
cluster, and any associated ACLs. After it finishes creating all the topics and ACLs that don’t exist in thedestination
cluster, it begins the migration of messages and performs consumer group offsets remapping. -
If any new topics are created in the
source
cluster while Redpanda Migrator is running, they are only migrated to thedestination
cluster if messages are written to them.
ACL migration for topics adheres to the following principles:
-
ALLOW WRITE
ACLs for topics are not migrated -
ALLOW ALL
ACLs for topics are downgraded toALLOW READ
-
Group ACLs are not migrated
Changing topic configurations, such as partition count, isn’t currently supported. |
Now, use the following Redpanda Migrator Bundle configuration. See the redpanda_migrator_bundle
input and redpanda_migrator_bundle
output docs for details.
The max_in_flight: 1 setting is required to preserve message ordering at the partition level. See the redpanda_migrator output documentation for more details.
|
redpanda_migrator_bundle.yaml
input:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ "localhost:9092" ]
topics:
- '^[^_]' # Skip internal topics which start with `_`
regexp_topics: true
consumer_group: migrator_bundle
start_from_oldest: true
replication_factor_override: false
schema_registry:
url: http://localhost:8081
include_deleted: true
subject_filter: ""
output:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ "localhost:9093" ]
max_in_flight: 1
replication_factor_override: false
schema_registry:
url: http://localhost:8082
metrics:
prometheus: {}
mapping: |
meta label = if this == "input_redpanda_migrator_lag" { "source" }
Launch the Redpanda Migrator Bundle pipeline, and leave it running:
rpk connect run redpanda_migrator_bundle.yaml
Check the status of migrated topics
You can use the Redpanda rpk
CLI tool to check which topics and ACLs have been migrated to the destination
cluster. You can quickly install rpk
if you don’t already have it.
For now, users need to be migrated manually. However, this step is not required for the current demo. Similarly, roles are specific to Redpanda and, for now, will also require manual migration if the source cluster is based on Redpanda.
|
rpk -X brokers=localhost:9093 topic list
NAME PARTITIONS REPLICAS
_schemas 1 1
bar 2 1
foo 2 1
rpk -X brokers=localhost:9093 security acl list
PRINCIPAL HOST RESOURCE-TYPE RESOURCE-NAME RESOURCE-PATTERN-TYPE OPERATION PERMISSION ERROR
User:redpanda * TOPIC bar LITERAL READ DENY
User:redpanda * TOPIC foo LITERAL READ ALLOW
Check metrics to monitor progress
Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the standard Redpanda Connect metrics, the redpanda_migrator
input also emits an input_redpanda_migrator_lag
metric for monitoring the migration progress of each topic and partition.
curl http://localhost:4195/metrics
...
# HELP input_redpanda_migrator_lag Benthos Gauge metric
# TYPE input_redpanda_migrator_lag gauge
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0
input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1
input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
...
Read from the migrated topics
Stop the read_data_source.yaml
consumer you started earlier and, afterwards, start a similar consumer for the destination
cluster. Before starting the consumer up on the destination
cluster, make sure you give the migrator bundle some time to replicate the translated offset.
read_data_destination.yaml
http:
enabled: false
input:
kafka_franz:
seed_brokers: [ "localhost:9093" ]
topics:
- '^[^_]' # Skip topics which start with `_`
regexp_topics: true
start_from_oldest: true
consumer_group: foobar
processors:
- schema_registry_decode:
url: "http://localhost:8082"
avro_raw_json: true
output:
stdout: {}
processors:
- mapping: |
root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
Now launch the destination
consumer pipeline, and leave it running:
rpk connect run read_data_destination.yaml
It’s worth clarifying that the source
cluster consumer uses the same foobar
consumer group. As you can see, this consumer resumes reading messages from where the source
consumer left off.
And you’re all done!
Due to the mechanics of the Kafka protocol, Redpanda Migrator needs to perform offset remapping when migrating consumer group offsets to the destination
cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the destination
cluster is queried to find the latest offset before the received offset timestamp. Redpanda Migrator then writes this offset as the destination
consumer group offset for the corresponding topic and partition pair.
Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimises the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.
The content from this cookbook was first introduced on the Redpanda Blog.