snowflake_streaming

Beta

Allows Snowflake to ingest data from your data pipeline using Snowpipe Streaming.

To help you configure your own snowflake_streaming output, this page includes two example data pipelines.

Introduced in version 4.39.0.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  snowflake_streaming:
    account: ORG-ACCOUNT # No default (required)
    user: "" # No default (required)
    role: ACCOUNTADMIN # No default (required)
    database: "" # No default (required)
    schema: "" # No default (required)
    table: "" # No default (required)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    mapping: "" # No default (optional)
    init_statement: | # No default (optional)
      CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
    schema_evolution:
      enabled: false # No default (required)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
    max_in_flight: 64
# All configuration fields, showing default values
output:
  label: ""
  snowflake_streaming:
    account: ORG-ACCOUNT # No default (required)
    user: "" # No default (required)
    role: ACCOUNTADMIN # No default (required)
    database: "" # No default (required)
    schema: "" # No default (required)
    table: "" # No default (required)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    mapping: "" # No default (optional)
    init_statement: | # No default (optional)
      CREATE TABLE IF NOT EXISTS <table-name> (amount NUMBER);
    schema_evolution:
      enabled: false # No default (required)
      new_column_type_mapping: |-
        root = match this.value.type() {
          this == "string" => "STRING"
          this == "bytes" => "BINARY"
          this == "number" => "DOUBLE"
          this == "bool" => "BOOLEAN"
          this == "timestamp" => "TIMESTAMP"
          _ => "VARIANT"
        }
    build_parallelism: 1
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 64
    channel_prefix: "" # No default (optional)

Supported data formats for Snowflake columns

The message data from your output must match the columns in the Snowflake table that you want to write data to. The following table shows you the column data types supported by Snowflake and how they correspond to the Bloblang data types in Redpanda Connect.

Snowflake column data type Bloblang data types

CHAR, VARCHAR

string

BINARY

string or bytes

NUMBER

number, or string where the string is parsed into a number

FLOAT

number

BOOLEAN

bool, or number where a non-zero number is true

TIME, DATE, TIMESTAMP

timestamp, or number where the number is a converted to a Unix timestamp, or string where the string is parsed using RFC 3339 format

VARIANT, ARRAY, OBJECT

Any data type converted into JSON

GEOGRAPHY,GEOMETRY

Not supported

Authentication

You can authenticate with Snowflake using an RSA key pair. Either specify:

Performance

For improved performance, this output:

  • Sends multiple messages in parallel. You can tune the maximum number of in-flight messages (or message batches) with the field max_in_flight.

  • Sends messages as a batch. You can configure batches at both the input and output level. For more information, see Message Batching.

Batch sizes

Redpanda recommends that every message batch writes at least 16 MiB of compressed output to Snowflake. You can monitor batch sizes using the snowflake_compressed_output_size_bytes metric.

Metrics

This output emits the following metrics.

Metric name Description

snowflake_convert_latency_ns

The time taken to convert messages into the Snowflake column data types.

snowflake_serialize_latency_ns

The time taken to serialize the converted columnar data into a file to send to Snowflake.

snowflake_build_output_latency_ns

The time taken to build the output file that is sent to Snowflake. This metric is the sum of snowflake_convert_latency_ns + snowflake_serialize_latency_ns.

snowflake_upload_latency_ns

The time taken to upload the output file to Snowflake.

snowflake_compressed_output_size_bytes

The size in bytes of each message batch sent to Snowflake.

Fields

account

The Snowflake account name to use, also known as the account identifier.

If you use an account locator to identify an account, for example: <account_locator>.<region_id>.<cloud>, specify only the <account locator> element in the account field.

Type: string

# Examples
account: AAAAAAA-AAAAAAA

user

Specify a user to run the Snowpipe Stream. To learn how to create a user, see the Snowflake documentation.

Type: string

role

The role of the user specified in the user field. The user’s role must have the required privileges to call the Snowpipe Streaming APIs. For more information about user roles, see the Snowflake documentation.

Type: string

# Examples
role: ACCOUNTADMIN

database

The Snowflake database you want to write data to.

Type: string

schema

The schema of the Snowflake database you want to write data to.

Type: string

table

The Snowflake table you want to write data to.

Type: string

private_key

The PEM-encoded private RSA key to use for authentication with Snowflake. You must specify a value for this field or the private_key_file field.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

private_key_file

A .p8, PEM-encoded file to load the private RSA key from. You must specify a value for this field or the private_key field.

Type: string

private_key_pass

If the RSA key is encrypted, specify the RSA key passphrase.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

mapping

The Bloblang mapping to execute on each message.

Type: string

init_statement

Optional SQL statements to execute immediately after this output connects to Snowflake for the first time. This is a useful way to initialize tables before processing data.

Make sure your SQL statements are idempotent, so they do not cause issues when run multiple times after service restarts.

Type: string

# Examples

init_statement: |2
  CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);

init_statement: |2
  ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
  ALTER TABLE t1 ADD COLUMN a2 NUMBER;

schema_evolution

Options to control schema updates as new columns are added to the pipeline.

Type: object

schema_evolution.enabled

Whether schema evolution is enabled.

Type: bool

schema_evolution.new_column_type_mapping

A mapping function from the Redpanda Connect data types to column data types in Snowflake. This mapping should result in the assignment of a string to the root variable with the data type for the new column in Snowflake.

The input to this mapping is an object with the value and the name of the new column. For example: {"value": 42.3, "name":"new_data_field"}".

You can also override the mapping function to customize data types.

Type: string

Default:

root = match this.value.type() {
    this == "string" => "STRING"
    this == "bytes" => "BINARY"
    this == "number" => "DOUBLE"
    this == "bool" => "BOOLEAN"
    this == "timestamp" => "TIMESTAMP"
    _ => "VARIANT"
  }

build_parallelism

The maximum amount of parallel processing to use when building the output for Snowflake. Monitor the snowflake_build_output_latency_ns metric to assess whether you need to increase this value.

Type: int

Default: 1

batching

Lets you configure a batching policy.

Type*: object

# Examples
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this number to improve throughput until performance plateaus.

Type: int

Default: 64

channel_prefix

The prefix to use when creating a channel name for connecting to a Snowflake table. Without a value for channel_prefix, this output creates a channel name based on a table’s fully qualified name, which results in a single stream per table.

Adding a channel_prefix also avoids the creation of duplicate channel names, which can cause errors and prevent multiple instances of Redpanda Connect from writing at the same time.

The maximum number of channels open at any time is determined by the value in the max_in_flight field.

There is a hard limit of 10,000 streams per table. If you need to use more than 10,000 streams, contact Snowflake support.

Type: string

Example pipelines

The following examples show you how to ingest, process, and write data to Snowflake from:

  • A Redpanda cluster

  • A REST API that posts JSON payloads to a HTTP server

  • Ingesting data from Redpanda

  • HTTP server to push data to Snowflake

Ingest data from Redpanda using consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake.

To create multiple Redpanda Connect streams to write to each output table, you need a unique channel prefix per stream. In this example, the channel_prefix field constructs a unique prefix for each stream using the host name.
input:
  kafka_franz:
    seed_brokers: ["redpanda.example.com:9092"]
    topics: ["my_topic_going_to_snow"]
    consumer_group: "redpanda_connect_to_snowflake"
    tls: {enabled: true}
    sasl:
      - mechanism: SCRAM-SHA-256
        username: MY_USER_NAME
        password: "${TODO}"
pipeline:
  processors:
    - schema_registry_decode:
        url: "redpanda.example.com:8081"
        basic_auth:
          enabled: true
          username: MY_USER_NAME
          password: "${TODO}"
output:
  snowflake_streaming:
    account: "MYSNOW-ACCOUNT"
    user: MYUSER
    role: ACCOUNTADMIN
    database: "MYDATABASE"
    schema: "PUBLIC"
    table: "MYTABLE"
    private_key_file: "my/private/key.p8"
    channel_prefix: "snowflake-channel-for-${HOST}"

Create a HTTP server input that receives HTTP PUT requests with JSON payloads. The payloads are buffered locally then written to Snowflake in batches.

This example uses a buffer to immediately respond to the HTTP requests. This may result in data loss if there are delivery failures between the output and Snowflake.

For more information about the configuration of buffers, see buffers. Alternatively, remove the buffer entirely to respond to the HTTP request only once the data is written to Snowflake.

input:
  http_server:
    path: /snowflake
buffer:
  memory:
    # Max inflight data before applying backpressure
    limit: 524288000 # 50MiB
    # Batching policy the size of the files sent to Snowflake
    batch_policy:
      enabled: true
      byte_size: 33554432 # 32MiB
      period: "10s"
output:
  snowflake_streaming:
    account: "MYSNOW-ACCOUNT"
    user: MYUSER
    role: ACCOUNTADMIN
    database: "MYDATABASE"
    schema: "PUBLIC"
    table: "MYTABLE"
    private_key_file: "my/private/key.p8"