http_server

Type:

Receive messages POSTed over HTTP(S). HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.

  • Common

  • Advanced

# Common config fields, showing default values
input:
  label: ""
  http_server:
    address: ""
    path: /post
    ws_path: /post/ws
    allowed_verbs:
      - POST
    timeout: 5s
    rate_limit: ""
# All config fields, showing default values
input:
  label: ""
  http_server:
    address: ""
    path: /post
    ws_path: /post/ws
    ws_welcome_message: ""
    ws_rate_limit_message: ""
    allowed_verbs:
      - POST
    timeout: 5s
    rate_limit: ""
    cert_file: ""
    key_file: ""
    cors:
      enabled: false
      allowed_origins: []
    sync_response:
      status: "200"
      headers:
        Content-Type: application/octet-stream
      metadata_headers:
        include_prefixes: []
        include_patterns: []

If the address config field is left blank the service-wide HTTP server will be used.

The field rate_limit allows you to specify an optional rate_limit resource, which will be applied to each HTTP request made and each websocket payload received.

When the rate limit is breached HTTP requests will have a 429 response returned with a Retry-After header. Websocket payloads will be dropped and an optional response payload will be sent as per ws_rate_limit_message.

Responses

It’s possible to return a response for each message received using synchronous responses. When doing so you can customize headers with the sync_response field headers, which can also use function interpolation in the value based on the response message contents.

Endpoints

The following fields specify endpoints that are registered for sending messages, and support path parameters of the form /{foo}, which are added to ingested messages as metadata. A path ending in / will match against all extensions of that path:

path (defaults to /post)

This endpoint expects POST requests where the entire request body is consumed as a single message.

If the request contains a multipart content-type header as per RFC1341 then the multiple parts are consumed as a batch of messages, where each body part is a message of the batch.

ws_path (defaults to /post/ws)

Creates a websocket connection, where payloads received on the socket are passed through the pipeline as a batch of one message.

Endpoint caveats

Components within a Redpanda Connect config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple http_server inputs or outputs (either within brokers or from cohabiting streams) is not possible in a predictable way.

This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash (/) and will therefore match against all extensions of that path, do not prevent the more specific path from matching against requests.

It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing.

For example, if you were to deploy two separate http_server inputs, one with a path /foo/ and the other with a path /foo/bar, it would not be possible to ensure that the path /foo/ does not swallow requests made to /foo/bar.

You may specify an optional ws_welcome_message, which is a static payload to be sent to all clients once a websocket connection is first established.

It’s also possible to specify a ws_rate_limit_message, which is a static payload to be sent to clients that have triggered the servers rate limit.

Metadata

This input adds the following metadata fields to each message:

- http_server_user_agent
- http_server_request_path
- http_server_verb
- http_server_remote_ip
- All headers (only first values are taken)
- All query parameters
- All path parameters
- All cookies

If HTTPS is enabled, the following fields are added as well:

- http_server_tls_version
- http_server_tls_subject
- http_server_tls_cipher_suite

You can access these metadata fields using function interpolation.

Examples

  • Path Switching

  • Mock OAuth 2.0 Server

This example shows an http_server input that captures all requests and processes them by switching on that path:

input:
  http_server:
    path: /
    allowed_verbs: [ GET, POST ]
    sync_response:
      headers:
        Content-Type: application/json

  processors:
    - switch:
      - check: '@http_server_request_path == "/foo"'
        processors:
          - mapping: |
              root.title = "You Got Fooed!"
              root.result = content().string().uppercase()

      - check: '@http_server_request_path == "/bar"'
        processors:
          - mapping: 'root.title = "Bar Is Slow"'
          - sleep: # Simulate a slow endpoint
              duration: 1s

This example shows an http_server input that mocks an OAuth 2.0 Client Credentials flow server at the endpoint /oauth2_test:

input:
  http_server:
    path: /oauth2_test
    allowed_verbs: [ GET, POST ]
    sync_response:
      headers:
        Content-Type: application/json

  processors:
    - log:
        message: "Received request"
        level: INFO
        fields_mapping: |
          root = @
          root.body = content().string()

    - mapping: |
        root.access_token = "MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3"
        root.token_type = "Bearer"
        root.expires_in = 3600

    - sync_response: {}
    - mapping: 'root = deleted()'

Fields

address

An alternative address to host from. If left empty the service wide address is used.

Type: string

Default: ""

path

The endpoint path to listen for POST requests.

Type: string

Default: "/post"

ws_path

The endpoint path to create websocket connections from.

Type: string

Default: "/post/ws"

ws_welcome_message

An optional message to deliver to fresh websocket connections.

Type: string

Default: ""

ws_rate_limit_message

An optional message to delivery to websocket connections that are rate limited.

Type: string

Default: ""

allowed_verbs

An array of verbs that are allowed for the path endpoint.

Type: array

Default: ["POST"] Requires version 3.33.0 or newer

timeout

Timeout for requests. If a consumed messages takes longer than this to be delivered the connection is closed, but the message may still be delivered.

Type: string

Default: "5s"

rate_limit

An optional rate limit to throttle requests by.

Type: string

Default: ""

cert_file

Enable TLS by specifying a certificate and key file. Only valid with a custom address.

Type: string

Default: ""

key_file

Enable TLS by specifying a certificate and key file. Only valid with a custom address.

Type: string

Default: ""

cors

Adds Cross-Origin Resource Sharing headers. Only valid with a custom address.

Type: object

Requires version 3.63.0 or newer

cors.enabled

Whether to allow CORS requests.

Type: bool

Default: false

cors.allowed_origins

An explicit list of origins that are allowed for CORS requests.

Type: array

Default: []

sync_response

Customize messages returned via synchronous responses.

Type: object

sync_response.status

Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata. This field supports interpolation functions.

Type: string

Default: "200"

# Examples

status: ${! json("status") }

status: ${! meta("status") }

sync_response.headers

Specify headers to return with synchronous responses. This field supports interpolation functions.

Type: object

Default: {"Content-Type":"application/octet-stream"}

sync_response.metadata_headers

Specify criteria for which metadata values are added to the response as headers.

Type: object

sync_response.metadata_headers.include_prefixes

Provide a list of explicit metadata key prefixes to match against.

Type: array

Default: []

# Examples

include_prefixes:
  - foo_
  - bar_

include_prefixes:
  - kafka_

include_prefixes:
  - content-

sync_response.metadata_headers.include_patterns

Provide a list of explicit metadata key regular expression (re2) patterns to match against.

Type: array

Default: []

# Examples

include_patterns:
  - .*

include_patterns:
  - _timestamp_unix$