# Retrieval-Augmented Generation (RAG)

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [connect-full.txt](https://docs.redpanda.com/connect-full.txt)

---
title: Retrieval-Augmented Generation (RAG)
latest-connect-version: 4.93.0
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-redpanda-tag: v26.1.9
docname: rag
page-component-name: connect
page-version: master
page-component-version: master
page-component-title: Connect
page-relative-src-path: rag.adoc
page-edit-url: https://github.com/redpanda-data/rp-connect-docs/edit/main/modules/cookbooks/pages/rag.adoc
description: How to configure Redpanda Connect to create a RAG pipeline, using PostgreSQL and PGVector.
page-git-created-date: "2024-09-12"
page-git-modified-date: "2024-09-12"
---

<!-- Source: https://docs.redpanda.com/connect/cookbooks/rag.md -->

This cookbook shows you how to create a Retrieval-Augmented Generation (RAG) pipeline, using PostgreSQL and [PGVector](https://github.com/pgvector/pgvector).

Follow the cookbook to:

-   Create two Redpanda Connect data pipelines: one for creating vector embeddings with [Ollama](https://ollama.ai) and another for searching the data, using [PGVector](https://github.com/pgvector/pgvector).

-   Run the two pipelines in parallel using [streams mode](https://docs.redpanda.com/connect/guides/streams_mode/about/).


## [](#compute-the-embeddings)Compute the embeddings

Start by defining the indexing pipeline, which takes textual data from Redpanda, computes vector embeddings for it, and then write it into a PostgreSQL table with a PGVector index on the embeddings column.

```bash
rpk topic create articles
echo '{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking."
  }
}' | rpk topic produce articles -f '%v'
```

Your indexing pipeline can read from the Redpanda topic, using the [`kafka`](https://docs.redpanda.com/connect/components/inputs/kafka/) input:

```yaml
input:
  kafka:
    addresses: [ "${REDPANDA_CLUSTER}" ]
    topics: [ articles ]
    consumer_group: rp_connect_articles_group
    tls:
      enabled: true
    sasl:
      mechanism: SCRAM-SHA-256
      user: "${REDPANDA_USER}"
      password: "${REDPANDA_PASSWORD}"
```

Use [Nomic Embed](https://ollama.com/library/nomic-embed-text) to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.

To send a mapped request and map the response back into the original document, use the [`branch` processor](https://docs.redpanda.com/connect/components/processors/branch/) with a child [`ollama_embeddings`](https://docs.redpanda.com/connect/components/processors/ollama_embeddings/) processor.

```yaml
pipeline:
  threads: -1
  processors:
    - branch:
        request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
        processors:
          - ollama_embeddings:
              model: nomic-embed-text
        result_map: 'root.article.embeddings = this'
```

With this pipeline, your processed documents should look something like this:

```yaml
{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking.",
    "embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
  }
}
```

Now, try sending this transformed data to PostgreSQL using the [`sql_insert`](https://docs.redpanda.com/connect/components/outputs/sql_insert/) output. You can take advantage of the `init_statement` functionality to set up `pgvector` and a table to write the data to.

```yaml
output:
  sql_insert:
    driver: postgres
    dsn: "${PG_CONNECTION_STRING}"
    init_statement: |
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS searchable_text (
        id varchar(128) PRIMARY KEY,
        title text NOT NULL,
        body text NOT NULL,
        embeddings vector(768) NOT NULL
      );
      CREATE INDEX IF NOT EXISTS text_hnsw_index
        ON searchable_text
        USING hnsw (embeddings vector_l2_ops);
    table: searchable_text
    columns: ["id", "title", "body", "embeddings"]
    args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"
```

Save this pipeline as `indexing.yaml` and run it with `rpk connect run ./indexing.yaml` to make sure your PostgreSQL table is populated with embeddings.

## [](#generating-search-responses)Generating search responses

To generate responses to questions with the dataset you’re creating embeddings for, use a [HTTP server input](https://docs.redpanda.com/connect/components/inputs/http_server/) to receive the questions. Since you’re running this pipeline in streams mode, the HTTP server uses [endpoints prefixed with stream identifier](https://docs.redpanda.com/connect/guides/streams_mode/about/#http-endpoints), which in this case is the same as the filename: `search`.

```yaml
input:
  http_server:
    path: /
    allowed_verbs: [ GET ]
    sync_response:
      headers:
        Content-Type: application/json
```

The user query uses the query parameter `q`: `[http://localhost:4195/search?q=question_here](http://localhost:4195/search?q=question_here)`. Since query parameters are exposed as metadata in the `http_server` input, you can reference that in bloblang with `@q`.

```yaml
pipeline:
  processors:
    - label: compute_embeddings
      ollama_embeddings:
        model: nomic-embed-text
        text: "search_query: %s".format(@q)
```

The payload is the embeddings vector. To fetch the top three most similar documents to your embeddings vector, you can write a PostgreSQL query in the [`sql_raw`](https://docs.redpanda.com/connect/components/processors/sql_raw/) processor.

```yaml
    - sql_raw:
        driver: "postgres"
        dsn: "${PG_CONNECTION_STRING}"
        query: SELECT title, body FROM searchable_text ORDER BY embeddings <-> $1 LIMIT 3
        args_mapping: "[ this.vector() ]"
```

With your looked up information, as well as your initial query, you can use the [`ollama_chat` processor](https://docs.redpanda.com/connect/components/processors/ollama_chat/) to respond to the user’s question as text.

```yaml
    - label: generate_response
      ollama_chat:
        model: llama3.1
        prompt: |
          Your task is to respond to user queries using the provided information.

          The user asked: ${! @q }
          Context: ${!this.map_each(row -> "%s\n%s".format(row.title, row.body)).join("\n\n")}
          Response:
```

Now that you’ve generated a response, you can send that back to the HTTP server as a response using [`sync_response`](https://docs.redpanda.com/connect/components/processors/sync_response/). Then, delete the message using a [bloblang mapping](https://docs.redpanda.com/connect/components/processors/mapping/) so that nothing goes to the output.

```yaml
    - mapping: 'root.response = content().string()'
    - sync_response: {}
    - mapping: 'root = deleted()'
```

Both pipelines are ready. Try running both of them using streams mode: `rpk connect streams indexing.yaml search.yaml`.

When some documents have been indexed, you can query the system using:

```bash
curl -G 'localhost:4195/search' --data-urlencode 'q=what is happening to the dogs?' | jq
```

The output should look something like:

```json
{
  "response": "Everyone in the world woke up today shocked as their beloved pooches were silent - unable to bark."
}
```