Data Transforms in Kubernetes Quickstart

Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.

Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform commands.

You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers.

Prerequisites

You must have the following:

  • A Redpanda cluster running at least version 24.3.

  • External access to the Kafka API and the Admin API.

    Ensure that your Redpanda cluster has external access enabled and is accessible from your host machine using the advertised addresses.

    For a tutorial on setting up a Redpanda cluster with external access, see Get Started with Redpanda in Kubernetes.
  • Development tools installed on your host machine:

    • For Golang, you must have at least version 1.20 of Go.

    • For Rust, you must have the latest stable version of Rust.

  • The rpk command-line client installed on your host machine and configured to connect to your Redpanda cluster.

    • For JavaScript and TypeScript projects, you must have the latest long-term-support release of Node.js.

      rpk profile create --from-profile <(kubectl get configmap --namespace <namespace> redpanda-rpk -o go-template='{{ .data.profile }}') <profile-name>

      Replace <profile-name> with the name that you want to give this rpk profile.

Enable data transforms

Data transforms is disabled on all clusters by default. Before you can deploy data transforms to a cluster, you must first enable the feature.

  1. To enable data transforms, set the data_transforms_enabled cluster property to true:

    • Helm + Operator

    • Helm

    redpanda-cluster.yaml
    apiVersion: cluster.redpanda.com/v1alpha2
    kind: Redpanda
    metadata:
      name: redpanda
    spec:
      chartRef: {}
      clusterSpec:
        config:
          cluster:
            data_transforms_enabled: true
    kubectl apply -f redpanda-cluster.yaml --namespace <namespace>
    • --values

    • --set

    write-caching.yaml
    config:
      cluster:
        data_transforms_enabled: true
    helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \
      --values write-caching.yaml --reuse-values
    helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \
      --set config.cluster.data_transforms_enabled=true
  2. Restart all brokers:

    kubectl rollout restart statefulset redpanda --namespace=<namespace>
  3. Wait for all Pods to restart:

    kubectl rollout status statefulset redpanda --namespace=<namespace> --watch

Create a data transforms project

The easiest way to create a new data transforms project is to use the rpk transform init command. This command generates template files and installs any dependencies for your chosen language.

Create and initialize a data transforms project:

  • Go

  • Rust

  • JavaScript

rpk transform init --language=tinygo --name=data-transforms-tutorial

A successful command generates project files in your current directory:

.
├── go.mod
├── go.sum
├── README.md
├── transform.go
└── transform.yaml

The transform.go file is the source file for your transform function.

The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command.

rpk transform init --language=rust --name=data-transforms-tutorial

A successful command generates project files in your current directory:

.
├── Cargo.lock
├── Cargo.toml
├── README.md
├── src
│  └── main.rs
└── transform.yaml

The src/main.rs file is the source file for your transform function.

The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command.

rpk transform init --language=javascript --name=data-transforms-tutorial

A successful command generates project files in your current directory:

.
├── README.md
├── esbuild.js
├── node_modules
├── package-lock.json
├── package.json
├── src
│   └── index.js
└── transform.yaml

The src/index.js file is the source file for your transform function.

The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command.

The esbuild.js file is the build script for your project. This file configures the build process using esbuild, a fast JavaScript bundler. It ensures that your code is bundled correctly and includes any necessary polyfills for Node.js standard modules that are not natively available in the Redpanda JavaScript runtime environment.

Now that you have a project set up, you can run some examples to learn how to work with data transforms. Make sure to copy the provided transform functions and paste them into your source file. For example, the transform.go file for Go projects, or the src/main.rs file for Rust.

Run examples

This section provides some examples of transform functions to teach you the basics of writing and deploying data transforms. It’s best to try each example in order, one after the other.

Copy records from one topic to another

This transform function copies the same data from an input topic to an output topic.

  1. Paste this transform function into your source file:

    • Go

    • Rust

    • JavaScript

    package main
    
    import (
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	// Make sure to register your callback and perform other setup in main
    	transform.OnRecordWritten(copyRecordsToOutput)
    }
    
    // This will be called for each record in the input topic.
    // The records returned will be written to the output topic.
    func copyRecordsToOutput(event transform.WriteEvent, writer transform.RecordWriter) error {
    	return writer.Write(event.Record())
    }
    use anyhow::Result;
    use redpanda_transform_sdk::*;
    
    fn main() {
    	// Make sure to register your callback and perform other setup in main
    	on_record_written(copy_records_to_output);
    }
    
    // This will be called for each record in the input topic.
    // The records returned will be written to the output topic.
    fn copy_records_to_output(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
    	writer.write(event.record)?;
    	Ok(())
    }
    import { onRecordWritten } from "@redpanda-data/transform-sdk";
    
    // Register your callback function in the entry point of your script.
    onRecordWritten(copyRecordsToOutput);
    
    // This function will be called for each record in the input topic.
    // The records returned will be written to the output topic.
    function copyRecordsToOutput(event, writer) {
      writer.write(event.record);
    }
  2. Build the transform into a Wasm binary:

    rpk transform build
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic
  4. Deploy the Wasm binary to your cluster:

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic
  5. Produce two new records to the input topic.

    echo "hello\nworld" | rpk topic produce input-topic
  6. Open Redpanda Console and check the records in both the input topic and the output topic. They should be the same.

    You can also verify the content of the output topic in the command-line:

    rpk topic consume output-topic

Convert CSV input to JSON output

This example is a transform function that converts CSV inputs into JSON outputs.

  1. Prepare the project files:

    • Go

    • Rust

    • JavaScript

    Paste this transform function into your source file:

    package main
    
    import (
    	"bytes"
    	"encoding/csv"
    	"encoding/json"
    	"errors"
    	"io"
    	"strconv"
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	transform.OnRecordWritten(csvToJsonTransform)
    }
    
    type ItemQuantity struct {
    	Item     string `json:"item"`
    	Quantity int    `json:"quantity"`
    }
    
    func csvToJsonTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
    	// The input data is a CSV (without a header row) that is structured as:
    	// key, item, quantity
    	reader := csv.NewReader(bytes.NewReader(event.Record().Value))
    	// Improve performance by reusing the result slice.
    	reader.ReuseRecord = true
    	for {
    		row, err := reader.Read()
    		if err == io.EOF {
    			break
    		} else if err != nil {
    			return err
    		}
    		if len(row) != 3 {
    			return errors.New("unexpected number of rows")
    		}
    		// Convert the last column into an int
    		quantity, err := strconv.Atoi(row[2])
    		if err != nil {
    			return err
    		}
    		// Marshall the JSON value
    		iq := ItemQuantity{
    			Item:     row[1],
    			Quantity: quantity,
    		}
    		v, err := json.Marshal(&iq)
    		if err != nil {
    			return err
    		}
    		// Add the output record using the first column as the key.
    		r := transform.Record{
    			Key:   []byte(row[0]),
    			Value: v,
    		}
    		if err := writer.Write(r); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    1. Add the following dependencies to the Cargo.toml file:

      csv = "1.3.0"
      serde_json = "1.0.111"
      serde = { version = "1.0.195", features = ["derive"] }
    2. Run the following command to update your dependencies:

      cargo build
    3. Paste this transform function into your source file:

      use anyhow::Result;
      use redpanda_transform_sdk::*;
      use serde::{Deserialize, Serialize};
      use csv::ReaderBuilder;
      use serde_json;
      
      #[derive(Serialize, Deserialize)]
      struct MarketStock {
      	item: String,
      	quantity: i32,
      }
      
      fn main() {
      	on_record_written(csv_to_json_transform);
      }
      
      fn csv_to_json_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
      	// The input data is a CSV (without a header row) that is defined as the MarketStock structure.
      	let mut reader = ReaderBuilder::new().has_headers(false).from_reader(event.record.value().unwrap_or_default());
      	// For each record in our CSV
      	for result in reader.deserialize() {
      		let stock: MarketStock = match result {
      			Ok(record) => record,
      			Err(err) => {
      				eprintln!("CSV deserialize error: {}", err);
      				continue; // Skip the invalid record and continue processing
      			}
      		};
      		// Convert it to JSON
      		let value = serde_json::to_vec(&stock)?;
      		// Then output it with the same key.
      		writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?;
      	}
      	Ok(())
      }

    Paste this transform function into your source file:

    import { onRecordWritten } from "@redpanda-data/transform-sdk";
    
    onRecordWritten(csvToJsonTransform);
    
    function csvToJsonTransform(event, writer) {
      // The input data is a CSV (without a header row) that is structured as:
      // key, item, quantity
      const input = event.record.value.text();
      const rows = input.split('\n');
    
      for (const row of rows) {
        const columns = row.split(',');
    
        if (columns.length !== 2) {
          throw new Error('unexpected number of columns');
        }
    
        const quantity = parseInt(columns[1], 10);
        if (isNaN(quantity)) {
          throw new Error('invalid quantity');
        }
    
        const itemQuantity = {
          item: columns[0],
          quantity: quantity,
        };
        event.record.value = JSON.stringify(itemQuantity);
        writer.write(event.record);
      }
    }
  2. Build the transform into a Wasm binary:

    rpk transform build
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic
  4. Deploy the Wasm binary to your cluster.

    If you have already deployed another example, this new transform function will replace it.

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic
  5. Produce CSV records to the input topic.

    echo "apples,10\npears,11\noranges,5" | rpk topic produce input-topic -k market-stock
  6. Open Redpanda Console and check the records in both the input topic and the output topic. You should see the following values:

    {
    	"item": "oranges",
    	"quantity": 5
    }
    {
    	"item": "apples",
    	"quantity": 10
    }
    {
    	"item": "pears",
    	"quantity": 11
    }

    You can also verify the content of the output topic in the command-line:

    rpk topic consume output-topic

Validate JSON

This example is a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.

  1. Paste this transform function into your source file:

    • Go

    • Rust

    • JavaScript

    import (
    	"encoding/json"
    	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    )
    
    func main() {
    	transform.OnRecordWritten(filterValidJson)
    }
    
    func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
    	if json.Valid(event.Record().Value) {
    		return w.Write(e.Record())
    	}
    	// Send invalid records to separate topic
    	return writer.Write(e.Record(), transform.ToTopic("invalid-json"))
    }
    use anyhow::Result;
    use redpanda_transform_sdk::*;
    
    fn main() {
    	on_record_written(filter_valid_json);
    }
    
    fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
    	let value = event.record.value().unwrap_or_default();
    	if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
    		writer.write(event.record)?;
    	} else {
    		// Send invalid records to separate topic
    		writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?;
    	}
    	Ok(())
    }

    The JavaScript SDK does not support writing records to a specific output topic. As a result, this transform function writes only valid JSON messages to the output topic. Invalid messages are logged, instead of written to a separate output topic.

    import { onRecordWritten } from "@redpanda-data/transform-sdk";
    
    onRecordWritten(filterValidJson);
    
    function filterValidJson(event, writer) {
      const recordValue = event.record.value.text();
    
      if (isValidJson(recordValue)) {
        writer.write(event.record);
      } else {
        console.log('Invalid JSON detected')
      }
    }
    
    function isValidJson(str) {
      try {
        JSON.parse(str);
        return true;
      } catch (e) {
        return false;
      }
    }
  2. Build the transform into a Wasm binary:

    rpk transform build
  3. Create topics to apply the transform function to:

    rpk topic create input-topic output-topic invalid-json
  4. Deploy the Wasm binary to your cluster.

    If you have already deployed another example, this new transform function will replace it.

    rpk transform deploy --input-topic=input-topic --output-topic=output-topic --output-topic=invalid-json
  5. Produce an invalid JSON record a one valid one to the input topic.

    echo '{"valid":"json"}' | rpk topic produce input-topic -k json
    echo 'invalid json' | rpk topic produce input-topic -k json
  6. Verify the content of the output topic in the command-line:

    rpk topic consume output-topic

You should see only the invalid JSON from the input topic. Invalid JSON messages are written to the invalid-json topic.

Clean up

Your transform function will continue processing new records in the input topic until you delete it. To delete the transform function:

rpk transform delete data-transforms-tutorial --no-confirm