Skip to main content
Version: 22.1

Data Transforms

caution

This feature is in technical preview and is not supported for production. For more information about technical previews, see Technical Preview.

It is recommended that you do not use Data Transforms with Tiered Storage. If you do, there is a likelihood that you will encounter problems with topics.

Data Transforms are not supported in Redpanda Cloud.

Redpanda Data Transforms enable users to perform simple data transformations on topics without leaving the platform, such as capitalizing strings, and filtering messages to adhere to GDPR. This eliminates data going back and forth by removing the need to consume data streams from a separate system for simple transformations.

Redpanda allows you to deploy custom Node.js programs to handle the entire consume-transform-produce cycle for you. This is handled by a transformation engine built on top of Wasm technology.

You can jump-start your data transformation development with rpk. Redpanda provides a sample Node.js project that you can use to start developing immediately. Then you can deploy the data transform to Redpanda for all of the brokers to run.

Under the hood, Redpanda reads from an immutable Apache Kafka® topic, applies a data transformation process, and then produces another immutable Kafka topic. That way, the data is transformed into a new topic and we preserve the original topic without any data loss.

note

The data transformation service will run with the same privileges that Redpanda has and it will have permission to read your data. Only run code from trusted authors.

Prerequisites

Verify the following before you configure data transformations and start the Wasm engine:

  • Data Transforms requires Redpanda version v21.12.1-wasm-beta1 or later.
  • Node.js version 12 or later must be installed on the machine where you want to run the transform bundle.
  • You must have a JavaScript script that transforms the data.
  • You must have source topics that you want to consume from. The destination topics can be generated on demand when your transformation runs.

Set up

In this section, we'll cover how to setup your transformations in Redpanda.

Enable data transforms

To enable the coprocessor that runs the data transformation, you must set the configuration properties at the cluster level with (centralized configuration)[/docs/cluster-administration/cluster-property-configuration] and enable the engine in the redpanda.yaml configuration file.

On the cluster level, you need to set the enable_coproc property to true using (cluster configuration rpk commands)[/docs/cluster-administration/cluster-property-configuration]:

rpk cluster config set enable_coproc true

Also set enable_idempotence property to false:

rpk cluster config set enable_idempotence false 

Finally, on the node level, add the following flags under redpanda in the redpanda.yaml file:

note

The default location of redpanda.yaml is /etc/redpanda/redpanda.yaml.

enable_coproc: true
developer_mode: true
enable_idempotence : false

Optionally, you can change the port that the coprocessor reads from. To do this, add this section:

coproc_supervisor_server:
address: 0.0.0.0
port: <new_port>

The default location of the Data Transforms log file is: /var/lib/redpanda/coprocessor/wasm_engine.log. You can configure the location of the log file in redpanda.yaml with this property:

coproc_engine:
logFilePath: <file_path>

Here’s an example redpanda.yaml file with the engine enabled:

config_file: /etc/redpanda/redpanda.yaml
node_uuid: mecCncdqAUKFoM2ojvR2Ai86kGWuhiJ95xpNCsKwe6VTgvHoJ
pandaproxy: {}
redpanda:
admin:
- address: 0.0.0.0
port: 9644
data_directory: /var/lib/redpanda/data
developer_mode: true ## since this is a tech preview you must set developer mode as true
enable_coproc: true ## data transform flag
enable_idempotence: false ## this flag is also mandatory for the tech preview
coproc_supervisor_server:
address: 0.0.0.0 ## if you want, you can change the ip address here
port: <new port> ## remember to change the port here
kafka_api:
- address: 0.0.0.0
port: 9092
node_id: 0
rpc_server:
address: 0.0.0.0
port: 33145
seed_servers: []
coproc_engine:
logFilePath: <file_path> ## optional: change the directory where logs are written
rpk:
coredump_dir: /var/lib/redpanda/coredump
enable_memory_locking: false
enable_usage_stats: false
overprovisioned: false
tune_aio_events: false
tune_ballast_file: false
tune_clocksource: false
tune_coredump: false
tune_cpu: false
tune_disk_irq: false
tune_disk_nomerges: false
tune_disk_scheduler: false
tune_disk_write_cache: false
tune_fstrim: false
tune_network: false
tune_swappiness: false
tune_transparent_hugepages: false
schema_registry: {}

Save redpanda.yaml.

Start the Wasm engine

After the configuration is complete, you can start the Wasm engine.

To start the Wasm engine, run this command:

sudo systemctl start wasm_engine

When you start the Wasm engine, it automatically starts Redpanda as well. You can check the status of the Wasm engine with this command:

systemctl status wasm_engine

The command will return something similar to this:

● wasm_engine.service - Redpanda`s wasm engine, your on-broker programmable data transformer
Loaded: loaded (/lib/systemd/system/wasm_engine.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:30 -03; 1 months 15 days ago
Main PID: 865 (node)
Tasks: 11 (limit: 9468)
Memory: 9.2M
CGroup: /wasm.slice/wasm_engine.service
└─865 /opt/redpanda/bin/node /opt/wasm/main.js /etc/redpanda/redpanda.yaml

To check Redpanda’s status, run this command:

systemctl status redpanda

It will return something similar to this:

● redpanda.service - Redpanda, the fastest queue in the West.
Loaded: loaded (/lib/systemd/system/redpanda.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:45 -03; 1 months 15 days ago
Main PID: 1084 (redpanda)
Status: "redpanda is ready! - v21.11.1 - f314d5522ad033fd50080d3f1fe0bf5b2c5a1042-dirty"
Tasks: 8 (limit: 9468)
Memory: 426.9M
CGroup: /redpanda.slice/redpanda.service
└─1084 /opt/redpanda/bin/redpanda --redpanda-cfg /etc/redpanda/redpanda.yaml --lock-memory=false

Run the data transform

In this section, we're going to cover how to run your transformations in Redpanda.

Generate the data transform package

The data transform is packaged in a Node.js project and uses the Wasm instruction format.

To create the template project, run:

rpk wasm generate <project_name>

Remember to change the project_name.

Example transformation

For example, let's create a project that will change the text in your events to all uppercase:

rpk wasm generate uppercase

A directory is created with the project name. The directory contains the following files:

**uppercase/**
├── package.json
├── **src**
│ └── main.js
├── **test**
│ └── main.test.js
└── **webpack.js**

Take note of the following files in the project: :

  • /src/main.js - This file contains your transform logic and hooks into the API to define the event inputs.
  • /src/package.json - If your transform requires Node.js dependencies, you must add them to this file.

The sample project

The sample project contains this main.js file:

const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@vectorizedio/wasm-api");
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);
/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}
/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;

Let's dissect this file to understand every line.

First, it imports these constants from the Wasm API:

const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@vectorizedio/wasm-api");

It then creates a constant variable to hold the function SimpleTransform. This is the main function that we'll use in the project.

const transform = new SimpleTransform();

It fills the subscribe list with the topic and the policy that it will use to process new messages.

transform.subscribe([["test-topic", PolicyInjection.Stored]]);

To add multiple source topics, add the topic and policy as pairs:

transform.subscribe[[<topic1>,<policyA>],[<topic2>,<policyB>]]
note

Run rpk create test-topic to create the source topics before you deploy the transformation. If the topic does not exist when the transformation is deployed, you might encounter a deployment error.

The PolicyInjection parameter can have the following values:

  • PolicyInjection.Earliest - The earliest offset. Transforms all of the events in the topic from offset 0.
  • PolicyInjection.Latest - The latest offset. Transforms only the current incoming events.
  • PolicyInjection.Stored - The stored offset. Transforms the events starting from the latest recorded offset on disk. If no offsets are recorded, the earliest offset is processed.

Next, it sets the policy that tells the coprocessor how to handle errors:

transform.errorHandler(PolicyError.SkipOnFailure);

The PolicyError values are:

  • PolicyError.SkipOnFailure - If there's a failure, it skips to the next event.
  • PolicyError.Deregister - If there's a failure, the coprocessor will be removed.

The next section contains the logic that is used to apply the uppercase rule. There are multiple ways to do this, but here it’s flipping the ASCII table to uppercase for every alphabetical character.

/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}

The logic is applied to the processRecord function. The transformedRecord variable obtains a recordBatch from the topic that we subscribed to, applies the uppercase function, and stores a map called records.

The generated transformedRecord is set into the topic named result-topic.

To finish, it creates a promise that is required by the API to process, and it ends by exporting this transform:

/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;

You can change the result-topic to any topic name that you like.

To produce onto more than one destination topic, add another line in the following format:

result.set("<destination_topic>", transformedRecord);

The actual name of the destination topic is created with the format of: <source>._<destination>_. If the destination topic does not already exist, it is created automatically during script deployment.

If you have other mechanisms to auto-generate topics, for example if auto_create_topics_enabled is set to True in your Redpanda configuration file, you might run into issues. In this example, if you set up a consumer before your transformation starts to write data into it, Redpanda will create a topic automatically for the consumer and the coprocessor won't be able to write data into it.

The batch API is Bytes-In-Bytes-Out. We highly recommend that you build deterministic functions based on the input to facilitate debugging your applications.

If your transform requires Node.js dependencies, add them to the /src/package.json file.

Prepare the script for deployment

Because the transform is packaged in a Node.js project, you must install the dependencies and build the script that runs the transform.

To do this, run these commands in the project directory:

npm install
npm run build

The build command creates the main.js JavaScript file in the /dist directory that contains the compiled transform bundle.

Deploy the transform

To enable the transform to start consuming and producing events, you must deploy it in Redpanda with a name and description.

As with other rpk commands, you must specify the brokers in the cluster and all of the authentication parameters (including user, password, TLS) for the brokers.

note
  • If the source topic does not exist, the deployment will fail. If the target topic already exists, it will use the existing topic.

To deploy the sample transform, run the following command:

rpk wasm deploy uppercase/dist/main.js --name uppercase --description "Converts uppercase text to lowercase"

Multiple Transforms

If you execute different types of transformations on different topics, it's advantageous to have multiple deployments. Every time a transformation happens, you subscribe to one topic and produce another. This creates a processing overhead.

After you run the transform

In this section, we're going to go over on how you can verify and clean your transformations in Redpanda.

Verify the transform

After the transform is deployed, Redpanda processes every event produced to the source topic and runs the logic that is defined by the transform.

To see the sample transform in action, follow these steps:

  1. Run: rpk topic consume test_topic._result_
  2. Produce events to the source topic: a. In a second terminal run: rpk topic produce test-topic b. Enter some text and press CTRL+D to send the event to the source topic.
  3. In the terminal that shows the consumed events, you’ll see the text that you produced and is now replaced with uppercase characters.

Clean up

To stop a transform, you must remove the transform from the cluster brokers.

For our sample transform, run rpk wasm remove uppercase.

The transform stops processing events and is removed.

Delete output topics

Delete an output topic the way you would delete a normal Kafka topic.

Before you delete a topic, shut down the coprocessors that are producing to the topic.

You can delete a topic with any Kafka client or you can use rpk with this command:

rpk topic delete |topic|

Reference

Additional commands

If you want to see what else rpk wasm can do, run the help command:

rpk wasm -h

You can also refer to rpk's reference page for further commands.

Produce directly into the coprocessor

There is a global control topic for each Redpanda cluster with the name coprocessor_internal_topic. This allows support for any Kafka client so you are not required to use rpk. If someone publishes to the topic, it will invoke a deployer implicitly. However, you must publish with the correct headers and format.

Source code

You can view the possible export values in Redpanda’s GitHub repository.

The API package is published under npm.

Tech preview limitations

We don't use Raft to publish and replicate the data. This is because the same transform is applied locally on all nodes in the cluster and it's possible to have inconsistencies in certain cases. For example, you may have inconsistencies if you have a topic with a replication factor > 1 and you have stateful or idempotent coprocessors producing different data onto those replication topics.

The coprocessor does not have system limits. For example, there is no limit for processing time, memory usage or number of topics created.

The coprocessor doesn't save your state. For example, if you have a counter and there is a crash, you will lose that counter. The best practice is to always avoid stateful implementations. The only state that is kept and checkpointed is your offset, just like Kafka. The semantics of your materialized topics is At-Least-Once. Redpanda saves the offset from wherever you were reading. In the case of a crash, your deployed code will be reprocessed based on the PolicyInjection policy. If you use PolicyInjection.Stored, for example, it will be reprocessed from whatever offset was saved before the crash.

It's not possible to pipeline multiple scripts so that you can pass transforms through one another (such as A->B->C->D). The API only maps topic(s)-to-topic(s).

Data Transforms are not supported in Redpanda Cloud.


Suggested reading

  • Redpanda Wasm Engine Architecture article
  • How Redpanda built its data transformation engine with the Wasm runtime article