Docs Connect Guides Streams Mode Streams Via REST API Streams Via REST API By using the Redpanda Connect streams mode REST API you can dynamically control which streams are active at runtime. The full spec for the Redpanda Connect streams mode REST API can be found here. Note that stream configs created and updated using this API do not benefit from environment variable interpolation (function interpolation will still work). Walkthrough Start by running Redpanda Connect in streams mode: rpk connect streams On a separate terminal we can add our first stream foo by POSTing a JSON or YAML config to the /streams/foo endpoint: $ curl http://localhost:4195/streams/foo -X POST --data-binary @- <<EOF input: http_server: {} buffer: memory: {} pipeline: threads: 4 processors: - mapping: | root = { "id": this.user.id, "content": this.body.content } output: http_server: {} EOF Now we can check the full set of streams loaded by GETing the /streams endpoint: $ curl http://localhost:4195/streams | jq '.' { "foo": { "active": true, "uptime": 7.223545951, "uptime_str": "7.223545951s" } } And we can send data to our new stream via its namespaced URL: $ curl http://localhost:4195/foo/post -d '{"user":{"id":"foo"},"body":{"content":"bar"}}' Good, now let’s add another stream bar the same way: $ curl http://localhost:4195/streams/bar -X POST --data-binary @- <<EOF input: kafka: addresses: - localhost:9092 topics: - my_topic pipeline: threads: 1 processors: - mapping: 'root = this.uppercase()' output: elasticsearch: urls: - http://localhost:9200 EOF And check the set again: $ curl http://localhost:4195/streams | jq '.' { "bar": { "active": true, "uptime": 10.121344484, "uptime_str": "10.121344484s" }, "foo": { "active": true, "uptime": 19.380582951, "uptime_str": "19.380583306s" } } It’s also possible to get the configuration of a loaded stream by GETing the path /streams/{id}: $ curl http://localhost:4195/streams/foo | jq '.' { "active": true, "uptime": 30.123488951, "uptime_str": "30.123488951s" "config": { "input": { "http_server": { "address": "", "cert_file": "", "key_file": "", "path": "/post", "timeout": "5s" } }, "buffer": { "memory": { "limit": 10000000 } }, ... etc ... } } Next, we might want to update stream foo by PUTing a new config to the path /streams/foo: $ curl http://localhost:4195/streams/foo -X PUT --data-binary @- <<EOF input: http_server: {} pipeline: threads: 4 processors: - mapping: | root = { "id": this.user.id, "content": this.body.content } output: http_server: {} EOF We have removed the memory buffer with this change, let’s check that the config has actually been updated: $ curl http://localhost:4195/streams/foo | jq '.' { "active": true, "uptime": 12.328482951, "uptime_str": "12.328482951s" "config": { "input": { "http_server": { "address": "", "cert_file": "", "key_file": "", "path": "/post", "timeout": "5s" } }, "buffer": { "type": "none" }, ... etc ... } } Good, we are done with stream bar now, so let’s delete it by DELETEing the /streams/bar endpoint: $ curl http://localhost:4195/streams/bar -X DELETE And let’s GET the /streams endpoint to see the new set: $ curl http://localhost:4195/streams | jq '.' { "foo": { "active": true, "uptime": 31.872448851, "uptime_str": "31.872448851s" } } Great. Another useful feature is POSTing to /streams, this allows us to set the entire set of streams with a single request. The payload is a map of stream ids to configurations and this will become the exclusive set of active streams. If there are existing streams that are not on the list they will be removed. $ curl http://localhost:4195/streams -X POST --data-binary @- <<EOF bar: input: http_client: url: http://localhost:4195/baz/get output: stdout: {} baz: input: http_server: {} output: http_server: {} EOF Let’s check our new set of streams: $ curl http://localhost:4195/streams | jq '.' { "bar": { "active": true, "uptime": 3.183883444, "uptime_str": "3.183883444s" }, "baz": { "active": true, "uptime": 3.183883449, "uptime_str": "3.183883449s" } } Done. Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution Streams Via Config Files Streams API