Docs Cloud Develop Kafka Connect Single Message Transforms Single Message Transforms Single Message Transforms (SMTs) help you modify data and its characteristics as it passes through a connector, without needing additional stream processors. Prior to using an SMT with production data, test the configuration on a smaller subset of data to verify the behavior of the SMT. Cast Cast SMT lets you change the data type of fields in a Redpanda message, updating the schema if one is present. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key) or value (org.apache.kafka.connect.transforms.Cast$Value). Configuration Property key Description spec Comma-separated list of field names and the type to which they should be cast; for example: my-field1:int32,my-field2:string. Allowed types are: `int8, int16, int32, int64, float32, float64, boolean, and string. Example "transforms": "Cast", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "price:float64" Before: {"price": 1234, "product_id": "9987"} After: {"price": 1234.0,"product_id": "9987"} DropHeaders DropHeaders SMT removes one or more headers from each record. Configuration Property key Description headers Comma-separated list of header names to drop. Example Sample configuration: "transforms": "DropHeader", "transforms.DropHeader.type": "org.apache.kafka.connect.transforms.DropHeaders", "transforms.DropHeader.headers": "source-id,conv-id" EventRouter (Debezium) The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data. To implement the outbox pattern in a Debezium application, configure a Debezium connector to: Capture changes in an outbox table Apply the Debezium outbox EventRouter Single Message Transformation EventRouter SMT is available for managed Debezium connectors only. Configuration Property key Description route.by.field Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. route.topic.replacement Specifies the name of the topic to which the connector emits outbox messages. The default topic name is outbox.event. followed by the aggregatetype column value in the outbox table record. table.expand.json.payload Specifies whether the JSON expansion of a String payload should be done. If no content is found, or if there’s a parsing error, the content is kept "as is". fields.additional.placement Specifies one or more outbox table columns to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. table.field.event.key Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining the correct order in Kafka partitions. Example Sample JSON configuration: "transforms": "outbox", "transforms.outbox.route.by.field": "type", "transforms.outbox.route.topic.replacement": "my-topic.${routedByValue}", "transforms.outbox.table.expand.json.payload": "true", "transforms.outbox.table.field.event.key": "aggregate_id", "transforms.outbox.table.fields.additional.placement": "before:envelope", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter" Suggested reading Debezium Outbox Event Router SMT ExtractField ExtractField SMT pulls the specified field from a Struct when a schema is present, or a Map for schemaless data. Any null values are passed through unmodified. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value). Configuration Property key Description field Field name to extract. Example Sample configuration: "transforms": "ExtractField", "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field": "product_id" Before: {"product_id":9987,"price":1234} After: {"value":9987} Filter Filter SMT drops all records, filtering them from subsequent transformations in the chain. This is intended to be used conditionally to filter out records matching (or not matching) a particular predicate. Configuration Property key Description predicate Name of predicate filtering records. Example Sample configuration: "transforms": "Filter", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter", "transforms.Filter.predicate": "IsMyTopic", "predicates": "IsMyTopic", "predicates.IsMyTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IsMyTopic.pattern": "my-topic" Predicates Managed connectors support the following predicates: TopicNameMatches org.apache.kafka.connect.transforms.predicates.TopicNameMatches - A predicate that is true for records with a topic name that matches the configured regular expression. Property key Description pattern A Java regular expression for matching against the name of a record’s topic. HasHeaderKey org.apache.kafka.connect.transforms.predicates.HasHeaderKey - A predicate that is true for records with at least one header with the configured name. Property key Description name The header name. RecordIsTombstone org.apache.kafka.connect.transforms.predicates.RecordIsTombstone - A predicate that is true for records that are tombstones (that is, they have null values). Flatten Flatten SMT flattens a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when a schema is present, or a Map for schemaless data. Array fields and their contents are not modified. The default delimiter is .. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key) or value (org.apache.kafka.connect.transforms.Flatten$Value). Configuration Property key Description delimiter Delimiter to insert between field names from the input record when generating field names for the output record. Example "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "." Before: { "user": { "id": 10, "name": { "first": "Red", "last": "Panda" } } } After: { "user.id": 10, "user.name.first": "Red", "user.name.last": "Panda" } HeaderFrom HeaderFrom SMT moves or copies fields in the key or value of a record into that record’s headers. Corresponding elements of fields and headers together identify a field and the header it should be moved or copied to. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HeaderFrom$Key) or value (org.apache.kafka.connect.transforms.HeaderFrom$Value). Configuration Property key Description fields Comma-separated list of field names in the record whose values are to be copied or moved to headers. headers Comma-separated list of header names, in the same order as the field names listed in the fields configuration property. operation Either move if the fields are to be moved to the headers (removed from the key/value), or copy if the fields are to be copied to the headers (retained in the key/value). Example "transforms": "HeaderFrom", "transforms.HeaderFrom.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.HeaderFrom.fields": "id,last_login_ts", "transforms.HeaderFrom.headers": "user_id,timestamp", "transforms.HeaderFrom.operation": "move" Before: Record value: { "id": 11, "name": "Harry Wilson", "last_login_ts": 1715242380 } Record header: { "conv_id": "uier923" } After: Record value: { "name": "Harry Wilson" } Record header: { "conv_id": "uier923", "user_id": 11, "timestamp": 1715242380 } HoistField HoistField SMT wraps data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key) or value (org.apache.kafka.connect.transforms.HoistField$Value). Configuration Property key Description field Field name for the single field that will be created in the resulting Struct or Map. Example "transforms": "HoistField", "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.HoistField.field": "name" Message: Red Panda After: {"name":"Red"} {"name":"Panda"} InsertField InsertField SMT inserts field(s) using attributes from the record metadata or a configured static value. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value). Configuration Property key Description offset.field Field name for Redpanda offset. partition.field Field name for Redpanda partition. static.field Field name for static data field. static.value The static field value. timestamp.field Field name for record timestamp. topic.field Field name for Redpanda topic. Example Sample configuration: "transforms": "InsertField", "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertField.static.field": "cluster_id", "transforms.InsertField.static.value": "19423" Before: {"product_id":9987,"price":1234} After: {"price":1234,"cluster_id":"19423","product_id":9987} MaskField MaskField SMT replaces the contents of fields in a record. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key) or value (org.apache.kafka.connect.transforms.MaskField$Value). Configuration Property key Description fields Comma-separated list of fields to mask. replacement Custom value replacement used to mask field values. Example "transforms": "MaskField", "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.MaskField.fields": "metadata", "transforms.MaskField.replacement": "***" Before: {"product_id":9987,"price":1234,"metadata":"test"} After: {"metadata":"***","price":1234,"product_id":9987} RegexRouter RegexRouter SMT updates the record topic using the configured regular expression and replacement string. Under the hood, the regex is compiled to a java.util.regex.Pattern. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic. Configuration Property key Description regex Regular expression to use for matching. replacement Replacement string. Example This configuration snippet shows how to add the prefix prefix_ to the beginning of a topic. "transforms": "AppendPrefix", "transforms.AppendPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.AppendPrefix.regex": ".*", "transforms.AppendPrefix.replacement": "prefix_$0" Before: topic-name After: prefix_topic-name ReplaceField ReplaceField SMT filters or renames fields in a Redpanda record. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key) or value (org.apache.kafka.connect.transforms.ReplaceField$Value). Configuration Property key Description exclude Fields to exclude. This takes precedence over the fields to include. include Fields to include. If specified, only these fields are used. renames List of comma-separated pairs. For example: foo:bar,abc:xyz Example Sample configuration: "transforms": "ReplaceField", "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.ReplaceField.renames": "product_id:item_number" Before: {"product_id":9987,"price":1234} After: {"item_number":9987,"price":1234} ReplaceTimestamp (Redpanda) ReplaceTimestamp (Redpanda) SMT is designed to support using a record key/value field as a record timestamp, which then can be used to partition data with an S3 connector. Use the concrete transformation type designed for the record key (com.redpanda.connectors.transforms.ReplaceTimestamp$Key) or value (com.redpanda.connectors.transforms.ReplaceTimestamp$Value). ReplaceTimestamp is available for Sink connector only. Configuration Property key Description field Specifies the name of a field to be used as a source of timestamp. Example To use my-timestamp field as a source of the timestamp for the record, update a connector config with: "transforms": "ReplaceTimestamp", "transforms.ReplaceTimestamp.type": "com.redpanda.connectors.transforms.ReplaceTimestamp$Value", "transforms.ReplaceTimestamp.field": "my-timestamp" for messages in a format: { "name": "my-name", ... "my-timestamp": 1707928150868, ... } The SMT needs structured data to be able to extract the field from it, which means either a Map in the case of schemaless data, or a Struct when a schema is present. The timestamp value should be of a numeric type (epoch millis), or a Java Date object (which is the case when using "connect.name":"org.apache.kafka.connect.data.Timestamp" in schema). SchemaRegistryReplicator (Redpanda) SchemaRegistryReplicator (Redpanda) SMT is a transform to replicate schemas. SchemaRegistryReplicator SMT is designed to be used with the MirrorMaker2 connector only. To use it, remove the _schema topic from the topic exclude list. Example Sample configuration: "transforms": "schema-replicator", "transforms.schema-replicator.type": "com.redpanda.connectors.transforms.SchemaRegistryReplicator" SetSchemaMetadata SetSchemaMetadata SMT sets the schema name, version, or both on the record’s key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) schema. Configuration Property key Description schema.name Schema name to set. schema.version Schema version to set. Example Sample configuration: "transforms": "SetSchemaMetadata", "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "transforms.SetSchemaMetadata.schema.name": "transaction-value" "transforms.SetSchemaMetadata.schema.version": "3" TimestampConverter TimestampConverter SMT converts timestamps between different formats, such as Unix epoch, strings, and Connect Date/Timestamp types. It applies to individual fields or to the entire value. Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value). Configuration Property key Description field The field containing the timestamp, or empty if the entire value is a timestamp. Default: "". target.type The desired timestamp representation: string, unix, Date, Time, or Timestamp. format A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when target.type=string or used to parse the input if the input is a string. Default: "". unix.precision The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to generate the output when type=unix or used to parse the input if the input is a Long. Note: This SMT causes precision loss during conversions from, and to, values with sub-millisecond components. Default: milliseconds. Example Sample configuration: "transforms": "TimestampConverter", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.field": "last_login_date", "transforms.TimestampConverter.format": "yyyy-MM-dd", "transforms.TimestampConverter.target.type": "string" Before: 1702041416 After: 2023-12-08 TimestampRouter TimestampRouter SMT updates the record’s topic field as a function of the original topic value and the record timestamp. This is mainly useful for sink connectors, because the topic field is often used to determine the equivalent entity name in the destination system (for example, a database table or search index name). TimestampRouter SMT should be used with sink connectors only. Configuration Property key Description topic.format Format string that can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively. timestamp.format Format string for the timestamp that is compatible with java.text.SimpleDateFormat. Example Sample configuration: "transforms": "router", "transforms.router.type": "org.apache.kafka.connect.transforms.TimestampRouter", "transforms.router.topic.format": "${topic}_${timestamp}", "transforms.router.timestamp.format": "YYYY-MM-dd" ValueToKey ValueToKey SMT replaces the record key with a new key formed from a subset of fields in the record value. Configuration Property key Description fields Comma-separated list of field names on the record value to extract as the record key. Example Sample configuration: "transforms": "valueToKey", "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.valueToKey.fields": "txn-id" Error handling By default, Error tolerance is set to NONE, so SMTs fail for any exception (notably, data parsing or data processing errors). To avoid the connector crashing for data issues, set Error tolerance to ALL, and specify Dead Letter Queue Topic Name as a place where failed messages are redirected. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Monitor Connectors Sizing Connectors