# Stream Text Embeddings with Redpanda, OpenAI, and MongoDB

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [labs-full.txt](https://docs.redpanda.com/labs-full.txt)

---
title: Stream Text Embeddings with Redpanda, OpenAI, and MongoDB
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: openai
page-component-name: labs
page-version: master
page-component-version: master
page-component-title: Labs
page-relative-src-path: openai.adoc
page-edit-url: https://github.com/redpanda-data/redpanda-labs/edit/main/docs/modules/connect-plugins/pages/openai.adoc
description: Build a streaming RAG pipeline with Redpanda, OpenAI, and MongoDB Atlas
page-git-created-date: "2025-05-06"
page-git-modified-date: "2025-05-06"
---

<!-- Source: https://docs.redpanda.com/labs/connect-plugins/openai.md -->

In this lab, you’ll build a [retrieval augmented generation](https://help.openai.com/en/articles/8868588-retrieval-augmented-generation-rag-and-semantic-search-for-gpts) (RAG) pipeline to enhance natural language understanding and response generation using Redpanda, [OpenAI](https://openai.com/), [MongoDB Atlas](https://www.mongodb.com/products/platform/atlas-vector-search), and [LangChain](https://www.langchain.com/) .

This RAG pipeline comprises of two phases:

-   **Acquisition and persistence of new information**: In this initial phase, Langchain is used to facilitate the acquisition of new information and prepare it for ingestion into Redpanda. The Redpanda Platform adds [OpenAI text embeddings](https://platform.openai.com/docs/guides/embeddings) to messages as they stream through Redpanda on their way to a MongoDB Atlas vector database. Redpanda handles real-time data ingestion and storage, while Redpanda Connect ensures efficient communication with MongoDB Atlas.

    The acquired information, such as documents and webpages, is split into smaller text chunks and stored in MongoDB Atlas along with their vector embeddings. These embeddings, which encode the semantic meaning of text in a multidimensional space, enable efficient semantic search. MongoDB Atlas enables queries based on vector embeddings to retrieve texts with similar semantic meaning.

-   **Retrieval of relevant contextual information**: In this phase, contextual information relevant to the user’s question (prompt) is retrieved from MongoDB Atlas through semantic search. This contextual information is then passed alongside the user’s question to OpenAI’s large language model. OpenAI’s language model leverages this additional context to improve the quality and relevance of its generated answers. This retrieval and augmentation of contextual information enhance the model’s understanding and enable it to produce more accurate and contextually relevant responses.


## [](#prerequisites)Prerequisites

You must have the following:

-   [Redpanda Cloud account](https://cloud.redpanda.com/sign-up)

-   [OpenAI developer platform account](https://platform.openai.com/signup/)

    > 📝 **NOTE**
    >
    > Make sure your account has [available credits](https://help.openai.com/en/articles/9038407-how-can-i-set-up-billing-for-my-account).

-   [MongoDB Atlas account](https://account.mongodb.com/account/register)

-   [Python 3](https://www.python.org/downloads)

-   [rpk](https://docs.redpanda.com/current/get-started/rpk-install/)


## [](#set-up-a-local-environment)Set up a local environment

1.  Clone this repository:

    ```bash
    git clone https://github.com/redpanda-data/redpanda-labs.git
    ```

2.  Change into the `redpanda-labs/connect-plugins/processor/embeddings/openai/` directory:

    ```bash
    cd redpanda-labs/connect-plugins/processor/embeddings/openai
    ```


## [](#set-up-redpanda-serverless)Set up Redpanda Serverless

1.  Log in to your Redpanda Cloud account and create a new [Serverless Standard](https://redpanda.com/redpanda-cloud/serverless) cluster.

2.  Make a note of the bootstrap server URL.

3.  Create a topic called `documents` with the default settings.

4.  Create a new user with permissions (ACLs) to access a topic named `documents` and a consumer group named `connect`.

5.  Add the cluster connection information to a local `.env` file:

    ```bash
    cat > .env<< EOF
    REDPANDA_SERVERS="<bootstrap-server-url>"
    REDPANDA_USER="<username>"
    REDPANDA_PASS="<password>"
    REDPANDA_TOPICS="documents"

    EOF
    ```


## [](#set-up-openai-api)Set up OpenAI API

1.  Log in to your OpenAI developer platform account and create a new [Project API key](https://platform.openai.com/api-keys).

2.  Add the secret key to the local `.env` file:

    ```bash
    cat >> .env<< EOF
    OPENAI_API_KEY="<secret_key>"
    OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
    OPENAI_MODEL="gpt-4o"

    EOF
    ```


## [](#set-up-mongodb-atlas)Set up MongoDB Atlas

1.  Log in to your MongoDB Atlas account and deploy a new [free cluster](https://www.mongodb.com/docs/atlas/getting-started) for development purposes.

2.  Create a new database named `VectorStore`, a new collection in that database named `Embeddings`, and an Atlas Vector Search index with the following JSON configuration:

    ```json
    {
      "fields": [
        {
          "numDimensions": 1536,
          "path": "embedding",
          "similarity": "euclidean",
          "type": "vector"
        }
      ]
    }
    ```

3.  Add the Atlas connection information to the local `.env` file:

    ```bash
    cat >> .env<< EOF
    # Connection string for MongoDB Driver for Go:
    ATLAS_CONNECTION_STRING="<connection-string>"
    ATLAS_DB="VectorStore"
    ATLAS_COLLECTION="Embeddings"
    ATLAS_INDEX="vector_index"

    EOF
    ```


## [](#set-the-environment-variables)Set the environment variables

Your `.env` file should now look like this:

```bash
REDPANDA_SERVERS="<bootstrap-server-url>"
REDPANDA_USER="<username>"
REDPANDA_PASS="<password>"
REDPANDA_TOPICS="documents"

OPENAI_API_KEY="<secret_key>"
OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
OPENAI_MODEL="gpt-4o"

ATLAS_CONNECTION_STRING="<connection-string>"
ATLAS_DB="VectorStore"
ATLAS_COLLECTION="Embeddings"
ATLAS_INDEX="vector_index"
```

To check your `.env` file:

```bash
cat .env
```

## [](#create-a-python-virtual-environment)Create a Python virtual environment

Create the Python virtual environment in the current directory:

```bash
python3 -m venv env
source env/bin/activate
pip install -r requirements.txt
exit
```

## [](#run-the-lab)Run the lab

This lab has three parts:

1.  Use **LangChain’s** `WebBaseLoader` and `RecursiveCharacterTextSplitter` to generate chunks of text from the BBC Sport website and send each chunk to a Redpanda topic named `documents`.

2.  Use **Redpanda Connect** to consume the messages from the `documents` topic and pass each message through a processor that calls **OpenAI’s embeddings API** to retrieve the vector embeddings for the text. The enriched messages are then inserted into a **MongoDB Atlas** database collection that has a vector search index.

3.  Complete the RAG pipeline by using **LangChain** to retrieve similar texts from the **MongoDB Atlas** database and add that context alongside a user question to a prompt that is sent to OpenAI’s new `gpt-4o` model.


### [](#start-redpanda-connect)Start Redpanda Connect

Start Redpanda Connect with the custom OpenAI processor:

```bash
rpk connect run --env-file .env --log.level debug atlas_demo.yaml
```

You should see the following in the output:

```bash
INFO Running main config from specified file       @service=redpanda-connect redpanda_connect_version=v4.33.0 path=atlas_demo.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=redpanda-connect
DEBU url: https://api.openai.com/v1/embeddings, model: text-embedding-3-small  @service=redpanda-connect label="" path=root.pipeline.processors.0
INFO Launching a Redpanda Connect instance, use CTRL+C to close  @service=redpanda-connect
INFO Input type kafka is now active                @service=redpanda-connect label="" path=root.input
DEBU Starting consumer group                       @service=redpanda-connect label="" path=root.input
INFO Output type mongodb is now active             @service=redpanda-connect label="" path=root.output
```

### [](#generate-new-text-documents)Generate new text documents

In another terminal window, generate new text documents and send them to Atlas through Redpanda Connect for embeddings:

```bash
source env/bin/activate
# Single webpage:
python produce_documents.py -u "https://www.bbc.co.uk/sport/football/articles/c3gglr8mpzdo"
# Entire sitemap:
python produce_documents.py -s "https://www.bbc.com/sport/sitemap.xml"
```

You can view the text and embeddings in the [Atlas console](https://cloud.mongodb.com).

### [](#run-the-retrieval-and-generation-chain)Run the retrieval and generation chain

Run the retrieval chain and ask OpenAI a question:

```bash
source env/bin/activate
python retrieve_generate.py -q """
  Which football players made the provisional England national squad for the Euro 2024 tournament,
  and on what date was this announced?
  """
```

It takes a few seconds for the following response to appear in the output:

**Question**: Which football players made the provisional England national squad for the Euro 2024 tournament, and on what date was this announced?

**Initial answer**: As of my knowledge cutoff date in October 2023, the provisional England national squad for the Euro 2024 tournament has not been announced. The selection of national teams for major tournaments like the UEFA European Championship typically happens closer to the event, often just a few weeks before the tournament starts. For the most current information, I recommend checking the latest updates from the Football Association (FA) or other reliable sports news sources.

**Augmented answer**: The provisional England national squad for the Euro 2024 tournament includes the following players:

**Goalkeepers**:

-   Dean Henderson (Crystal Palace)

-   Jordan Pickford (Everton)

-   Aaron Ramsdale (Arsenal)

-   James Trafford (Burnley)


**Defenders**:

-   Jarrad Branthwaite (Everton)

-   Lewis Dunk (Brighton)

-   Joe Gomez (Liverpool)

-   Marc Guehi (Crystal Palace)

-   Ezri Konsa (Aston Villa)

-   Harry Maguire (Manchester United)

-   Jarell Quansah (Liverpool)

-   Luke Shaw (Manchester United)

-   John Stones (Manchester City)

-   Kieran Trippier (Newcastle)

-   Kyle Walker (Manchester City)


**Midfielders**:

-   Trent Alexander-Arnold (Liverpool)

-   Conor Gallagher (Chelsea)

-   Curtis Jones (Liverpool)

-   Kobbie Mainoo (Manchester United)

-   Declan Rice (Arsenal)

-   Adam Wharton (Crystal Palace)


**Forwards**:

-   Jude Bellingham (Real Madrid)

-   Jarrod Bowen (West Ham)

-   Eberechi Eze (Crystal Palace)

-   Phil Foden (Manchester City)

-   Jack Grealish (Manchester City)

-   Anthony Gordon (Newcastle)

-   Harry Kane (Bayern Munich)

-   James Maddison (Tottenham)

-   Cole Palmer (Chelsea)

-   Bukayo Saka (Arsenal)

-   Ivan Toney (Brentford)

-   Ollie Watkins (Aston Villa)


This announcement was made on May 21, 2024.

## [](#next-steps)Next steps

Learn more about [Redpanda Connect](https://docs.redpanda.com/connect/get-started/about/) and explore the other [available connectors](https://docs.redpanda.com/connect/components/about/).