Use Redpanda with the HTTP Proxy API
Redpanda HTTP Proxy (pandaproxy) allows access to your data through a REST API. For example, you can list topics or brokers, get events, produce events, subscribe to events from topics using consumer groups, and commit offsets for a consumer.
See the HTTP Proxy API reference for a full list of available endpoints.
Prerequisites
Start Redpanda
The first step is to start up Redpanda. HTTP Proxy is enabled by default on port 8082. To change the proxy port, edit redpanda.yaml:
-
redpanda.yaml
-
Kubernetes Cluster Resource
...
pandaproxy:
pandaproxy_api:
- address: 0.0.0.0
port: 8082
...
apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
...
spec:
...
resources:
pandaproxyApi:
- port: 8082
...
| The remainder of this guide is based on the assumption that the HTTP Proxy port is 8082. |
Authenticate with HTTP Proxy
HTTP Proxy supports authentication using SCRAM credentials or OIDC tokens.
The authentication method depends on
the authentication_method broker property and
the cluster’s http_authentication settings.
SCRAM Authentication
If HTTP Proxy is configured to support SASL, you can provide the SCRAM username and password as part of the Basic Authentication header in your request. For example, to list topics as an authenticated user:
-
curl
-
NodeJS
-
Python
curl -s -u "<username>:<password>" "http://<host-address>:8082/topics"
let options = {
auth: { username: "<username>", password: "<password>" },
};
axios
.get("http://<host-address>:8082/topics", options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
auth = ("<username>", "<password>")
res = requests.get("http://<host-address>:8082/topics", auth=auth).json()
pretty(res)
OIDC Authentication
If HTTP Proxy is configured to support OIDC, you can provide an OIDC token in the Authorization header. For example:
-
curl
-
NodeJS
-
Python
curl -s -H "Authorization: Bearer <oidc-token>" "http://<host-address>:8082/topics"
let options = {
headers: { Authorization: `Bearer <oidc-token>` },
};
axios
.get("http://<host-address>:8082/topics", options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
headers = {"Authorization": "Bearer <oidc-token>"}
res = requests.get("http://<host-address>:8082/topics", headers=headers).json()
pretty(res)
For details about configuring OIDC authentication, see OIDC Authentication.
Set up libraries
You need an app that calls the HTTP Proxy endpoint. This app can be curl (or a similar CLI), or it could be your own custom app written in any language. Below are curl, JavaScript and Python examples.
In the examples, <host-address> refers to your Redpanda cluster’s hostname or IP address. All following examples use a base_uri variable that combines the protocol, host, and port for consistency across curl, JavaScript, and Python examples.
|
-
curl
-
NodeJS
-
Python
Curl is likely already installed on your system. If not, see curl download instructions.
Set the base URI for your HTTP Proxy:
base_uri="http://<host-address>:8082"
| This is based on the assumption that you’re in the root directory of an existing NodeJS project. See Build a Chat Room Application with Redpanda and Node.js for an example of a NodeJS project. |
In a terminal window, run:
npm install axios
Import the library into your code:
const axios = require('axios');
const base_uri = 'http://<host-address>:8082';
In a terminal window, run:
pip install requests
Import the library into your code:
import requests
import json
def pretty(text):
print(json.dumps(text, indent=2))
base_uri = "http://<host-address>:8082"
Create a topic
To create a test topic for this guide, use rpk. You can configure rpk for your Redpanda deployment, using profiles, flags, or environment variables.
To create a topic named test_topic with three partitions, run:
rpk topic create test_topic -p 3
For more information, see the rpk topic create reference.
Access your data
Here are some sample commands to produce and consume streams:
Get list of topics
-
curl
-
NodeJS
-
Python
curl -s "$base_uri/topics"
axios
.get(`${base_uri}/topics`)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application. If your file name is index.js for example, you would run the following command:
node index.js
res = requests.get(f"{base_uri}/topics").json()
pretty(res)
Expected output:
["test_topic"]
Send events to a topic
Use POST to send events in the REST endpoint query. The header must include the following line:
Content-Type:application/vnd.kafka.json.v2+json
The following commands show how to send events to test_topic:
-
curl
-
NodeJS
-
Python
curl -s \
-X POST \
"$base_uri/topics/test_topic" \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-d '{
"records":[
{
"value":"Redpanda",
"partition":0
},
{
"value":"HTTP proxy",
"partition":1
},
{
"value":"Test event",
"partition":2
}
]
}'
let payload = { records: [
{
"value":"Redpanda",
"partition": 0
},
{
"value":"HTTP proxy",
"partition": 1
},
{
"value":"Test event",
"partition": 2
}
]};
let options = { headers: { "Content-Type" : "application/vnd.kafka.json.v2+json" }};
axios
.post(`${base_uri}/topics/test_topic`, payload, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.post(
url=f"{base_uri}/topics/test_topic",
data=json.dumps(
dict(records=[
dict(value="Redpanda", partition=0),
dict(value="HTTP Proxy", partition=1),
dict(value="Test Event", partition=2)
])),
headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
Expected output (may be formatted differently depending on the chosen application):
{"offsets":[{"partition":0,"offset":0},{"partition":2,"offset":0},{"partition":1,"offset":0}]}
Get events from a topic
After events have been sent to the topic, you can retrieve these same events.
-
curl
-
NodeJS
-
Python
curl -s \
"$base_uri/topics/test_topic/partitions/0/records?offset=0&timeout=1000&max_bytes=100000"\
-H "Accept: application/vnd.kafka.json.v2+json"
let options = {
headers: { accept: "application/vnd.kafka.json.v2+json" },
params: {
offset: 0,
timeout: "1000",
max_bytes: "100000",
},
};
axios
.get(`${base_uri}/topics/test_topic/partitions/0/records`, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.get(
url=f"{base_uri}/topics/test_topic/partitions/0/records",
params={"offset": 0, "timeout":1000,"max_bytes":100000},
headers={"Accept": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
Expected output:
[{"topic":"test_topic","key":null,"value":"Redpanda","partition":0,"offset":0}]
Get list of brokers
-
curl
-
NodeJS
-
Python
curl "$base_uri/brokers"
axios
.get(`${base_uri}/brokers`)
.then(response => console.log(response.data))
.catch(error => console.error(error));
res = requests.get(f"{base_uri}/brokers").json()
pretty(res)
Expected output:
{brokers: [0]}
Create a consumer
To retrieve events from a topic using consumers, you must create a consumer and a consumer group, and then subscribe the consumer instance to a topic. Each action involves a different endpoint and method.
The first endpoint is: /consumers/<test_group_name>. For this REST call, the payload is the group information.
-
curl
-
NodeJS
-
Python
curl -s \
-X POST \
"$base_uri/consumers/test_group" \
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{
"format":"json",
"name":"test_consumer",
"auto.offset.reset":"earliest",
"auto.commit.enable":"false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
}'
let payload = {
"name": "test_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
};
let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }};
axios
.post(`${base_uri}/consumers/test_group`, payload, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.post(
url=f"{base_uri}/consumers/test_group",
data=json.dumps({
"name": "test_consumer",
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false",
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
}),
headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
pretty(res)
Expected output:
{"instance_id":"test_consumer","base_uri":"http://127.0.0.1:8082/consumers/test_group/instances/test_consumer"}
|
Subscribe to the topic
After creating the consumer, subscribe to the topic that you created.
-
curl
-
NodeJS
-
Python
curl -s -o /dev/null -w "%{http_code}" \
-X POST \
"$base_uri/consumers/test_group/instances/test_consumer/subscription"\
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{
"topics": [
"test_topic"
]
}'
let payload = { topics: ["test_topic"]};
let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }};
axios
.post(`${base_uri}/consumers/test_group/instances/test_consumer/subscription`, payload, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.post(
url=f"{base_uri}/consumers/test_group/instances/test_consumer/subscription",
data=json.dumps({"topics": ["test_topic"]}),
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Expected response is an HTTP 204, without a body. Now you can get the events from test_topic.
Retrieve events
Retrieve the events from the topic:
-
curl
-
NodeJS
-
Python
curl -s \
"$base_uri/consumers/test_group/instances/test_consumer/records?timeout=1000&max_bytes=100000"\
-H "Accept: application/vnd.kafka.json.v2+json"
let options = {
headers: { Accept: "application/vnd.kafka.json.v2+json" },
params: {
timeout: "1000",
max_bytes: "100000",
},
};
axios
.get(`${base_uri}/consumers/test_group/instances/test_consumer/records`, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.get(
url=f"{base_uri}/consumers/test_group/instances/test_consumer/records",
params={"timeout":1000,"max_bytes":100000},
headers={"Accept": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
Expected output:
[{"topic":"test_topic","key":null,"value":"Redpanda","partition":0,"offset":0},{"topic":"test_topic","key":null,"value":"HTTP proxy","partition":1,"offset":0},{"topic":"test_topic","key":null,"value":"Test event","partition":2,"offset":0}]
Get offsets from consumer
-
curl
-
Python
curl -s \
-X 'GET' \
curl -s -o /dev/null -w "%{http_code}" \
-X 'POST' \
"$base_uri/consumers/test_group/instances/test_consumer/offsets" \
-H 'accept: application/vnd.kafka.v2+json' \
-H 'accept: application/vnd.kafka.v2+json' \
-H 'Content-Type: application/vnd.kafka.v2+json' \
-d '{
"partitions": [
{
"topic": "test_topic",
"partition": 0
},
{
"topic": "test_topic",
"partition": 1
},
{
"topic": "test_topic",
"partition": 2
}
]
}'
res = requests.get(
url=f"{base_uri}/consumers/test_group/instances/test_consumer/offsets",
data=json.dumps(
dict(partitions=[
dict(topic="test_topic", partition=p) for p in [0, 1, 2]
])),
headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
pretty(res)
Expected output:
{ "offsets": [{ "topic": "test_topic", "partition": 0, "offset": 0, "metadata": "" },{ "topic": "test_topic", "partition": 1, "offset": 0, "metadata": "" }, { "topic": "test_topic", "partition": 2, "offset": 0, "metadata": "" }] }
Commit offsets for consumer
After events have been handled by a consumer, the offsets can be committed, so that the consumer group won’t retrieve them again.
-
curl
-
NodeJS
-
Python
curl -s -o /dev/null -w "%{http_code}" \
-X 'POST' \
"$base_uri/consumers/test_group/instances/test_consumer/offsets" \
-H 'accept: application/vnd.kafka.v2+json' \
-H 'Content-Type: application/vnd.kafka.v2+json' \
-d '{
"partitions": [
{
"topic": "test_topic",
"partition": 0,
"offset": 0
},
{
"topic": "test_topic",
"partition": 1,
"offset": 0
},
{
"topic": "test_topic",
"partition": 2,
"offset": 0
}
]
}'
let options = {
headers: {
accept: "application/vnd.kafka.v2+json",
"Content-Type": "application/vnd.kafka.v2+json",
}
};
let payload = { partitions: [
{ topic: "test_topic", partition: 0, offset: 0 },
{ topic: "test_topic", partition: 1, offset: 0 },
{ topic: "test_topic", partition: 2, offset: 0 },
]};
axios
.post(`${base_uri}/consumers/test_group/instances/test_consumer/offsets`, payload, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
Run the application:
node index.js
res = requests.post(
url=f"{base_uri}/consumers/test_group/instances/test_consumer/offsets",
data=json.dumps(
dict(partitions=[
dict(topic="test_topic", partition=p, offset=0) for p in [0, 1, 2]
])),
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Expected output: none.
Delete a consumer
To remove a consumer from a group, send a DELETE request as shown below:
-
curl
-
NodeJS
-
Python
curl -s -o /dev/null -w "%{http_code}" \
-X 'DELETE' \
"$base_uri/consumers/test_group/instances/test_consumer" \
-H 'Content-Type: application/vnd.kafka.v2+json'
let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }};
axios
.delete(`${base_uri}/consumers/test_group/instances/test_consumer`, options)
.then(response => console.log(response.data))
.catch(error => console.error(error));
res = requests.delete(
url=f"{base_uri}/consumers/test_group/instances/test_consumer",
headers={"Content-Type": "application/vnd.kafka.v2+json"})
Use Swagger with HTTP Proxy
You can use Swagger UI to test and interact with Redpanda HTTP Proxy endpoints.
Use Docker to start Swagger UI:
docker run -p 80:8080 -d swaggerapi/swagger-ui
Verify that the Swagger container is available:
docker ps
Verify that the Docker container has been added and is running:
swaggerapi/swagger-ui with Up… status
In a browser, enter <host-address> in the address bar to open the Swagger console.
Change the URL to http://<host-address>:8082/v1, and click Explore to update the page with Redpanda HTTP Proxy endpoints.
You can call the endpoints in any application and language that supports web interactions.