Retrieval-Augmented Generation (RAG)
This cookbook shows you how to create a vector embeddings indexing pipeline for Retrieval-Augmented Generation (RAG), using PostgreSQL and PGVector.
Follow the cookbook to:
Compute the embeddings
Start by creating a Redpanda topic, which you can use as an input for an indexing data pipeline.
rpk topic create articles
echo '{
"type": "article",
"article": {
"id": "123foo",
"title": "Dogs Stop Barking",
"content": "The world was shocked this morning to find that all dogs have stopped barking."
}
}' | rpk topic produce articles -f '%v'
Your indexing pipeline can read from the Redpanda topic, using the kafka input:
input:
kafka:
addresses: [ "TODO" ]
topics: [ articles ]
consumer_group: rp_connect_articles_group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: "TODO"
password: "TODO"
Use Nomic Embed to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.
To send a mapped request and map the response back into the original document, use the branch processor with a child ollama_embeddings processor.
pipeline:
threads: -1
processors:
- branch:
request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
processors:
- ollama_embeddings:
model: nomic-embed-text
result_map: 'root.article.embeddings = this'
With this pipeline, your processed documents should look something like this:
{
"type": "article",
"article": {
"id": "123foo",
"title": "Dogs Stop Barking",
"content": "The world was shocked this morning to find that all dogs have stopped barking.",
"embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
}
}
Now, try sending this transformed data to PostgreSQL using the sql_insert output. You can take advantage of the init_statement functionality to set up pgvector and a table to write the data to.
output:
sql_insert:
driver: postgres
dsn: "TODO"
init_statement: |
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS searchable_text (
id varchar(128) PRIMARY KEY,
title text NOT NULL,
body text NOT NULL,
embeddings vector(768) NOT NULL
);
CREATE INDEX IF NOT EXISTS text_hnsw_index
ON searchable_text
USING hnsw (embeddings vector_l2_ops);
table: searchable_text
columns: ["id", "title", "body", "embeddings"]
args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"
After deploying this pipeline using the Redpanda Console, you can verify data is being written into PostgreSQL using psql to execute SELECT count(*) FROM searchable_text;.