# Retrieval-Augmented Generation (RAG)

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: Retrieval-Augmented Generation (RAG)
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/cookbooks/rag
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/cookbooks/rag.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/cookbooks/rag.adoc
description: How to configure Redpanda Connect to create a RAG pipeline, using PostgreSQL and PGVector.
page-git-created-date: "2024-09-12"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/cookbooks/rag.md -->

This cookbook shows you how to create a vector embeddings indexing pipeline for Retrieval-Augmented Generation (RAG), using PostgreSQL and [PGVector](https://github.com/pgvector/pgvector).

Follow the cookbook to:

-   Take textual data from a Redpanda topic and compute vector embeddings for it using [Ollama](https://ollama.ai)

-   Write the pipeline output into a PostgreSQL table with a [PGVector](https://github.com/pgvector/pgvector) index on the embeddings column.


## [](#compute-the-embeddings)Compute the embeddings

Start by creating a Redpanda topic, which you can use as an input for an indexing data pipeline.

```bash
rpk topic create articles
echo '{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking."
  }
}' | rpk topic produce articles -f '%v'
```

Your indexing pipeline can read from the Redpanda topic, using the [`kafka`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/inputs/kafka/) input:

```yaml
input:
  kafka:
    addresses: [ "TODO" ]
    topics: [ articles ]
    consumer_group: rp_connect_articles_group
    tls:
      enabled: true
    sasl:
      mechanism: SCRAM-SHA-256
      user: "TODO"
      password: "TODO"
```

Use [Nomic Embed](https://ollama.com/library/nomic-embed-text) to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.

To send a mapped request and map the response back into the original document, use the [`branch` processor](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/branch/) with a child [`ollama_embeddings`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/ollama_embeddings/) processor.

```yaml
pipeline:
  threads: -1
  processors:
    - branch:
        request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
        processors:
          - ollama_embeddings:
              model: nomic-embed-text
        result_map: 'root.article.embeddings = this'
```

With this pipeline, your processed documents should look something like this:

```yaml
{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking.",
    "embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
  }
}
```

Now, try sending this transformed data to PostgreSQL using the [`sql_insert`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/sql_insert/) output. You can take advantage of the `init_statement` functionality to set up `pgvector` and a table to write the data to.

```yaml
output:
  sql_insert:
    driver: postgres
    dsn: "TODO"
    init_statement: |
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS searchable_text (
        id varchar(128) PRIMARY KEY,
        title text NOT NULL,
        body text NOT NULL,
        embeddings vector(768) NOT NULL
      );
      CREATE INDEX IF NOT EXISTS text_hnsw_index
        ON searchable_text
        USING hnsw (embeddings vector_l2_ops);
    table: searchable_text
    columns: ["id", "title", "body", "embeddings"]
    args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"
```

After deploying this pipeline using the Redpanda Console, you can verify data is being written into PostgreSQL using `psql` to execute `SELECT count(*) FROM searchable_text;`.