Convert JSON Messages into Avro

This example shows you how to query live tracking data from the International Space Station and convert it from JSON to Avro using Redpanda data transforms.

This example uses cURL to query data from api.open-notify.org which is then piped through to Redpanda using the rpk command-line client. When the data is in Redpanda, it’s converted from JSON to Avro using the transforms function. Then, you can see the converted data in Redpanda Console.

Architectural Overview

Prerequisites

You must have the following:

Run the lab

  1. Clone this repository:

    git clone https://github.com/redpanda-data/redpanda-labs.git
  2. Change into the data-transforms/iss_demo/ directory:

    cd redpanda-labs/data-transforms/go/iss_demo
  3. Set the REDPANDA_VERSION environment variable to at least version 23.3.1. Data transforms was introduced in this version. For all available versions, see the GitHub releases.

    For example:

    export REDPANDA_VERSION=24.3.2
  4. Set the REDPANDA_CONSOLE_VERSION environment variable to the version of Redpanda Console that you want to run. For all available versions, see the GitHub releases.

    For example:

    export REDPANDA_CONSOLE_VERSION=2.8.1
  5. Start Redpanda in Docker by running the following command:

    docker compose up -d --wait
  6. Post the Avro schema to the Schema Registry using a cURL command:

    ./post-schema.sh

    Take a note of the schema ID that is returned from this command. In a clean environment this will be 1.

  7. Set up your rpk profile:

    rpk profile create iss_demo --from-profile profile.yml
    Created and switched to new profile "iss_demo".
  8. Create the required topics iss_json and iss_avro:

    rpk topic create iss_json iss_avro
  9. Deploy the transforms function:

    rpk transform build
    rpk transform deploy --var=SCHEMA_ID=1 --input-topic=iss_json --output-topic=iss_avro

    This example accepts the following environment variables:

    • SCHEMA_ID (required): The ID of the Avro schema stored in the Redpanda schema registry.

Now, you can test that the data can be converted from JSON to Avro.

  1. Get a single record representing the location of the ISS:

    curl http://api.open-notify.org/iss-now.json

    Example output:

    {"message": "success", "timestamp": 1695753164, "iss_position": {"latitude": "-12.8784", "longitude": "92.2935"}}
  2. Run rpk topic produce:

    rpk topic produce iss_json
  3. Paste the output of the cURL command into the prompt and press Ctrl+C to exit the prompt.

  4. Consume the Avro topic using rpk topic consume and observe that the transforms function has converted it to Avro:

    rpk topic consume iss_avro --num 1

    Example output:

    {
      "topic": "iss_avro",
      "value": "\u0000\u0000\u0000\u0000\u0001\ufffd\ufffd\u0011\ufffd\ufffd\ufffd)\ufffd\u0010X9\ufffd\ufffd\u0012W@\ufffd\ufffd\ufffd\ufffd\u000c",
      "timestamp": 1695753212929,
      "partition": 0,
      "offset": 0
    }
  5. Open Redpanda Console to view the decoded data.

    Redpanda Console showing the decoded message

Files in the example

  • iss.avsc: Avro schema used for conversion.

  • profile.yml: Used to configure rpk with the rpk profile command.

  • transform.go: This is the Golang code that will be compiled to WebAssembly. This code:

    • Initializes the transform, including getting the schema from the Schema Registry and creating the goavro codec object (both stored in global variables).

    • Registers the callback toAvro.

    • toAvro parses the JSON into a struct iss_now, converts the struct into a map and then converts the map to Avro binary using the goavro codec.

    • Prepends the schema ID using the magic five bytes 0x0 followed by a BigEndian uint32.

    • This is all appended to the output slice.

Clean up

To shut down and delete the containers along with all your cluster data:

docker compose down -v

Next steps

You could set up a loop to poll the location of the ISS and produce it to the iss_json topic. For example:

while true
do
line=`curl http://api.open-notify.org/iss-now.json -s`
echo $line | rpk topic produce iss_json
sleep 1
done