amqp_0_9
Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various message brokers, including RabbitMQ.
-
Common
-
Advanced
inputs:
label: ""
amqp_0_9:
urls: [] # No default (required)
queue: "" # No default (required)
consumer_tag: ""
prefetch_count: 10
inputs:
label: ""
amqp_0_9:
urls: [] # No default (required)
queue: "" # No default (required)
queue_declare:
enabled: false
durable: true
auto_delete: false
arguments: "" # No default (optional)
bindings_declare: [] # No default (optional)
consumer_tag: ""
auto_ack: false
nack_reject_patterns: []
prefetch_count: 10
prefetch_size: 0
tls:
enabled: false
skip_cert_verify: false
enable_renegotiation: false
root_cas: ""
root_cas_file: ""
client_certs: []
TLS is automatically enabled when connecting to an amqps URL. However, you can customize TLS settings if required.
Metadata
This input adds the following metadata fields to each message:
-
amqp_content_type -
amqp_content_encoding -
amqp_delivery_mode -
amqp_priority -
amqp_correlation_id -
amqp_reply_to -
amqp_expiration -
amqp_message_id -
amqp_timestamp -
amqp_type -
amqp_user_id -
amqp_app_id -
amqp_consumer_tag -
amqp_delivery_tag -
amqp_redelivered -
amqp_exchange -
amqp_routing_key -
All existing message headers, including nested headers prefixed with the key of their respective parent.
You can access these metadata fields using function interpolations.
Fields
auto_ack
Set to true to automatically acknowledge messages as soon as they are consumed rather than waiting for acknowledgments from downstream. This can improve throughput and prevent the pipeline from becoming blocked, but delivery guarantees are lost.
Type: bool
Default: false
bindings_declare[]
Passively declares the bindings of the target queue to make sure they exist and are configured correctly. If the bindings exist, then the passive declaration verifies that fields specified in this object match them.
Type: object
# Examples:
bindings_declare:
- exchange: foo
key: bar
nack_reject_patterns[]
A list of regular expression patterns to match against errors in messages that Redpanda Connect fails to deliver. When a message has an error that matches a pattern, it is dropped or delivered to a dead-letter queue (if a queue has been configured).
By default, failed messages are negatively acknowledged (nacked) and requeued.
Requires version 3.64.0 or later.
Type: array
Default: []
# Examples:
nack_reject_patterns:
- "^reject me please:.+$"
queue_declare
Passively declares the target queue to make sure a queue with the specified name exists and is configured correctly. If the queue exists, then the passive declaration verifies that fields specified in this object match the its properties.
Type: object
queue_declare.arguments
Arguments for server-specific implementations of the queue (optional). You can use arguments to configure additional parameters for queue types that require them. For more information about available arguments, see the RabbitMQ Client Library.
| Argument | Description | Accepted values |
|---|---|---|
|
Declares the type of queue. |
Options: |
|
The maximum number of messages in the queue. |
A non-negative integer. |
|
The maximum size of messages (in bytes) in the queue. |
A non-negative integer. |
|
Sets the queue’s overflow behavior. |
Options: |
|
The duration (in milliseconds) that messages remain in the queue before they expire and are discarded. |
A string that represents the number of milliseconds. For example, |
|
The duration after which the queue automatically expires. |
A positive integer. |
|
The duration (in configurable units) that streamed messages are retained on disk before they are discarded. |
Options: |
|
The maximum size (in bytes) of the segment files held on disk. |
A positive integer. Default: |
|
The version of the classic queue to use. |
Options: |
|
The duration (in milliseconds) that a consumer can remain idle before it is automatically canceled. |
A positive integer that represents the number of milliseconds. For example, |
|
When set to |
A boolean. |
Type: object
# Examples:
arguments:
x-max-length: 1000
x-max-length-bytes: 4096
x-queue-type: quorum
queue_declare.auto_delete
Whether the declared queue auto-deletes when there are no active consumers.
Type: bool
Default: false
tls
Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments.
Type: object
tls.client_certs[]
A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.
You must set tls.enabled: true for the client certificates to take effect.
Certificate pairing rules: For each certificate item, provide either:
-
Inline PEM data using both
certandkeyor -
File paths using both
cert_fileandkey_file.
Mixing inline and file-based values within the same item is not supported.
Type: object
Default: []
# Examples:
client_certs:
- cert: foo
key: bar
# ---
client_certs:
- cert_file: ./example.pem
key_file: ./example.key
tls.client_certs[].key
A plain text certificate key to use.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
password: foo
# ---
password: ${KEY_PASSWORD}
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.
Requires version 3.45.0 or later.
Type: bool
Default: false
tls.root_cas
Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
Default: ""
# Examples:
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
tls.root_cas_file
Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data.
Type: string
Default: ""
# Examples:
root_cas_file: ./root_cas.pem
tls.skip_cert_verify
Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely.
Type: bool
Default: false
urls[]
A list of URLs to connect to. This input attempts to connect to each URL in the list, in order, until a successful connection is established. It then continues to use that URL until the connection is closed.
If an item in the list contains commas, it is split into multiple URLs.
Requires version 3.58.0 or later.
Type: array
# Examples:
urls:
- "amqp://guest:guest@127.0.0.1:5672/"
# ---
urls:
- "amqp://127.0.0.1:5672/,amqp://127.0.0.2:5672/"
# ---
urls:
- "amqp://127.0.0.1:5672/"
- "amqp://127.0.0.2:5672/"