Retrieval-Augmented Generation (RAG)

This cookbook shows you how to create a vector embeddings indexing pipeline for Retrieval-Augmented Generation (RAG), using PostgreSQL and PGVector.

Follow the cookbook to:

  • Take textual data from a Redpanda topic and compute vector embeddings for it using Ollama

  • Write the pipeline output into a PostgreSQL table with a PGVector index on the embeddings column.

Compute the embeddings

Start by creating a Redpanda topic, which you can use as an input for an indexing data pipeline.

rpk topic create articles
echo '{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking."
  }
}' | rpk topic produce articles -f '%v'

Your indexing pipeline can read from the Redpanda topic, using the kafka input:

input:
  kafka:
    addresses: [ "TODO" ]
    topics: [ articles ]
    consumer_group: rp_connect_articles_group
    tls:
      enabled: true
    sasl:
      mechanism: SCRAM-SHA-256
      user: "TODO"
      password: "TODO"

Use Nomic Embed to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.

To send a mapped request and map the response back into the original document, use the branch processor with a child ollama_embeddings processor.

pipeline:
  threads: -1
  processors:
    - branch:
        request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
        processors:
          - ollama_embeddings:
              model: nomic-embed-text
        result_map: 'root.article.embeddings = this'

With this pipeline, your processed documents should look something like this:

{
  "type": "article",
  "article": {
    "id": "123foo",
    "title": "Dogs Stop Barking",
    "content": "The world was shocked this morning to find that all dogs have stopped barking.",
    "embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
  }
}

Now, try sending this transformed data to PostgreSQL using the sql_insert output. You can take advantage of the init_statement functionality to set up pgvector and a table to write the data to.

output:
  sql_insert:
    driver: postgres
    dsn: "TODO"
    init_statement: |
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS searchable_text (
        id varchar(128) PRIMARY KEY,
        title text NOT NULL,
        body text NOT NULL,
        embeddings vector(768) NOT NULL
      );
      CREATE INDEX IF NOT EXISTS text_hnsw_index
        ON searchable_text
        USING hnsw (embeddings vector_l2_ops);
    table: searchable_text
    columns: ["id", "title", "body", "embeddings"]
    args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"

After deploying this pipeline using the Redpanda Console, you can verify data is being written into PostgreSQL using psql to execute SELECT count(*) FROM searchable_text;.