http_client
Connects to a server and continuously requests single messages.
-
Common
-
Advanced
inputs:
label: ""
http_client:
url: "" # No default (required)
verb: GET
headers: {}
rate_limit: "" # No default (optional)
timeout: 5s
payload: "" # No default (optional)
stream:
enabled: false
reconnect: true
scanner:
lines: {}
auto_replay_nacks: true
inputs:
label: ""
http_client:
url: "" # No default (required)
verb: GET
headers: {}
metadata:
include_prefixes: []
include_patterns: []
dump_request_log_level: ""
oauth:
enabled: false
consumer_key: ""
consumer_secret: ""
access_token: ""
access_token_secret: ""
oauth2:
enabled: false
client_key: ""
client_secret: ""
token_url: ""
scopes: []
endpoint_params: {}
basic_auth:
enabled: false
username: ""
password: ""
jwt:
enabled: false
private_key_file: ""
signing_method: ""
claims: {}
headers: {}
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
extract_headers:
include_prefixes: []
include_patterns: []
rate_limit: "" # No default (optional)
timeout: 5s
retry_period: 1s
max_retry_backoff: 300s
retries: 3
follow_redirects: true
backoff_on:
- 429
drop_on: []
successful_on: []
proxy_url: "" # No default (optional)
disable_http2: false
payload: "" # No default (optional)
drop_empty_bodies: true
stream:
enabled: false
reconnect: true
scanner:
lines: {}
auto_replay_nacks: true
Dynamic URL and header settings
You can set the url and headers values dynamically using function interpolations.
Pagination
You can also add function interpolations to the url and headers fields to implement basic pagination, such as page numbers or tokens, where subsequent requests need to include data from previously-consumed responses.
Example:
input:
http_client:
url: >-
https://api.example.com/search?query=allmyfoos&start_time=${! (
(timestamp_unix()-300).ts_format("2006-01-02T15:04:05Z","UTC").escape_url_query()
) }${! ("&next_token="+this.meta.next_token.not_null()) | "" }
verb: GET
rate_limit: schedule_searches
oauth2:
enabled: true
token_url: https://api.example.com/oauth2/token
client_key: "${EXAMPLE_KEY}"
client_secret: "${EXAMPLE_SECRET}"
rate_limit_resources:
- label: schedule_searches
local:
count: 1
interval: 30s
If pagination requires more complex logic, consider using the http processor combined with a generate input, which allows you to schedule the processor.
|
Streaming messages
If you enable streaming, Redpanda Connect consumes the body of the server response as a continuous stream of data, and breaks the stream down into smaller, logical messages using the specified scanner. This functionality allows you to consume APIs that provide long-lived streamed data feeds, such as stock market feeds.
Fields
auto_replay_nacks
Whether to automatically replay rejected messages (negative acknowledgements) at the output level. If the cause of rejections persists, leaving this option enabled can result in back pressure.
Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data is discarded immediately upon consumption and mutation.
Type: bool
Default: true
backoff_on[]
A list of status codes that indicate a request failure, and trigger retries with an increasing backoff period between attempts.
Type: int
Default:
- 429
basic_auth.password
A password to authenticate with.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
drop_empty_bodies
Whether to drop empty payloads received from the target server.
Type: bool
Default: true
drop_on[]
A list of status codes that indicate a request failure, where the input should not attempt retries. This helps avoid unnecessary retries for requests that are unlikely to succeed.
| In these cases, the request is dropped, but the message that triggered the request is retained. |
Type: int
Default: []
dump_request_log_level
EXPERIMENTAL: Set the logging level for the request and response payloads of each HTTP request.
Type: string
Default: ""
Options: TRACE, DEBUG, INFO, WARN, ERROR, FATAL, ``
extract_headers
Specify which response headers to add to the resulting messages as metadata. Header keys are automatically converted to lowercase before matching, so make sure that your patterns target the lowercase versions of the expected header keys.
Type: object
extract_headers.include_patterns[]
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
Type: array
Default: []
# Examples:
include_patterns:
- .*
# ---
include_patterns:
- _timestamp_unix$
extract_headers.include_prefixes[]
Provide a list of explicit metadata key prefixes to match against.
Type: array
Default: []
# Examples:
include_prefixes:
- foo_
- bar_
# ---
include_prefixes:
- kafka_
# ---
include_prefixes:
- content-
follow_redirects
Whether or not to transparently follow redirects, i.e. responses with 300-399 status codes. If disabled, the response message will contain the body, status, and headers from the redirect response and the processor will not make a request to the URL set in the Location header of the response.
Type: bool
Default: true
headers
A map of headers to add to the request. This field supports interpolation functions.
Type: string
Default: {}
# Examples:
headers:
Content-Type: application/octet-stream
traceparent: ${! tracing_span().traceparent }
jwt
Beta
Configure JSON Web Token (JWT) authentication. This feature is in beta and may change in future releases. JWT tokens provide secure, stateless authentication between services.
Type: object
jwt.headers
Additional key-value pairs to include in the JWT header (optional). These headers provide extra metadata for JWT processing.
Type: object
Default: {}
jwt.private_key_file
Path to a file containing the PEM-encoded private key using PKCS#1 or PKCS#8 format. The private key must be compatible with the algorithm specified in the signing_method field.
Type: string
Default: ""
jwt.signing_method
The cryptographic algorithm used to sign the JWT token. Supported algorithms include RS256, RS384, RS512, and EdDSA. This algorithm must be compatible with the private key specified in the private_key_file field.
Type: string
Default: ""
metadata
Specify matching rules that determine which metadata keys to add to the HTTP request as headers (optional).
Type: object
metadata.include_patterns[]
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
Type: array
Default: []
# Examples:
include_patterns:
- .*
# ---
include_patterns:
- _timestamp_unix$
metadata.include_prefixes[]
Provide a list of explicit metadata key prefixes to match against.
Type: array
Default: []
# Examples:
include_prefixes:
- foo_
- bar_
# ---
include_prefixes:
- kafka_
# ---
include_prefixes:
- content-
oauth.access_token
The value used to gain access to the protected resources on behalf of the user.
Type: string
Default: ""
oauth.access_token_secret
The secret that establishes ownership of the oauth.access_token in OAuth 1.0 authentication.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
oauth.consumer_key
A value used to identify the client to the service provider.
Type: string
Default: ""
oauth.consumer_secret
The secret that establishes ownership of the consumer key in OAuth 1.0 authentication.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
oauth2
Allows you to specify open authentication using OAuth version 2 and the client credentials token flow.
Type: object
oauth2.client_key
A value used to identify the client to the token provider.
Type: string
Default: ""
oauth2.client_secret
The secret used to establish ownership of the client key.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
oauth2.endpoint_params
A list of endpoint parameters specified as arrays of strings (optional).
Type: object
Default: {}
# Examples:
endpoint_params:
bar:
- woof
foo:
- meow
- quack
payload
A payload to deliver for each request (optional). This field supports interpolation functions.
Type: string
retry_period
The initial period to wait between failed requests before retrying.
Type: string
Default: 1s
stream
Enables streaming mode, where the HTTP connection remains open and messages are processed line-by-line.
Type: object
stream.reconnect
Whether to automatically reestablish the HTTP connection if it is lost.
Type: bool
Default: true
stream.scanner
The scanner used to split the stream of bytes into individual messages. Scanners are useful for processing large data sources efficiently without holding the entire data set in memory. For example, the csv scanner processes individual rows in a CSV file without loading the entire file in memory.
Type: scanner
Default:
lines: {}
successful_on[]
A list of HTTP status codes that should be considered as successful, even if they are not 2XX codes. This is useful for handling cases where non-2XX codes indicate that the request was processed successfully, such as 303 See Other or 409 Conflict.
By default, all 2XX codes are considered successful unless they are specified in backoff_on or drop_on fields.
Type: int
Default: []
tls
Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments.
Type: object
tls.client_certs[]
A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.
You must set tls.enabled: true for the client certificates to take effect.
Certificate pairing rules: For each certificate item, provide either:
-
Inline PEM data using both
certandkeyor -
File paths using both
cert_fileandkey_file.
Mixing inline and file-based values within the same item is not supported.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Type: bool
Default: false
tls.root_cas
Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.root_cas_file
Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
tls.skip_cert_verify
Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely.
Type: bool
Default: false