azure_blob_storage
Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.
Introduced in version 3.36.0.
-
Common
-
Advanced
# Common config fields, showing default values
input:
label: ""
azure_blob_storage:
storage_account: ""
storage_access_key: ""
storage_connection_string: ""
storage_sas_token: ""
container: "" # No default (required)
prefix: ""
scanner:
to_the_end: {}
targets_input: null # No default (optional)
# All config fields, showing default values
input:
label: ""
azure_blob_storage:
storage_account: ""
storage_access_key: ""
storage_connection_string: ""
storage_sas_token: ""
container: "" # No default (required)
prefix: ""
scanner:
to_the_end: {}
delete_objects: false
targets_input: null # No default (optional)
Supports multiple authentication methods but only one of the following is required:
-
storage_connection_string
-
storage_account
andstorage_access_key
-
storage_account
andstorage_sas_token
-
storage_account
to access via DefaultAzureCredential
If multiple are set then the storage_connection_string
is given priority.
If the storage_connection_string
does not contain the AccountName
parameter, please specify it in the
storage_account
field.
Download large files
When downloading large files it’s often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a scanner
can be specified that determines how to break the input into smaller individual messages.
Stream new files
By default this input will consume all files found within the target container and will then gracefully terminate. This is referred to as a "batch" mode of operation. However, it’s possible to instead configure a container as an Event Grid source and then use this as a targets_input
, in which case new files are consumed as they’re uploaded and Redpanda Connect will continue listening for and downloading files as they arrive. This is referred to as a "streamed" mode of operation.
Metadata
This input adds the following metadata fields to each message:
-
blob_storage_key
-
blob_storage_container
-
blob_storage_last_modified
-
blob_storage_last_modified_unix
-
blob_storage_content_type
-
blob_storage_content_encoding
-
All user defined metadata
You can access these metadata fields using function interpolation.
Fields
storage_account
The storage account to access. This field is ignored if storage_connection_string
is set.
Type: string
Default: ""
storage_access_key
The storage account access key. This field is ignored if storage_connection_string
is set.
Type: string
Default: ""
storage_connection_string
A storage account connection string. This field is required if storage_account
and storage_access_key
/ storage_sas_token
are not set.
Type: string
Default: ""
storage_sas_token
The storage account SAS token. This field is ignored if storage_connection_string
or storage_access_key
are set.
Type: string
Default: ""
prefix
An optional path prefix, if set only objects with the prefix are consumed.
Type: string
Default: ""
scanner
The scanner by which the stream of bytes consumed will be broken out into individual messages. Scanners are useful for processing large sources of data without holding the entirety of it within memory. For example, the csv
scanner allows you to process individual CSV rows without loading the entire CSV file in memory at once.
Type: scanner
Default: {"to_the_end":{}}
Requires version 4.25.0 or newer
delete_objects
Whether to delete downloaded objects from the blob once they are processed.
Type: bool
Default: false
targets_input
This is an experimental field that provides an optional source of download targets, configured as a regular Redpanda Connect input. Each message yielded by this input should be a single structured object containing a field name , which represents the blob to be downloaded.
|
This requires setting up Azure Blob Storage as an Event Grid source and an associated event handler that a Redpanda Connect input can read from. For example, use either one of the following:
-
Azure Event Hubs using the
kafka
input -
Namespace topics using the
mqtt
input
Type: input
Requires version 4.27.0 or newer
-
Event Hubs
-
Namespace Topics
targets_input:
kafka:
addresses:
- <event-hub-hostname>:9093
topics: [ <event-hub-name> ]
tls:
enabled: true
skip_cert_verify: false
sasl:
mechanism: "PLAIN"
user: "$ConnectionString"
password: <eventhub-connection-string>
consumer_group: <consumer-group>
start_from_oldest: true
processors:
- unarchive:
format: json_array
- mapping: |-
if this.subject.contains("/containers/<container-name>/") && this.eventType == "Microsoft.Storage.BlobCreated" {
root.name = this.data.url.parse_url().path.trim_prefix("/<container-name>/")
}
else {
root = deleted()
}
targets_input:
mqtt:
topics:
- <topic-name>
urls:
- <url>.eventgrid.azure.net:8883
processors:
- unarchive:
format: json_array
- mapping: |-
if this.eventType == "Microsoft.Storage.BlobCreated" {
root.name = this.data.url.parse_url().path.trim_prefix("/<container-name>/")
} else {
root = deleted()
}