Develop Data Transforms

Learn how to initialize a data transforms project and write transform functions in your chosen language.

Prerequisites

You must have the following development tools installed on your host machine:

Initialize a data transforms project

To initialize a data transforms project, use the following command to set up the project files in your current directory. This command adds the latest version of the SDK as a project dependency:

rpk transform init --language=<language> --name=<name>

If you do not include the --language flag, the command will prompt you for the language. Supported languages include:

  • tinygo-no-goroutines (does not include Goroutines)

  • tinygo-with-goroutines

  • rust

  • javascript

  • typescript

For example, if you choose tinygo-no-goroutines, the following project files are created:

.
├── go.mod
├── go.sum
├── README.md
├── transform.go
└── transform.yaml

The transform.go file contains a boilerplate transform function. The transform.yaml file specifies the configuration settings for the transform function.

See also:

Build transform functions

You can develop your transform logic with one of the available SDKs that allow your transform code to interact with a Redpanda cluster.

  • Go

  • Rust

  • JavaScript

All transform functions must register a callback with the OnRecordWritten() method.

You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. You can also use the standard predefined init() function.

package main

import (
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
  // Register your transform function.
  // This is a good place to perform other setup too.
  transform.OnRecordWritten(myTransform)
}
// myTransform is where you read the record that was written, and then you can
// output new records that will be written to the destination topic
func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  return writer.Write(event.Record())
}

All transform functions must register a callback with the on_record_written() method.

You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed.

use redpanda_transform_sdk::*;

fn main() {
  // Register your transform function.
  // This is a good place to perform other setup too.
  on_record_written(my_transform);
}

// my_transform is where you read the record that was written, and then you can
// return new records that will be written to the output topic
fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
  writer.write(event.record)?;
  Ok(())
}

All transform functions must register a callback with the onRecordWritten() method.

You should run any initialization steps outside of the callback so that they are only run once when the transform function is first deployed.

// src/index.js
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// This is a good place to perform setup steps.

// Register your transform function.
onRecordWritten((event, writer) => {
  // This is where you read the record that was written, and then you can
  // output new records that will be written to the destination topic
  writer.write(event.record);
});

If you need to use Node.js standard modules in your transform function, you must configure the polyfillNode plugin for esbuild. This plugin allows you to polyfill Node.js APIs that are not natively available in the Redpanda JavaScript runtime environment.

esbuild.js
import * as esbuild from 'esbuild';
import { polyfillNode } from 'esbuild-plugin-polyfill-node';

await esbuild.build({
  plugins: [
    polyfillNode({
      globals: {
        buffer: true, // Allow a global Buffer variable if referenced.
        process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that.
      },
      polyfills: {
=        crypto: true, // Enable crypto polyfill
        // Add other polyfills as needed
      },
    }),
  ],
});

Error handling

By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively.

Redpanda tracks the offsets of records that have been processed by transform functions. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function will retry processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved.

Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely.

  • Go

  • Rust

  • JavaScript

package main

import (
    "log"
    "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
    transform.OnRecordWritten(myTransform)
}

func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  record := event.Record()
  if record.Key == nil {
    // Handle the error internally by logging it
    log.Println("Error: Record key is nil")
    // Skip this record and continue to process other records
    return nil
  }
  // Allow errors with writes to escape
  return writer.Write(record)
}
use redpanda_transform_sdk::*;
use log::error;

fn main() {
  // Set up logging
  env_logger::init();
  on_record_written(my_transform);
}

fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> anyhow::Result<()> {
  let record = event.record;
  if record.key().is_none() {
    // Handle the error internally by logging it
    error!("Error: Record key is nil");
    // Skip this record and continue to process other records
    return Ok(());
  }
  // Allow errors with writes to escape
  return writer.write(record)
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// Register your transform function.
onRecordWritten((event, writer) => {
  const record = event.record;
  if (!record.key) {
    // Handle the error internally by logging it
    console.error("Error: Record key is nil");
    // Skip this record and continue to process other records
    return;
  }
  // Allow errors with writes to escape
  writer.write(record);
});

When you deploy this transform function, and produce a message without a key, you’ll get the following in the logs:

{
  "body": {
    "stringValue": "2024/06/20 08:17:33 Error: Record key is nil\n"
  },
  "timeUnixNano": 1718871455235337000,
  "severityNumber": 13,
  "attributes": [
    {
      "key": "transform_name",
      "value": {
        "stringValue": "test"
      }
    },
    {
      "key": "node",
      "value": {
        "intValue": 0
      }
    }
  ]
}

You can view logs for transform functions using the rpk transform logs <transform-function-name> command.

To ensure that you are notified of any errors or issues in your data transforms, Redpanda provides metrics that you can use to monitor the state of your data transforms.

See also:

Avoid state management

Relying on in-memory state across transform invocations can lead to inconsistencies and unpredictable behavior. Data transforms operate with at-least-once semantics, meaning a transform function might be executed more than once for a given record. Redpanda may also restart a transform function at any point, which causes its state to be lost.

Access environment variables

You can access both built-in and custom environment variables in your transform function. In this example, environment variables are checked once during initialization:

  • Go

  • Rust

  • JavaScript

package main

import (
  "fmt"
  "os"
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
  // Check environment variables before registering the transform function.
  outputTopic1, ok := os.LookupEnv("REDPANDA_OUTPUT_TOPIC_1")
  if ok {
    fmt.Printf("Output topic 1: %s\n", outputTopic1)
  } else {
    fmt.Println("Only one output topic is set")
  }

  // Register your transform function.
  transform.OnRecordWritten(myTransform)
}

func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  return writer.Write(event.Record())
}
use redpanda_transform_sdk::*;
use std::env;
use log::error;

fn main() {
  // Set up logging
  env_logger::init();

  // Check environment variables before registering the transform function.
  match env::var("REDPANDA_OUTPUT_TOPIC_1") {
    Ok(output_topic_1) => println!("Output topic 1: {}", output_topic_1),
    Err(_) => println!("Only one output topic is set"),
  }

  // Register your transform function.
  on_record_written(my_transform);
}

fn my_transform(_event: WriteEvent, _writer: &mut RecordWriter) -> anyhow::Result<()> {
  Ok(())
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// Check environment variables before registering the transform function.
const outputTopic1 = process.env.REDPANDA_OUTPUT_TOPIC_1;
if (outputTopic1) {
  console.log(`Output topic 1: ${outputTopic1}`);
} else {
  console.log("Only one output topic is set");
}

// Register your transform function.
onRecordWritten((event, writer) => {
  return writer.write(event.record);
});

Write to specific output topics

You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.

  • Go

  • Rust

  • JavaScript

import (
	"encoding/json"
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
	transform.OnRecordWritten(filterValidJson)
}

func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
	if json.Valid(event.Record().Value) {
		return w.Write(e.Record())
	}
	// Send invalid records to separate topic
	return writer.Write(e.Record(), transform.ToTopic("invalid-json"))
}
use anyhow::Result;
use redpanda_transform_sdk::*;

fn main() {
	on_record_written(filter_valid_json);
}

fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
	let value = event.record.value().unwrap_or_default();
	if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
		writer.write(event.record)?;
	} else {
		// Send invalid records to separate topic
		writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?;
	}
	Ok(())
}

The JavaScript SDK does not support writing records to a specific output topic.

Connect to the Schema Registry

You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms.

See also: