Data Transforms Sandbox

Technical Preview

Technical preview features are not supported for production deployments.

Redpanda data transforms provide a framework to create, build, and deploy inline data transformations on data written to Redpanda topics. You can develop custom data functions, which run asynchronously using a WebAssembly (Wasm) engine inside a Redpanda broker. A transform function processes every message produced to an input topic and returns one or more messages that are then produced to an output topic.

With data transforms, you can perform common data streaming tasks like filtering, scrubbing/cleaning/redacting, and normalizing and transcoding (for example, converting JSON to Avro). This is done efficiently within Redpanda, without the overhead of sending data to a separate stream processing environment or pushing this work onto consuming applications. Data transforms dramatically simplify the effort of delivering validated and pre-processed data to consumers in the form they expect. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.

This technical preview provides a Docker container with a single broker to test Redpanda data transforms. You can develop data transforms with rpk transform commands. This page provides steps to initialize a transform project with rpk transform init, edit the transform logic in transform.go, then build and deploy the transform to a Redpanda cluster.

Share your feedback in the #wasm-transforms Redpanda Community Slack channel or in the feedback form.

Run data transforms

Follow these steps to set up, create, build, and deploy a Redpanda data transforms project in Golang.

Prerequisites

  1. Install Golang (latest version: 1.20).

  2. Install Docker.

Limitations

  • Golang WebAssembly transforms are compiled using tinygo. Some library modules may not work in this environment; for example, generated protocol buffer code is not supported.

  • The network and local disk are not accessible in this technical preview version.

Install rpk

  1. Download the technical preview of rpk that supports data transforms for your architecture.

    • Apple silicon

    • Apple x86-64

    • Linux x86-64

    • Linux Arm64

    curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-darwin-arm64.tar.gz
    curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-darwin-amd64.tar.gz
    curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-linux-amd64.tar.gz
    curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-linux-arm64.tar.gz
  2. Unzip the downloaded bundle to get the custom rpk binary.

  3. Set the custom rpk binary in your PATH.

Set up Redpanda

  1. Start a Redpanda container with the custom rpk binary with data transforms enabled:

    rpk container start
    Checking for a local image...
    Creating network redpanda
    Starting cluster...
    Waiting for the cluster to be ready...
    
    Cluster started!
    NODE-ID  STATUS   KAFKA-ADDRESS    ADMIN-ADDRESS    PROXY-ADDRESS
    0        running  127.0.0.1:56086  127.0.0.1:56085  127.0.0.1:56087
    
    You can use rpk to interact with this cluster. E.g:
    
        rpk cluster info -X brokers=127.0.0.1:56086
        rpk cluster health -X admin.hosts=127.0.0.1:56085
    
    You may also set an environment variable with the comma-separated list of
    broker and admin API addresses:
    
        export RPK_BROKERS="127.0.0.1:56086"
        export RPK_ADMIN_HOSTS="127.0.0.1:56085"
        rpk cluster info
        rpk cluster health
    • rpk expects Docker to use its default socket. To configure in Docker Desktop, go to Settings > Advanced, and enable Allow the default Docker socket to be used.

    • Set the RPK_* environment variables listed in the command’s output. The rpk commands use them when deploying the transform’s Wasm module for this container.

  2. Create the demo topics to produce and consume:

    rpk topic create demo-1 demo-2

Create a data transforms project

  1. Create and initialize a data transforms project:

    rpk transform init

    A successful command generates project files in your current directory:

    .
    ├── go.mod
    ├── go.sum
    ├── README.md
    ├── transform.go
    └── transform.yaml

    The transform.go file contains the transform logic, and the transform.yaml file configures the project.

    When creating a custom data transform, initialization steps can be done either in main (because it’s only run once at the start of the package) or in Golang’s standard predefined init() function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.
  2. Implement your project by adding transform logic to transform.go. The following examples show some basic transforms. Each example can be copied into the transform.go file.

    • Identity transform

    • Transcoder transform

    • Validation filter transform

    package main
    
    import (
    	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
    )
    
    // This example shows the basic usage of the package:
    // This transform does nothing but copy the same data from an
    // input topic to an output topic.
    func main() {
    	// Make sure to register your callback and perform other setup in main
    	redpanda.OnRecordWritten(identityTransform)
    }
    
    // This will be called for each record in the source topic.
    //
    // The output records returned will be written to the destination topic.
    func identityTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
    	return []redpanda.Record{e.Record()}, nil
    }
    package main
    
    import (
    	"bytes"
    	"encoding/csv"
    	"encoding/json"
    	"errors"
    	"io"
    	"strconv"
    
    	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
    )
    
    // This example shows a transform that converts CSV inputs into JSON outputs.
    func main() {
    	redpanda.OnRecordWritten(csvToJsonTransform)
    }
    
    type Foo struct {
    	A string `json:"a"`
    	B int    `json:"b"`
    }
    
    func csvToJsonTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) {
    	// The input data is a CSV (without a header row) that is the structure of:
    	// key, a, b
    	reader := csv.NewReader(bytes.NewReader(e.Record().Value))
    	// Improve performance by reusing the result slice.
    	reader.ReuseRecord = true
    	output := []redpanda.Record{}
    	for {
    		row, err := reader.Read()
    		if err == io.EOF {
    			break
    		} else if err != nil {
    			return nil, err
    		}
    		if len(row) != 3 {
    			return nil, errors.New("unexpected number of rows")
    		}
    		// Convert the last column into an int
    		b, err := strconv.Atoi(row[2])
    		if err != nil {
    			return nil, err
    		}
    		// Marshal our JSON value
    		f := Foo{
    			A: row[1],
    			B: b,
    		}
    		v, err := json.Marshal(&f)
    		if err != nil {
    			return nil, err
    		}
    		// Add our output record using the first column as the key.
    		output = append(output, redpanda.Record{
    			Key:   []byte(row[0]),
    			Value: v,
    		})
    
    	}
    	return output, nil
    }
    import (
    	"encoding/json"
    
    	"github.com/redpanda-data/redpanda/src/go/transform-sdk"
    )
    
    // This example shows a filter that outputs only valid JSON into the
    // output topic.
    func main() {
    	redpanda.OnRecordWritten(filterValidJson)
    }
    
    func filterValidJson(e redpanda.WriteEvent) ([]redpanda.Record, error) {
    	v := []redpanda.Record{}
    	if json.Valid(e.Record().Value) {
    		v = append(v, e.Record())
    	}
    	return v, nil
    }

Build and deploy the transform

  1. Build the transform into a WebAssembly module.

    rpk transform build
  2. Deploy the WebAssembly module to your cluster.

    rpk transform deploy --input-topic=demo-1 --output-topic=demo-2
  3. Validate that your transform is running:

    1. Produce a few records to the demo-1 topic.

      echo "foo\nbar" | rpk topic produce demo-1
    2. Consume from the demo-2 topic.

      rpk topic consume demo-2
      {
        "topic": "demo-2",
        "value": "foo",
        "timestamp": 1687545891433,
        "partition": 0,
        "offset": 0
      }
      {
        "topic": "demo-2",
        "value": "bar",
        "timestamp": 1687545892434,
        "partition": 0,
        "offset": 1
      }
You can see stdout and stderr from the broker’s logs. In the Docker container, use rpk container logs --filter=transform. Otherwise, see the broker’s stderr output stream.

Update to new release

Follow these steps to update your project and container as new technical previews of Redpanda data transforms are released:

  1. Repeat the step to install rpk.

  2. Purge your running container, then start the newly-downloaded container.

    rpk container purge
    rpk container start