Skip to main content
Version: 22.3

Data Transforms

caution

Lab features are not supported for production deployments.

  • Redpanda recommends not using Data Transforms with Tiered Storage.
  • Data Transforms are not supported in Redpanda Cloud.

Redpanda Data Transforms let users perform simple data transformations on topics without leaving the platform, such as capitalizing strings or filtering messages to adhere to GDPR. This eliminates data going back and forth by removing the need to consume data streams from a separate system for simple transformations.

Redpanda lets you deploy custom Node.js programs to handle the entire consume-transform-produce cycle for you. This is handled by a transformation engine built on top of Wasm technology.

You can jump start your data transformation development with rpk. Redpanda provides a sample Node.js project you can use to start developing immediately. Then you can deploy the data transform to Redpanda for all the brokers to run.

Under the hood, Redpanda reads from an immutable Apache Kafka® topic, applies a data transformation process, and then produces another immutable Kafka topic. That way, the data is transformed into a new topic with the original topic preserved without any data loss.

note

The data transformation service runs with the same privileges as Redpanda, and it has permission to read your data. Only run code from trusted authors.

Prerequisites

Make sure you have the following prerequisites before you configure data transformations and start the Wasm engine:

  • Redpanda version v21.12.1-wasm-beta1 or later.
  • Node.js version 12 or later installed on the machine where you want to run the transform bundle.
  • A JavaScript script that transforms the data.
  • Source topics that you want to consume from. The destination topics can be generated on demand when your transformation runs.

Set up

Enable data transforms

To enable the coprocessor that runs the data transformation, set the cluster properties and enable the engine in the redpanda.yaml configuration file.

On the cluster level, set the enable_coproc property to true:

rpk cluster config set enable_coproc true

Also set enable_idempotence property to false:

rpk cluster config set enable_idempotence false 

Finally, on the node level, add the following flags under redpanda in the redpanda.yaml file:

note

The default location of redpanda.yaml is /etc/redpanda/redpanda.yaml.

enable_coproc: true
developer_mode: true
enable_idempotence : false

Optionally, change the port that the coprocessor reads from:

coproc_supervisor_server:
address: 0.0.0.0
port: <new_port>

The default location of the Data Transforms log file is: /var/lib/redpanda/coprocessor/wasm_engine.log. To configure the location of the log file in redpanda.yaml:

coproc_engine:
logFilePath: <file_path>

Here’s an example redpanda.yaml file with the engine enabled:

config_file: /etc/redpanda/redpanda.yaml
node_uuid: mecCncdqAUKFoM2ojvR2Ai86kGWuhiJ95xpNCsKwe6VTgvHoJ
pandaproxy: {}
redpanda:
admin:
- address: 0.0.0.0
port: 9644
data_directory: /var/lib/redpanda/data
developer_mode: true ## because this is a lab feature, you must set developer mode as true
enable_coproc: true ## data transform flag
enable_idempotence: false ## this flag is mandatory for lab features
coproc_supervisor_server:
address: 0.0.0.0 ## optionally change the ip address
port: <new port> ## remember to change the port
kafka_api:
- address: 0.0.0.0
port: 9092
node_id: 0
rpc_server:
address: 0.0.0.0
port: 33145
seed_servers: []
coproc_engine:
logFilePath: <file_path> ## optional: change the directory where logs are written
rpk:
coredump_dir: /var/lib/redpanda/coredump
enable_memory_locking: false
enable_usage_stats: false
overprovisioned: false
tune_aio_events: false
tune_ballast_file: false
tune_clocksource: false
tune_coredump: false
tune_cpu: false
tune_disk_irq: false
tune_disk_nomerges: false
tune_disk_scheduler: false
tune_disk_write_cache: false
tune_fstrim: false
tune_network: false
tune_swappiness: false
tune_transparent_hugepages: false
schema_registry: {}

Save redpanda.yaml.

Start the Wasm engine

After the configuration is complete, start the Wasm engine.

To start the Wasm engine, run:

sudo systemctl start wasm_engine

The Wasm engine automatically starts Redpanda as well. To check the status of the Wasm engine:

systemctl status wasm_engine

The command returns something similar to the following:

● wasm_engine.service - Redpanda`s wasm engine, your on-broker programmable data transformer
Loaded: loaded (/lib/systemd/system/wasm_engine.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:30 -03; 1 months 15 days ago
Main PID: 865 (node)
Tasks: 11 (limit: 9468)
Memory: 9.2M
CGroup: /wasm.slice/wasm_engine.service
└─865 /opt/redpanda/bin/node /opt/wasm/main.js /etc/redpanda/redpanda.yaml

To check Redpanda’s status, run:

systemctl status redpanda

It return something similar to the following:

● redpanda.service - Redpanda, the fastest queue in the West.
Loaded: loaded (/lib/systemd/system/redpanda.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:45 -03; 1 months 15 days ago
Main PID: 1084 (redpanda)
Status: "redpanda is ready! - v21.11.1 - f314d5522ad033fd50080d3f1fe0bf5b2c5a1042-dirty"
Tasks: 8 (limit: 9468)
Memory: 426.9M
CGroup: /redpanda.slice/redpanda.service
└─1084 /opt/redpanda/bin/redpanda --redpanda-cfg /etc/redpanda/redpanda.yaml --lock-memory=false

Run the data transform

Generate the data transform package

The data transform is packaged in a Node.js project and uses the Wasm instruction format.

To create the template project, run:

rpk wasm generate <project_name>

Remember to change the project_name.

Example transformation

For example, to create a project that changes the text in your events to all uppercase:

rpk wasm generate uppercase

A directory is created with the project name. The directory contains the following files:

**uppercase/**
├── package.json
├── **src**
│ └── main.js
├── **test**
│ └── main.test.js
└── **webpack.js**

Take note of the following files in the project:

  • /src/main.js - This file contains your transform logic and hooks into the API to define the event inputs.
  • /src/package.json - If your transform requires Node.js dependencies, you must add them to this file.

The sample project

The sample project contains this main.js file:

const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@vectorizedio/wasm-api");
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);
/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}
/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;

This file imports the following constants from the Wasm API:

const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@vectorizedio/wasm-api");

It then creates a constant variable to hold the function SimpleTransform. This is the main function to use in the project.

const transform = new SimpleTransform();

It fills the subscribe list with the topic and the policy that it will use to process new messages.

transform.subscribe([["test-topic", PolicyInjection.Stored]]);

To add multiple source topics, add the topic and policy as pairs:

transform.subscribe[[<topic1>,<policyA>],[<topic2>,<policyB>]]
note

Run rpk create test-topic to create the source topics before you deploy the transformation. If the topic doesn't exist when the transformation is deployed, you might encounter a deployment error.

The PolicyInjection parameter can have the following values:

  • PolicyInjection.Earliest - The earliest offset. Transforms all of the events in the topic from offset 0.
  • PolicyInjection.Latest - The latest offset. Transforms only the current incoming events.
  • PolicyInjection.Stored - The stored offset. Transforms the events starting from the latest recorded offset on disk. If no offsets are recorded, the earliest offset is processed.

Next, it sets the policy that tells the coprocessor how to handle errors:

transform.errorHandler(PolicyError.SkipOnFailure);

The PolicyError values are:

  • PolicyError.SkipOnFailure - If there's a failure, it skips to the next event.
  • PolicyError.Deregister - If there's a failure, the coprocessor is removed.

The following section contains the logic to apply the uppercase rule. There are multiple ways to do this, but here it’s flipping the ASCII table to uppercase for every alphabetical character.

/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}

The logic is applied to the processRecord function. The transformedRecord variable obtains a recordBatch from the topic subscribed to, applies the uppercase function, and stores a map called records.

The generated transformedRecord is set into the topic named result-topic.

It then creates a promise that is required by the API to process, and it ends by exporting this transform:

/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;

You can change the result-topic to any topic name.

To produce onto more than one destination topic, add another line in the following format:

result.set("<destination_topic>", transformedRecord);

The actual name of the destination topic is created with the format <source>._<destination>_. If the destination topic doesn't already exist, it's created automatically during script deployment.

If you have other mechanisms to auto-generate topics (for example if auto_create_topics_enabled is set to True in your Redpanda configuration file), you might run into issues. In this example, if you set up a consumer before your transformation starts to write data into it, Redpanda creates a topic automatically for the consumer and the coprocessor won't be able to write data into it.

The batch API is Bytes-In-Bytes-Out. Redpanda highly recommends that you build deterministic functions based on the input to facilitate debugging your applications.

If your transform requires Node.js dependencies, add them to the /src/package.json file.

Prepare the script for deployment

Because the transform is packaged in a Node.js project, you must install the dependencies and build the script that runs the transform.

To do this, run the following commands in the project directory:

npm install
npm run build

The build command creates the main.js JavaScript file in the /dist directory that contains the compiled transform bundle.

Deploy the transform

To enable the transform to start consuming and producing events, you must deploy it in Redpanda with a name and description.

As with other rpk commands, you must specify the brokers in the cluster and all of the authentication parameters (including user, password, TLS) for the brokers.

note

If the source topic doesn't exist, the deployment fails. If the target topic already exists, it uses the existing topic.

To deploy the sample transform, run:

rpk wasm deploy uppercase/dist/main.js --name uppercase --description "Converts uppercase text to lowercase"

Multiple Transforms

If you run different types of transformations on different topics, it's advantageous to have multiple deployments. Every time a transformation happens, you subscribe to one topic and produce another. This creates a processing overhead.

After you run the transform

After you run the transform, you can verify and clean your transform in Redpanda.

Verify the transform

After the transform is deployed, Redpanda processes every event produced to the source topic and runs the logic that is defined by the transform.

To see the sample transform in action:

  1. Run rpk topic consume test_topic._result_.
  2. Produce events to the source topic:
    a. In a second terminal run: rpk topic produce test-topic.
    b. Enter some text and click CTRL+D to send the event to the source topic.
  3. In the terminal that shows the consumed events, you see the text that you produced and is now replaced with uppercase characters.

Clean up

To stop a transform, you must remove the transform from the cluster brokers.

For example, run rpk wasm remove uppercase.

The transform stops processing events and is removed.

Delete output topics

Delete an output topic the way you would delete a Kafka topic.

Before you delete a topic, shut down the coprocessors producing to the topic.

You can delete a topic with any Kafka client, or you can use rpk:

rpk topic delete |topic|

Reference

To see what else rpk wasm can do, run:

rpk wasm -h

See also rpk Commands.

Produce directly into the coprocessor

There's a global control topic for each Redpanda cluster with the name coprocessor_internal_topic. This allows support for any Kafka client, so you're not required to use rpk. If someone publishes to the topic, it invokes a deployer implicitly. However, you must publish with the correct headers and format.

Source code

You can view the possible export values in Redpanda’s GitHub repository.

The API package is published under npm.

Lab limitations

  • Redpanda doesn't use Raft to publish and replicate the data. This is because the same transform is applied locally on all nodes in the cluster and it's possible to have inconsistencies in certain cases. For example, you may have inconsistencies if you have a topic with a replication factor > 1 and you have stateful or idempotent coprocessors producing different data onto those replication topics.

  • The coprocessor doesn't have system limits. For example, there's no limit for processing time, memory usage, or number of topics created.

  • The coprocessor doesn't save your state. For example, if you have a counter and there's a crash, you lose that counter. The best practice is to always avoid stateful implementations. The only state that is kept and checkpointed is your offset, just like Kafka. The semantics of your materialized topics is At-Least-Once. Redpanda saves the offset from wherever you were reading. In the case of a crash, your deployed code is reprocessed based on the PolicyInjection policy. If you use PolicyInjection.Stored, for example, it is reprocessed from whatever offset was saved before the crash.

  • It's not possible to pipeline multiple scripts so that you can pass transforms through one another (such as A->B->C->D). The API only maps topic(s)-to-topic(s).

  • Data Transforms are not supported in Redpanda Cloud.


Suggested reading