Data Transforms
Lab features are not supported for production deployments.
- Redpanda recommends not using Data Transforms with Tiered Storage.
- Data Transforms are not supported in Redpanda Cloud.
Redpanda Data Transforms let users perform simple data transformations on topics without leaving the platform, such as capitalizing strings or filtering messages to adhere to GDPR. This eliminates data going back and forth by removing the need to consume data streams from a separate system for simple transformations.
Redpanda lets you deploy custom Node.js programs to handle the entire consume-transform-produce cycle for you. This is handled by a transformation engine built on top of Wasm technology.
You can jump start your data transformation development with rpk
. Redpanda provides a sample Node.js project you can use to start developing immediately. Then you can deploy the data transform to Redpanda for all the brokers to run.
Under the hood, Redpanda reads from an immutable Apache Kafka® topic, applies a data transformation process, and then produces another immutable Kafka topic. That way, the data is transformed into a new topic with the original topic preserved without any data loss.
The data transformation service runs with the same privileges as Redpanda, and it has permission to read your data. Only run code from trusted authors.
Prerequisites
Make sure you have the following prerequisites before you configure data transformations and start the Wasm engine:
- Redpanda version v21.12.1-wasm-beta1 or later.
- Node.js version 12 or later installed on the machine where you want to run the transform bundle.
- A JavaScript script that transforms the data.
- Source topics that you want to consume from. The destination topics can be generated on demand when your transformation runs.
Set up
Enable data transforms
To enable the coprocessor that runs the data transformation, set the cluster properties and enable the engine in the redpanda.yaml
configuration file.
On the cluster level, set the enable_coproc
property to true
:
rpk cluster config set enable_coproc true
Also set enable_idempotence
property to false
:
rpk cluster config set enable_idempotence false
Finally, on the node level, add the following flags under redpanda
in the redpanda.yaml
file:
The default location of redpanda.yaml
is /etc/redpanda/redpanda.yaml
.
enable_coproc: true
developer_mode: true
enable_idempotence : false
Optionally, change the port that the coprocessor reads from:
coproc_supervisor_server:
address: 0.0.0.0
port: <new_port>
The default location of the Data Transforms log file is: /var/lib/redpanda/coprocessor/wasm_engine.log
. To configure the location of the log file in redpanda.yaml
:
coproc_engine:
logFilePath: <file_path>
Here’s an example redpanda.yaml
file with the engine enabled:
config_file: /etc/redpanda/redpanda.yaml
node_uuid: mecCncdqAUKFoM2ojvR2Ai86kGWuhiJ95xpNCsKwe6VTgvHoJ
pandaproxy: {}
redpanda:
admin:
- address: 0.0.0.0
port: 9644
data_directory: /var/lib/redpanda/data
developer_mode: true ## because this is a lab feature, you must set developer mode as true
enable_coproc: true ## data transform flag
enable_idempotence: false ## this flag is mandatory for lab features
coproc_supervisor_server:
address: 0.0.0.0 ## optionally change the ip address
port: <new port> ## remember to change the port
kafka_api:
- address: 0.0.0.0
port: 9092
node_id: 0
rpc_server:
address: 0.0.0.0
port: 33145
seed_servers: []
coproc_engine:
logFilePath: <file_path> ## optional: change the directory where logs are written
rpk:
coredump_dir: /var/lib/redpanda/coredump
enable_memory_locking: false
enable_usage_stats: false
overprovisioned: false
tune_aio_events: false
tune_ballast_file: false
tune_clocksource: false
tune_coredump: false
tune_cpu: false
tune_disk_irq: false
tune_disk_nomerges: false
tune_disk_scheduler: false
tune_disk_write_cache: false
tune_fstrim: false
tune_network: false
tune_swappiness: false
tune_transparent_hugepages: false
schema_registry: {}
Save redpanda.yaml
.
Start the Wasm engine
After the configuration is complete, start the Wasm engine.
To start the Wasm engine, run:
sudo systemctl start wasm_engine
The Wasm engine automatically starts Redpanda as well. To check the status of the Wasm engine:
systemctl status wasm_engine
The command returns something similar to the following:
● wasm_engine.service - Redpanda`s wasm engine, your on-broker programmable data transformer
Loaded: loaded (/lib/systemd/system/wasm_engine.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:30 -03; 1 months 15 days ago
Main PID: 865 (node)
Tasks: 11 (limit: 9468)
Memory: 9.2M
CGroup: /wasm.slice/wasm_engine.service
└─865 /opt/redpanda/bin/node /opt/wasm/main.js /etc/redpanda/redpanda.yaml
To check Redpanda’s status, run:
systemctl status redpanda
It return something similar to the following:
● redpanda.service - Redpanda, the fastest queue in the West.
Loaded: loaded (/lib/systemd/system/redpanda.service; enabled; vendor preset: enabled)
Active: active (running) since Mon 2021-12-13 00:49:45 -03; 1 months 15 days ago
Main PID: 1084 (redpanda)
Status: "redpanda is ready! - v21.11.1 - f314d5522ad033fd50080d3f1fe0bf5b2c5a1042-dirty"
Tasks: 8 (limit: 9468)
Memory: 426.9M
CGroup: /redpanda.slice/redpanda.service
└─1084 /opt/redpanda/bin/redpanda --redpanda-cfg /etc/redpanda/redpanda.yaml --lock-memory=false
Run the data transform
Generate the data transform package
The data transform is packaged in a Node.js project and uses the Wasm instruction format.
To create the template project, run:
rpk wasm generate <project_name>
Remember to change the project_name
.
Example transformation
For example, to create a project that changes the text in your events to all uppercase:
rpk wasm generate uppercase
A directory is created with the project name. The directory contains the following files:
**uppercase/**
├── package.json
├── **src**
│ └── main.js
├── **test**
│ └── main.test.js
└── **webpack.js**
Take note of the following files in the project:
/src/main.js
- This file contains your transform logic and hooks into the API to define the event inputs./src/package.json
- If your transform requires Node.js dependencies, you must add them to this file.
The sample project
The sample project contains this main.js
file:
const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@redpanda-data/wasm-api");
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);
/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}
/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;
This file imports the following constants from the Wasm API:
const {
SimpleTransform,
PolicyError,
PolicyInjection
} = require("@redpanda-data/wasm-api");
It then creates a constant variable to hold the function SimpleTransform
. This is the main function to use in the project.
const transform = new SimpleTransform();
It fills the subscribe
list with the topic and the policy that it will use to process new messages.
transform.subscribe([["test-topic", PolicyInjection.Stored]]);
To add multiple source topics, add the topic and policy as pairs:
transform.subscribe[[<topic1>,<policyA>],[<topic2>,<policyB>]]
Run rpk create test-topic
to create the source topics before you deploy the transformation. If the topic doesn't exist when the transformation is deployed, you might encounter a deployment error.
The PolicyInjection
parameter can have the following values:
PolicyInjection.Earliest
- The earliest offset. Transforms all of the events in the topic from offset 0.PolicyInjection.Latest
- The latest offset. Transforms only the current incoming events.PolicyInjection.Stored
- The stored offset. Transforms the events starting from the latest recorded offset on disk. If no offsets are recorded, the earliest offset is processed.
Next, it sets the policy that tells the coprocessor how to handle errors:
transform.errorHandler(PolicyError.SkipOnFailure);
The PolicyError
values are:
PolicyError.SkipOnFailure
- If there's a failure, it skips to the next event.PolicyError.Deregister
- If there's a failure, the coprocessor is removed.
The following section contains the logic to apply the uppercase rule. There are multiple ways to do this, but here it’s flipping the ASCII table to uppercase for every alphabetical character.
/* Auxiliar transform function for records */
const uppercase = (record) => {
const newRecord = {
...record,
value: record.value.map((char) => {
if (char >= 97 && char <= 122) {
return char - 32;
} else {
return char;
}
}),
};
return newRecord;
}
The logic is applied to the processRecord
function. The transformedRecord
variable obtains a recordBatch
from the topic subscribed to, applies the uppercase
function, and stores a map called records
.
The generated transformedRecord
is set into the topic named result-topic
.
It then creates a promise that is required by the API to process, and it ends by exporting this transform:
/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
const transformedRecord = recordBatch.map(({ header, records }) => {
return {
header,
records: records.map(uppercase),
};
});
result.set("result-topic", transformedRecord);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;
You can change the result-topic
to any topic name.
To produce onto more than one destination topic, add another line in the following format:
result.set("<destination_topic>", transformedRecord);
The actual name of the destination topic is created with the format <source>._<destination>_
. If the destination topic doesn't already exist, it's created automatically during script deployment.
If you have other mechanisms to auto-generate topics (for example if auto_create_topics_enabled
is set to True
in your Redpanda configuration file), you might run into issues. In this example, if you set up a consumer before your transformation starts to write data into it, Redpanda creates a topic automatically for the consumer and the coprocessor won't be able to write data into it.
The batch API is Bytes-In-Bytes-Out. Redpanda highly recommends that you build deterministic functions based on the input to facilitate debugging your applications.
If your transform requires Node.js dependencies, add them to the /src/package.json
file.
Prepare the script for deployment
Because the transform is packaged in a Node.js project, you must install the dependencies and build the script that runs the transform.
To do this, run the following commands in the project directory:
npm install
npm run build
The build command creates the main.js
JavaScript file in the /dist
directory that contains the compiled transform bundle.
Deploy the transform
To enable the transform to start consuming and producing events, you must deploy it in Redpanda with a name and description.
As with other rpk
commands, you must specify the brokers in the cluster and all of the authentication parameters (including user, password, TLS) for the brokers.
If the source topic doesn't exist, the deployment fails. If the target topic already exists, it uses the existing topic.
To deploy the sample transform, run:
rpk wasm deploy uppercase/dist/main.js --name uppercase --description "Converts uppercase text to lowercase"
Multiple Transforms
If you run different types of transformations on different topics, it's advantageous to have multiple deployments. Every time a transformation happens, you subscribe to one topic and produce another. This creates a processing overhead.
After you run the transform
After you run the transform, you can verify and clean your transform in Redpanda.
Verify the transform
After the transform is deployed, Redpanda processes every event produced to the source topic and runs the logic that is defined by the transform.
To see the sample transform in action:
- Run
rpk topic consume test_topic._result_
. - Produce events to the source topic:
a. In a second terminal run:rpk topic produce test-topic
.
b. Enter some text and click CTRL+D to send the event to the source topic. - In the terminal that shows the consumed events, you see the text that you produced and is now replaced with uppercase characters.
Clean up
To stop a transform, you must remove the transform from the cluster brokers.
For example, run rpk wasm remove uppercase
.
The transform stops processing events and is removed.
Delete output topics
Delete an output topic the way you would delete a Kafka topic.
Before you delete a topic, shut down the coprocessors producing to the topic.
You can delete a topic with any Kafka client, or you can use rpk
:
rpk topic delete |topic|
Reference
To see what else rpk wasm
can do, run:
rpk wasm -h
See also rpk Commands.
Produce directly into the coprocessor
There's a global control topic for each Redpanda cluster with the name coprocessor_internal_topic
. This allows support for any Kafka client, so you're not required to use rpk. If someone publishes to the topic, it invokes a deployer implicitly. However, you must publish with the correct headers and format.
Source code
You can view the possible export values in Redpanda’s GitHub repository.
The API package is published under npm
.
Lab limitations
Redpanda doesn't use Raft to publish and replicate the data. This is because the same transform is applied locally on all nodes in the cluster and it's possible to have inconsistencies in certain cases. For example, you may have inconsistencies if you have a topic with a replication factor > 1 and you have stateful or idempotent coprocessors producing different data onto those replication topics.
The coprocessor doesn't have system limits. For example, there's no limit for processing time, memory usage, or number of topics created.
The coprocessor doesn't save your state. For example, if you have a counter and there's a crash, you lose that counter. The best practice is to always avoid stateful implementations. The only state that is kept and checkpointed is your offset, just like Kafka. The semantics of your materialized topics is
At-Least-Once
. Redpanda saves the offset from wherever you were reading. In the case of a crash, your deployed code is reprocessed based on thePolicyInjection
policy. If you usePolicyInjection.Stored
, for example, it is reprocessed from whatever offset was saved before the crash.It's not possible to pipeline multiple scripts so that you can pass transforms through one another (such as A->B->C->D). The API only maps topic(s)-to-topic(s).
Data Transforms are not supported in Redpanda Cloud.