Kafka Client Code Examples
Redpanda is Kafka API-compatible, which means that despite Redpanda being a new streaming data platform, you can leverage the countless client libraries that have been created for Kafka. If you find something that is not supported, reach out to the Redpanda team in the community Slack.
This example walks you through how to get started with a variety of Kafka client libraries by creating a topic, producing some data, and consuming it back.
Set up Redpanda
Follow the Quickstart guide to spin up a development environment in Docker. If you already have a Redpanda Cloud cluster, then you’re good to go; this example will show you how to connect to that too.
If you’re running Redpanda on your laptop, or in a shared development
environment, then avoid using Redpanda’s optimized production settings. Running
sudo rpk redpanda tune all or manually configuring Redpanda for production
might affect your experience with other applications running on your machine.
|
Prepare the client environment
-
Go
-
Node.js
-
Python
# Create and enter the project folder
mkdir redpanda-go; cd redpanda-go
# Initialize the project
go mod init com/redpanda/example
# Install required dependencies
go get github.com/twmb/franz-go
go get github.com/twmb/franz-go/pkg/kadm
go get github.com/twmb/tlscfg
go get github.com/twmb/franz-go/pkg/sasl/scram@v1.9.0
Download and install Node.js. This example uses the KafkaJS library.
# Create and enter the project folder
mkdir redpanda-node; cd redpanda-node
# Generate package.json (the default values are fine)
npm init
# Install required dependencies
npm i -D typescript
npm i -D @types/node
npm i kafkajs
# Generate tsconfig.json
tsc --init
Download and install Python 3 from python.org. This example uses the kafka-python library.
# Create and enter the project folder
mkdir redpanda-python; cd redpanda-python
# Create virtual environment
python3 -m venv .env
source .env/bin/activate
# Install dependencies
(.env) pip install kafka-python
Create topic
You can create a topic using Redpanda’s command line interface rpk:
-
Local
-
Cloud
rpk topic create demo
TOPIC STATUS
demo OK
rpk topic list
NAME PARTITIONS REPLICAS
demo 1 1
# Set the REDPANDA_BROKERS environment variable with the comma-separated list
# of cluster hosts. You can copy the list from the Overview tab in Redpanda
# Cloud.
export REDPANDA_BROKERS="<TODO: change this to your cluster hosts>"
# Before running this command, create a service account in the Security tab in
# Redpanda Cloud and add the necessary ACLs to allow the service account to
# create, write to, and read from a topic with the name `demo`.
rpk topic create demo \
--tls-enabled \
--sasl-mechanism SCRAM-SHA-256 \
--user "<TODO: change this to your service account name>" \
--password "<TODO: change this to your service account password>"
TOPIC STATUS
demo OK
You can also create a topic programmatically:
-
Go
-
Node.js
-
Python
admin.go
package main
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
topic := "demo"
seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
)
if err != nil {
panic(err)
}
defer client.Close()
admin := kadm.NewClient(client)
defer admin.Close()
ctx := context.Background()
// Create a topic with a single partition and single replica
resp, _ := admin.CreateTopics(ctx, 1, 1, nil, topic)
for _, ctr := range resp {
if ctr.Err != nil {
fmt.Printf("Unable to create topic '%s': %s", ctr.Topic, ctr.Err)
} else {
fmt.Printf("Created topic '%s'", ctr.Topic)
}
}
}
admin.ts
const {Kafka} = require("kafkajs")
const redpanda = new Kafka({brokers: ["localhost:9092"]})
const admin = redpanda.admin()
admin.connect().then(() => {
admin.createTopics({
topics: [{
topic: "demo",
numPartitions: 1,
replicationFactor: 1
}]
})
.then((resp) => {
resp ? console.log("Created topic") :
console.log("Failed to create topic")
})
.finally(() => admin.disconnect())
})
admin.py
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
try:
demo_topic = NewTopic(name="demo", num_partitions=1, replication_factor=1)
admin.create_topics(new_topics=[demo_topic])
print("Created topic")
except TopicAlreadyExistsError as e:
print("Topic already exists")
finally:
admin.close()
Connecting to Redpanda Cloud
Connecting to a local Redpanda cluster (or a cluster with no security) is as simple as specifying a list of brokers; however, this is done differently in Redpanda Cloud.
You can configure Redpanda Cloud to use SASL/SCRAM (username and password) or mTLS based authentication. These modes of security require some additional parameters to be specified when creating a client connection.
Redpanda Cloud environments use certificates signed by
Let’s Encrypt. Most programming languages will
load their root certificate authority (ISRG Root X1 ) by default so you
shouldn’t need to provide a custom CA.
|
-
Go
-
Node.js
-
Python
package main
import (
"crypto/tls"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
func main() {
seeds := []string{"<TODO: change this to your cluster hosts>"}
opts := []kgo.Opt{}
opts = append(opts,
kgo.SeedBrokers(seeds...),
)
// Initialize public CAs for TLS
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
/* Initialize mTLS
tc, err := tlscfg.New(
// Custom CA is only required if you are using self-signed certificates
tlscfg.MaybeWithDiskCA("ca.crt", tlscfg.ForClient),
tlscfg.MaybeWithDiskKeyPair("client.crt", "client.key"),
)
if err != nil {
panic(err)
}
opts = append(opts, kgo.DialTLSConfig(tc))
*/
// Initializes SASL/SCRAM
opts = append(opts, kgo.SASL(scram.Auth{
User: "<TODO: change this to your service account name>",
Pass: "<TODO: change this to your service account password>",
}.AsSha256Mechanism()))
client, _ := kgo.NewClient(opts...)
defer client.Close()
}
const {Kafka} = require("kafkajs")
const redpanda = new Kafka({
brokers: ["<TODO: change this to your cluster hosts>"],
ssl: {
// mTLS client certificate and private key can be downloaded from the
// Overview tab in the Redpanda Cloud UI:
// cert: fs.readFileSync("client.crt", "utf8"),
// key: fs.readFileSync("client.key", "utf8"),
// Custom CA is only required if you are using self-signed certificates
// ca: fs.readFileSync("ca.crt", "utf8")
},
sasl: {
mechanism: "scram-sha-256",
username: "<TODO: change this to your service account name>",
password: "<TODO: change this to your service account password>"
}
})
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="<TODO: change this to your cluster hosts>",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="<TODO: change this to your service account name>",
sasl_plain_password="<TODO: change this to your service account password>",
# mTLS client certificate and private key can be downloaded from the
# Overview tab in the Redpanda Cloud UI:
# ssl_certfile="client.crt",
# ssl_keyfile="client.key",
# Custom CA is only required if you are using self-signed certificates
# ssl_cafile="ca.crt"
)
Create producer
After you have a topic, you can create a producer and send some messages:
-
Go
-
Node.js
-
Python
producer.go
package main
import (
"context"
"fmt"
"os"
"sync"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
topic := "demo"
hostname, _ := os.Hostname()
ctx := context.Background()
seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
)
if err != nil {
panic(err)
}
defer client.Close()
var wg sync.WaitGroup
for i := 1; i < 100; i++ {
wg.Add(1)
record := &kgo.Record {
Topic: topic,
Key: []byte(hostname),
Value: []byte(fmt.Sprintf("Message %d", i)),
}
client.Produce(ctx, record, func(record *kgo.Record, err error) {
defer wg.Done()
if err != nil {
fmt.Printf("Error sending message: %v \n", err)
} else {
fmt.Printf("Message sent: topic: %s, offset: %d, value: %s \n",
topic, record.Offset, record.Value)
}
})
}
wg.Wait()
// Alternatively, produce messages synchronously
for i := 100; i < 200; i++ {
record := &kgo.Record{
Topic: topic,
Key: []byte(hostname),
Value: []byte(fmt.Sprintf("Synchronous message %d", i)),
}
results := client.ProduceSync(ctx, record)
for _, pr := range results {
if pr.Err != nil {
fmt.Printf("Error sending synchronous message: %v \n", pr.Err)
} else {
fmt.Printf("Message sent: topic: %s, offset: %d, value: %s \n",
topic, pr.Record.Offset, pr.Record.Value)
}
}
}
}
producer.ts
const os = require("os")
const {Kafka, CompressionTypes} = require("kafkajs")
const redpanda = new Kafka({brokers: ["localhost:9092"]})
const producer = redpanda.producer()
const sendMessage = (msg: string) => {
return producer.send({
topic: "demo",
compression: CompressionTypes.GZIP,
messages: [{
// Messages with the same key are sent to the same topic partition for
// guaranteed ordering
key: os.hostname(),
value: JSON.stringify(msg)
}]
})
.catch((e) => {
console.error(`Unable to send message: ${e.message}`, e)
})
}
const run = async () => {
await producer.connect()
for (let i = 0; i < 100; i++) {
sendMessage(`message ${i}`).then((resp) => {
console.log(`Message sent: ${JSON.stringify(resp)}`)
})
}
}
run().catch(console.error)
process.once("SIGINT", async () => {
try {
await producer.disconnect()
console.log("Producer disconnected")
} finally {
process.kill(process.pid, "SIGINT")
}
})
producer.py
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers="localhost:9092")
hostname = str.encode(socket.gethostname())
# Produce asynchronously
for i in range(100):
msg = f"message #{i}"
producer.send(
"demo",
key=hostname,
value=str.encode(msg)
)
producer.flush()
def on_success(metadata):
print(f"Sent to topic '{metadata.topic}' at offset {metadata.offset}")
def on_error(e):
print(f"Error sending message: {e}")
# Produce asynchronously with callbacks
for i in range(100, 200):
msg = f"message with callbacks #{i}"
future = producer.send(
"demo",
key=hostname,
value=str.encode(msg)
)
future.add_callback(on_success)
future.add_errback(on_error)
producer.flush()
# Wait for every future to produce synchronously
for i in range(200, 300):
msg = f"synchronous message #{i}"
future = producer.send(
"demo",
key=hostname,
value=str.encode(msg)
)
try:
metadata = future.get(timeout=5)
print(f"Sent to topic '{metadata.topic}' at offset {metadata.offset}")
except KafkaError as e:
print(f"Error sending message: {e}")
pass
Create consumer
You can create a consumer to read the data back out of the topic:
-
Go
-
Node.js
-
Python
consumer.go
package main
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
topic := "demo"
ctx := context.Background()
seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("demo-group"),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
)
if err != nil {
panic(err)
}
defer client.Close()
for {
fetches := client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
// All errors are retried internally when fetching, but non-retriable
// errors are returned from polls so that users can notice and take
// action.
panic(fmt.Sprint(errs))
}
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
topicInfo := fmt.Sprintf("topic: %s (%d|%d)",
record.Topic, record.Partition, record.Offset)
messageInfo := fmt.Sprintf("key: %s, Value: %s",
record.Key, record.Value)
fmt.Printf("Message consumed: %s, %s \n", topicInfo, messageInfo)
}
}
}
consumer.ts
const {Kafka} = require("kafkajs")
const redpanda = new Kafka({brokers: ["localhost:9092"]})
const consumer = redpanda.consumer({groupId: "demo-group"})
const run = async () => {
await consumer.connect()
await consumer.subscribe({
topic: "demo",
fromBeginning: true
})
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
const topicInfo = `topic: ${topic} (${partition}|${message.offset})`
const messageInfo = `key: ${message.key}, value: ${message.value}`
console.log(`Message consumed: ${topicInfo}, ${messageInfo}`)
},
})
}
run().catch(console.error)
process.once("SIGINT", async () => {
try {
await consumer.disconnect()
console.log("Consumer disconnected")
} finally {
process.kill(process.pid, "SIGINT")
}
})
consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=["localhost:9092"],
group_id="demo-group",
auto_offset_reset="earliest",
enable_auto_commit=False,
consumer_timeout_ms=1000
)
consumer.subscribe("demo")
for message in consumer:
topic_info = f"topic: {message.topic} ({message.partition}|{message.offset})"
message_info = f"key: {message.key}, {message.value}"
print(f"{topic_info}, {message_info}")
Running
-
Go
-
Node.js
-
Python
# Create the topic
go run admin.go
# Produce some data
go run producer.go
# Consume it back
go run consumer.go
# Create the topic
tsc admin.ts && node admin.js
# Produce some data
tsc producer.ts && node producer.js
# Consume it back
tsc consumer.ts && node consumer.js
# Create the topic
(.env) python3 admin.py
# Produce some data
(.env) python3 producer.py
# Consume it back
(.env) python3 consumer.py
Wrapping up
In this example you developed the building blocks of a Redpanda client application that creates a topic, produces messages to, and consumes messages from a Redpanda cluster running in a local environment, or in Redpanda Cloud.
The code provided here is intentionally simple to help you get started. For additional resources to help you build stream processing applications that can aggregate, join, and filter your data streams, see: