Docs Self-Managed Develop Data Transforms Run Data Transforms Run Data Transforms in Linux This is documentation for Self-Managed v23.3. To view the latest available version of the docs, see v24.2. Run Data Transforms in Linux Data transforms is a beta feature. It is not supported for production deployments. Runtime behavior may change, and you may not be able to upgrade data transforms without changes. Beta features are available for users to test and provide feedback. Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types. Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform commands. You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers. See also: How Data Transforms Work and Limitations Prerequisites You must have the following: A Redpanda cluster running version 23.3. External access to the Kafka API and the Admin API. Development tools installed on your host machine: For Golang, you must have at least version 1.20 of Go. For Rust, you must have the latest stable version of Rust. The rpk command-line client. Install rpk on your host machine and configure it to connect to your Redpanda cluster. Limitations Transforms have no external access to disk or network resources. Only single record transforms is supported, but multiple output records from a single input record is supported. For aggregations, joins, or complex transformations, use Apache Flink. Only a single output topic is supported. Transforms have at-least-once delivery. When clients use the Kafka Transactions API on partitions of an input topic, transforms process only committed records. Because data transforms are powered by Wasm, transform functions can be authored in any language. However, a data transforms SDK currently is only available in Golang and Rust. Enable data transforms To enable data transforms, set the data_transforms_enabled cluster property to true: rpk cluster config set data_transforms_enabled true Restart all brokers: rpk redpanda stop rpk redpanda start Configure memory for data transforms Redpanda reserves memory for each transform function within the broker. You need enough memory for your input record and output record to be in memory at the same time. Set the following properties based on the number of functions you have and the amount of memory you anticipate needing. wasm_per_core_memory_reservation: Total amount of memory (in bytes) to reserve per shard for all Wasm VMs. Default = 20 MiB. Requires restart. wasm_per_function_memory_limit: Amount of memory (in bytes) to reserve per instance of a Wasm VM. Default = 2 MiB. Requires restart. For example, to set wasm_per_core_memory_reservation to 40 MiB: rpk cluster config set wasm_per_core_memory_reservation=41943040 The maximum number of functions that can be deployed to a cluster is equal to wasm_per_core_memory_reservation / wasm_per_function_memory_limit. When that limit is hit, Redpanda cannot allocate memory for the VM and the transforms stay in error states. See also: How Data Transforms Work Create a data transforms project Go Rust Create and initialize a data transforms project: rpk transform init --language=tinygo If you do not include the --language flag, the transform init command will prompt you for the language. A successful command generates project files in your current directory: . ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml The transform.go file contains the transform logic, and the transform.yaml file specifies the transform’s configuration. When creating a custom data transform, initialization steps can be done either in main (because it’s only run once at the start of the package) or in Go’s standard predefined init() function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost. Implement your project by adding transform logic. The following examples show some basic transforms. Each example can be copied into the transform.go file. Identity transform Transcoder transform Validation filter transform package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows the basic usage of the package: // This transform does nothing but copy the same data from an // input topic to an output topic. func main() { // Make sure to register your callback and perform other setup in main transform.OnRecordWritten(identityTransform) } // This will be called for each record in the source topic. // // The output records returned will be written to the destination topic. func identityTransform(e transform.WriteEvent, w transform.RecordWriter) error { return w.Write(e.Record()) } package main import ( "bytes" "encoding/csv" "encoding/json" "errors" "io" "strconv" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows a transform that converts CSV inputs into JSON outputs. func main() { transform.OnRecordWritten(csvToJsonTransform) } type Foo struct { A string `json:"a"` B int `json:"b"` } func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error { // The input data is a CSV (without a header row) that is the structure of: // key, a, b reader := csv.NewReader(bytes.NewReader(e.Record().Value)) // Improve performance by reusing the result slice. reader.ReuseRecord = true for { row, err := reader.Read() if err == io.EOF { break } else if err != nil { return err } if len(row) != 3 { return errors.New("unexpected number of rows") } // Convert the last column into an int b, err := strconv.Atoi(row[2]) if err != nil { return err } // Marshal our JSON value f := Foo{ A: row[1], B: b, } v, err := json.Marshal(&f) if err != nil { return err } // Add our output record using the first column as the key. r := transform.Record{ Key: []byte(row[0]), Value: v, } if err := w.Write(r); err != nil { return err } } return nil } import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows a filter that outputs only valid JSON into the // output topic. func main() { transform.OnRecordWritten(filterValidJson) } func filterValidJson(e transform.WriteEvent, w transform.RecordWriter) error { if json.Valid(e.Record().Value) { return w.Write(e.Record()) } return nil } Create and initialize a data transforms project: rpk transform init --language=rust If you do not include the --language flag, the transform init command will prompt you for the language. A successful command generates project files in your current directory: . ├── Cargo.lock ├── Cargo.toml ├── README.md ├── src │ └── main.rs └── transform.yaml The src/main.rs file contains the transform logic, and the transform.yaml file specifies the transform’s configuration. When creating a custom data transform, initialization steps can be done in main() because it’s only run once at the start of the package. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost. The following examples show some basic transforms. Each example can be copied into the main.rs file. Identity transform Transcoder transform Validation filter transform use anyhow::Result; use redpanda_transform_sdk::*; // This example shows the basic usage of the crate: // This transform does nothing but copy the same data from an // input topic to an output topic. fn main() { // Make sure to register your callback and perform other setup in main on_record_written(my_transform); } // This will be called for each record in the source topic. // // The output records returned will be written to the destination topic. fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { writer.write(event.record)?; Ok(()) } use anyhow::Result; use redpanda_transform_sdk::*; use serde::{Deserialize, Serialize}; // This example shows a transform that converts CSV inputs into JSON outputs. fn main() { on_record_written(my_transform); } #[derive(Serialize, Deserialize)] struct Foo { a: String, b: i32, } fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { // The input data is a CSV (without a header row) that is defined as our Foo structure. let mut reader = csv::Reader::from_reader(event.record.value().unwrap_or_default()); // For each record in our CSV for result in reader.deserialize() { let foo: Foo = result?; // Convert it to JSON let value = serde_json::to_vec(&foo)?; // Then output it with the same key. writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?; } Ok(()) } use anyhow::Result; use redpanda_transform_sdk::*; // This example shows a filter that outputs only valid JSON to the output topic. fn main() { on_record_written(filter_valid_json); } fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { let value = event.record.value().unwrap_or_default(); if serde_json::from_slice::<serde_json::Value>(value).is_ok() { writer.write(event.record)?; } Ok(()) } Build and deploy the transform Build the transform into a Wasm module with metadata: rpk transform build Create demo topics to apply the transform function to: rpk topic create demo-1 demo-2 Deploy the Wasm module to your cluster. For example, with the identity transform: rpk transform deploy --input-topic=demo-1 --output-topic=demo-2 Validate that your transform is running. For example: Produce a few records to the demo-1 topic. echo "foo\nbar" | rpk topic produce demo-1 Consume from the demo-2 topic. rpk topic consume demo-2 { "topic": "demo-2", "value": "foo", "timestamp": 1687545891433, "partition": 0, "offset": 0 } { "topic": "demo-2", "value": "bar", "timestamp": 1687545892434, "partition": 0, "offset": 1 } You can see STDOUT and STDERR for your function in the broker’s logs. Monitor data transforms You can monitor your transforms with the following metrics: transform_execution_latency_sec transform_execution_errors wasm_engine_cpu_seconds_total wasm_engine_memory_usage wasm_engine_max_memory wasm_binary_executable_memory_usage transform_read_bytes transform_write_bytes transform_lag transform_failures transform_state See Public Metrics Reference Suggested reading How Data Transforms Work Golang SDK for Data Transforms Rust SDK for Data Transforms rpk transform commands Suggested labs Flatten JSON MessagesConvert JSON Messages into AvroTransform JSON Messages into a New Topic using JQFilter Messages into a New Topic using a RegexConvert Timestamps using RustRedact Information in JSON MessagesSee moreSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Run Data Transforms Run Data Transforms in Kubernetes