Single Message Transforms

Single Message Transforms (SMTs) help you modify data and its characteristics as it passes through a connector, without needing additional stream processors.

Prior to using an SMT with production data, test the configuration on a smaller subset of data to verify the behavior of the SMT.

Cast

Cast SMT lets you change the data type of fields in a Redpanda message, updating the schema if one is present.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key) or value (org.apache.kafka.connect.transforms.Cast$Value).

Configuration

Property key Description

spec

Comma-separated list of field names and the type to which they should be cast; for example: my-field1:int32,my-field2:string. Allowed types are: `int8, int16, int32, int64, float32, float64, boolean, and string.

Example

"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "price:float64"

Before:

{"price": 1234, "product_id": "9987"}

After:

{"price": 1234.0,"product_id": "9987"}

DropHeaders

DropHeaders SMT removes one or more headers from each record.

Configuration

Property key Description

headers

Comma-separated list of header names to drop.

Example

Sample configuration:

"transforms": "DropHeader",
"transforms.DropHeader.type": "org.apache.kafka.connect.transforms.DropHeaders",
"transforms.DropHeader.headers": "source-id,conv-id"

EventRouter (Debezium)

The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.

To implement the outbox pattern in a Debezium application, configure a Debezium connector to:

  • Capture changes in an outbox table

  • Apply the Debezium outbox EventRouter Single Message Transformation

EventRouter SMT is available for managed Debezium connectors only.

Configuration

Property key Description

route.by.field

Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages.

route.topic.replacement

Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record.

table.expand.json.payload

Specifies whether the JSON expansion of a String payload should be done. If no content is found, or if there’s a parsing error, the content is kept "as is".

fields.additional.placement

Specifies one or more outbox table columns to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope.

table.field.event.key

Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining the correct order in Kafka partitions.

Example

Sample JSON configuration:

"transforms": "outbox",
"transforms.outbox.route.by.field": "type",
"transforms.outbox.route.topic.replacement": "my-topic.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.fields.additional.placement": "before:envelope",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"

ExtractField

ExtractField SMT pulls the specified field from a Struct when a schema is present, or a Map for schemaless data. Any null values are passed through unmodified.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).

Configuration

Property key Description

field

Field name to extract.

Example

Sample configuration:

"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field": "product_id"

Before:

{"product_id":9987,"price":1234}

After:

{"value":9987}

Filter

Filter SMT drops all records, filtering them from subsequent transformations in the chain. This is intended to be used conditionally to filter out records matching (or not matching) a particular predicate.

Configuration

Property key Description

predicate

Name of predicate filtering records.

Example

Sample configuration:

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsMyTopic",
"predicates": "IsMyTopic",
"predicates.IsMyTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsMyTopic.pattern": "my-topic"

Predicates

Managed connectors support the following predicates:

TopicNameMatches

org.apache.kafka.connect.transforms.predicates.TopicNameMatches - A predicate that is true for records with a topic name that matches the configured regular expression.

Property key Description

pattern

A Java regular expression for matching against the name of a record’s topic.

HasHeaderKey

org.apache.kafka.connect.transforms.predicates.HasHeaderKey - A predicate that is true for records with at least one header with the configured name.

Property key Description

name

The header name.

RecordIsTombstone

org.apache.kafka.connect.transforms.predicates.RecordIsTombstone - A predicate that is true for records that are tombstones (that is, they have null values).

Flatten

Flatten SMT flattens a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when a schema is present, or a Map for schemaless data. Array fields and their contents are not modified. The default delimiter is ..

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key) or value (org.apache.kafka.connect.transforms.Flatten$Value).

Configuration

Property key Description

delimiter

Delimiter to insert between field names from the input record when generating field names for the output record.

Example

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."

Before:

{
  "user": {
    "id": 10,
    "name": {
      "first": "Red",
      "last": "Panda"
    }
  }
}

After:

{
  "user.id": 10,
  "user.name.first": "Red",
  "user.name.last": "Panda"
}

HeaderFrom

HeaderFrom SMT moves or copies fields in the key or value of a record into that record’s headers. Corresponding elements of fields and headers together identify a field and the header it should be moved or copied to.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HeaderFrom$Key) or value (org.apache.kafka.connect.transforms.HeaderFrom$Value).

Configuration

Property key Description

fields

Comma-separated list of field names in the record whose values are to be copied or moved to headers.

headers

Comma-separated list of header names, in the same order as the field names listed in the fields configuration property.

operation

Either move if the fields are to be moved to the headers (removed from the key/value), or copy if the fields are to be copied to the headers (retained in the key/value).

Example

"transforms": "HeaderFrom",
"transforms.HeaderFrom.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.HeaderFrom.fields": "id,last_login_ts",
"transforms.HeaderFrom.headers": "user_id,timestamp",
"transforms.HeaderFrom.operation": "move"

Before:

  • Record value:

{
    "id": 11,
    "name": "Harry Wilson",
    "last_login_ts": 1715242380
}
  • Record header:

{
    "conv_id": "uier923"
}

After:

  • Record value:

{
    "name": "Harry Wilson"
}
  • Record header:

{
    "conv_id": "uier923",
    "user_id": 11,
    "timestamp": 1715242380
}

HoistField

HoistField SMT wraps data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key) or value (org.apache.kafka.connect.transforms.HoistField$Value).

Configuration

Property key Description

field

Field name for the single field that will be created in the resulting Struct or Map.

Example

"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "name"

Message:

Red
Panda

After:

{"name":"Red"}
{"name":"Panda"}

InsertField

InsertField SMT inserts field(s) using attributes from the record metadata or a configured static value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

Configuration

Property key Description

offset.field

Field name for Redpanda offset.

partition.field

Field name for Redpanda partition.

static.field

Field name for static data field.

static.value

The static field value.

timestamp.field

Field name for record timestamp.

topic.field

Field name for Redpanda topic.

Example

Sample configuration:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "cluster_id",
"transforms.InsertField.static.value": "19423"

Before:

{"product_id":9987,"price":1234}

After:

{"price":1234,"cluster_id":"19423","product_id":9987}

MaskField

MaskField SMT replaces the contents of fields in a record.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key) or value (org.apache.kafka.connect.transforms.MaskField$Value).

Configuration

Property key Description

fields

Comma-separated list of fields to mask.

replacement

Custom value replacement used to mask field values.

Example

"transforms": "MaskField",
"transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskField.fields": "metadata",
"transforms.MaskField.replacement": "***"

Before:

{"product_id":9987,"price":1234,"metadata":"test"}

After:

{"metadata":"***","price":1234,"product_id":9987}

RegexRouter

RegexRouter SMT updates the record topic using the configured regular expression and replacement string. Under the hood, the regex is compiled to a java.util.regex.Pattern. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic.

Configuration

Property key Description

regex

Regular expression to use for matching.

replacement

Replacement string.

Example

This configuration snippet shows how to add the prefix prefix_ to the beginning of a topic.

"transforms": "AppendPrefix",
"transforms.AppendPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AppendPrefix.regex": ".*",
"transforms.AppendPrefix.replacement": "prefix_$0"

Before: topic-name

After: prefix_topic-name

ReplaceField

ReplaceField SMT filters or renames fields in a Redpanda record.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key) or value (org.apache.kafka.connect.transforms.ReplaceField$Value).

Configuration

Property key Description

exclude

Fields to exclude. This takes precedence over the fields to include.

include

Fields to include. If specified, only these fields are used.

renames

List of comma-separated pairs. For example: foo:bar,abc:xyz

Example

Sample configuration:

"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.renames": "product_id:item_number"

Before:

{"product_id":9987,"price":1234}

After:

{"item_number":9987,"price":1234}

ReplaceTimestamp (Redpanda)

ReplaceTimestamp (Redpanda) SMT is designed to support using a record key/value field as a record timestamp, which then can be used to partition data with an S3 connector.

Use the concrete transformation type designed for the record key (com.redpanda.connectors.transforms.ReplaceTimestamp$Key) or value (com.redpanda.connectors.transforms.ReplaceTimestamp$Value).

ReplaceTimestamp is available for Sink connector only.

Configuration

Property key Description

field

Specifies the name of a field to be used as a source of timestamp.

Example

To use my-timestamp field as a source of the timestamp for the record, update a connector config with:

"transforms": "ReplaceTimestamp",
"transforms.ReplaceTimestamp.type": "com.redpanda.connectors.transforms.ReplaceTimestamp$Value",
"transforms.ReplaceTimestamp.field": "my-timestamp"

for messages in a format:

{
"name": "my-name",
...
"my-timestamp": 1707928150868,
...
}

The SMT needs structured data to be able to extract the field from it, which means either a Map in the case of schemaless data, or a Struct when a schema is present. The timestamp value should be of a numeric type (epoch millis), or a Java Date object (which is the case when using "connect.name":"org.apache.kafka.connect.data.Timestamp" in schema).

SchemaRegistryReplicator (Redpanda)

SchemaRegistryReplicator (Redpanda) SMT is a transform to replicate schemas.

SchemaRegistryReplicator SMT is designed to be used with the MirrorMaker2 connector only. To use it, remove the _schema topic from the topic exclude list.

Example

Sample configuration:

"transforms": "schema-replicator",
"transforms.schema-replicator.type": "com.redpanda.connectors.transforms.SchemaRegistryReplicator"

SetSchemaMetadata

SetSchemaMetadata SMT sets the schema name, version, or both on the record’s key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) schema.

Configuration

Property key Description

schema.name

Schema name to set.

schema.version

Schema version to set.

Example

Sample configuration:

"transforms": "SetSchemaMetadata",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "transaction-value"
"transforms.SetSchemaMetadata.schema.version": "3"

TimestampConverter

TimestampConverter SMT converts timestamps between different formats, such as Unix epoch, strings, and Connect Date/Timestamp types. It applies to individual fields or to the entire value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value).

Configuration

Property key Description

field

The field containing the timestamp, or empty if the entire value is a timestamp. Default: "".

target.type

The desired timestamp representation: string, unix, Date, Time, or Timestamp.

format

A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when target.type=string or used to parse the input if the input is a string. Default: "".

unix.precision

The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to generate the output when type=unix or used to parse the input if the input is a Long. Note: This SMT causes precision loss during conversions from, and to, values with sub-millisecond components. Default: milliseconds.

Example

Sample configuration:

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "last_login_date",
"transforms.TimestampConverter.format": "yyyy-MM-dd",
"transforms.TimestampConverter.target.type": "string"

Before: 1702041416

After: 2023-12-08

TimestampRouter

TimestampRouter SMT updates the record’s topic field as a function of the original topic value and the record timestamp. This is mainly useful for sink connectors, because the topic field is often used to determine the equivalent entity name in the destination system (for example, a database table or search index name).

TimestampRouter SMT should be used with sink connectors only.

Configuration

Property key Description

topic.format

Format string that can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively.

timestamp.format

Format string for the timestamp that is compatible with java.text.SimpleDateFormat.

Example

Sample configuration:

"transforms": "router",
"transforms.router.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.router.topic.format": "${topic}_${timestamp}",
"transforms.router.timestamp.format": "YYYY-MM-dd"

ValueToKey

ValueToKey SMT replaces the record key with a new key formed from a subset of fields in the record value.

Configuration

Property key Description

fields

Comma-separated list of field names on the record value to extract as the record key.

Example

Sample configuration:

"transforms": "valueToKey",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "txn-id"

Error handling

By default, Error tolerance is set to NONE, so SMTs fail for any exception (notably, data parsing or data processing errors). To avoid the connector crashing for data issues, set Error tolerance to ALL, and specify Dead Letter Queue Topic Name as a place where failed messages are redirected.