Build a Chat Room Application with Redpanda and Java

Create a basic chat room application with Redpanda and Kafka Java clients.

This tutorial describes how to:

  • Start a Redpanda cluster to store and stream chat room messages.

  • Write a client application in Java to produce and consume chat room messages.

  • Build and run multiple clients to exchange chat messages streamed through Redpanda.

Demo of the application

Prerequisites

You must have

  • Java 11 or 17 (OpenJDK is recommended)

  • Maven

  • Windows/Linux

  • macOS

You can download OpenJDK from Adoptium, and can follow the installation instructions for Maven on the official Maven website.

Mac users with Homebrew installed can run the following commands to install these dependencies:

brew install openjdk@11 maven

Make sure to follow any symlinking instructions in the Caveats output.

When the prerequisites are installed, the following commands should print the version of both Java and Maven:

java --version
mvn --version

Set up a Redpanda environment

Complete the Redpanda Quickstart before continuing. This tutorial expands on the quickstart. You can choose to run either one or three brokers.

If you’re running Redpanda in a shared development environment, avoid running sudo rpk redpanda tune all or manually configuring Redpanda for production. The optimized production settings might affect your experience with other applications running on your machine.

Set up your environment

  1. Bootstrap a new Java project using the following Maven command:

    mvn archetype:generate \
      -DgroupId=com.example \
      -DartifactId=chat-room \
      -DarchetypeArtifactId=maven-archetype-quickstart \
      -DarchetypeVersion=1.4 \
      -DinteractiveMode=false
  2. Set your working directory to the project folder:

    cd chat-room
  3. Add the required dependencies to your pom.xml file:

    <dependencies>
      <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.4.0</version>
      </dependency>
      <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.9</version>
      </dependency>
    </dependencies>
    You can check for new versions of the Kafka Java client on the Sonatype website.
  4. Set the source and target Java version to 11 in your pom.xml file:

    <properties>
      <maven.compiler.source>11</maven.compiler.source>
      <maven.compiler.target>11</maven.compiler.target>
    </properties>
  5. Install the dependencies by building the project:

    mvn package

    The output is verbose, but you should see a successful build message:

    [INFO] BUILD SUCCESS

Create a topic

You need a topic named chat-room for both Redpanda and the client to use to store chat room messages. If you completed the Redpanda Quickstart, this topic already exists in your cluster.

  1. Verify that the chat-room topic exists in your cluster by listing all topics:

    docker exec -it redpanda-0 rpk topic list

    Output:

    NAME       PARTITIONS  REPLICAS
    chat-room  1           1
  2. If the topic doesn’t exist yet, use rpk to create a chat-room topic:

    docker exec -it redpanda-0 rpk topic create chat-room

    Output:

    TOPIC      STATUS
    chat-room  OK

Confirm that the topic exists on the client side

The client may not always know that the topic exists. You can verify that it exists and create it if not.

In chat-room/ create a source file called src/main/java/com/example/Admin.java with the following content.

The broker settings in this code are from the Redpanda Quickstart, where the external port for broker redpanda-0 is set to port 19092. If you’re not using the Redpanda Quickstart, make sure that the broker’s port is correct for your deployment.
src/main/java/com/example/Admin.java
package com.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Collections;
import java.util.Properties;
public class Admin {
  private final static String BOOTSTRAP_SERVERS = "localhost:19092";
  public static Properties getProducerProps() {
    Properties props = new Properties();
    props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }
  public static Properties getConsumerProps(String groupId) {
    Properties props = new Properties();
    props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    props.put("group.id", groupId);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }
  public static boolean topicExists(String topicName) {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    try (AdminClient client = AdminClient.create(props)) {
      return client.listTopics().names().get().contains(topicName);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  public static void createTopic(String topicName) {
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    try (AdminClient client = AdminClient.create(props)) {
      NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
      client.createTopics(Collections.singletonList(newTopic));
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

This code is a utility class that provides methods for checking if a topic exists, creating a topic, and returning producer and consumer configuration properties. You will use these methods in the other classes in the chat room application.

Create a producer

A client needs a producer to publish chat-room topic messages.

To create a producer for the client, in chat-room/ create a source file src/main/java/com/example/ChatProducer.java with the following content.

src/main/java/com/example/ChatProducer.java
package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.Map;
public class ChatProducer implements AutoCloseable {
  private KafkaProducer<String, String> producer;
  private String topic;
  private Gson gson;
  public ChatProducer(String topic) {
    this.producer = new KafkaProducer<>(Admin.getProducerProps());
    this.topic = topic;
    this.gson = new Gson();
  }
  public void sendMessage(String user, String message) {
    Map<String, String> messageMap = new HashMap<>();
    messageMap.put("user", user);
    messageMap.put("message", message);
    String jsonMessage = gson.toJson(messageMap);
    producer.send(new ProducerRecord<>(topic, null, jsonMessage));
    producer.flush();
  }
  @Override
  public void close() {
    producer.close();
  }
}

You now have a working producer that sends strings entered by the user to the chat-room topic. Messages are sent as JSON encoded strings here, but keep in mind that the producer only sends buffers, so you can encode the messages however you like.

Create a consumer

A client needs a consumer to receive chat-room topic messages.

To create a consumer for the client, in chat-room/ create a source file src/main/java/com/example/ChatConsumer.java with the following content.

src/main/java/com/example/ChatConsumer.java
package com.example;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.Map;
import java.time.Duration;
import java.util.Collections;

public class ChatConsumer implements Runnable, AutoCloseable {
  private volatile boolean running = true;
  private KafkaConsumer<String, String> consumer;
  private Gson gson;
  private Type type;
  public ChatConsumer(String topic, String groupId) {
    this.consumer = new KafkaConsumer<>(Admin.getConsumerProps(groupId));
    this.consumer.subscribe(Collections.singletonList(topic));
    this.gson = new Gson();
    this.type = new TypeToken<Map<String, String>>(){}.getType();
  }
  @Override
  public void run() {
    while (running) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
        Map<String, String> messageMap = gson.fromJson(record.value(), type);
        System.out.println(messageMap.get("user") + ": " + messageMap.get("message"));
      }
    }
  }
  @Override
  public void close() {
    running = false;
    consumer.close();
  }
}

You now have a consumer that reads all messages from the chat-room topic and prints them to the console. You can start as many consumer groups as you like, but remember that each group reads a message only once, which is why the example is using a generated UUID for the group ID.

Create a client application

The client needs an application that creates the topic, producer, and consumer and implements the chat logic.

To create a client application, in chat-room/ create a source file src/main/java/com/example/Main.java with the following content.

src/main/java/com/example/Main.java
package com.example;

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class Main {
  private static final String TOPIC = "chat-room";
  public static void main(String[] args) {
    if (!Admin.topicExists(TOPIC)) {
      Admin.createTopic(TOPIC);
    }
    Scanner scanner = new Scanner(System.in);
    System.out.print("Enter your username: ");
    String username = scanner.nextLine();
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    try (ChatConsumer consumer = new ChatConsumer(TOPIC, UUID.randomUUID().toString());
          ChatProducer producer = new ChatProducer(TOPIC)) {
      Future<?> future = executorService.submit(consumer);
      System.out.print("Connected, press Ctrl+C to exit\n");
      while (!future.isDone()) {
        String message = scanner.nextLine();
        producer.sendMessage(username, message);
      }
    } catch (Exception e) {
        System.out.println("Closing chat...");
    } finally {
        executorService.shutdownNow();
    }
  }
}

Build and run the application

Compile the client chat application, run it from multiple client terminals, and chat between the clients.

  1. From chat-room/, compile the client application:

    mvn compile
  2. Open at least two terminals, and for each terminal:

    1. Run the client application:

      mvn exec:java -Dexec.mainClass="com.example.Main"
    2. When prompted with Enter user name:, enter a unique name for the chat room.

  3. Use the chat application: enter a message in a terminal, and verify that the message is received in the other terminals.

    For example:

    Enter user name:
    Alice
    Connected, press Ctrl+C to exit
    Alice: Hi, I'm Alice
    Bob: Hi Alice, I'm Bob, nice to meet you

Next steps

This is a basic example of a chat room application. You can improve this application by implementing additional features and components, such as:

  • A user interface to make it more interactive and user-friendly.

  • A user registration and login system to authenticate users before they can access the chat room.

  • Rate limiting and other measures to prevent spamming and abuse in the chat room.

Suggested reading

For additional resources to help you build stream processing applications that can aggregate, join, and filter your data streams, see: