aws_s3
Downloads objects within an Amazon S3 bucket, optionally filtered by a prefix, either by walking the items in the bucket or by streaming upload notifications in real time.
-
Common
-
Advanced
inputs:
label: ""
aws_s3:
bucket: ""
prefix: ""
scanner:
to_the_end: {}
sqs:
url: ""
endpoint: ""
key_path: Records.*.s3.object.key
bucket_path: Records.*.s3.bucket.name
envelope_path: ""
delay_period: ""
max_messages: 10
wait_time_seconds: 0
nack_visibility_timeout: 0
inputs:
label: ""
aws_s3:
bucket: ""
prefix: ""
region: "" # No default (optional)
endpoint: "" # No default (optional)
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
credentials:
profile: "" # No default (optional)
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
from_ec2_role: "" # No default (optional)
role: "" # No default (optional)
role_external_id: "" # No default (optional)
force_path_style_urls: false
delete_objects: false
scanner:
to_the_end: {}
sqs:
url: ""
endpoint: ""
key_path: Records.*.s3.object.key
bucket_path: Records.*.s3.bucket.name
envelope_path: ""
delay_period: ""
max_messages: 10
wait_time_seconds: 0
nack_visibility_timeout: 0
Stream objects on upload with SQS
A common pattern for consuming S3 objects is to emit upload notification events from the bucket either directly to an SQS queue, or to an SNS topic that is consumed by an SQS queue, and then have your consumer listen for events that prompt it to download the newly uploaded objects. More information about this pattern and how to set it up can be found in the Amazon S3 docs.
Redpanda Connect is able to follow this pattern when you configure an sqs.url, where it consumes events from SQS and downloads only the object keys contained in those events. For this to work, Redpanda Connect needs to know where within the event the key and bucket names can be found, specified as dot paths with the fields sqs.key_path and sqs.bucket_path. The default values for these fields should already be correct when following the guide above.
If your notification events are being routed to SQS via an SNS topic, the events are enveloped by SNS, in which case you also need to specify the field sqs.envelope_path, which in the case of SNS to SQS will usually be Message.
When using SQS, make sure you have sensible values for sqs.max_messages and also the visibility timeout of the queue itself. When Redpanda Connect consumes an S3 object the SQS message that triggered it is not deleted until the S3 object has been sent onwards. This ensures at-least-once crash resiliency, but also means that if the S3 object takes longer to process than the visibility timeout of your queue, then the same objects might be processed multiple times.
Download large files
When downloading large files, process them in streamed parts to avoid loading the entire file into memory at once. To do this, specify a scanner that determines how to break the input into smaller individual messages.
Bucket and prefix
The bucket field accepts a bucket name only, not an ARN. For example, use my-bucket, not arn:aws:s3:::my-bucket.
The prefix field accepts a single string. To consume from multiple prefixes in the same bucket, use multiple aws_s3 inputs in a broker input:
input:
broker:
inputs:
- aws_s3:
bucket: my-bucket
prefix: logs/app1/
- aws_s3:
bucket: my-bucket
prefix: logs/app2/
Credentials
By default, Redpanda Connect uses a shared credentials file when connecting to AWS services. You can also set credentials explicitly at the component level to transfer data across accounts. You can find out more in AWS credentials.
S3-compatible storage
The endpoint and force_path_style_urls fields let you connect to S3-compatible storage services such as Cloudflare R2, MinIO, or DigitalOcean Spaces.
For Cloudflare R2, set endpoint to your account endpoint URL and enable force_path_style_urls:
input:
aws_s3:
bucket: r2-bucket
endpoint: https://<account-id>.r2.cloudflarestorage.com
force_path_style_urls: true
region: auto
credentials:
id: <r2-access-key-id>
secret: <r2-secret-access-key>
Find your account ID in the Cloudflare dashboard under R2 > Overview > Account Details. Generate API credentials under R2 > Manage R2 API Tokens.
Metadata
This input adds the following metadata fields to each message:
-
s3_key
-
s3_bucket
-
s3_last_modified_unix
-
s3_last_modified (RFC3339)
-
s3_content_type
-
s3_content_encoding
-
s3_version_id
-
All user defined metadata
You can access these metadata fields using function interpolation. User-defined metadata is case insensitive in AWS, so keys are often received in capitalized form. To normalize them, map all metadata keys to lowercase or uppercase using a Bloblang mapping such as meta = meta().map_each_key(key → key.lowercase()).
Fields
bucket
The bucket to consume from. If the field sqs.url is specified this field is optional.
Type: string
Default: ""
credentials
Optional manual configuration of AWS credentials to use. More information can be found in Amazon Web Services.
Type: object
credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Type: bool
credentials.secret
The secret for the credentials being used.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration. |
Type: string
credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
delete_objects
Whether to delete downloaded objects from the bucket once they are processed.
Type: bool
Default: false
force_path_style_urls
Forces the client API to use path style URLs for downloading keys, which is often required when connecting to custom endpoints.
Type: bool
Default: false
prefix
An optional path prefix, if set only objects with the prefix are consumed when walking a bucket.
Type: string
Default: ""
scanner
The scanner by which the stream of bytes consumed will be broken out into individual messages. Scanners are useful for processing large sources of data without holding the entirety of it within memory. For example, the csv scanner allows you to process individual CSV rows without loading the entire CSV file in memory at once.
Type: scanner
Default:
to_the_end: {}
sqs.bucket_path
A dot path whereby the bucket name can be found in SQS messages.
Type: string
Default: Records.*.s3.bucket.name
sqs.delay_period
An optional period of time to wait from when a notification was originally sent to when the target key download is attempted.
Type: string
Default: ""
# Examples:
delay_period: 10s
# ---
delay_period: 5m
sqs.envelope_path
A dot path of a field to extract an enveloped JSON payload for further extracting the key and bucket from SQS messages. This is specifically useful when subscribing an SQS queue to an SNS topic that receives bucket events.
Type: string
Default: ""
# Examples:
envelope_path: Message
sqs.key_path
A dot path whereby object keys are found in SQS messages.
Type: string
Default: Records.*.s3.object.key
sqs.max_messages
The maximum number of SQS messages to consume from each request.
Type: int
Default: 10
sqs.nack_visibility_timeout
Custom SQS Nack Visibility timeout in seconds. Default is 0
Type: int
Default: 0
sqs.url
An optional SQS URL to connect to. When specified this queue will control which objects are downloaded.
Type: string
Default: ""
sqs.wait_time_seconds
Whether to set the wait time. Enabling this activates long-polling. Valid values: 0 to 20.
Type: int
Default: 0
tcp.connect_timeout
Maximum amount of time a dial will wait for a connect to complete. Zero disables.
Type: string
Default: 0s
tcp.keep_alive.count
Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.
Type: int
Default: 9
tcp.keep_alive.idle
Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.
Type: string
Default: 15s