Docs Self-Managed Develop Use Redpanda with the HTTP Proxy API Use Redpanda with the HTTP Proxy API Redpanda HTTP Proxy (pandaproxy) allows access to your data through a REST API. For example, you can list topics or brokers, get events, produce events, subscribe to events from topics using consumer groups, and commit offsets for a consumer. Prerequisites Start Redpanda The first step is to start up Redpanda. HTTP Proxy is enabled by default on port 8082. To change the proxy port, edit redpanda.yaml: redpanda.yaml Kubernetes Cluster Resource ... pandaproxy: pandaproxy_api: - address: 0.0.0.0 port: 8082 ... apiVersion: redpanda.vectorized.io/v1alpha1 kind: Cluster ... spec: ... resources: pandaproxyApi: - port: 8082 ... The remainder of this section is based on the assumption that the HTTP Proxy port is 8082, your container (or Pod in Kubernetes) is named redpanda-0, and your namespace is panda-ns (in Kubernetes). Configure rpk Make sure rpk is configured for your Redpanda deployment, so you can use it to create a topic: Docker Kubernetes alias rpk="docker exec -ti redpanda-0 rpk" alias rpk="kubectl -n panda-ns exec -ti redpanda-0 -c redpanda -- rpk" Create a topic Create a topic to use with HTTP Proxy: rpk topic create test_topic -p 3 For more information, see rpk Commands. Set up libraries You need an app that calls the HTTP Proxy endpoint. This app can be curl (or a similar CLI), or it could be your own custom app written in any language. Below are curl, JavaScript and Python examples. curl NodeJS Python Curl is likely already installed on your system. If not, see curl download instructions. This is based on the assumption that you’re in the root directory of an existing NodeJS project. See Build a Chat Room Application with Redpanda and Node.js for an example of a NodeJS project. In a terminal window, run: npm install axios Import the library into your code: const axios = require('axios'); const base_uri = 'http://<host-address>:8082'; In a terminal window, run: pip install requests Import the library into your code: import requests import json def pretty(text): print(json.dumps(text, indent=2)) base_uri = "http://<host-address>:8082" Access your data Here are some sample commands to produce and consume streams: Get list of topics curl NodeJS Python curl -s "<host-address>:8082/topics" axios .get(`${base_uri}/topics`) .then(response => console.log(response.data)) .catch(error => console.log); Run the application. If your file name is index.js for example, you would run the following command: node index.js res = requests.get(f"{base_uri}/topics").json() pretty(res) Expected output: ["test_topic"] Send events to a topic Use POST to send events in the REST endpoint query. The header must include the following line: Content-Type:application/vnd.kafka.json.v2+json The following commands show how to send events to test_topic: curl NodeJS Python curl -s \ -X POST \ "http://<host-address>:8082/topics/test_topic" \ -H "Content-Type: application/vnd.kafka.json.v2+json" \ -d '{ "records":[ { "value":"Redpanda", "partition":0 }, { "value":"HTTP proxy", "partition":1 }, { "value":"Test event", "partition":2 } ] }' let payload = { records: [ { "value":"Redpanda", "partition": 0 }, { "value":"HTTP proxy", "partition": 1 }, { "value":"Test event", "partition": 2 } ]}; let options = { headers: { "Content-Type" : "application/vnd.kafka.json.v2+json" }}; axios .post(`${base_uri}/topics/test_topic`, payload, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.post( url=f"{base_uri}/topics/test_topic", data=json.dumps( dict(records=[ dict(value="Redpanda", partition=0), dict(value="HTTP Proxy", partition=1), dict(value="Test Event", partition=2) ])), headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json() pretty(res) Expected output (may be formatted differently depending on the chosen application): {"offsets":[{"partition":0,"offset":0},{"partition":2,"offset":0},{"partition":1,"offset":0}]} Get events from a topic After events have been sent to the topic, you can retrieve these same events. curl NodeJS Python curl -s \ "http://<host-address>:8082/topics/test_topic/partitions/0/records?offset=0&timeout=1000&max_bytes=100000"\ -H "Accept: application/vnd.kafka.json.v2+json" let options = { headers: { accept: "application/vnd.kafka.json.v2+json" }, params: { offset: 0, timeout: "1000", max_bytes: "100000", }, }; axios .get(`${base_uri}/topics/test_topic/partitions/0/records`, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.get( url=f"{base_uri}/topics/test_topic/partitions/0/records", params={"offset": 0, "timeout":1000,"max_bytes":100000}, headers={"Accept": "application/vnd.kafka.json.v2+json"}).json() pretty(res) Expected output: [{"topic":"test_topic","key":null,"value":"Redpanda","partition":0,"offset":0}] Create a consumer To retrieve events from a topic using consumers, you must create a consumer and a consumer group, and then subscribe the consumer instance to a topic. Each action involves a different endpoint and method. The first endpoint is: /consumers/<test_group_name>. For this REST call, the payload is the group information. curl NodeJS Python curl -s \ -X POST \ "http://<host-address>:8082/consumers/test_group"\ -H "Content-Type: application/vnd.kafka.v2+json" \ -d '{ "format":"json", "name":"test_consumer", "auto.offset.reset":"earliest", "auto.commit.enable":"false", "fetch.min.bytes": "1", "consumer.request.timeout.ms": "10000" }' let payload = { "name": "test_consumer", "format": "json", "auto.offset.reset": "earliest", "auto.commit.enable": "false", "fetch.min.bytes": "1", "consumer.request.timeout.ms": "10000" }; let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }}; axios .post(`${base_uri}/consumers/test_group`, payload, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.post( url=f"{base_uri}/consumers/test_group", data=json.dumps({ "name": "test_consumer", "format": "json", "auto.offset.reset": "earliest", "auto.commit.enable": "false", "fetch.min.bytes": "1", "consumer.request.timeout.ms": "10000" }), headers={"Content-Type": "application/vnd.kafka.v2+json"}).json() pretty(res) Expected output: {"instance_id":"test_consumer","base_uri":"http://127.0.0.1:8082/consumers/test_group/instances/test_consumer"} Consumers expire after five minutes of inactivity. To prevent this from happening, try consuming events within a loop. If the consumer has expired, you can create a new one with the same name. The output base_uri will be used in subsequent curl requests. Subscribe to the topic After creating the consumer, subscribe to the topic that you created. curl NodeJS Python curl -s -o /dev/null -w "%{http_code}" \ -X POST \ "<base-uri>/subscription"\ -H "Content-Type: application/vnd.kafka.v2+json" \ -d '{ "topics": [ "test_topic" ] }' let payload = { topics: ["test_topic"]}; let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }}; axios .post(`${base_uri}/consumers/test_group/instances/test_consumer/subscription`, payload, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.post( url=f"{base_uri}/consumers/test_group/instances/test_consumer/subscription", data=json.dumps({"topics": ["test_topic"]}), headers={"Content-Type": "application/vnd.kafka.v2+json"}) Expected response is an HTTP 204, without a body. Now you can get the events from test_topic. Retrieve events Retrieve the events from the topic: curl NodeJS Python curl -s \ "<base-uri>/records?timeout=1000&max_bytes=100000"\ -H "Accept: application/vnd.kafka.json.v2+json" let options = { headers: { Accept: "application/vnd.kafka.json.v2+json" }, params: { timeout: "1000", max_bytes: "100000", }, }; axios .get(`${base_uri}/consumers/test_group/instances/test_consumer/records`, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.get( url=f"{base_uri}/consumers/test_group/instances/test_consumer/records", params={"timeout":1000,"max_bytes":100000}, headers={"Accept": "application/vnd.kafka.json.v2+json"}).json() pretty(res) Expected output: [{"topic":"test_topic","key":null,"value":"Redpanda","partition":0,"offset":0},{"topic":"test_topic","key":null,"value":"HTTP proxy","partition":1,"offset":0},{"topic":"test_topic","key":null,"value":"Test event","partition":2,"offset":0}] Get offsets from consumer curl Python curl -s \ -X 'GET' \ '<base-uri>/offsets' \ -H 'accept: application/vnd.kafka.v2+json' \ -H 'Content-Type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "test_topic", "partition": 0 }, { "topic": "test_topic", "partition": 1 }, { "topic": "test_topic", "partition": 2 } ] }' res = requests.get( url=f"{base_uri}/consumers/test_group/instances/test_consumer/offsets", data=json.dumps( dict(partitions=[ dict(topic="test_topic", partition=p) for p in [0, 1, 2] ])), headers={"Content-Type": "application/vnd.kafka.v2+json"}).json() pretty(res) Expected output: { "offsets": [{ "topic": "test_topic", "partition": 0, "offset": 0, "metadata": "" },{ "topic": "test_topic", "partition": 1, "offset": 0, "metadata": "" }, { "topic": "test_topic", "partition": 2, "offset": 0, "metadata": "" }] } Commit offsets for consumer After events have been handled by a consumer, the offsets can be committed, so that the consumer group won’t retrieve them again. curl NodeJS Python curl -s -o /dev/null -w "%{http_code}" \ -X 'POST' \ '<base-uri>/offsets' \ -H 'accept: application/vnd.kafka.v2+json' \ -H 'Content-Type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "test_topic", "partition": 0, "offset": 0 }, { "topic": "test_topic", "partition": 1, "offset": 0 }, { "topic": "test_topic", "partition": 2, "offset": 0 } ] }' let options = { headers: { accept: "application/vnd.kafka.v2+json", "Content-Type": "application/vnd.kafka.v2+json", } }; let payload = { partitions: [ { topic: "test_topic", partition: 0, offset: 0 }, { topic: "test_topic", partition: 1, offset: 0 }, { topic: "test_topic", partition: 2, offset: 0 }, ]}; axios .post(`${base_uri}/consumers/test_group/instances/test_consumer/offsets`, payload, options) .then(response => console.log(response.data)) .catch(error => console.log); Run the application: node index.js res = requests.post( url=f"{base_uri}/consumers/test_group/instances/test_consumer/offsets", data=json.dumps( dict(partitions=[ dict(topic="test_topic", partition=p, offset=0) for p in [0, 1, 2] ])), headers={"Content-Type": "application/vnd.kafka.v2+json"}) Expected output: none. Get list of brokers curl NodeJS Python curl "http://<host-address>:8082/brokers" axios .get(`${base_uri}/brokers`) .then(response => console.log(response.data)) .catch(error => console.log); res = requests.get(f"{base_uri}/brokers").json() pretty(res) Expected output: {brokers: [0]} Delete a consumer To remove a consumer from a group, send a DELETE request as shown below: curl NodeJS Python curl -s -o /dev/null -w "%{http_code}" \ -X 'DELETE' \ '<base-uri>' \ -H 'Content-Type: application/vnd.kafka.v2+json' let options = { headers: { "Content-Type": "application/vnd.kafka.v2+json" }}; axios .delete(`${base_uri}/consumers/test_group/instances/test_consumer`, options) .then(response => console.log(response.data)) .catch(error => console.log); res = requests.delete( url=f"{base_uri}/consumers/test_group/instances/test_consumer", headers={"Content-Type": "application/vnd.kafka.v2+json"}) Authenticate with HTTP Proxy If the Redpanda HTTP Proxy is configured to use SASL, you can provide the SCRAM username and password as part of the Basic Authentication header in your request. For example, to list topics as an authenticated user: curl NodeJS Python curl -s -u "<username>:<password>" "<host-address>:8082/topics" let options = { auth: { username: "<username>", password: "<password>" }, }; axios .get(`${base_uri}/topics`, options) .then(response => console.log(response.data)) .catch(error => console.log); auth = ("<username>", "<password>") res = requests.get(f"{base_uri}/topics", auth=auth).json() pretty(res) Use Swagger with HTTP Proxy You can use Swagger UI to test and interact with Redpanda HTTP Proxy endpoints. Use Docker to start Swagger UI: docker run -p 80:8080 -d swaggerapi/swagger-ui Verify that the Swagger container is available: docker ps Verify that the Docker container has been added and is running: swaggerapi/swagger-ui with Up… status In a browser, enter <host-address> in the address bar to open the Swagger console. Change the URL to http://<host-address>:8082/v1, and click Explore to update the page with Redpanda HTTP Proxy endpoints. You can call the endpoints in any application and language that supports web interactions. The following examples show how to call the endpoints using curl, NodeJS, and Python. Suggested labs Stream Stock Market Data from a CSV file Using PythonStream Stock Market Data from a CSV file Using Node.jsBuild a Chat Room Application with Redpanda and GolangBuild a Chat Room Application with Redpanda and JavaBuild a Chat Room Application with Redpanda and Node.jsBuild a Chat Room Application with Redpanda and PythonBuild a Chat Room Application with Redpanda and RustSee moreSearch all labs Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Benchmark Redpanda Manage Topics