Build a Chat Room Application with Redpanda Cloud and Python

Create a basic chat room application with Redpanda Cloud and Kafka clients developed with kafka-python.

This tutorial describes how to:

  • Start a Redpanda cluster to store and stream chat room messages.

  • Write a client application in Python to produce and consume chat room messages.

  • Build and run multiple clients to exchange chat messages streamed through Redpanda Cloud.

Demo of the application

Prerequisites

Download and install Python 3 from python.org.

Redpanda Cloud environments use certificates signed by Let’s Encrypt. Most programming languages will Load their root certificate authority (CA), ISRG Root X1, by default so you shouldn’t need to provide a custom CA.

Set up Redpanda Cloud

Complete the Redpanda Cloud Quickstart before continuing. This tutorial expands on the quickstart.

Set up your environment

  1. Create and set your working directory to the project folder, chat-room:

    mkdir chat-room
    cd chat-room
  2. Create a virtual environment:

    python3 -m venv .env
    source .env/bin/activate
  3. Install the required dependencies:

    pip3 install kafka-python

Create a topic

You need a topic named chat-room for both Redpanda and the client to use to store chat room messages. If you completed the Redpanda Cloud Quickstart, this topic already exists in your cluster.

  1. Verify that the chat-room topic exists in your cluster by listing all topics:

    rpk topic list --tls-enabled

    Output:

    NAME       PARTITIONS  REPLICAS
    chat-room  1           1
  2. If the topic doesn’t exist yet, use rpk to create a chat-room topic:

    rpk topic create chat-room --tls-enabled

    Output:

    TOPIC      STATUS
    chat-room  OK

Confirm that the topic exists on the client side

The client may not always know that the topic exists. You can verify that it exists and create it if not. In chat-room/ create a source file called admin.py with the following content.

Replace the placeholders with the same values that you used in the Redpanda Cloud Quickstart.

admin.py
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.admin import KafkaAdminClient, NewTopic
class ChatAdmin:
  def __init__(self, brokers):
    self.admin = KafkaAdminClient(
      bootstrap_servers=brokers,
      sasl_mechanism="SCRAM-SHA-256",
      security_protocol="SASL_SSL",
      sasl_plain_username="redpanda-chat-account",
      sasl_plain_password="<password>",
    )
  def topic_exists(self, topic_name):
    topics_metadata = self.admin.list_topics()
    return topic_name in topics_metadata
  def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
    if not self.topic_exists(topic_name):
      new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
      self.admin.create_topics([new_topic])
      print(f"Topic {topic_name} created.")
    else:
      print(f"Topic {topic_name} already exists.")
  def close(self):
    self.admin.close()

Create a producer

A client needs a producer to publish chat-room topic messages.

To create a producer for the client, in chat-room/ create a source file producer.py with the following content.

Replace the placeholders with the same values that you used in the Redpanda Cloud Quickstart.

producer.py
from kafka import KafkaProducer
import json
class ChatProducer:
  def __init__(self, brokers, topic):
    self.topic = topic
    self.producer = KafkaProducer(
      bootstrap_servers=brokers,
      sasl_mechanism="SCRAM-SHA-256",
      security_protocol="SASL_SSL",
      sasl_plain_username="redpanda-chat-account",
      sasl_plain_password="<password>",
      value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
  def send_message(self, user, message):
    self.producer.send(self.topic, {"user": user, "message": message})
    self.producer.flush()
  def close(self):
    self.producer.close()

You now have a working producer that sends strings entered by the user to the chat-room topic. Messages are sent as JSON encoded strings here, but keep in mind that the producer only sends buffers, so you can encode the messages however you like.

Create a consumer

A client needs a consumer to receive chat-room topic messages.

To create a consumer for the client, in chat-room/ create a source file consumer.py with the following content.

Replace the placeholders with the same values that you used in the Redpanda Cloud Quickstart.

consumer.py
from kafka import KafkaConsumer
import json
import uuid
class ChatConsumer:
  def __init__(self, brokers, topic, group_id=None):
    if group_id is None:
      group_id = str(uuid.uuid4())
    self.consumer = KafkaConsumer(
      topic,
      bootstrap_servers=brokers,
      sasl_mechanism="SCRAM-SHA-256",
      security_protocol="SASL_SSL",
      sasl_plain_username="redpanda-chat-account",
      sasl_plain_password="<password>",
      group_id=group_id,
      value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    )
  def print_messages(self):
    for msg in self.consumer:
      print(f"{msg.value['user']}: {msg.value['message']}")
  def close(self):
    self.consumer.close()

You now have a consumer that reads all messages from the chat-room topic and prints them to the console. You can start as many consumer groups as you like, but remember that each group reads a message only once, which is why the example is using a generated UUID for the group ID.

Create a client application

The client needs an application that creates the topic, producer, and consumer and implements the chat logic.

To create a client application, in chat-room/ create a source file app.py with the following content.

Replace the placeholders with the same values that you used in the Redpanda Cloud Quickstart.

app.py
import threading
from producer import ChatProducer
from consumer import ChatConsumer
from admin import ChatAdmin
brokers = ["<bootstrap-server-address>"]
topic = "chat-room"
def consumer_thread(consumer):
  consumer.print_messages()
if __name__ == "__main__":
  admin = ChatAdmin(brokers)
  if not admin.topic_exists(topic):
    print(f"Creating topic: {topic}")
    admin.create_topic(topic)
  username = input("Enter your username: ")
  producer = ChatProducer(brokers, topic)
  consumer = ChatConsumer(brokers, topic)
  consumer_t = threading.Thread(target=consumer_thread, args=(consumer,))
  consumer_t.daemon = True
  consumer_t.start()
  print("Connected. Press Ctrl+C to exit")
  try:
    while True:
      message = input()
      producer.send_message(username, message)
  except KeyboardInterrupt:
    pass
  finally:
    print("\nClosing chat...")
    producer.close()
    consumer.close()
    admin.close()
    consumer_t.join(1)

Build and run the application

Build the client chat application, run it from multiple client terminals, and chat between the clients.

  1. Open at least two terminals, and for each terminal:

  2. Run the client application:

    python app.py
  3. When prompted with Enter user name:, enter a unique name for the chat room.

  4. Use the chat application: enter a message in a terminal, and verify that the message is received in the other terminals.

    For example:

    Enter user name:
    Alice
    Connected, press Ctrl+C to exit
    Alice: Hi, I'm Alice
    Bob: Hi Alice, I'm Bob, nice to meet you

Next steps

This is a basic example of a chat room application. You can improve this application by implementing additional features and components, such as:

  • A user interface to make it more interactive and user-friendly.

  • A user registration and login system to authenticate users before they can access the chat room.

  • Rate limiting and other measures to prevent spamming and abuse in the chat room.

Suggested reading

For additional resources to help you build stream processing applications that can aggregate, join, and filter your data streams, see: