Docs Self-Managed Develop Data Transforms Build Develop Data Transforms Learn how to initialize a data transforms project and write transform functions in your chosen language. Prerequisites You must have the following development tools installed on your host machine: The rpk command-line client installed on your host machine and configured to connect to your Redpanda cluster. For Golang projects, you must have at least version 1.20 of Go. For Rust projects, you must have the latest stable version of Rust. For JavaScript and TypeScript projects, you must have the latest long-term-support release of Node.js. Initialize a data transforms project To initialize a data transforms project, use the following command to set up the project files in your current directory. This command adds the latest version of the SDK as a project dependency: rpk transform init --language=<language> --name=<name> If you do not include the --language flag, the command will prompt you for the language. Supported languages include: tinygo-no-goroutines (does not include Goroutines) tinygo-with-goroutines rust javascript typescript For example, if you choose tinygo-no-goroutines, the following project files are created: . ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml The transform.go file contains a boilerplate transform function. The transform.yaml file specifies the configuration settings for the transform function. See also: Configure Data Transforms Build transform functions You can develop your transform logic with one of the available SDKs that allow your transform code to interact with a Redpanda cluster. Go Rust JavaScript All transform functions must register a callback with the OnRecordWritten() method. You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. You can also use the standard predefined init() function. package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { // Register your transform function. // This is a good place to perform other setup too. transform.OnRecordWritten(myTransform) } // myTransform is where you read the record that was written, and then you can // output new records that will be written to the destination topic func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { return writer.Write(event.Record()) } All transform functions must register a callback with the on_record_written() method. You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. use redpanda_transform_sdk::*; fn main() { // Register your transform function. // This is a good place to perform other setup too. on_record_written(my_transform); } // my_transform is where you read the record that was written, and then you can // return new records that will be written to the output topic fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> { writer.write(event.record)?; Ok(()) } All transform functions must register a callback with the onRecordWritten() method. You should run any initialization steps outside of the callback so that they are only run once when the transform function is first deployed. // src/index.js import { onRecordWritten } from "@redpanda-data/transform-sdk"; // This is a good place to perform setup steps. // Register your transform function. onRecordWritten((event, writer) => { // This is where you read the record that was written, and then you can // output new records that will be written to the destination topic writer.write(event.record); }); If you need to use Node.js standard modules in your transform function, you must configure the polyfillNode plugin for esbuild. This plugin allows you to polyfill Node.js APIs that are not natively available in the Redpanda JavaScript runtime environment. esbuild.js import * as esbuild from 'esbuild'; import { polyfillNode } from 'esbuild-plugin-polyfill-node'; await esbuild.build({ plugins: [ polyfillNode({ globals: { buffer: true, // Allow a global Buffer variable if referenced. process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that. }, polyfills: { = crypto: true, // Enable crypto polyfill // Add other polyfills as needed }, }), ], }); Error handling By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively. Redpanda tracks the offsets of records that have been processed by transform functions. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function will retry processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved. Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely. Go Rust JavaScript package main import ( "log" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(myTransform) } func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { record := event.Record() if record.Key == nil { // Handle the error internally by logging it log.Println("Error: Record key is nil") // Skip this record and continue to process other records return nil } // Allow errors with writes to escape return writer.Write(record) } use redpanda_transform_sdk::*; use log::error; fn main() { // Set up logging env_logger::init(); on_record_written(my_transform); } fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> anyhow::Result<()> { let record = event.record; if record.key().is_none() { // Handle the error internally by logging it error!("Error: Record key is nil"); // Skip this record and continue to process other records return Ok(()); } // Allow errors with writes to escape return writer.write(record) } import { onRecordWritten } from "@redpanda-data/transform-sdk"; // Register your transform function. onRecordWritten((event, writer) => { const record = event.record; if (!record.key) { // Handle the error internally by logging it console.error("Error: Record key is nil"); // Skip this record and continue to process other records return; } // Allow errors with writes to escape writer.write(record); }); When you deploy this transform function, and produce a message without a key, you’ll get the following in the logs: { "body": { "stringValue": "2024/06/20 08:17:33 Error: Record key is nil\n" }, "timeUnixNano": 1718871455235337000, "severityNumber": 13, "attributes": [ { "key": "transform_name", "value": { "stringValue": "test" } }, { "key": "node", "value": { "intValue": 0 } } ] } You can view logs for transform functions using the rpk transform logs <transform-function-name> command. To ensure that you are notified of any errors or issues in your data transforms, Redpanda provides metrics that you can use to monitor the state of your data transforms. See also: View logs for transform functions Monitor data transforms Configure transform logging rpk transform logs Avoid state management Relying on in-memory state across transform invocations can lead to inconsistencies and unpredictable behavior. Data transforms operate with at-least-once semantics, meaning a transform function might be executed more than once for a given record. Redpanda may also restart a transform function at any point, which causes its state to be lost. Access environment variables You can access both built-in and custom environment variables in your transform function. In this example, environment variables are checked once during initialization: Go Rust JavaScript package main import ( "fmt" "os" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { // Check environment variables before registering the transform function. outputTopic1, ok := os.LookupEnv("REDPANDA_OUTPUT_TOPIC_1") if ok { fmt.Printf("Output topic 1: %s\n", outputTopic1) } else { fmt.Println("Only one output topic is set") } // Register your transform function. transform.OnRecordWritten(myTransform) } func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { return writer.Write(event.Record()) } use redpanda_transform_sdk::*; use std::env; use log::error; fn main() { // Set up logging env_logger::init(); // Check environment variables before registering the transform function. match env::var("REDPANDA_OUTPUT_TOPIC_1") { Ok(output_topic_1) => println!("Output topic 1: {}", output_topic_1), Err(_) => println!("Only one output topic is set"), } // Register your transform function. on_record_written(my_transform); } fn my_transform(_event: WriteEvent, _writer: &mut RecordWriter) -> anyhow::Result<()> { Ok(()) } import { onRecordWritten } from "@redpanda-data/transform-sdk"; // Check environment variables before registering the transform function. const outputTopic1 = process.env.REDPANDA_OUTPUT_TOPIC_1; if (outputTopic1) { console.log(`Output topic 1: ${outputTopic1}`); } else { console.log("Only one output topic is set"); } // Register your transform function. onRecordWritten((event, writer) => { return writer.write(event.record); }); Write to specific output topics You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic. Go Rust JavaScript import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(filterValidJson) } func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error { if json.Valid(event.Record().Value) { return w.Write(e.Record()) } // Send invalid records to separate topic return writer.Write(e.Record(), transform.ToTopic("invalid-json")) } use anyhow::Result; use redpanda_transform_sdk::*; fn main() { on_record_written(filter_valid_json); } fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { let value = event.record.value().unwrap_or_default(); if serde_json::from_slice::<serde_json::Value>(value).is_ok() { writer.write(event.record)?; } else { // Send invalid records to separate topic writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?; } Ok(()) } The JavaScript SDK does not support writing records to a specific output topic. Connect to the Schema Registry You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms. See also: Redpanda Schema Registry Go Schema Registry client reference Rust Schema Registry client reference JavaScript Schema Registry client reference Next steps Configure Data Transforms Suggested reading How Data Transforms Work Data Transforms SDKs rpk transform commands Suggested labs Flatten JSON MessagesTransform JSON Messages into a New Topic using JQConvert JSON Messages into AvroFilter Messages into a New Topic using a RegexConvert Timestamps using RustRedact Information in JSON MessagesSee moreSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Kubernetes Configure