azure_blob_storage

Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.

  • Common

  • Advanced

# Common config fields, showing default values
input:
  label: ""
  azure_blob_storage:
    storage_account: ""
    storage_access_key: ""
    storage_connection_string: ""
    storage_sas_token: ""
    container: "" # No default (required)
    prefix: ""
    scanner:
      to_the_end: {}
    targets_input: null # No default (optional)
# All config fields, showing default values
input:
  label: ""
  azure_blob_storage:
    storage_account: ""
    storage_access_key: ""
    storage_connection_string: ""
    storage_sas_token: ""
    container: "" # No default (required)
    prefix: ""
    scanner:
      to_the_end: {}
    delete_objects: false
    targets_input: null # No default (optional)

Supports multiple authentication methods but only one of the following is required:

  • storage_connection_string

  • storage_account and storage_access_key

  • storage_account and storage_sas_token

  • storage_account to access via DefaultAzureCredential

If multiple are set then the storage_connection_string is given priority.

If the storage_connection_string does not contain the AccountName parameter, please specify it in the storage_account field.

Download large files

When downloading large files it’s often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a scanner can be specified that determines how to break the input into smaller individual messages.

Stream new files

By default this input will consume all files found within the target container and will then gracefully terminate. This is referred to as a "batch" mode of operation. However, it’s possible to instead configure a container as an Event Grid source and then use this as a targets_input, in which case new files are consumed as they’re uploaded and Redpanda Connect will continue listening for and downloading files as they arrive. This is referred to as a "streamed" mode of operation.

Metadata

This input adds the following metadata fields to each message:

  • blob_storage_key

  • blob_storage_container

  • blob_storage_last_modified

  • blob_storage_last_modified_unix

  • blob_storage_content_type

  • blob_storage_content_encoding

  • All user defined metadata

You can access these metadata fields using function interpolation.

Fields

storage_account

The storage account to access. This field is ignored if storage_connection_string is set.

Type: string

Default: ""

storage_access_key

The storage account access key. This field is ignored if storage_connection_string is set.

Type: string

Default: ""

storage_connection_string

A storage account connection string. This field is required if storage_account and storage_access_key / storage_sas_token are not set.

Type: string

Default: ""

storage_sas_token

The storage account SAS token. This field is ignored if storage_connection_string or storage_access_key are set.

Type: string

Default: ""

container

The name of the container from which to download blobs.

Type: string

prefix

An optional path prefix, if set only objects with the prefix are consumed.

Type: string

Default: ""

scanner

The scanner by which the stream of bytes consumed will be broken out into individual messages. Scanners are useful for processing large sources of data without holding the entirety of it within memory. For example, the csv scanner allows you to process individual CSV rows without loading the entire CSV file in memory at once.

Type: scanner

Default: {"to_the_end":{}}

delete_objects

Whether to delete downloaded objects from the blob once they are processed.

Type: bool

Default: false

targets_input

This is an experimental field that provides an optional source of download targets, configured as a regular Redpanda Connect input. Each message yielded by this input should be a single structured object containing a field name, which represents the blob to be downloaded.

This requires setting up Azure Blob Storage as an Event Grid source and an associated event handler that a Redpanda Connect input can read from. For example, use either one of the following:

Type: input

  • Event Hubs

  • Namespace Topics

targets_input:
  kafka:
    addresses:
      - <event-hub-hostname>:9093
    topics: [ <event-hub-name> ]
    tls:
      enabled: true
      skip_cert_verify: false
    sasl:
      mechanism: "PLAIN"
      user: "$ConnectionString"
      password: <eventhub-connection-string>
    consumer_group: <consumer-group>
    start_from_oldest: true
  processors:
    - unarchive:
        format: json_array
    - mapping: |-
        if this.subject.contains("/containers/<container-name>/") && this.eventType == "Microsoft.Storage.BlobCreated" {
          root.name = this.data.url.parse_url().path.trim_prefix("/<container-name>/")
        }
        else {
          root = deleted()
        }
targets_input:
  mqtt:
    topics:
      - <topic-name>
    urls:
      - <url>.eventgrid.azure.net:8883
  processors:
    - unarchive:
        format: json_array
    - mapping: |-
        if this.eventType == "Microsoft.Storage.BlobCreated" {
          root.name = this.data.url.parse_url().path.trim_prefix("/<container-name>/")
        } else {
          root = deleted()
        }