Synchronous Responses
In a regular Redpanda Connect pipeline, messages flow in one direction and acknowledgements in the other:
----------- Message ------------->
Input (AMQP) -> Processors -> Output (AMQP)
<------- Acknowledgement ---------
However, Redpanda Connect supports bidirectional protocols like HTTP and WebSocket, which allow responses to be returned directly from the pipeline.
For example, HTTP is a request/response protocol, and inputs like http_server (Self-Managed) or gateway (Redpanda Cloud) support returning response payloads to the requester.
--------- Request Body -------->
Input (HTTP) -> Processors -> Output (Sync Response)
<--- Response Body (and ack) ---
Routing processed messages back
To return a processed response, use the sync_response output.
Use the http_server input in Self-Managed deployments:
input:
http_server:
path: /weather
pipeline:
processors:
- mapping: |
root = {
city: json("location"),
forecast: "Clear skies with light winds",
temperature_c: 22
}
output:
sync_response: {}
Sending this request:
{ "location": "Berlin" }
Returns:
{
"city": "Berlin",
"forecast": "Clear skies with light winds",
"temperature_c": 22
}
Combine with other outputs
You can route processed messages to storage and return a response using a broker output.
input:
http_server:
path: /weather
output:
broker:
pattern: fan_out
outputs:
- kafka:
addresses: [localhost:9092]
topic: weather.requests
- sync_response:
processors:
- mapping: |
root = {
status: "received",
received_at: now()
}
Returning partially processed messages
You can return a response before the message is fully processed by using the sync_response processor.
This allows continued processing after the response is set.
pipeline:
processors:
- mapping: root = "Received weather report for %s".format(json("location"))
- sync_response: {}
- mapping: root.reported_at = now()
This returns "Received weather report for Berlin" to the client, but continues modifying the message before storing or forwarding it.
|
Due to delivery guarantees, the response is not sent until all downstream processing and acknowledgements are complete. |
Routing output responses back
Some outputs, such as http_client, support returning a downstream response back to the input.
input:
http_server:
path: /proxy-weather
output:
http_client:
url: https://api.example.com/forecast
verb: POST
propagate_response: true
This forwards the request to an external weather API and returns its response to the original requester.
You can combine this with other outputs:
input:
http_server:
path: /proxy-weather
output:
broker:
pattern: fan_out
outputs:
- kafka:
addresses: [localhost:9092]
topic: weather.proxy
- http_client:
url: https://api.example.com/forecast
verb: POST
propagate_response: true