http_server
Receive messages sent over HTTP using POST requests. HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.
-
Common
-
Advanced
inputs:
label: ""
http_server:
address: ""
path: /post
ws_path: /post/ws
allowed_verbs:
- "POST"
timeout: 5s
rate_limit: ""
inputs:
label: ""
http_server:
address: ""
path: /post
ws_path: /post/ws
ws_welcome_message: ""
ws_rate_limit_message: ""
allowed_verbs:
- "POST"
timeout: 5s
rate_limit: ""
cert_file: ""
key_file: ""
cors:
enabled: false
allowed_origins: []
sync_response:
status: 200
headers:
Content-Type: "application/octet-stream"
metadata_headers:
include_prefixes: []
include_patterns: []
tcp:
reuse_addr: false
reuse_port: false
The field rate_limit allows you to specify an optional rate_limit resource, which will be applied to each HTTP request made and each websocket payload received.
When the rate limit is breached HTTP requests will have a 429 response returned with a Retry-After header. Websocket payloads will be dropped and an optional response payload will be sent as per ws_rate_limit_message.
Responses
It’s possible to return a response for each message received using synchronous responses. When doing so you can customize headers with the sync_response field headers, which can also use function interpolation in the value based on the response message contents.
Endpoints
The following fields specify endpoints that are registered for sending messages, and support path parameters of the form /{foo}, which are added to ingested messages as metadata. A path ending in / will match against all extensions of that path:
path (defaults to /post)
This endpoint expects POST requests where the entire request body is consumed as a single message.
If the request contains a multipart content-type header as per RFC1341 then the multiple parts are consumed as a batch of messages, where each body part is a message of the batch.
ws_path (defaults to /post/ws)
Creates a websocket connection, where payloads received on the socket are passed through the pipeline as a batch of one message.
|
Endpoint caveats
Components within a Redpanda Connect config will register their respective endpoints in a non-deterministic order. This means that establishing precedence of endpoints that are registered via multiple This ambiguity makes it difficult to ensure that paths which are both a subset of a path registered by a separate component, and end in a slash ( It is therefore recommended that you ensure paths of separate components do not collide unless they are explicitly non-competing. For example, if you were to deploy two separate |
You may specify an optional ws_welcome_message, which is a static payload to be sent to all clients once a websocket connection is first established.
It’s also possible to specify a ws_rate_limit_message, which is a static payload to be sent to clients that have triggered the servers rate limit.
Metadata
This input adds the following metadata fields to each message:
- http_server_user_agent
- http_server_request_path
- http_server_verb
- http_server_remote_ip
- All headers (only first values are taken)
- All query parameters
- All path parameters
- All cookies
If HTTPS is enabled, the following fields are added as well:
- http_server_tls_version
- http_server_tls_subject
- http_server_tls_cipher_suite
You can access these metadata fields using function interpolation.
Examples
Path Switching
This example shows an http_server input that captures all requests and processes them by switching on that path:
input:
http_server:
path: /
allowed_verbs: [ GET, POST ]
sync_response:
headers:
Content-Type: application/json
processors:
- switch:
- check: '@http_server_request_path == "/foo"'
processors:
- mapping: |
root.title = "You Got Fooed!"
root.result = content().string().uppercase()
- check: '@http_server_request_path == "/bar"'
processors:
- mapping: 'root.title = "Bar Is Slow"'
- sleep: # Simulate a slow endpoint
duration: 1s
Mock OAuth 2.0 Server
This example shows an http_server input that mocks an OAuth 2.0 Client Credentials flow server at the endpoint /oauth2_test:
input:
http_server:
path: /oauth2_test
allowed_verbs: [ GET, POST ]
sync_response:
headers:
Content-Type: application/json
processors:
- log:
message: "Received request"
level: INFO
fields_mapping: |
root = @
root.body = content().string()
- mapping: |
root.access_token = "MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3"
root.token_type = "Bearer"
root.expires_in = 3600
- sync_response: {}
- mapping: 'root = deleted()'
Fields
address
An alternative address to host from. If left empty the service wide address is used.
Type: string
Default: ""
allowed_verbs[]
An array of verbs that are allowed for the path endpoint.
Type: array
Default:
- "POST"
cert_file
Enable TLS by specifying a certificate and key file. Only valid with a custom address.
Type: string
Default: ""
cors.allowed_origins[]
An explicit list of origins that are allowed for CORS requests.
Type: array
Default: []
key_file
Enable TLS by specifying a certificate and key file. Only valid with a custom address.
Type: string
Default: ""
sync_response.headers
Specify headers to return with synchronous responses. This field supports interpolation functions.
Type: string
Default:
Content-Type: "application/octet-stream"
sync_response.metadata_headers
Specify criteria for which metadata values are added to the response as headers.
Type: object
sync_response.metadata_headers.include_patterns[]
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
Type: array
Default: []
# Examples:
include_patterns:
- .*
# ---
include_patterns:
- _timestamp_unix$
sync_response.metadata_headers.include_prefixes[]
Provide a list of explicit metadata key prefixes to match against.
Type: array
Default: []
# Examples:
include_prefixes:
- foo_
- bar_
# ---
include_prefixes:
- kafka_
# ---
include_prefixes:
- content-
sync_response.status
Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata. This field supports interpolation functions.
Type: string
Default: 200
# Examples:
status: ${! json("status") }
# ---
status: ${! meta("status") }
tcp.reuse_addr
Enable SO_REUSEADDR, allowing binding to ports in TIME_WAIT state. Useful for graceful restarts and config reloads where the server needs to rebind to the same port immediately after shutdown.
Type: bool
Default: false
tcp.reuse_port
Enable SO_REUSEPORT, allowing multiple sockets to bind to the same port for load balancing across multiple processes/threads.
Type: bool
Default: false
timeout
Timeout for requests. If a consumed messages takes longer than this to be delivered the connection is closed, but the message may still be delivered.
Type: string
Default: 5s