Data Transforms in Linux Quickstart

Data transforms is generally available for all Redpanda Community and Redpanda Enterprise Edition users. To unlock this feature in Redpanda Cloud, contact Redpanda support.

Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.

Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform commands.

You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers.

Prerequisites

You must have the following:

  • A Redpanda cluster running at least version 24.2.

  • External access to the Kafka API and the Admin API.

  • Development tools installed on your host machine:

    • For Golang, you must have at least version 1.20 of Go.

    • For Rust, you must have the latest stable version of Rust.

  • The rpk command-line client installed on your host machine and configured to connect to your Redpanda cluster.

Enable data transforms

Data transforms is disabled on all clusters by default. Before you can deploy data transforms to a cluster, you must first enable the feature.

  1. To enable data transforms, set the data_transforms_enabled cluster property to true:

    rpk cluster config set data_transforms_enabled true
    bash
  2. Restart all brokers:

    rpk redpanda stop
    rpk redpanda start
    bash

Create a data transforms project

The easiest way to create a new data transforms project is to use the rpk transform init command. This command generates template files and installs any dependencies for your chosen language.

Create and initialize a data transforms project:

rpk transform init --language=tinygo --name=data-transforms-tutorial
bash

A successful command generates project files in your current directory:

.
├── go.mod
├── go.sum
├── README.md
├── transform.go
└── transform.yaml

The transform.go file is the source file for your transform function.

The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command.

Now that you have a project set up, you can run some examples to learn how to work with data transforms. Make sure to copy the provided transform functions and paste them into your source file. For example, the transform.go file for Go projects, or the src/main.rs file for Rust.

Run examples

This section provides some examples of transform functions to teach you the basics of writing and deploying data transforms. It’s best to try each example in order, one after the other.

Copy records from one topic to another

This transform function copies the same data from an input topic to an output topic.

  1. Paste this transform function into your source file:

    package main
    
    import (
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	// Make sure to register your callback and perform other setup in main
    	transform.OnRecordWritten(copyRecordsToOutput)
    }
    
    // This will be called for each record in the input topic.
    // The records returned will be written to the output topic.
    func copyRecordsToOutput(event transform.WriteEvent, writer transform.RecordWriter) error {
    	return writer.Write(event.Record())
    }
    go
  2. Build the transform into a Wasm binary:

    rpk transform build
    bash
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic
    bash
  4. Deploy the Wasm binary to your cluster:

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic
    bash
  5. Produce two new records to the input topic.

    echo "hello\nworld" | rpk topic produce input-topic
    bash
  6. Open Redpanda Console and check the records in both the input topic and the output topic. They should be the same.

    You can also verify the content of the output topic in the command-line:

    rpk topic consume output-topic
    bash

Convert CSV input to JSON output

This example is a transform function that converts CSV inputs into JSON outputs.

  1. Prepare the project files:

    Paste this transform function into your source file:

    package main
    
    import (
    	"bytes"
    	"encoding/csv"
    	"encoding/json"
    	"errors"
    	"io"
    	"strconv"
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	transform.OnRecordWritten(csvToJsonTransform)
    }
    
    type ItemQuantity struct {
    	Item     string `json:"item"`
    	Quantity int    `json:"quantity"`
    }
    
    func csvToJsonTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
    	// The input data is a CSV (without a header row) that is structured as:
    	// key, item, quantity
    	reader := csv.NewReader(bytes.NewReader(event.Record().Value))
    	// Improve performance by reusing the result slice.
    	reader.ReuseRecord = true
    	for {
    		row, err := reader.Read()
    		if err == io.EOF {
    			break
    		} else if err != nil {
    			return err
    		}
    		if len(row) != 3 {
    			return errors.New("unexpected number of rows")
    		}
    		// Convert the last column into an int
    		quantity, err := strconv.Atoi(row[2])
    		if err != nil {
    			return err
    		}
    		// Marshall the JSON value
    		iq := ItemQuantity{
    			Item:     row[1],
    			Quantity: quantity,
    		}
    		v, err := json.Marshal(&iq)
    		if err != nil {
    			return err
    		}
    		// Add the output record using the first column as the key.
    		r := transform.Record{
    			Key:   []byte(row[0]),
    			Value: v,
    		}
    		if err := writer.Write(r); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    go
  2. Build the transform into a Wasm binary:

    rpk transform build
    bash
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic
    bash
  4. Deploy the Wasm binary to your cluster.

    If you have already deployed another example, this new transform function will replace it.

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic
    bash
  5. Produce CSV records to the input topic.

    echo "apples,10\npears,11\noranges,5" | rpk topic produce input-topic -k market-stock
    bash
  6. Open Redpanda Console and check the records in both the input topic and the output topic. You should see the following values:

    {
    	"item": "oranges",
    	"quantity": 5
    }
    {
    	"item": "apples",
    	"quantity": 10
    }
    {
    	"item": "pears",
    	"quantity": 11
    }
    json

    You can also verify the content of the output topic in the command-line:

    rpk topic consume output-topic
    bash

Validate JSON

This example is a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.

  1. Paste this transform function into your source file:

    import (
    	"encoding/json"
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	transform.OnRecordWritten(filterValidJson)
    }
    
    func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
    	if json.Valid(event.Record().Value) {
    		return w.Write(e.Record())
    	}
    	// Send invalid records to separate topic
    	return writer.Write(e.Record(), transform.ToTopic("invalid-json"))
    }
    go
  2. Build the transform into a Wasm binary:

    rpk transform build
    bash
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic invalid-json
    bash
  4. Deploy the Wasm binary to your cluster.

    If you have already deployed another example, this new transform function will replace it.

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic --output-topic=invalid-json
    bash
  5. Produce an invalid JSON record a one valid one to the input topic.

    echo '{"valid":"json"}' | rpk topic produce input-topic -k json
    echo 'invalid json' | rpk topic produce input-topic -k json
    bash
  6. Verify the content of the output topic in the command-line:

    rpk topic consume output-topic
    bash

You should see only the invalid JSON from the input topic. Invalid JSON messages are written to the invalid-json topic.

Clean up

Your transform function will continue processing new records in the input topic until you delete it. To delete the transform function:

rpk transform delete data-transforms-tutorial --no-confirm
bash