Run Data Transforms in Kubernetes

Data transforms is a beta feature. It is not supported for production deployments. Runtime behavior may change, and you may not be able to upgrade data transforms without changes. Beta features are available for users to test and provide feedback.

Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.

Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform commands.

You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers.

Prerequisites

You must have the following:

  • A Redpanda cluster running version 23.3.

  • External access to the Kafka API and the Admin API.

    Ensure that your Redpanda cluster has external access enabled and is accessible from your host machine using the advertised addresses.

    For a tutorial on setting up a Redpanda cluster with external access, see Get Started with Redpanda in Kubernetes.
  • Development tools installed on your host machine:

    • For Golang, you must have at least version 1.20 of Go.

    • For Rust, you must have the latest stable version of Rust.

  • The rpk command-line client.

    Install rpk on your host machine and configure it to connect to your Redpanda cluster.

    rpk profile create --from-profile <(kubectl get configmap --namespace <namespace> redpanda-rpk -o go-template='{{ .data.profile }}') <profile-name>

    Replace <profile-name> with the name that you want to give this rpk profile.

Limitations

  • Transforms have no external access to disk or network resources.

  • Only single record transforms is supported, but multiple output records from a single input record is supported. For aggregations, joins, or complex transformations, use Apache Flink.

  • Only a single output topic is supported.

  • Transforms have at-least-once delivery.

  • When clients use the Kafka Transactions API on partitions of an input topic, transforms process only committed records.

  • Because data transforms are powered by Wasm, transform functions can be authored in any language. However, a data transforms SDK currently is only available in Golang and Rust.

Enable data transforms

  1. To enable data transforms, set the data_transforms_enabled cluster property to true:

    kubectl exec redpanda-0 -c redpanda -n <namespace> -- rpk cluster config set data_transforms_enabled true
  2. Restart all brokers:

    kubectl rollout restart statefulset redpanda --namespace=<namespace>
  3. Wait for all Pods to restart:

    kubectl rollout status statefulset redpanda --namespace=<namespace> --watch

Configure memory for data transforms

Redpanda reserves memory for each transform function within the broker. You need enough memory for your input record and output record to be in memory at the same time.

Set the following properties based on the number of functions you have and the amount of memory you anticipate needing.

  • wasm_per_core_memory_reservation: Total amount of memory (in bytes) to reserve per shard for all Wasm VMs. Default = 20 MiB. Requires restart.

  • wasm_per_function_memory_limit: Amount of memory (in bytes) to reserve per instance of a Wasm VM. Default = 2 MiB. Requires restart.

For example, to set wasm_per_core_memory_reservation to 40 MiB:

rpk cluster config set wasm_per_core_memory_reservation=41943040

The maximum number of functions that can be deployed to a cluster is equal to wasm_per_core_memory_reservation / wasm_per_function_memory_limit. When that limit is hit, Redpanda cannot allocate memory for the VM and the transforms stay in error states.

Create a data transforms project

  • Go

  • Rust

  1. Create and initialize a data transforms project:

    rpk transform init --language=tinygo

    If you do not include the --language flag, the transform init command will prompt you for the language.

    A successful command generates project files in your current directory:

    .
    ├── go.mod
    ├── go.sum
    ├── README.md
    ├── transform.go
    └── transform.yaml

    The transform.go file contains the transform logic, and the transform.yaml file specifies the transform’s configuration.

    When creating a custom data transform, initialization steps can be done either in main (because it’s only run once at the start of the package) or in Go’s standard predefined init() function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.
  2. Implement your project by adding transform logic.

    The following examples show some basic transforms. Each example can be copied into the transform.go file.

    • Identity transform

    • Transcoder transform

    • Validation filter transform

    package main
    
    import (
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    // This example shows the basic usage of the package:
    // This transform does nothing but copy the same data from an
    // input topic to an output topic.
    func main() {
    	// Make sure to register your callback and perform other setup in main
    	transform.OnRecordWritten(identityTransform)
    }
    
    // This will be called for each record in the source topic.
    //
    // The output records returned will be written to the destination topic.
    func identityTransform(e transform.WriteEvent, w transform.RecordWriter) error {
    	return w.Write(e.Record())
    }
    package main
    
    import (
    	"bytes"
    	"encoding/csv"
    	"encoding/json"
    	"errors"
    	"io"
    	"strconv"
    
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    // This example shows a transform that converts CSV inputs into JSON outputs.
    func main() {
    	transform.OnRecordWritten(csvToJsonTransform)
    }
    
    type Foo struct {
    	A string `json:"a"`
    	B int    `json:"b"`
    }
    
    func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error {
    	// The input data is a CSV (without a header row) that is the structure of:
    	// key, a, b
    	reader := csv.NewReader(bytes.NewReader(e.Record().Value))
    	// Improve performance by reusing the result slice.
    	reader.ReuseRecord = true
    	for {
    		row, err := reader.Read()
    		if err == io.EOF {
    			break
    		} else if err != nil {
    			return err
    		}
    		if len(row) != 3 {
    			return errors.New("unexpected number of rows")
    		}
    		// Convert the last column into an int
    		b, err := strconv.Atoi(row[2])
    		if err != nil {
    			return err
    		}
    		// Marshal our JSON value
    		f := Foo{
    			A: row[1],
    			B: b,
    		}
    		v, err := json.Marshal(&f)
    		if err != nil {
    			return err
    		}
    		// Add our output record using the first column as the key.
    		r := transform.Record{
    			Key:   []byte(row[0]),
    			Value: v,
    		}
    		if err := w.Write(r); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    import (
    	"encoding/json"
    
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    // This example shows a filter that outputs only valid JSON into the
    // output topic.
    func main() {
    	transform.OnRecordWritten(filterValidJson)
    }
    
    func filterValidJson(e transform.WriteEvent, w transform.RecordWriter) error {
    	if json.Valid(e.Record().Value) {
    		return w.Write(e.Record())
    	}
    	return nil
    }
  1. Create and initialize a data transforms project:

    rpk transform init --language=rust

    If you do not include the --language flag, the transform init command will prompt you for the language.

    A successful command generates project files in your current directory:

    .
    ├── Cargo.lock
    ├── Cargo.toml
    ├── README.md
    ├── src
    │  └── main.rs
    └── transform.yaml

    The src/main.rs file contains the transform logic, and the transform.yaml file specifies the transform’s configuration.

    When creating a custom data transform, initialization steps can be done in main() because it’s only run once at the start of the package. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.

    The following examples show some basic transforms. Each example can be copied into the main.rs file.

    • Identity transform

    • Transcoder transform

    • Validation filter transform

    use anyhow::Result;
    use redpanda_transform_sdk::*;
    
    // This example shows the basic usage of the crate:
    // This transform does nothing but copy the same data from an
    // input topic to an output topic.
    fn main() {
        // Make sure to register your callback and perform other setup in main
        on_record_written(my_transform);
    }
    
    // This will be called for each record in the source topic.
    //
    // The output records returned will be written to the destination topic.
    fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
        writer.write(event.record)?;
        Ok(())
    }
    use anyhow::Result;
    use redpanda_transform_sdk::*;
    use serde::{Deserialize, Serialize};
    
    // This example shows a transform that converts CSV inputs into JSON outputs.
    fn main() {
        on_record_written(my_transform);
    }
    
    #[derive(Serialize, Deserialize)]
    struct Foo {
        a: String,
        b: i32,
    }
    
    fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
        // The input data is a CSV (without a header row) that is defined as our Foo structure.
        let mut reader = csv::Reader::from_reader(event.record.value().unwrap_or_default());
        // For each record in our CSV
        for result in reader.deserialize() {
            let foo: Foo = result?;
            // Convert it to JSON
            let value = serde_json::to_vec(&foo)?;
            // Then output it with the same key.
            writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?;
        }
        Ok(())
    }
    use anyhow::Result;
    use redpanda_transform_sdk::*;
    
    // This example shows a filter that outputs only valid JSON to the output topic.
    fn main() {
        on_record_written(filter_valid_json);
    }
    
    fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
        let value = event.record.value().unwrap_or_default();
        if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
            writer.write(event.record)?;
        }
        Ok(())
    }

Build and deploy the transform

  1. Build the transform into a Wasm module with metadata:

    rpk transform build
  2. Create demo topics to apply the transform function to:

    rpk topic create demo-1 demo-2
  3. Deploy the Wasm module to your cluster. For example, with the identity transform:

    rpk transform deploy --input-topic=demo-1 --output-topic=demo-2
  4. Validate that your transform is running. For example:

    1. Produce a few records to the demo-1 topic.

      echo "foo\nbar" | rpk topic produce demo-1
    2. Consume from the demo-2 topic.

      rpk topic consume demo-2
      {
        "topic": "demo-2",
        "value": "foo",
        "timestamp": 1687545891433,
        "partition": 0,
        "offset": 0
      }
      {
        "topic": "demo-2",
        "value": "bar",
        "timestamp": 1687545892434,
        "partition": 0,
        "offset": 1
      }

You can see STDOUT and STDERR for your function in the broker’s logs.

Monitor data transforms

You can monitor your transforms with the following metrics:

  • transform_execution_latency_sec

  • transform_execution_errors

  • wasm_engine_cpu_seconds_total

  • wasm_engine_memory_usage

  • wasm_engine_max_memory

  • wasm_binary_executable_memory_usage

  • transform_read_bytes

  • transform_write_bytes

  • transform_lag

  • transform_failures

  • transform_state