Synchronous Responses

In a regular Redpanda Connect pipeline, messages flow in one direction and acknowledgements in the other:

    ----------- Message ------------->

Input (AMQP) -> Processors -> Output (AMQP)

    <------- Acknowledgement ---------

However, Redpanda Connect supports bidirectional protocols like HTTP and WebSocket, which allow responses to be returned directly from the pipeline.

For example, HTTP is a request/response protocol, and inputs like http_server (Self-Managed) or gateway (Redpanda Cloud) support returning response payloads to the requester.

           --------- Request Body -------->

Input (HTTP) -> Processors -> Output (Sync Response)

           <--- Response Body (and ack) ---

Routing processed messages back

To return a processed response, use the sync_response output.

Use the gateway input in Redpanda Cloud:

input:
  gateway: {}

pipeline:
  processors:
    - mapping: |
        root = {
          city: json("location"),
          forecast: "Clear skies with light winds",
          temperature_c: 22
        }

output:
  sync_response: {}

Sending this request:

{ "location": "Berlin" }

Returns:

{
  "city": "Berlin",
  "forecast": "Clear skies with light winds",
  "temperature_c": 22
}

Combine with other outputs

You can route processed messages to storage and return a response using a broker output.

input:
  gateway: {}

output:
  broker:
    pattern: fan_out
    outputs:
      - redpanda:
          seed_brokers:
            - ${REDPANDA_BROKERS}
          topic: weather.requests
          tls:
            enabled: true
          sasl:
            - mechanism: SCRAM-SHA-256
              username: ${secrets.USERNAME}
              password: ${secrets.PASSWORD}
      - sync_response:
          processors:
            - mapping: |
                root = {
                  status: "received",
                  received_at: now()
                }

Returning partially processed messages

You can return a response before the message is fully processed by using the sync_response processor.

This allows continued processing after the response is set.

pipeline:
  processors:
    - mapping: root = "Received weather report for %s".format(json("location"))
    - sync_response: {}
    - mapping: root.reported_at = now()

This returns "Received weather report for Berlin" to the client, but continues modifying the message before storing or forwarding it.

Due to delivery guarantees, the response is not sent until all downstream processing and acknowledgements are complete.