Retrieval-Augmented Generation (RAG)

This cookbook shows you how to create a Retrieval-Augmented Generation (RAG) pipeline, using PostgreSQL and PGVector.

Follow the cookbook to:

  • Create two Redpanda Connect data pipelines: one for creating vector embeddings with Ollama and another for searching the data, using PGVector.

  • Run the two pipelines in parallel using streams mode.

Compute the embeddings

Start by defining the indexing pipeline, which takes textual data from Redpanda, computes vector embeddings for it, and then write it into a PostgreSQL table with a PGVector index on the embeddings column.

rpk topic create articles
echo '{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking."
  }
}' | rpk topic produce articles -f '%v'

Your indexing pipeline can read from the Redpanda topic, using the kafka input:

input:
  kafka:
    addresses: [ "${REDPANDA_CLUSTER}" ]
    topics: [ articles ]
    consumer_group: rp_connect_articles_group
    tls:
      enabled: true
    sasl:
      mechanism: SCRAM-SHA-256
      user: "${REDPANDA_USER}"
      password: "${REDPANDA_PASSWORD}"

Use Nomic Embed to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.

To send a mapped request and map the response back into the original document, use the branch processor with a child ollama_embeddings processor.

pipeline:
  threads: -1
  processors:
    - branch:
        request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
        processors:
          - ollama_embeddings:
              model: nomic-embed-text
        result_map: 'root.article.embeddings = this'

With this pipeline, your processed documents should look something like this:

{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking.",
    "embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
  }
}

Now, try sending this transformed data to PostgreSQL using the sql_insert output. You can take advantage of the init_statement functionality to set up pgvector and a table to write the data to.

output:
  sql_insert:
    driver: postgres
    dsn: "${PG_CONNECTION_STRING}"
    init_statement: |
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS searchable_text (
        id varchar(128) PRIMARY KEY,
        title text NOT NULL,
        body text NOT NULL,
        embeddings vector(768) NOT NULL
      );
      CREATE INDEX IF NOT EXISTS text_hnsw_index
        ON searchable_text
        USING hnsw (embeddings vector_l2_ops);
    table: searchable_text
    columns: ["id", "title", "body", "embeddings"]
    args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"

Save this pipeline as indexing.yaml and run it with rpk connect run ./indexing.yaml to make sure your PostgreSQL table is populated with embeddings.

Generating search responses

To generate responses to questions with the dataset you’re creating embeddings for, use a HTTP server input to receive the questions. Since you’re running this pipeline in streams mode, the HTTP server uses endpoints prefixed with stream identifier, which in this case is the same as the filename: search.

input:
  http_server:
    path: /
    allowed_verbs: [ GET ]
    sync_response:
      headers:
        Content-Type: application/json

The user query uses the query parameter q: http://localhost:4195/search?q=question_here. Since query parameters are exposed as metadata in the http_server input, you can reference that in bloblang with @q.

pipeline:
  processors:
    - label: compute_embeddings
      ollama_embeddings:
        model: nomic-embed-text
        text: "search_query: %s".format(@q)

The payload is the embeddings vector. To fetch the top three most similar documents to your embeddings vector, you can write a PostgreSQL query in the sql_raw processor.

    - sql_raw:
        driver: "postgres"
        dsn: "${PG_CONNECTION_STRING}"
        query: SELECT title, body FROM searchable_text ORDER BY embeddings <-> $1 LIMIT 3
        args_mapping: "[ this.vector() ]"

With your looked up information, as well as your initial query, you can use the ollama_chat processor to respond to the user’s question as text.

    - label: generate_response
      ollama_chat:
        model: llama3.1
        prompt: |
          Your task is to respond to user queries using the provided information.

          The user asked: ${! @q }
          Context: ${!this.map_each(row -> "%s\n%s".format(row.title, row.body)).join("\n\n")}
          Response:

Now that you’ve generated a response, you can send that back to the HTTP server as a response using sync_response. Then, delete the message using a bloblang mapping so that nothing goes to the output.

    - mapping: 'root.response = content().string()'
    - sync_response: {}
    - mapping: 'root = deleted()'

Both pipelines are ready. Try running both of them using streams mode: rpk connect streams indexing.yaml search.yaml.

When some documents have been indexed, you can query the system using:

curl -G 'localhost:4195/search' --data-urlencode 'q=what is happening to the dogs?' | jq

The output should look something like:

{
  "response": "Everyone in the world woke up today shocked as their beloved pooches were silent - unable to bark."
}