Skip to main content
Version: 22.3

Use Redpanda with Kafka Client Libraries

Redpanda is Kafka API-compatible, which means that despite Redpanda being a new streaming data platform, you can leverage the countless client libraries that have been created for Kafka. If you find something that is not supported, reach out to our team in the community Slack.

This example walks you through how to get started with a variety of Kafka client libraries by creating a topic, producing some data, and consuming it back.

Set up Redpanda

Follow the Redpanda Quickstart guide to spin up a development environment in Docker. If you already have a Redpanda Cloud cluster, then you're good to go; this example will show you how to connect to that too.

caution

If you're running Redpanda on your laptop, or in a shared development environment, then avoid using Redpanda's optimized production settings. Running sudo rpk redpanda tune all or manually configuring Redpanda for production might affect your experience with other applications running on your machine.

Prepare the client environment

Download and install Go from go.dev. This Kafka client code example uses the franz-go library.

# Create and enter the project folder
mkdir redpanda-go; cd redpanda-go
# Initialize the project
go mod init com/redpanda/example
# Install required dependencies
go get github.com/twmb/franz-go
go get github.com/twmb/franz-go/pkg/kadm
go get github.com/twmb/tlscfg
go get github.com/twmb/franz-go/pkg/sasl/scram@v1.9.0

Create topic

You can create a topic using Redpanda's command line interface rpk:

rpk topic create demo
TOPIC STATUS
demo OK

rpk topic list
NAME PARTITIONS REPLICAS
demo 1 1

You can also create a topic programmatically:

admin.go
package main

import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
topic := "demo"
seeds := []string{"localhost:9092"}

client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
)
if err != nil {
panic(err)
}
defer client.Close()

admin := kadm.NewClient(client)
defer admin.Close()

ctx := context.Background()
// Create a topic with a single partition and single replica
resp, _ := admin.CreateTopics(ctx, 1, 1, nil, topic)
for _, ctr := range resp {
if ctr.Err != nil {
fmt.Printf("Unable to create topic '%s': %s", ctr.Topic, ctr.Err)
} else {
fmt.Printf("Created topic '%s'", ctr.Topic)
}
}
}

Connecting to Redpanda Cloud

Connecting to a local Redpanda cluster (or a cluster with no security) is as simple as specifying a list of brokers; however, this is done differently in Redpanda Cloud.

You can configure Redpanda Cloud to use SASL/SCRAM (username and password) or mTLS based authentication. These modes of security require some additional parameters to be specified when creating a client connection.

important

Redpanda Cloud environments use certificates signed by Let's Encrypt. Most programming languages will load their root certificate authority (ISRG Root X1) by default so you shouldn't need to provide a custom CA.

package main

import (
"crypto/tls"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)

func main() {
seeds := []string{"<TODO: change this to your cluster hosts>"}

opts := []kgo.Opt{}
opts = append(opts,
kgo.SeedBrokers(seeds...),
)

// Initialize public CAs for TLS
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))

/* Initialize mTLS
tc, err := tlscfg.New(
// Custom CA is only required if you are using self-signed certificates
tlscfg.MaybeWithDiskCA("ca.crt", tlscfg.ForClient),
tlscfg.MaybeWithDiskKeyPair("client.crt", "client.key"),
)
if err != nil {
panic(err)
}
opts = append(opts, kgo.DialTLSConfig(tc))
*/

// Initializes SASL/SCRAM
opts = append(opts, kgo.SASL(scram.Auth{
User: "<TODO: change this to your service account name>",
Pass: "<TODO: change this to your service account password>",
}.AsSha256Mechanism()))

client, _ := kgo.NewClient(opts...)
defer client.Close()
}

Create producer

After you have a topic, you can create a producer and send some messages:

producer.go
package main

import (
"context"
"fmt"
"os"
"sync"
"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
topic := "demo"
hostname, _ := os.Hostname()
ctx := context.Background()
seeds := []string{"localhost:9092"}

client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
)
if err != nil {
panic(err)
}
defer client.Close()

var wg sync.WaitGroup
for i := 1; i < 100; i++ {
wg.Add(1)
record := &kgo.Record {
Topic: topic,
Key: []byte(hostname),
Value: []byte(fmt.Sprintf("Message %d", i)),
}
client.Produce(ctx, record, func(record *kgo.Record, err error) {
defer wg.Done()
if err != nil {
fmt.Printf("Error sending message: %v \n", err)
} else {
fmt.Printf("Message sent: topic: %s, offset: %d, value: %s \n",
topic, record.Offset, record.Value)
}
})
}
wg.Wait()

// Alternatively, produce messages synchronously
for i := 100; i < 200; i++ {
record := &kgo.Record{
Topic: topic,
Key: []byte(hostname),
Value: []byte(fmt.Sprintf("Synchronous message %d", i)),
}
results := client.ProduceSync(ctx, record)
for _, pr := range results {
if pr.Err != nil {
fmt.Printf("Error sending synchronous message: %v \n", pr.Err)
} else {
fmt.Printf("Message sent: topic: %s, offset: %d, value: %s \n",
topic, pr.Record.Offset, pr.Record.Value)
}
}
}
}

Create consumer

You can create a consumer to read the data back out of the topic:

consumer.go
package main

import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
topic := "demo"
ctx := context.Background()
seeds := []string{"localhost:9092"}

client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("demo-group"),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
)
if err != nil {
panic(err)
}
defer client.Close()

for {
fetches := client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
// All errors are retried internally when fetching, but non-retriable
// errors are returned from polls so that users can notice and take
// action.
panic(fmt.Sprint(errs))
}

iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
topicInfo := fmt.Sprintf("topic: %s (%d|%d)",
record.Topic, record.Partition, record.Offset)
messageInfo := fmt.Sprintf("key: %s, Value: %s",
record.Key, record.Value)
fmt.Printf("Message consumed: %s, %s \n", topicInfo, messageInfo)
}
}
}

Running

# Create the topic
go run admin.go
# Produce some data
go run producer.go
# Consume it back
go run consumer.go

Wrapping up

In this example you developed the building blocks of a Redpanda client application that creates a topic, produces messages to, and consumes messages from a Redpanda cluster running in a local environment, or in Redpanda Cloud.

The code provided here is intentionally simple to help you get started. For additional resources to help you build stream processing applications that can aggregate, join, and filter your data streams, see:

What do you like about this doc?




Optional: Share your email address if we can contact you about your feedback.

Let us know what we do well: