Kafka Migrator

With Kafka Migrator, you can move your workloads from any Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently.

Redpanda Connect’s Kafka Migrator uses functionality from the following components:

For convenience, these components are bundled together into the following:

This cookbook shows how to use the bundled components.

Create the Docker containers

First, you’ll need two clusters. To keep it simple, you can run the Bitnami Kafka and Schema Registry Docker containers for the source cluster and a Redpanda Docker container for the destination cluster via Docker Compose.

docker-compose.yml
services:
  source:
    image: bitnami/kafka
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
      KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
      KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
      KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
    ports:
      - 9092:9092
      - 19092:19092
    healthcheck:
      test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
      start_period: 5s
      interval: 3s

  init_source:
    image: bitnami/kafka
    working_dir: /opt/bitnami/kafka/bin
    entrypoint: /bin/bash
    depends_on:
      source:
        condition: service_healthy
    command: >
      -c "kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic foo --replication-factor=1 --partitions=2 &&
          kafka-topics.sh --bootstrap-server source:19092 --create --if-not-exists --topic bar --replication-factor=1 --partitions=2 &&
          echo 'Created topics:' &&
          kafka-topics.sh --bootstrap-server source:19092 --list --exclude-internal &&
          kafka-acls.sh --bootstrap-server source:19092 --add --allow-principal User:redpanda --operation Read --topic foo &&
          kafka-acls.sh --bootstrap-server source:19092 --add --deny-principal User:redpanda --operation Read --topic bar
          echo 'Created ACLs:' &&
          kafka-acls.sh --bootstrap-server source:19092 --list"

  source_schema_registry:
    image: bitnami/schema-registry
    environment:
      SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://source:19092
    ports:
      - 8081:8081
    depends_on:
      source:
        condition: service_healthy

  destination:
    image: redpandadata/redpanda
    command:
      - redpanda
      - start
      - --node-id 0
      - --mode dev-container
      - --set rpk.additional_start_flags=[--reactor-backend=epoll]
      - --set redpanda.auto_create_topics_enabled=false
      - --kafka-addr 0.0.0.0:9093
      - --advertise-kafka-addr localhost:9093
      - --schema-registry-addr 0.0.0.0:8081
    ports:
      - 8082:8081
      - 9093:9093
      - 9645:9644
docker-compose -f docker-compose.yml up --force-recreate -V
This command uses an init container to create two topics, foo and bar, each with two partitions and an associated ACL.

Create schemas

When the demo clusters are up and running, use curl to create a schema for each topic in the source cluster.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions

Generate messages

Let’s simulate an application with a producer and consumer actively publishing and reading messages on the source cluster. You can use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics from the source cluster.

generate_data.yaml
http:
  enabled: false

input:
  sequence:
    inputs:
      - generate:
          mapping: |
            let msg = counter()
            root.data = $msg

            meta kafka_topic = match $msg % 2 {
              0 => "foo"
              1 => "bar"
            }
          interval: 1s
          count: 0
          batch_size: 1

        processors:
          - schema_registry_encode:
              url: "http://localhost:8081"
              subject: ${! metadata("kafka_topic") }
              avro_raw_json: true

output:
  kafka_franz:
    seed_brokers: [ "localhost:9092" ]
    topic: ${! @kafka_topic }
    partitioner: manual
    partition: ${! random_int(min:0, max:1) }

Now, run this command to start the pipeline, and leave it running:

redpanda-connect run generate_data.yaml

Next, add a Redpanda Connect consumer, which reads messages from the source cluster topics, and leave it running. This consumer uses the foobar consumer group, which will be reused in a later step when consuming from the destination cluster.

read_data_source.yaml
http:
  enabled: false

input:
  kafka_franz:
    seed_brokers: [ "localhost:9092" ]
    topics:
      - '^[^_]' # Skip topics which start with `_`
    regexp_topics: true
    start_from_oldest: true
    consumer_group: foobar

  processors:
    - schema_registry_decode:
        url: "http://localhost:8081"
        avro_raw_json: true

output:
  stdout: {}
  processors:
    - mapping: |
        root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})

Launch the source consumer pipeline, and leave it running:

redpanda-connect run read_data_source.yaml

At this point, the source cluster should have some data in both foo and bar topics, and the consumer should print the messages it reads from these topics to stdout.

Configure and start Kafka Migrator

You’re ready to start the new Kafka Migrator Bundle, which will do the following:

  • On startup, it reads all the schemas from the source cluster Schema Registry through the REST API and pushes them to the destination cluster Schema Registry using the same API. It needs to preserve the schema IDs, so the destination cluster must not have any schemas in it.

  • Once the schemas have been imported, Kafka Migrator begins the migration of all the selected topics from the source cluster, and any associated ACLs. After it finishes creating all the topics and ACLs that don’t exist in the destination cluster, it begins the migration of messages and performs consumer group offsets remapping.

  • If any new topics are created in the source cluster while Kafka Migrator is running, they are only migrated to the destination cluster if messages are written to them.

ACL migration for topics adheres to the following principles:

  • ALLOW WRITE ACLs for topics are not migrated

  • ALLOW ALL ACLs for topics are downgraded to ALLOW READ

  • Group ACLs are not migrated

Changing topic configurations, such as partition count, isn’t currently supported.

Now, use the following Kafka Migrator Bundle configuration. See the kafka_migrator_bundle input and kafka_migrator_bundle output docs for details.

The max_in_flight: 1 setting is required to preserve message ordering at the partition level. See the kafka_migrator output documentation for more details.
kafka_migrator_bundle.yaml
input:
  kafka_migrator_bundle:
    kafka_migrator:
      seed_brokers: [ "localhost:9092" ]
      topics:
        - '^[^_]' # Skip internal topics which start with `_`
      regexp_topics: true
      consumer_group: migrator_bundle
      start_from_oldest: true

    schema_registry:
      url: http://localhost:8081
      include_deleted: true
      subject_filter: ""

output:
  kafka_migrator_bundle:
    kafka_migrator:
      seed_brokers: [ "localhost:9093" ]
      max_in_flight: 1

    schema_registry:
      url: http://localhost:8082

metrics:
  prometheus: {}
  mapping: |
    meta label = if this == "input_kafka_migrator_lag" { "source" }

Launch the Kafka Migrator Bundle pipeline, and leave it running:

redpanda-connect run kafka_migrator_bundle.yaml

Check the status of migrated topics

You can use the Redpanda rpk CLI tool to check which topics and ACLs have been migrated to the destination cluster. You can quickly install rpk if you don’t already have it.

For now, users need to be migrated manually. However, this step is not required for the current demo. Similarly, roles are specific to Redpanda and, for now, will also require manual migration if the source cluster is based on Redpanda.
rpk -X brokers=localhost:9093 topic list
NAME      PARTITIONS  REPLICAS
_schemas  1           1
bar       2           1
foo       2           1

rpk -X brokers=localhost:9093 security acl list
PRINCIPAL      HOST  RESOURCE-TYPE  RESOURCE-NAME  RESOURCE-PATTERN-TYPE  OPERATION  PERMISSION  ERROR
User:redpanda  *     TOPIC          bar            LITERAL                READ       DENY
User:redpanda  *     TOPIC          foo            LITERAL                READ       ALLOW

Check metrics to monitor progress

Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the standard Redpanda Connect metrics, the kafka_migrator input also emits an input_kafka_migrator_lag metric for monitoring the migration progress of each topic and partition.

curl http://localhost:4195/metrics
...
# HELP input_kafka_migrator_lag Benthos Gauge metric
# TYPE input_kafka_migrator_lag gauge
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
...

Read from the migrated topics

Stop the read_data_source.yaml consumer you started earlier and, afterwards, start a similar consumer for the destination cluster. Before starting the consumer up on the destination cluster, make sure you give the migrator bundle some time to replicate the translated offset.

read_data_destination.yaml
http:
  enabled: false

input:
  kafka_franz:
    seed_brokers: [ "localhost:9093" ]
    topics:
      - '^[^_]' # Skip topics which start with `_`
    regexp_topics: true
    start_from_oldest: true
    consumer_group: foobar

  processors:
    - schema_registry_decode:
        url: "http://localhost:8082"
        avro_raw_json: true

output:
  stdout: {}
  processors:
    - mapping: |
        root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})

Now launch the destination consumer pipeline, and leave it running:

redpanda-connect run read_data_destination.yaml

It’s worth clarifying that the source cluster consumer uses the same foobar consumer group. As you can see, this consumer resumes reading messages from where the source consumer left off.

And you’re all done!

Due to the mechanics of the Kafka protocol, Kafka Migrator needs to perform offset remapping when migrating consumer group offsets to the destination cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the destination cluster is queried to find the latest offset before the received offset timestamp. Kafka Migrator then writes this offset as the destination consumer group offset for the corresponding topic and partition pair.

Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimises the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.

The content from this cookbook was first introduced on the Redpanda Blog.