# Run Data Transforms in Kubernetes

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [streaming-full.txt](https://docs.redpanda.com/streaming-full.txt)

---
title: Run Data Transforms in Kubernetes
latest-operator-version: v26.1.4
# EOL = End-of-Life (support lifecycle status)
page-is-nearing-eol: "false"
page-is-past-eol: "true"
page-eol-date: April 30, 2025
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
docname: data-transforms/k-run-transforms
page-component-name: streaming
page-version: "24.1"
page-component-version: "24.1"
page-component-title: Streaming
page-relative-src-path: data-transforms/k-run-transforms.adoc
page-edit-url: https://github.com/redpanda-data/docs/edit/v/24.1/modules/develop/pages/data-transforms/k-run-transforms.adoc
description: Learn how to build and deploy WebAssembly data transforms in Kubernetes deployments.
page-git-created-date: "2023-12-22"
page-git-modified-date: "2024-05-01"
support-status: past end-of-life
---

<!-- Source: https://docs.redpanda.com/streaming/24.1/develop/data-transforms/k-run-transforms.md -->

Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.

Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with [`rpk transform`](https://docs.redpanda.com/streaming/24.1/reference/rpk/rpk-transform/rpk-transform/) commands.

You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers.

See also: [How Data Transforms Work](https://docs.redpanda.com/streaming/24.1/develop/data-transforms/how-transforms-work/) and [Limitations](#limitations)

## [](#prerequisites)Prerequisites

You must have the following:

-   [A Redpanda cluster](https://docs.redpanda.com/streaming/24.1/deploy/) running version 24.1.

-   External access to the Kafka API and the Admin API.

    Ensure that your Redpanda cluster has [external access](https://docs.redpanda.com/streaming/24.1/manage/kubernetes/networking/external/) enabled and is accessible from your host machine using the advertised addresses.

    > 💡 **TIP**
    >
    > For a tutorial on setting up a Redpanda cluster with external access, see [Get Started with Redpanda in Kubernetes](https://docs.redpanda.com/streaming/24.1/deploy/deployment-option/self-hosted/kubernetes/get-started-dev/).

-   Development tools installed on your host machine:

    -   For Golang, you must have at least version 1.20 of [Go](https://go.dev/doc/install).

    -   For Rust, you must have the latest stable version of [Rust](https://rustup.rs/).


-   The [`rpk` command-line client](https://docs.redpanda.com/streaming/24.1/get-started/rpk-install/).

    Install `rpk` on your host machine and configure it to connect to your Redpanda cluster.

    You can use a [pre-configured `rpk` profile](https://docs.redpanda.com/streaming/24.1/manage/kubernetes/networking/k-connect-to-redpanda/#rpk-profile):

    ```bash
    rpk profile create --from-profile <(kubectl get configmap --namespace <namespace> redpanda-rpk -o go-template='{{ .data.profile }}') <profile-name>
    ```

    Replace `<profile-name>` with the name that you want to give this `rpk` profile.


## [](#limitations)Limitations

-   Transforms have no external access to disk or network resources.

-   Only single record transforms is supported, but multiple output records from a single input record is supported. For aggregations, joins, or complex transformations, use Apache Flink.

-   Up to 8 output topics are supported.

-   Transforms have at-least-once delivery.

-   When clients use the Kafka Transactions API on partitions of an input topic, transforms process only committed records.

-   Because data transforms are powered by Wasm, transform functions can be authored in any language. However, a data transforms SDK currently is only available in [Golang](https://docs.redpanda.com/streaming/24.1/reference/data-transform-golang-sdk/) and [Rust](https://docs.redpanda.com/streaming/24.1/reference/data-transform-rust-sdk/).


## [](#enable-data-transforms)Enable data transforms

1.  To enable data transforms, set the `data_transforms_enabled` cluster property to `true`:

    ```bash
    kubectl exec redpanda-0 -c redpanda -n <namespace> -- rpk cluster config set data_transforms_enabled true
    ```

2.  Restart all brokers:

    ```bash
    kubectl rollout restart statefulset redpanda --namespace=<namespace>
    ```

3.  Wait for all Pods to restart:

    ```bash
    kubectl rollout status statefulset redpanda --namespace=<namespace> --watch
    ```


## [](#configure-memory)Configure memory for data transforms

Redpanda reserves memory for each transform function within the broker. You need enough memory for your input record and output record to be in memory at the same time.

Set the following properties based on the number of functions you have and the amount of memory you anticipate needing.

-   `wasm_per_core_memory_reservation`: Total amount of memory (in bytes) to reserve per shard for all Wasm VMs. Default = 20 MiB. Requires restart.

-   `wasm_per_function_memory_limit`: Amount of memory (in bytes) to reserve per instance of a Wasm VM. Default = 2 MiB. Requires restart.


For example, to set `wasm_per_core_memory_reservation` to 40 MiB:

```bash
rpk cluster config set wasm_per_core_memory_reservation=41943040
```

The maximum number of functions that can be deployed to a cluster is equal to `wasm_per_core_memory_reservation` / `wasm_per_function_memory_limit`. When that limit is hit, Redpanda cannot allocate memory for the VM and the transforms stay in error states.

See also: [How Data Transforms Work](https://docs.redpanda.com/streaming/24.1/develop/data-transforms/how-transforms-work/)

## [](#create-a-data-transforms-project)Create a data transforms project

### Go

1.  Create and initialize a data transforms project:

    ```bash
    rpk transform init --language=tinygo
    ```

    If you do not include the `--language` flag, the `transform init` command will prompt you for the language.

    A successful command generates project files in your current directory:

    .
    ├── go.mod
    ├── go.sum
    ├── README.md
    ├── transform.go
    └── transform.yaml

    The `transform.go` file contains the transform logic, and the `transform.yaml` file specifies the transform’s configuration.

    > 💡 **TIP**
    >
    > When creating a custom data transform, initialization steps can be done either in `main` (because it’s only run once at the start of the package) or in Go’s standard predefined `init()` function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.

2.  Implement your project by adding transform logic.

    The following examples show some basic transforms. Each example can be copied into the `transform.go` file.
#### Identity transform

```go
package main

import (
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows the basic usage of the package:
// This transform does nothing but copy the same data from an
// input topic to an output topic.
func main() {
	// Make sure to register your callback and perform other setup in main
	transform.OnRecordWritten(identityTransform)
}

// This will be called for each record in the source topic.
//
// The output records returned will be written to the destination topic.
func identityTransform(e transform.WriteEvent, w transform.RecordWriter) error {
	return w.Write(e.Record())
}
```

#### Transcoder transform

```go
package main

import (
	"bytes"
	"encoding/csv"
	"encoding/json"
	"errors"
	"io"
	"strconv"

	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows a transform that converts CSV inputs into JSON outputs.
func main() {
	transform.OnRecordWritten(csvToJsonTransform)
}

type Foo struct {
	A string `json:"a"`
	B int    `json:"b"`
}

func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error {
	// The input data is a CSV (without a header row) that is the structure of:
	// key, a, b
	reader := csv.NewReader(bytes.NewReader(e.Record().Value))
	// Improve performance by reusing the result slice.
	reader.ReuseRecord = true
	for {
		row, err := reader.Read()
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}
		if len(row) != 3 {
			return errors.New("unexpected number of rows")
		}
		// Convert the last column into an int
		b, err := strconv.Atoi(row[2])
		if err != nil {
			return err
		}
		// Marshal our JSON value
		f := Foo{
			A: row[1],
			B: b,
		}
		v, err := json.Marshal(&f)
		if err != nil {
			return err
		}
		// Add our output record using the first column as the key.
		r := transform.Record{
			Key:   []byte(row[0]),
			Value: v,
		}
		if err := w.Write(r); err != nil {
			return err
		}
	}
	return nil
}
```

#### Validation filter transform

```go
import (
	"encoding/json"

	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows a filter that outputs only valid JSON into the
// output topic.
func main() {
	transform.OnRecordWritten(filterValidJson)
}

func filterValidJson(e transform.WriteEvent, w transform.RecordWriter) error {
	if json.Valid(e.Record().Value) {
		return w.Write(e.Record())
	}
	// Send invalid records to separate topic
	return w.Write(e.Record(), transform.ToTopic("invalid-json"))
}
```

### Rust

1.  Create and initialize a data transforms project:

    ```bash
    rpk transform init --language=rust
    ```

    If you do not include the `--language` flag, the `transform init` command will prompt you for the language.

    A successful command generates project files in your current directory:

    .
    ├── Cargo.lock
    ├── Cargo.toml
    ├── README.md
    ├── src
    │  └── main.rs
    └── transform.yaml

    The `src/main.rs` file contains the transform logic, and the `transform.yaml` file specifies the transform’s configuration.

    > 💡 **TIP**
    >
    > When creating a custom data transform, initialization steps can be done in `main()` because it’s only run once at the start of the package. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.

    The following examples show some basic transforms. Each example can be copied into the `main.rs` file.
#### Identity transform

```rust
use anyhow::Result;
use redpanda_transform_sdk::*;

// This example shows the basic usage of the crate:
// This transform does nothing but copy the same data from an
// input topic to an output topic.
fn main() {
    // Make sure to register your callback and perform other setup in main
    on_record_written(my_transform);
}

// This will be called for each record in the source topic.
//
// The output records returned will be written to the destination topic.
fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
    writer.write(event.record)?;
    Ok(())
}
```

#### Transcoder transform

```rust
use anyhow::Result;
use redpanda_transform_sdk::*;
use serde::{Deserialize, Serialize};

// This example shows a transform that converts CSV inputs into JSON outputs.
fn main() {
    on_record_written(my_transform);
}

#[derive(Serialize, Deserialize)]
struct Foo {
    a: String,
    b: i32,
}

fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
    // The input data is a CSV (without a header row) that is defined as our Foo structure.
    let mut reader = csv::Reader::from_reader(event.record.value().unwrap_or_default());
    // For each record in our CSV
    for result in reader.deserialize() {
        let foo: Foo = result?;
        // Convert it to JSON
        let value = serde_json::to_vec(&foo)?;
        // Then output it with the same key.
        writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?;
    }
    Ok(())
}
```

#### Validation filter transform

```rust
use anyhow::Result;
use redpanda_transform_sdk::*;

// This example shows a filter that outputs only valid JSON to the output topic.
fn main() {
    on_record_written(filter_valid_json);
}

fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
    let value = event.record.value().unwrap_or_default();
    if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
        writer.write(event.record)?;
    } else {
        // Send invalid records to separate topic
        write.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?;
    }
    Ok(())
}
```

## [](#build-and-deploy-the-transform)Build and deploy the transform

1.  Build the transform into a Wasm module with metadata:

    ```bash
    rpk transform build
    ```

2.  Create demo topics to apply the transform function to:

    ```bash
    rpk topic create demo-1 demo-2
    ```

3.  Deploy the Wasm module to your cluster. For example, with the identity transform:

    ```bash
    rpk transform deploy --input-topic=demo-1 --output-topic=demo-2
    ```

4.  Validate that your transform is running. For example:

    1.  Produce a few records to the `demo-1` topic.

        ```bash
        echo "foo\nbar" | rpk topic produce demo-1
        ```

    2.  Consume from the `demo-2` topic.

        ```bash
        rpk topic consume demo-2
        ```

        ```json
        {
          "topic": "demo-2",
          "value": "foo",
          "timestamp": 1687545891433,
          "partition": 0,
          "offset": 0
        }
        {
          "topic": "demo-2",
          "value": "bar",
          "timestamp": 1687545892434,
          "partition": 0,
          "offset": 1
        }
        ```



## [](#view-data-transform-logs)View data transform logs

Runtime logs for transforms are written to an internal topic called `_redpanda.transform_logs`.

To view the logs, run the following `rpk` command:

```bash
rpk transform logs <transform_name>
```

Or, in the Redpanda Console, go to **Topics**, and make sure to check **Show internal topics** to see `_redpanda.transform_logs` in the topic list.

For usage details, see the [`rpk transform logs` reference](https://docs.redpanda.com/streaming/24.1/reference/rpk/rpk-transform/rpk-transform-logs/).

## [](#monitor-data-transforms)Monitor data transforms

You can monitor your transforms with the following metrics:

-   `transform_execution_latency_sec`

-   `transform_execution_errors`

-   `wasm_engine_cpu_seconds_total`

-   `wasm_engine_memory_usage`

-   `wasm_engine_max_memory`

-   `wasm_binary_executable_memory_usage`

-   `transform_read_bytes`

-   `transform_write_bytes`

-   `transform_lag`

-   `transform_failures`

-   `transform_state`


See [Public Metrics](https://docs.redpanda.com/streaming/24.1/reference/public-metrics-reference/)

## [](#suggested-reading)Suggested reading

-   [How Data Transforms Work](https://docs.redpanda.com/streaming/24.1/develop/data-transforms/how-transforms-work/)

-   [Golang SDK for Data Transforms](https://docs.redpanda.com/streaming/24.1/reference/data-transform-golang-sdk/)

-   [Rust SDK for Data Transforms](https://docs.redpanda.com/streaming/24.1/reference/data-transform-rust-sdk/)

-   [`rpk transform` commands](https://docs.redpanda.com/streaming/24.1/reference/rpk/rpk-transform/rpk-transform/)


## Suggested labs

-   [Flatten JSON Messages](https://docs.redpanda.com/labs/data-transforms/flatten-go/)
-   [Convert JSON Messages into Avro](https://docs.redpanda.com/labs/data-transforms/issdemo-go/)
-   [Filter Messages into a New Topic using a Regex](https://docs.redpanda.com/labs/data-transforms/regex-go/)
-   [Convert Timestamps using Rust](https://docs.redpanda.com/labs/data-transforms/ts-converter-rust/)
-   [Redact Information in JSON Messages](https://docs.redpanda.com/labs/data-transforms/redaction-go/)

[Search all labs](https://docs.redpanda.com/labs)