Data Transforms Sandbox
Technical preview features are not supported for production deployments. |
Redpanda data transforms provide a framework to create, build, and deploy inline data transformations on data written to Redpanda topics. You can develop custom data functions, which run asynchronously using a WebAssembly (Wasm) engine inside a Redpanda broker. A transform function processes every message produced to an input topic and returns one or more messages that are then produced to an output topic.
With data transforms, you can perform common data streaming tasks like filtering, scrubbing/cleaning/redacting, and normalizing and transcoding (for example, converting JSON to Avro). This is done efficiently within Redpanda, without the overhead of sending data to a separate stream processing environment or pushing this work onto consuming applications. Data transforms dramatically simplify the effort of delivering validated and pre-processed data to consumers in the form they expect. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.
This technical preview provides a Docker container with a single broker to test Redpanda data transforms. You can develop data transforms with rpk transform
commands. This page provides steps to initialize a transform project with rpk transform init
, edit the transform logic in transform.go
, then build and deploy the transform to a Redpanda cluster.
Share your feedback in the #wasm-transforms Redpanda Community Slack channel or in the feedback form.
Run data transforms
Follow these steps to set up, create, build, and deploy a Redpanda data transforms project in Golang.
Limitations
-
Golang WebAssembly transforms are compiled using tinygo. Some library modules may not work in this environment; for example, generated protocol buffer code is not supported.
-
The network and local disk are not accessible in this technical preview version.
Install rpk
-
Download the technical preview of
rpk
that supports data transforms for your architecture.-
Apple silicon
-
Apple x86-64
-
Linux x86-64
-
Linux Arm64
curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-darwin-arm64.tar.gz
curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-darwin-amd64.tar.gz
curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-linux-amd64.tar.gz
curl -SLO https://dl.redpanda.com/i5iqIdFA2acjP1CQ/redpanda-wasm-transforms-tech-preview/raw/versions/0.0.0-20230830git604fcce/rpk-linux-arm64.tar.gz
-
-
Unzip the downloaded bundle to get the custom
rpk
binary. -
Set the custom
rpk
binary in yourPATH
.
Set up Redpanda
-
Start a Redpanda container with the custom
rpk
binary with data transforms enabled:rpk container start
Checking for a local image... Creating network redpanda Starting cluster... Waiting for the cluster to be ready... Cluster started! NODE-ID STATUS KAFKA-ADDRESS ADMIN-ADDRESS PROXY-ADDRESS 0 running 127.0.0.1:56086 127.0.0.1:56085 127.0.0.1:56087 You can use rpk to interact with this cluster. E.g: rpk cluster info -X brokers=127.0.0.1:56086 rpk cluster health -X admin.hosts=127.0.0.1:56085 You may also set an environment variable with the comma-separated list of broker and admin API addresses: export RPK_BROKERS="127.0.0.1:56086" export RPK_ADMIN_HOSTS="127.0.0.1:56085" rpk cluster info rpk cluster health
-
rpk
expects Docker to use its default socket. To configure in Docker Desktop, go to Settings > Advanced, and enable Allow the default Docker socket to be used. -
Set the
RPK_*
environment variables listed in the command’s output. Therpk
commands use them when deploying the transform’s Wasm module for this container.
-
-
Create the demo topics to produce and consume:
rpk topic create demo-1 demo-2
Create a data transforms project
-
Create and initialize a data transforms project:
rpk transform init
A successful command generates project files in your current directory:
. ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml
The
transform.go
file contains the transform logic, and thetransform.yaml
file configures the project.When creating a custom data transform, initialization steps can be done either in main
(because it’s only run once at the start of the package) or in Golang’s standard predefinedinit()
function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost. -
Implement your project by adding transform logic to
transform.go
. The following examples show some basic transforms. Each example can be copied into thetransform.go
file.-
Identity transform
-
Transcoder transform
-
Validation filter transform
package main import ( "github.com/redpanda-data/redpanda/src/go/transform-sdk" ) // This example shows the basic usage of the package: // This transform does nothing but copy the same data from an // input topic to an output topic. func main() { // Make sure to register your callback and perform other setup in main redpanda.OnRecordWritten(identityTransform) } // This will be called for each record in the source topic. // // The output records returned will be written to the destination topic. func identityTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) { return []redpanda.Record{e.Record()}, nil }
package main import ( "bytes" "encoding/csv" "encoding/json" "errors" "io" "strconv" "github.com/redpanda-data/redpanda/src/go/transform-sdk" ) // This example shows a transform that converts CSV inputs into JSON outputs. func main() { redpanda.OnRecordWritten(csvToJsonTransform) } type Foo struct { A string `json:"a"` B int `json:"b"` } func csvToJsonTransform(e redpanda.WriteEvent) ([]redpanda.Record, error) { // The input data is a CSV (without a header row) that is the structure of: // key, a, b reader := csv.NewReader(bytes.NewReader(e.Record().Value)) // Improve performance by reusing the result slice. reader.ReuseRecord = true output := []redpanda.Record{} for { row, err := reader.Read() if err == io.EOF { break } else if err != nil { return nil, err } if len(row) != 3 { return nil, errors.New("unexpected number of rows") } // Convert the last column into an int b, err := strconv.Atoi(row[2]) if err != nil { return nil, err } // Marshal our JSON value f := Foo{ A: row[1], B: b, } v, err := json.Marshal(&f) if err != nil { return nil, err } // Add our output record using the first column as the key. output = append(output, redpanda.Record{ Key: []byte(row[0]), Value: v, }) } return output, nil }
import ( "encoding/json" "github.com/redpanda-data/redpanda/src/go/transform-sdk" ) // This example shows a filter that outputs only valid JSON into the // output topic. func main() { redpanda.OnRecordWritten(filterValidJson) } func filterValidJson(e redpanda.WriteEvent) ([]redpanda.Record, error) { v := []redpanda.Record{} if json.Valid(e.Record().Value) { v = append(v, e.Record()) } return v, nil }
-
Build and deploy the transform
-
Build the transform into a WebAssembly module.
rpk transform build
-
Deploy the WebAssembly module to your cluster.
rpk transform deploy --input-topic=demo-1 --output-topic=demo-2
-
Validate that your transform is running:
-
Produce a few records to the
demo-1
topic.echo "foo\nbar" | rpk topic produce demo-1
-
Consume from the
demo-2
topic.rpk topic consume demo-2
{ "topic": "demo-2", "value": "foo", "timestamp": 1687545891433, "partition": 0, "offset": 0 } { "topic": "demo-2", "value": "bar", "timestamp": 1687545892434, "partition": 0, "offset": 1 }
-
You can see stdout and stderr from the broker’s logs. In the Docker container, use rpk container logs --filter=transform . Otherwise, see the broker’s stderr output stream.
|
Update to new release
Follow these steps to update your project and container as new technical previews of Redpanda data transforms are released:
-
Repeat the step to install
rpk
. -
Purge your running container, then start the newly-downloaded container.
rpk container purge rpk container start