# Golang SDK for Data Transforms

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [streaming-full.txt](https://docs.redpanda.com/streaming-full.txt)

---
title: Golang SDK for Data Transforms
latest-operator-version: v26.1.4
# EOL = End-of-Life (support lifecycle status)
page-is-nearing-eol: "false"
page-is-past-eol: "true"
page-eol-date: December 22, 2024
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
docname: data-transform-golang-sdk
page-component-name: streaming
page-version: "23.3"
page-component-version: "23.3"
page-component-title: Streaming
page-relative-src-path: data-transform-golang-sdk.adoc
page-edit-url: https://github.com/redpanda-data/docs/edit/v/23.3/modules/reference/pages/data-transform-golang-sdk.adoc
description: Work with data transform APIs in Redpanda using Go.
page-git-created-date: "2024-02-05"
page-git-modified-date: "2024-02-05"
support-status: past end-of-life
---

<!-- Source: https://docs.redpanda.com/streaming/23.3/reference/data-transform-golang-sdk.md -->

API reference documentation for Redpanda data transforms.

## [](#data-transforms)Data transforms

Package `transform` contains the core functions and types for transforming data within Redpanda.

### [](#data-transforms-functions)Data transforms functions

#### [](#onrecordwritten)OnRecordWritten

```go
func OnRecordWritten(fn OnRecordWrittenCallback)
```

The `OnRecordWritten` function registers a callback of type [`OnRecordWrittenCallback`](#onrecordwrittencallback), which is invoked when a record is written to the input topic.

The function should be called in a package’s `main` function to register the transform function that will be applied.

* * *

### [](#data-transforms-types)Data transforms types

#### [](#onrecordwrittencallback)OnRecordWrittenCallback

```go
type OnRecordWrittenCallback func(e WriteEvent, w RecordWriter) error
```

The `OnRecordWrittenCallback` type is a callback to transform records after a write event happens in the input topic. It’s the type of the parameter for the [`OnRecordWritten`](#onrecordwritten) function.

#### [](#record)Record

```go
type Record struct {
	// Key is an optional field.
	Key []byte
	// Value is the blob of data that is written to Redpanda.
	Value []byte
	// Headers are client specified key/value pairs that are
	// attached to a record.
	Headers []RecordHeader
	// Attrs is the attributes of a record.
	//
	// Output records should leave these unset.
	Attrs RecordAttrs
	// The timestamp associated with this record.
	//
	// For output records this can be left unset as it will
	// always be the same value as the input record.
	Timestamp time.Time
	// The offset of this record in the partition.
	//
	// For output records this field is left unset,
	// as it will be set by Redpanda.
	Offset int64
}
```

The `Record` type is a record that has been written to Redpanda.

#### [](#recordattrs)RecordAttrs

```go
type RecordAttrs struct {
	// contains filtered or unexported fields
}
```

#### [](#recordheader)RecordHeader

```go
type RecordHeader struct {
	Key   []byte
	Value []byte
}
```

The `RecordHeader` type is an optional key/value pair that is passed along with records.

#### [](#writeevent)WriteEvent

```go
type WriteEvent interface {
	// Access the record associated with this event
	Record() Record
}
```

The `WriteEvent` type contains information about a record that was written.

#### [](#recordwriter)RecordWriter

```go
type RecordWriter interface {
      // Write writes a record to the output topic.
      //
      // When writing a record, only the key, value and headers are
      // used other information like the timestamp will be overridden
      // by the broker.
      Write(Record) error
}
```

`RecordWriter` is an interface for writing transformed records to the destination topic.

## [](#schema-registry-client)Schema Registry client

Package `sr` is a Schema Registry client for usage within Redpanda data transforms.

Data transforms support reading schemas and writing schemas to Redpanda’s Schema Registry.

After a schema is obtained, a record may be decoded using the appropriate library. Using the TinyGo compiler and runtime, the following libraries for Apache Avro and Protocol Buffers are known to work:

-   [goavro](https://github.com/linkedin/goavro)

-   [`vtprotobuf`, the Vitess Protocol Buffers compiler](https://github.com/planetscale/vtprotobuf)


### [](#schema-registry-variables)Schema Registry variables

```go
var (
	// ErrNotRegistered is returned from Serde when attempting to encode a
	// value or decode an ID that has not been registered, or when using
	// Decode with a missing new value function.
	ErrNotRegistered = errors.New("registration is missing for encode/decode")

	// ErrBadHeader is returned from Decode when the input slice is shorter
	// than five bytes, or if the first byte is not the magic 0 byte.
	ErrBadHeader = errors.New("5 byte header for value is missing or does not have the 0 magic byte")
)
```

### [](#schema-registry-functions)Schema Registry functions

#### [](#extractid)ExtractID

```go
func ExtractID(b []byte) (int, error)
```

Extract the ID from the header of a Schema Registry encoded value.

Returns `ErrBadHeader` if the array is missing the leading magic byte or is too small.

### [](#schema-registry-types)Schema Registry types

#### [](#clientopt)ClientOpt

```go
type ClientOpt interface {
	// contains filtered or unexported methods
}
```

`ClientOpt` is an option to configure a [`SchemaRegistryClient`](#schemaregistryclient)

#### [](#maxcacheentries)MaxCacheEntries

```go
func MaxCacheEntries(size int) ClientOpt
```

`MaxCacheEntries` configures how many entries to cache within the client.

By default the cache is unbounded. Use 0 to disable the cache.

#### [](#reference)Reference

```go
type Reference struct {
	Name    string
	Subject string
	Version int
}
```

`SchemaReference` is a way for one schema to reference another schema. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. See [Reference a schema](https://docs.redpanda.com/streaming/23.3/manage/schema-reg/schema-reg-api/#reference-a-schema).

#### [](#schema)Schema

```go
type Schema struct {
	Schema     string
	Type       SchemaType
	References []Reference
}
```

`Schema` is a schema that can be registered within the Schema Registry.

#### [](#schemaregistryclient)SchemaRegistryClient

```go
type SchemaRegistryClient interface {
	// LookupSchemaById looks up a schema via its global ID.
	LookupSchemaById(id int) (s *Schema, err error)
	// LookupSchemaByVersion looks up a schema via a subject for a specific version.
	//
	// Use version -1 to get the latest version.
	LookupSchemaByVersion(subject string, version int) (s *SubjectSchema, err error)
	// CreateSchema creates a schema under the given subject, returning the version and ID.
	//
	// If an equivalent schema already exists globally, that schema ID will be reused.
	// If an equivalent schema already exists within that subject, this will be a noop and the
	// existing schema will be returned.
	CreateSchema(subject string, schema Schema) (s *SubjectSchema, err error)
}
```

`SchemaRegistryClient` is a client for interacting with Redpanda’s Schema Registry.

The client provides caching out of the box, which can be configured with options.

#### [](#newclient)NewClient

```go
func NewClient(opts ...ClientOpt) (c SchemaRegistryClient)
```

`NewClient` creates a new [`SchemaRegistryClient`](#schemaregistryclient) with the specified options applied.

#### [](#schematype)SchemaType

```go
type SchemaType int
```

`SchemaType` is an enum for the different types of schemas that can be stored in the Schema Registry.

```go
const (
	TypeAvro SchemaType = iota
	TypeProtobuf
	TypeJSON
)
```

#### [](#serde)Serde

```go
type Serde[T any] struct {
	// contains filtered or unexported fields
}
```

`Serde` encodes and decodes values according to the Schema Registry wire format. A `Serde` itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values.

To use a `Serde` for encoding, you must first preregister the schema IDs and values that you will encode. The latest registered ID that supports encoding is used to encode.

To use a `Serde` for decoding, you can either preregister the schema IDs and values that you will consume, or you can discover the schema every time you receive an `ErrNotRegistered` error from decode.

#### [](#serdet-appendencode)(\*Serde\[T\]) AppendEncode

```go
func (s *Serde[T]) AppendEncode(b []byte, v T) ([]byte, error)
```

`AppendEncode` appends an encoded value to `b` according to the schema registry wire format and returns it. If [`EncodeFn`](#encodefn) was not used, this returns `ErrNotRegistered`.

#### [](#serdet-decode)(\*Serde\[T\]) Decode

```go
func (s *Serde[T]) Decode(b []byte, v T) error
```

`Decode` decodes `b` into `v`. If the [`DecodeFn`](#decodefn) option was not used, this returns `ErrNotRegistered`.

Serde does not handle references in schemas. You must register the full decode function for any top-level ID, regardless of how many other schemas are referenced in the top-level ID.

#### [](#serdet-encode)(\*Serde\[T\]) Encode

```go
func (s *Serde[T]) Encode(v T) ([]byte, error)
```

`Encode` encodes a value according to the Schema Registry wire format and returns it. If [`EncodeFn`](#encodefn) was not used, this returns `ErrNotRegistered`.

#### [](#serdet-mustappendencode)(\*Serde\[T\]) MustAppendEncode

```go
func (s *Serde[T]) MustAppendEncode(b []byte, v T) []byte
```

`MustAppendEncode` returns the value of [`AppendEncode`](#serdet-append-encode), panicking on error. This is a shortcut for when your encode function cannot error.

#### [](#serdet-mustencode)(\*Serde\[T\]) MustEncode

```go
func (s *Serde[T]) MustEncode(v T) []byte
```

`MustEncode` returns the value of [`Encode`](#serdet-encode), panicking on error. This is a shortcut for when your encode function cannot error.

#### [](#serdet-register)(\*Serde\[T\]) Register

```go
func (s *Serde[T]) Register(id int, opts ...SerdeOpt[T])
```

`Register` registers a schema ID and the value it corresponds to, as well as the encoding or decoding functions. Register functions depending on whether you are only encoding, only decoding, or both.

#### [](#serdet-setdefaults)(\*Serde\[T\]) SetDefaults

```go
func (s *Serde[T]) SetDefaults(opts ...SerdeOpt[T])
```

`SetDefaults` sets default options to apply to every registered type. These options are always applied first, so you can override them as necessary when registering.

This can be useful if you always want to use the same encoding or decoding functions.

#### [](#serdeopt)SerdeOpt

```go
type SerdeOpt[T any] interface {
	// contains filtered or unexported methods
}
```

`SerdeOpt` is an option to configure a [`Serde`](#serde).

#### [](#appendencodefn)AppendEncodeFn

```go
func AppendEncodeFn[T any](fn func([]byte, T) ([]byte, error)) SerdeOpt[T]
```

`AppendEncodeFn` allows [`Serde`](#serde) to encode a value to an existing slice. This can be more efficient than [`EncodeFn`](#encodefn); this function is used if it exists.

#### [](#decodefn)DecodeFn

```go
func DecodeFn[T any](fn func([]byte, T) error) SerdeOpt[T]
```

`DecodeFn` allows [`Serde`](#serde) to decode into a value.

#### [](#encodefn)EncodeFn

```go
func EncodeFn[T any](fn func(T) ([]byte, error)) SerdeOpt[T]
```

`EncodeFn` allows [`Serde`](#serde) to encode a value.

#### [](#subjectschema)SubjectSchema

```go
type SubjectSchema struct {
	Schema

	Subject string
	Version int
	ID      int
}
```

`SchemaSubject` is a schema along with the subject, version, and ID of the schema.

## [](#related-topics)Related topics

-   [Run Data Transforms in Linux](https://docs.redpanda.com/streaming/23.3/develop/data-transforms/run-transforms/)