snowflake_streaming

Allows Snowflake to ingest data from your data pipeline using Snowpipe Streaming.

To help you configure your own snowflake_streaming output, this page includes example data pipelines.

# Common configuration fields, showing default values
output:
  label: ""
  snowflake_streaming:
    account: ORG-ACCOUNT # No default (required)
    user: "" # No default (required)
    role: ACCOUNTADMIN # No default (required)
    database: MYDATABASE # No default (required)
    schema: PUBLIC # No default (required)
    table: MYTABLE # No default (required)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    mapping: "" # No default (optional)
    init_statement: | # No default (optional)
      CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
    schema_evolution:
      enabled: false # No default (required)
      processors: [] # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: "" # No default (optional)
      check: "" # No default (optional)
    max_in_flight: 4
yml

Supported data formats for Snowflake columns

The message data from your output must match the columns in the Snowflake table that you want to write data to. The following table shows you the column data types supported by Snowflake and how they correspond to the Bloblang data types in Redpanda Connect.

Snowflake column data type Bloblang data types

CHAR, VARCHAR

string

BINARY

string or bytes

NUMBER

number, or string where the string is parsed into a number

FLOAT

number

BOOLEAN

bool, or number where a non-zero number is true

TIME, DATE, TIMESTAMP

timestamp, or number where the number is a converted to a Unix timestamp, or string where the string is parsed using RFC 3339 format

VARIANT, ARRAY, OBJECT

Any data type converted into JSON

GEOGRAPHY,GEOMETRY

Not supported

Authentication

You can authenticate with Snowflake using an RSA key pair. Either specify:

Performance

For improved performance, this output:

  • Sends multiple messages in parallel. You can tune the maximum number of in-flight messages (or message batches) with the field max_in_flight.

  • Sends messages as a batch. You can configure batches at both the input and output level. For more information, see Message Batching.

Batch sizes

Redpanda recommends that every message batch writes at least 16 MiB of compressed output to Snowflake. You can monitor batch sizes using the snowflake_compressed_output_size_bytes metric.

Metrics

This output emits the following metrics.

Metric name Description

snowflake_compressed_output_size_bytes

The size in bytes of each message batch uploaded to Snowflake.

snowflake_convert_latency_ns

The time taken to convert messages into the Snowflake column data types.

snowflake_serialize_latency_ns

The time taken to serialize the converted columnar data into a file for upload to Snowflake.

snowflake_build_output_latency_ns

The time taken to build the file that is uploaded to Snowflake. This metric is the sum of snowflake_convert_latency_ns + snowflake_serialize_latency_ns.

snowflake_upload_latency_ns

The time taken to upload the output file to Snowflake.

snowflake_register_latency_ns

The time taken to register the uploaded output file with Snowflake.

snowflake_commit_latency_ns

The time taken to commit the uploaded data updates to the target Snowflake table.

Fields

account

Use the format <orgname>-<account_name> where:

  • The <orgname> is the name of your Snowflake organization.

  • The <account_name> is the unique name of your account with your Snowflake organization.

To find the correct value for this field, run the following query in Snowflake:

WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';
sql

Type: string

# Examples

account: ORG-ACCOUNT
yml

url

Specify a custom URL to connect to Snowflake. This parameter overrides the default URL, which is automatically generated from the value of output.snowflake_streaming.account. By default, the URL is constructed as follows: https://<output.snowflake_streaming.account>.snowflakecomputing.com.

Type: string

# Examples

url: https://org-account.privatelink.snowflakecomputing.com
yml

user

Specify a user to run the Snowpipe Stream. To learn how to create a user, see the Snowflake documentation.

Type: string

role

The role of the user specified in the user field. The user’s role must have the required privileges to call the Snowpipe Streaming APIs. For more information about user roles, see the Snowflake documentation.

Type: string

# Examples

role: ACCOUNTADMIN
yml

database

The Snowflake database you want to write data to.

Type: string

# Examples

database: MY_DATABASE
yml

schema

The schema of the Snowflake database you want to write data to.

Type: string

# Examples

schema: PUBLIC
yml

table

The Snowflake table you want to write data to. This field supports interpolation functions.

Type: string

# Examples

table: MY_TABLE
yml

private_key

The PEM-encoded private RSA key to use for authentication with Snowflake. You must specify a value for this field or the private_key_file field.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

private_key_file

A .p8, PEM-encoded file to load the private RSA key from. You must specify a value for this field or the private_key field.

Type: string

private_key_pass

If the RSA key is encrypted, specify the RSA key passphrase.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Manage Secrets before adding it to your configuration.

Type: string

mapping

The Bloblang mapping to execute on each message.

Type: string

init_statement

Optional SQL statements to execute immediately after this output connects to Snowflake for the first time. This is a useful way to initialize tables before processing data.

Make sure your SQL statements are idempotent, so they do not cause issues when run multiple times after service restarts.

Type: string

# Examples

init_statement: |2
  CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);

init_statement: |2
  ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
  ALTER TABLE t1 ADD COLUMN a2 NUMBER;
yml

schema_evolution

Options to control schema updates when messages are written to the Snowflake table.

Type: object

schema_evolution.enabled

Whether schema evolution is enabled. When set to true, the Snowflake table is automatically created based on the schema of the first message written to it, if the table does not already exist. As new fields are added to subsequent messages in the pipeline, existing columns are created in the Snowflake table. Any required columns are marked as nullable if new messages do not include data for them.

Type: bool

schema_evolution.processors

A series of processors to execute when new columns are added to the Snowflake table. You can use these processors to:

  • Run side effects when the schema evolves.

  • Enrich the message with additional information to guide the schema changes.

For example, a processor could read the schema from the schema registry that a message was produced with and use that schema to determine the data type of the new column in Snowflake.

The input to these processors is an object with the value and name of the new column, the original message, and details of the Snowflake table the output writes to.

For example: {"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "db_field_name"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}

The output from the processors must be a valid message, which contains a string that specifies the column type for the new column in Snowflake. The metadata remains the same as in the original message that triggered the schema update.

Type: array

build_options

Options for optimizing the build of the output data that is sent to Snowflake. Monitor the snowflake_build_output_latency_ns metric to assess whether you need to update these options.

Type: object

# Examples

build_options:
  parallelism: 4
  chunk_size: 10000
yml

build_options.parallelism

The maximum amount of parallel processing to use when building the output for Snowflake.

Type: int

Default: 1

build_options.chunk_size

The number of table rows to submit in each chunk for processing.

Type: int

Default: 50000

batching

Lets you configure a batching policy.

Type*: object

# Examples
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
yml

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.period

The period after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples

period: 1s

period: 1m

period: 500ms
yml

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples

check: this.type == "end_of_transaction"
yml

batching.processors

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: array

# Examples

processors:
  - archive:
      format: concatenate

processors:
  - archive:
      format: lines

processors:
  - archive:
      format: json_array
yml

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this number to improve throughput until performance plateaus.

Type: int

Default: 4

channel_prefix

The prefix to use when creating a channel name for connecting to a Snowflake table. Adding a channel_prefix avoids the creation of duplicate channel names, which result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.

You can specify either the channel_prefix or channel_name, but not both. If neither field is populated, this output creates a channel name based on a table’s fully-qualified name, which results in a single stream per table.

The maximum number of channels open at any time is determined by the value in the max_in_flight field.

Snowflake limits the number of streams per table to 10,000. If you need to use more than 10,000 streams, contact Snowflake support.

Type: string

# Examples

channel_prefix: channel-${HOST}
yml

channel_name

The channel name to use when connecting to a Snowflake table. Duplicate channel names cause errors and prevent multiple instances of Redpanda Connect from writing at the same time, and so this field supports interpolation functions.

Redpanda Connect assumes that a message batch contains messages for a single channel, which means that interpolation is only executed on the first message in each batch. If your pipeline uses an input that is partitioned, such as an Apache Kafka topic, batch messages at the input level to make sure all messages are processed by the same channel.

You can specify either the channel_name or channel_prefix, but not both. If neither field is populated, this output creates a channel name based on a table’s fully-qualified name, which results in a single stream per table.

Snowflake limits the number of streams per table to 10,000. If you need to use more than 10,000 streams, contact Snowflake support.

Type: string

# Examples

channel_name: partition-${!@kafka_partition}
yml

offset_token

The offset token to use for exactly-once delivery of data to a Snowflake table. This field supports interpolation functions.

This output assumes that messages within a batch are in increasing order by offset token. When data is sent on a channel, the offset token of each message in the batch is compared to the latest token processed by the channel. If the offset token is lexicographically less than the latest token, it’s assumed the message is a duplicate and is dropped. Messages must be delivered to the output in order, otherwise they are processed as duplicates and dropped.

To avoid dropping retried messages if later messages have succeeded in the meantime, use a dead-letter queue to process failed messages. See the Ingesting data exactly once from Redpanda example.

If you’re using a numeric value as an offset token, pad the value so that it’s lexicographically ordered in its string representation because offset tokens are compared in string form. For more details, see the Ingesting data exactly once from Redpanda example.

For more information about offset tokens, see Snowflake Documentation.

Type: string

# Examples

offset_token: offset-${!"%016X".format(@kafka_offset)}

offset_token: postgres-${!@lsn}
yml

commit_timeout

The maximum duration to wait while data updates from a message batch are asynchronously committed to Snowflake.

Type: string

Default: 60s

# Examples

commit_timeout: 10s

commit_timeout: 10m
yml

Example pipelines

The following examples show you how to ingest, process, and write data to Snowflake from:

  • A PostgreSQL table using change data capture (CDC)

  • A Redpanda cluster

  • A REST API that posts JSON payloads to a HTTP server

Send data from a PostgreSQL table and write it to Snowflake exactly once using PostgreSQL logical replication.

This example includes some important features:

  • To make sure that a Snowflake streaming channel does not assume that older data is already committed, the configuration sets a 45-second interval between message batches. This interval prevents a message batch from being sent while another batch is retried.

  • The log sequence number of each data update from the Write-Ahead Log (WAL) in PostgreSQL makes sure that data is only uploaded once to the snowflake_streaming output, and that messages sent to the output are already lexicographically ordered.

To do exactly-once data delivery, it’s important that records are delivered in order to the output, and are correctly partitioned. Before you start, read the offset_token field description. Alternatively, remove the offset_token field to use Redpanda Connect’s default at-least-once delivery model.
input:
  postgres_cdc:
    dsn: postgres://foouser:foopass@localhost:5432/foodb
    schema: "public"
    tables: ["my_pg_table"]
    # Use very large batches. Each batch is sent to Snowflake individually,
    # so to optimize query performance, use the largest file size
    # your memory allows
    batching:
      count: 50000
      period: 45s
    # Set an interval between message batches to prevent multiple batches
    # from being in flight at once
    checkpoint_limit: 1
output:
  snowflake_streaming:
    # Using the log sequence number makes sure data is only updated exactly once
    offset_token: "${!@lsn}"
    # Sending a single ordered log means you can only send one update
    # at a time and properly increment the offset_token
    # and use only a single channel.
    max_in_flight: 1
    account: "MYSNOW-ACCOUNT"
    user: MYUSER
    role: ACCOUNTADMIN
    database: "MYDATABASE"
    schema: "PUBLIC"
    table: "MY_PG_TABLE"
    private_key_file: "my/private/key.p8"
yaml