Docs Self-Managed Develop Data Transforms Get Started Kubernetes Data Transforms in Kubernetes Quickstart Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types. Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform commands. You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers. See also: How Data Transforms Work. Prerequisites You must have the following: A Redpanda cluster running at least version 24.3. External access to the Kafka API and the Admin API. Ensure that your Redpanda cluster has external access enabled and is accessible from your host machine using the advertised addresses. For a tutorial on setting up a Redpanda cluster with external access, see Get Started with Redpanda in Kubernetes. Development tools installed on your host machine: For Golang, you must have at least version 1.20 of Go. For Rust, you must have the latest stable version of Rust. The rpk command-line client installed on your host machine and configured to connect to your Redpanda cluster. For JavaScript and TypeScript projects, you must have the latest long-term-support release of Node.js. You can use a pre-configured rpk profile: rpk profile create --from-profile <(kubectl get configmap --namespace <namespace> redpanda-rpk -o go-template='{{ .data.profile }}') <profile-name> Replace <profile-name> with the name that you want to give this rpk profile. Enable data transforms Data transforms is disabled on all clusters by default. Before you can deploy data transforms to a cluster, you must first enable the feature. To enable data transforms, set the data_transforms_enabled cluster property to true: Helm + Operator Helm redpanda-cluster.yaml apiVersion: cluster.redpanda.com/v1alpha2 kind: Redpanda metadata: name: redpanda spec: chartRef: {} clusterSpec: config: cluster: data_transforms_enabled: true kubectl apply -f redpanda-cluster.yaml --namespace <namespace> --values --set write-caching.yaml config: cluster: data_transforms_enabled: true helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \ --values write-caching.yaml --reuse-values helm upgrade --install redpanda redpanda/redpanda --namespace <namespace> --create-namespace \ --set config.cluster.data_transforms_enabled=true Restart all brokers: kubectl rollout restart statefulset redpanda --namespace=<namespace> Wait for all Pods to restart: kubectl rollout status statefulset redpanda --namespace=<namespace> --watch Create a data transforms project The easiest way to create a new data transforms project is to use the rpk transform init command. This command generates template files and installs any dependencies for your chosen language. Create and initialize a data transforms project: Go Rust JavaScript rpk transform init --language=tinygo --name=data-transforms-tutorial A successful command generates project files in your current directory: . ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml The transform.go file is the source file for your transform function. The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command. rpk transform init --language=rust --name=data-transforms-tutorial A successful command generates project files in your current directory: . ├── Cargo.lock ├── Cargo.toml ├── README.md ├── src │ └── main.rs └── transform.yaml The src/main.rs file is the source file for your transform function. The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command. rpk transform init --language=javascript --name=data-transforms-tutorial A successful command generates project files in your current directory: . ├── README.md ├── esbuild.js ├── node_modules ├── package-lock.json ├── package.json ├── src │ └── index.js └── transform.yaml The src/index.js file is the source file for your transform function. The transform.yaml file is the configuration for your transform function. The transform.yaml file already contains the name of your transform function and the language that you specified in the rpk transform init command. The esbuild.js file is the build script for your project. This file configures the build process using esbuild, a fast JavaScript bundler. It ensures that your code is bundled correctly and includes any necessary polyfills for Node.js standard modules that are not natively available in the Redpanda JavaScript runtime environment. Now that you have a project set up, you can run some examples to learn how to work with data transforms. Make sure to copy the provided transform functions and paste them into your source file. For example, the transform.go file for Go projects, or the src/main.rs file for Rust. Run examples This section provides some examples of transform functions to teach you the basics of writing and deploying data transforms. It’s best to try each example in order, one after the other. Copy records from one topic to another This transform function copies the same data from an input topic to an output topic. Paste this transform function into your source file: Go Rust JavaScript package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { // Make sure to register your callback and perform other setup in main transform.OnRecordWritten(copyRecordsToOutput) } // This will be called for each record in the input topic. // The records returned will be written to the output topic. func copyRecordsToOutput(event transform.WriteEvent, writer transform.RecordWriter) error { return writer.Write(event.Record()) } use anyhow::Result; use redpanda_transform_sdk::*; fn main() { // Make sure to register your callback and perform other setup in main on_record_written(copy_records_to_output); } // This will be called for each record in the input topic. // The records returned will be written to the output topic. fn copy_records_to_output(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { writer.write(event.record)?; Ok(()) } import { onRecordWritten } from "@redpanda-data/transform-sdk"; // Register your callback function in the entry point of your script. onRecordWritten(copyRecordsToOutput); // This function will be called for each record in the input topic. // The records returned will be written to the output topic. function copyRecordsToOutput(event, writer) { writer.write(event.record); } Build the transform into a Wasm binary: rpk transform build Create topics to apply the transform function to: rpk topic create input-topic output-topic Deploy the Wasm binary to your cluster: rpk transform deploy --input-topic=input-topic --output-topic=output-topic Produce two new records to the input topic. echo "hello\nworld" | rpk topic produce input-topic Open Redpanda Console and check the records in both the input topic and the output topic. They should be the same. You can also verify the content of the output topic in the command-line: rpk topic consume output-topic Convert CSV input to JSON output This example is a transform function that converts CSV inputs into JSON outputs. Prepare the project files: Go Rust JavaScript Paste this transform function into your source file: package main import ( "bytes" "encoding/csv" "encoding/json" "errors" "io" "strconv" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(csvToJsonTransform) } type ItemQuantity struct { Item string `json:"item"` Quantity int `json:"quantity"` } func csvToJsonTransform(event transform.WriteEvent, writer transform.RecordWriter) error { // The input data is a CSV (without a header row) that is structured as: // key, item, quantity reader := csv.NewReader(bytes.NewReader(event.Record().Value)) // Improve performance by reusing the result slice. reader.ReuseRecord = true for { row, err := reader.Read() if err == io.EOF { break } else if err != nil { return err } if len(row) != 3 { return errors.New("unexpected number of rows") } // Convert the last column into an int quantity, err := strconv.Atoi(row[2]) if err != nil { return err } // Marshall the JSON value iq := ItemQuantity{ Item: row[1], Quantity: quantity, } v, err := json.Marshal(&iq) if err != nil { return err } // Add the output record using the first column as the key. r := transform.Record{ Key: []byte(row[0]), Value: v, } if err := writer.Write(r); err != nil { return err } } return nil } Add the following dependencies to the Cargo.toml file: csv = "1.3.0" serde_json = "1.0.111" serde = { version = "1.0.195", features = ["derive"] } Run the following command to update your dependencies: cargo build Paste this transform function into your source file: use anyhow::Result; use redpanda_transform_sdk::*; use serde::{Deserialize, Serialize}; use csv::ReaderBuilder; use serde_json; #[derive(Serialize, Deserialize)] struct MarketStock { item: String, quantity: i32, } fn main() { on_record_written(csv_to_json_transform); } fn csv_to_json_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { // The input data is a CSV (without a header row) that is defined as the MarketStock structure. let mut reader = ReaderBuilder::new().has_headers(false).from_reader(event.record.value().unwrap_or_default()); // For each record in our CSV for result in reader.deserialize() { let stock: MarketStock = match result { Ok(record) => record, Err(err) => { eprintln!("CSV deserialize error: {}", err); continue; // Skip the invalid record and continue processing } }; // Convert it to JSON let value = serde_json::to_vec(&stock)?; // Then output it with the same key. writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?; } Ok(()) } Paste this transform function into your source file: import { onRecordWritten } from "@redpanda-data/transform-sdk"; onRecordWritten(csvToJsonTransform); function csvToJsonTransform(event, writer) { // The input data is a CSV (without a header row) that is structured as: // key, item, quantity const input = event.record.value.text(); const rows = input.split('\n'); for (const row of rows) { const columns = row.split(','); if (columns.length !== 2) { throw new Error('unexpected number of columns'); } const quantity = parseInt(columns[1], 10); if (isNaN(quantity)) { throw new Error('invalid quantity'); } const itemQuantity = { item: columns[0], quantity: quantity, }; event.record.value = JSON.stringify(itemQuantity); writer.write(event.record); } } Build the transform into a Wasm binary: rpk transform build Create topics to apply the transform function to: rpk topic create input-topic output-topic Deploy the Wasm binary to your cluster. If you have already deployed another example, this new transform function will replace it. rpk transform deploy --input-topic=input-topic --output-topic=output-topic Produce CSV records to the input topic. echo "apples,10\npears,11\noranges,5" | rpk topic produce input-topic -k market-stock Open Redpanda Console and check the records in both the input topic and the output topic. You should see the following values: { "item": "oranges", "quantity": 5 } { "item": "apples", "quantity": 10 } { "item": "pears", "quantity": 11 } You can also verify the content of the output topic in the command-line: rpk topic consume output-topic Validate JSON This example is a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic. Paste this transform function into your source file: Go Rust JavaScript import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(filterValidJson) } func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error { if json.Valid(event.Record().Value) { return w.Write(e.Record()) } // Send invalid records to separate topic return writer.Write(e.Record(), transform.ToTopic("invalid-json")) } use anyhow::Result; use redpanda_transform_sdk::*; fn main() { on_record_written(filter_valid_json); } fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { let value = event.record.value().unwrap_or_default(); if serde_json::from_slice::<serde_json::Value>(value).is_ok() { writer.write(event.record)?; } else { // Send invalid records to separate topic writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?; } Ok(()) } The JavaScript SDK does not support writing records to a specific output topic. As a result, this transform function writes only valid JSON messages to the output topic. Invalid messages are logged, instead of written to a separate output topic. import { onRecordWritten } from "@redpanda-data/transform-sdk"; onRecordWritten(filterValidJson); function filterValidJson(event, writer) { const recordValue = event.record.value.text(); if (isValidJson(recordValue)) { writer.write(event.record); } else { console.log('Invalid JSON detected') } } function isValidJson(str) { try { JSON.parse(str); return true; } catch (e) { return false; } } Build the transform into a Wasm binary: rpk transform build Create topics to apply the transform function to: rpk topic create input-topic output-topic invalid-json Deploy the Wasm binary to your cluster. If you have already deployed another example, this new transform function will replace it. rpk transform deploy --input-topic=input-topic --output-topic=output-topic --output-topic=invalid-json Produce an invalid JSON record a one valid one to the input topic. echo '{"valid":"json"}' | rpk topic produce input-topic -k json echo 'invalid json' | rpk topic produce input-topic -k json Verify the content of the output topic in the command-line: rpk topic consume output-topic You should see only the invalid JSON from the input topic. Invalid JSON messages are written to the invalid-json topic. Clean up Your transform function will continue processing new records in the input topic until you delete it. To delete the transform function: rpk transform delete data-transforms-tutorial --no-confirm Suggested reading How Data Transforms Work Golang SDK for Data Transforms Rust SDK for Data Transforms rpk transform commands Suggested labs Flatten JSON MessagesConvert JSON Messages into AvroTransform JSON Messages into a New Topic using JQFilter Messages into a New Topic using a RegexConvert Timestamps using RustRedact Information in JSON MessagesSee moreSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Linux Build