# snowflake_streaming

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: snowflake_streaming
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/outputs/snowflake_streaming
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/outputs/snowflake_streaming.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/outputs/snowflake_streaming.adoc
page-git-created-date: "2024-11-19"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/snowflake_streaming.md -->

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/outputs/snowflake_streaming/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

Allows Snowflake to ingest data from your data pipeline using [Snowpipe Streaming](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview).

To help you configure your own `snowflake_streaming` output, this page includes [example data pipelines](#example-pipelines).

#### Common

```yml
outputs:
  label: ""
  snowflake_streaming:
    account: "" # No default (required)
    user: "" # No default (required)
    role: "" # No default (required)
    database: "" # No default (required)
    schema: "" # No default (required)
    table: "" # No default (required)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    mapping: "" # No default (optional)
    init_statement: "" # No default (optional)
    schema_evolution:
      enabled: "" # No default (required)
      ignore_nulls: true
      processors: [] # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 4
```

#### Advanced

```yml
outputs:
  label: ""
  snowflake_streaming:
    account: "" # No default (required)
    url: "" # No default (optional)
    user: "" # No default (required)
    role: "" # No default (required)
    database: "" # No default (required)
    schema: "" # No default (required)
    table: "" # No default (required)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    mapping: "" # No default (optional)
    init_statement: "" # No default (optional)
    schema_evolution:
      enabled: "" # No default (required)
      ignore_nulls: true
      processors: [] # No default (optional)
    build_options:
      parallelism: 1
      chunk_size: 50000
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 4
    channel_prefix: "" # No default (optional)
    channel_name: "" # No default (optional)
    offset_token: "" # No default (optional)
    commit_backoff:
      initial_interval: 32ms
      max_interval: 512ms
      max_elapsed_time: 60s
      multiplier: 2
    message_format: object
    timestamp_format: 2006-01-02T15:04:05.999999999Z07:00
```

## [](#conversion-of-message-data-into-snowflake-table-rows)Conversion of message data into Snowflake table rows

Message data conversion to Snowflake table rows is determined by the:

-   Output message contents.

-   [Schema evolution settings](#schema_evolution).

-   Schema of the [target Snowflake table](#table).


The following scenarios highlight how these three factors affect data written to the target table.

> 📝 **NOTE**
>
> For reduced complexity, consider [turning on schema evolution](#schema_evolution), which automatically creates and updates the Snowflake table schema based on message contents.

### [](#scenario-data-and-table-schema-match-schema-evolution-turned-on-or-off)Scenario: Data and table schema match (schema evolution turned on or off)

An output message matches the existing table schema, and the `schema_evolution.enabled` field is set to `true` or `false`.

The target Snowflake table has two columns:

-   `product_id` (NUMBER)

-   `product_code` (STRING)


A pipeline generates the following message:

```json
{"product_id": 521, "product_code": “EST-PR”}
```

In this scenario:

-   The JSON keys in the message (`"product_id"` and `"product_code"`) match column names in the target Snowflake table.

-   The message values match the column data types. (If there was a data mismatch, the message would be rejected.)

-   Redpanda Connect inserts the message values into a new row in the target Snowflake table.

    | product_id | product_code |
    | --- | --- |
    | 521 | EST-PR |


### [](#scenario-data-and-table-schema-mismatch-schema-evolution-turned-on)Scenario: Data and table schema mismatch (schema evolution turned on)

An output message includes schema updates, and the `schema_evolution.enabled` field is set to `true`.

The target Snowflake table has the same two columns as the [previous scenario](#scenario-data-and-table-schema-match-schema-evolution-turned-on-or-off):

-   `product_id` (NUMBER)

-   `product_code` (STRING)


This time, the pipeline generates the following message:

```json
{"product_batch": 11111, "product_color": “yellow”}
```

In this scenario:

-   The JSON keys (`"product_batch"` and `"product_color"`) do not match column names in the target Snowflake table.

-   As schema evolution is enabled, Redpanda Connect adds two new columns to the target table with data types derived from the output message values. For more information about the mapping of data types, see [Supported data formats for Snowflake columns](#supported-data-formats-for-snowflake-columns).

-   Redpanda Connect inserts the message values into a new table row.

    | product_id | product_code | product_batch | product_color |
    | --- | --- | --- | --- |
    | (null) | (null) | 11111 | yellow |

    > 📝 **NOTE**
    >
    > You can [configure processors](#schema_evolution-processors) to override the schema updates derived from the message values.


### [](#scenario-data-and-table-schema-mismatch-schema-evolution-turned-off)Scenario: Data and table schema mismatch (schema evolution turned off)

An output message includes schema updates, and the `schema_evolution.enabled` field is set to `false`.

The target Snowflake table has the same two columns:

-   `product_id` (NUMBER)

-   `product_code` (STRING)


The pipeline generates the same message as the [previous scenario](#scenario-data-and-table-schema-mismatch-schema-evolution-turned-on):

```json
{"product_batch": 11111, "product_color": “yellow”}
```

In this scenario:

-   The JSON keys (`"product_batch"` and `"product_color"`) do not match any existing column names.

-   Because schema evolution is turned off, Redpanda Connect ignores the extra column names and values and inserts a row of null values.

    | product_id | product_code |
    | --- | --- |
    | (null) | (null) |


## [](#supported-data-formats-for-snowflake-columns)Supported data formats for Snowflake columns

The message data from your output must match the columns in the Snowflake table that you want to write data to. The following table shows you the [column data types supported by Snowflake](https://docs.snowflake.com/en/sql-reference/intro-summary-data-types) and how they correspond to the [Bloblang data types](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/methods/#type) in Redpanda Connect.

| Snowflake column data type | Bloblang data types |
| --- | --- |
| CHAR, VARCHAR | string |
| BINARY | string or bytes |
| NUMBER | number, or string where the string is parsed into a number |
| FLOAT, including special values, such as NaN (Not a Number), -inf (negative infinity), and inf (positive infinity) | number |
| BOOLEAN | bool, or number where a non-zero number is true |
| TIME, DATE, TIMESTAMP | timestamp, or number where the number is a converted to a Unix timestamp, or string where the string is parsed using RFC 3339 format |
| VARIANT, ARRAY, OBJECT | Any data type converted into JSON |
| GEOGRAPHY,GEOMETRY | Not supported |

## [](#authentication)Authentication

You can authenticate with Snowflake using an [RSA key pair](https://docs.snowflake.com/en/user-guide/key-pair-auth). Either specify:

-   A PEM-encoded private key, in the [`private_key` field](#private_key).

-   The path to a file from which the output can load the private RSA key, in the [`private_key_file` field](#private_key_file).


## [](#performance)Performance

For improved performance, this output:

-   Sends multiple messages in parallel. You can tune the maximum number of in-flight messages (or message batches) with the field `max_in_flight`.

-   Sends messages as a batch. You can configure batches at both the input and output level. For more information, see [Message Batching](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).


### [](#batch-sizes)Batch sizes

Redpanda recommends that every message batch writes at least 16 MiB of compressed output to Snowflake. You can monitor batch sizes using the `snowflake_compressed_output_size_bytes` metric.

### [](#metrics)Metrics

This output emits the following metrics.

| Metric name | Description |
| --- | --- |
| snowflake_compressed_output_size_bytes | The size in bytes of each message batch uploaded to Snowflake. |
| snowflake_convert_latency_ns | The time taken to convert messages into the Snowflake column data types. |
| snowflake_serialize_latency_ns | The time taken to serialize the converted columnar data into a file for upload to Snowflake. |
| snowflake_build_output_latency_ns | The time taken to build the file that is uploaded to Snowflake. This metric is the sum of snowflake_convert_latency_ns + snowflake_serialize_latency_ns. |
| snowflake_upload_latency_ns | The time taken to upload the output file to Snowflake. |
| snowflake_register_latency_ns | The time taken to register the uploaded output file with Snowflake. |
| snowflake_commit_latency_ns | The time taken to commit the uploaded data updates to the target Snowflake table. |

## [](#fields)Fields

### [](#account)`account`

The [Snowflake account name to use](https://docs.snowflake.com/en/user-guide/admin-account-identifier#account-name).

Use the format `<orgname>-<account_name>` where:

-   The `<orgname>` is the name of your Snowflake organization.

-   The `<account_name>` is the unique name of your account with your Snowflake organization.


To find the correct value for this field, run the following query in Snowflake:

```sql
WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';
```

**Type**: `string`

```yaml
# Examples:
account: ORG-ACCOUNT
```

### [](#batching)`batching`

Lets you configure a [batching policy](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

Type\*: `object`

```yml
# Examples
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

The number of bytes at which the batch is flushed. Set to `0` to disable size-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

The number of messages after which the batch is flushed. Set to `0` to disable count-based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as `100ms`, `1s`, or `5s`.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, and therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#build_options)`build_options`

Options for optimizing the build of the output data that is sent to Snowflake. Monitor the `snowflake_build_output_latency_ns` metric to assess whether you need to update these options.

**Type**: `object`

### [](#build_options-chunk_size)`build_options.chunk_size`

The number of table rows to submit in each chunk for processing.

**Type**: `int`

**Default**: `50000`

### [](#build_options-parallelism)`build_options.parallelism`

The maximum amount of parallel processing to use when building the output for Snowflake.

**Type**: `int`

**Default**: `1`

### [](#channel_name)`channel_name`

The channel name to use when connecting to a Snowflake table. Duplicate channel names cause errors and prevent multiple instances of Redpanda Connect from writing at the same time, and so this field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

Redpanda Connect assumes that a message batch contains messages for a single channel, which means that interpolation is only executed on the first message in each batch. If your pipeline uses an input that is partitioned, such as an Apache Kafka topic, batch messages at the input level to make sure all messages are processed by the same channel.

You can specify either the `channel_name` or `channel_prefix`, but not both. If neither field is populated, this output creates a channel name based on a table’s fully-qualified name, which results in a single stream per table.

> 📝 **NOTE**
>
> Snowflake limits the number of streams per table to 10,000. If you need to use more than 10,000 streams, contact [Snowflake support](https://www.snowflake.com/en/support/).

**Type**: `string`

```yaml
# Examples:
channel_name: partition-${!@kafka_partition}
```

### [](#channel_prefix)`channel_prefix`

The prefix to use when creating a channel name for connecting to a Snowflake table. Adding a `channel_prefix` avoids the creation of duplicate channel names, which result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.

You can specify either the `channel_prefix` or `channel_name`, but not both. If neither field is populated, this output creates a channel name based on a table’s fully-qualified name, which results in a single stream per table.

The maximum number of channels open at any time is determined by the value in the `max_in_flight` field.

> 📝 **NOTE**
>
> Snowflake limits the number of streams per table to 10,000. If you need to use more than 10,000 streams, contact [Snowflake support](https://www.snowflake.com/en/support/).

**Type**: `string`

```yaml
# Examples:
channel_prefix: channel-${HOST}
```

### [](#commit_backoff)`commit_backoff`

Control how frequently Snowflake is polled to check if data has been committed.

**Type**: `object`

### [](#commit_backoff-initial_interval)`commit_backoff.initial_interval`

The initial period to wait between status polls.

**Type**: `string`

**Default**: `32ms`

### [](#commit_backoff-max_elapsed_time)`commit_backoff.max_elapsed_time`

The maximum total time to wait for data to be committed. If zero then no limit is used.

**Type**: `string`

**Default**: `60s`

### [](#commit_backoff-max_interval)`commit_backoff.max_interval`

The maximum period to wait between status polls.

**Type**: `string`

**Default**: `512ms`

### [](#commit_backoff-multiplier)`commit_backoff.multiplier`

The factor by which the poll interval grows on each attempt.

**Type**: `float`

**Default**: `2`

### [](#database)`database`

The Snowflake database you want to write data to.

**Type**: `string`

```yaml
# Examples:
database: MY_DATABASE
```

### [](#init_statement)`init_statement`

Optional SQL statements to execute immediately after this output connects to Snowflake for the first time. This is a useful way to initialize tables before processing data.

> 📝 **NOTE**
>
> Make sure your SQL statements are idempotent, so they do not cause issues when run multiple times after service restarts.

**Type**: `string`

```yaml
# Examples:
init_statement: |-

  CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);

# ---

init_statement: |-

  ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
  ALTER TABLE t1 ADD COLUMN a2 NUMBER;
```

### [](#mapping)`mapping`

The [Bloblang `mapping`](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) to execute on each message.

**Type**: `string`

### [](#max_in_flight)`max_in_flight`

The maximum number of messages to have in flight at a given time. Increase this number to improve throughput until performance plateaus.

**Type**: `int`

**Default**: `4`

### [](#message_format)`message_format`

The format to expect incoming messages from the rest of the pipeline.

**Type**: `string`

**Default**: `object`

| Option | Summary |
| --- | --- |
| array | Messages are an array of values where each position matches the ordinal of the column in Snowflake. |
| object | Messages are JSON or Bloblang objects where each key is the Snowflake column name and the value is the column value. |

```yaml
# Examples:
message_format: array
```

### [](#offset_token)`offset_token`

The offset token to use for exactly-once delivery of data to a Snowflake table. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

This output assumes that messages within a batch are in increasing order by offset token. When data is sent on a channel, the offset token of each message in the batch is compared to the latest token processed by the channel. If the offset token is lexicographically less than the latest token, it’s assumed the message is a duplicate and is dropped. Messages must be delivered to the output in order, otherwise they are processed as duplicates and dropped.

To avoid dropping retried messages if later messages have succeeded in the meantime, use a dead-letter queue to process failed messages. See the [Ingesting data exactly once from Redpanda](#example-pipelines) example.

> 📝 **NOTE**
>
> If you’re using a numeric value as an offset token, pad the value so that it’s lexicographically ordered in its string representation because offset tokens are compared in string form. For more details, see the [Ingesting data exactly once from Redpanda](#example-pipelines) example.

For more information about offset tokens, see [Snowflake Documentation](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens).

**Type**: `string`

```yaml
# Examples:
offset_token: offset-${!"%016X".format(@kafka_offset)}

# ---

offset_token: postgres-${!@lsn}
```

### [](#private_key)`private_key`

The PEM-encoded private RSA key to use for authentication with Snowflake. You must specify a value for this field or the `private_key_file` field.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#private_key_file)`private_key_file`

A `.p8`, PEM-encoded file to load the private RSA key from. You must specify a value for this field or the `private_key` field.

**Type**: `string`

### [](#private_key_pass)`private_key_pass`

If the RSA key is encrypted, specify the RSA key passphrase.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#role)`role`

The role of the user specified in the `user` field. The user’s role must have the [required privileges](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#required-access-privileges) to call the Snowpipe Streaming APIs. For more information about user roles, see the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles).

**Type**: `string`

```yaml
# Examples:
role: ACCOUNTADMIN
```

### [](#schema)`schema`

The schema of the Snowflake database you want to write data to.

**Type**: `string`

```yaml
# Examples:
schema: PUBLIC
```

### [](#schema_evolution)`schema_evolution`

Options to control schema updates when messages are written to the Snowflake table.

**Type**: `object`

### [](#schema_evolution-enabled)`schema_evolution.enabled`

Whether schema evolution is enabled. When set to `true`, the Snowflake table is automatically created based on the schema of the first message written to it, if the table does not already exist. As new fields are added to subsequent messages in the pipeline, new columns are created in the Snowflake table. Any required columns are marked as `nullable` if new messages do not include data for them.

**Type**: `bool`

### [](#schema_evolution-ignore_nulls)`schema_evolution.ignore_nulls`

When set to `true` and schema evolution is enabled, new columns that have `null` values _are not_ added to the Snowflake table. This behavior:

-   Prevents unnecessary schema changes caused by placeholder or incomplete data.

-   Avoids creating table columns with incorrect data types.


> 📝 **NOTE**
>
> Redpanda does not recommend updating the default setting unless you are confident about the data type of `null` columns in advance.

**Type**: `bool`

**Default**: `true`

### [](#schema_evolution-processors)`schema_evolution.processors[]`

A series of processors to execute when new columns are added to the Snowflake table. You can use these processors to:

-   Run side effects when the schema evolves.

-   Enrich the message with additional information to guide the schema changes.


For example, a processor could read the schema from the schema registry that a message was produced with and use that schema to determine the data type of the new column in Snowflake.

The input to these processors is an object with the value and name of the new column, the original message, and details of the Snowflake table the output writes to.

For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "db_field_name"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`

The output from the processors must be a valid message, which contains a string that specifies the column type for the new column in Snowflake. The metadata remains the same as in the original message that triggered the schema update.

**Type**: `processor`

```yaml
# Examples:
processors:
  - mapping: |-
      root = match this.value.type() {
        this == "string" => "STRING"
        this == "bytes" => "BINARY"
        this == "number" => "DOUBLE"
        this == "bool" => "BOOLEAN"
        this == "timestamp" => "TIMESTAMP"
        _ => "VARIANT"
      }
```

### [](#table)`table`

The Snowflake table you want to write data to. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

```yaml
# Examples:
table: MY_TABLE
```

### [](#timestamp_format)`timestamp_format`

The format to parse string values for `TIMESTAMP`, `TIMESTAMP_LTZ` and `TIMESTAMP_NTZ` columns. Should be a layout for [time.Parse](https://pkg.go.dev/time#Parse) in Go.

**Type**: `string`

**Default**: `2006-01-02T15:04:05.999999999Z07:00`

### [](#url)`url`

Specify a custom URL to connect to Snowflake. This parameter overrides the default URL, which is automatically generated from the value of `output.snowflake_streaming.account`. By default, the URL is constructed as follows: `[https://<output.snowflake_streaming.account>.snowflakecomputing.com](https://\<output.snowflake_streaming.account\>.snowflakecomputing.com)`.

**Type**: `string`

```yaml
# Examples:
url: https://org-account.privatelink.snowflakecomputing.com
```

### [](#user)`user`

Specify a user to run the Snowpipe Stream. To learn how to create a user, see the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/admin-user-management).

**Type**: `string`

## [](#example-pipelines)Example pipelines

The following examples show you how to ingest, process, and write data to Snowflake from:

-   A PostgreSQL table using change data capture (CDC)

-   A Redpanda cluster

-   A REST API that posts JSON payloads to a HTTP server


See also: [Ingest data into Snowflake cookbook](https://docs.redpanda.com/cloud-data-platform/develop/connect/cookbooks/snowflake_ingestion/)

### Write data exactly once to a Snowflake table using CDC

Send data from a PostgreSQL table and write it to Snowflake exactly once using PostgreSQL logical replication.

This example includes some important features:

-   To make sure that a Snowflake streaming channel does not assume that older data is already committed, the configuration sets a 45-second interval between message batches. This interval prevents a message batch from being sent while another batch is retried.

-   The log sequence number of each data update from the Write-Ahead Log (WAL) in PostgreSQL makes sure that data is only uploaded once to the `snowflake_streaming` output, and that messages sent to the output are already lexicographically ordered.


> 📝 **NOTE**
>
> To do exactly-once data delivery, it’s important that records are delivered in order to the output, and are correctly partitioned. Before you start, read the [`offset_token`](#offset_token) field description. Alternatively, remove the `offset_token` field to use Redpanda Connect’s default at-least-once delivery model.

```yaml
input:
  postgres_cdc:
    dsn: postgres://foouser:foopass@localhost:5432/foodb
    schema: "public"
    tables: ["my_pg_table"]
    # Use very large batches. Each batch is sent to Snowflake individually,
    # so to optimize query performance, use the largest file size
    # your memory allows
    batching:
      count: 50000
      period: 45s
    # Set an interval between message batches to prevent multiple batches
    # from being in flight at once
    checkpoint_limit: 1
output:
  snowflake_streaming:
    # Using the log sequence number makes sure data is only updated exactly once
    offset_token: "${!@lsn}"
    # Sending a single ordered log means you can only send one update
    # at a time and properly increment the offset_token
    # and use only a single channel.
    max_in_flight: 1
    account: "MYSNOW-ACCOUNT"
    user: MYUSER
    role: ACCOUNTADMIN
    database: "MYDATABASE"
    schema: "PUBLIC"
    table: "MY_PG_TABLE"
    private_key_file: "my/private/key.p8"
```

### Ingest data exactly once from Redpanda

Ingest data from Redpanda using consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake.

This example includes some important features:

-   To create multiple Redpanda Connect streams to write to each output table, you need a unique channel prefix per stream. The `channel_prefix` field constructs a unique prefix for each stream using the host name.

-   To prevent message failures from being retried and changing the order of delivered messages, a dead-letter queue processes them.


> 📝 **NOTE**
>
> To do exactly-once data delivery, it’s important that records are delivered in order to the output, and are correctly partitioned. Before you start, read the [`channel_name`](#channel_name) and [`offset_token`](#offset_token) field descriptions. Alternatively, remove the `offset_token` field to use Redpanda Connect’s default at-least-once delivery model.

```yaml
input:
  redpanda_common:
    topics: ["my_topic_going_to_snow"]
    consumer_group: "redpanda_connect_to_snowflake"
    # Use very large batches. Each batch is sent to Snowflake individually,
    # so to optimize query performance, use the largest file size
    # your memory allows
    fetch_max_bytes: 100MiB
    fetch_min_bytes: 50MiB
    partition_buffer_bytes: 100MiB
pipeline:
  processors:
    - schema_registry_decode:
        url: "redpanda.example.com:8081"
        basic_auth:
          enabled: true
          username: MY_USER_NAME
          password: "${TODO}"
output:
  fallback:
    - snowflake_streaming:
        # To write an ordered stream of messages, each partition in
        # Apache Kafka gets its own channel.
        channel_name: "partition-${!@kafka_partition}"
        # Offsets are lexicographically sorted in string form by padding with
        # leading zeros
        offset_token: offset-${!"%016X".format(@kafka_offset)}
        account: "MYSNOW-ACCOUNT"
        user: MYUSER
        role: ACCOUNTADMIN
        database: "MYDATABASE"
        schema: "PUBLIC"
        table: "MYTABLE"
        private_key_file: "my/private/key.p8"
        schema_evolution:
          enabled: true
    # To prevent delivery failures from changing the order of
    # delivered records, it's important that they are immediately
    # sent to a dead-letter queue.
    - retry:
        output:
          redpanda_common:
            topic: "dead_letter_queue"
```

### HTTP server to push data to Snowflake

Create a HTTP server input that receives HTTP PUT requests with JSON payloads.

The payloads are buffered locally then written to Snowflake in batches. To create multiple Redpanda Connect streams to write to each output table, you need a unique channel prefix per stream. In this example, the `channel_prefix` field constructs a unique prefix for each stream using the host name.

> 📝 **NOTE**
>
> Using a buffer to immediately respond to the HTTP requests may result in data loss if there are delivery failures between the output and Snowflake.

For more information about the configuration of buffers, see [buffers](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/buffers/memory/). Alternatively, remove the buffer entirely to respond to the HTTP request only once the data is written to Snowflake.

```yaml
input:
  http_server:
    path: /snowflake
buffer:
  memory:
    # Max inflight data before applying backpressure
    limit: 524288000 # 50MiB
    # Batching policy the size of the files sent to Snowflake
    batch_policy:
      enabled: true
      byte_size: 33554432 # 32MiB
      period: "10s"
output:
  snowflake_streaming:
    account: "MYSNOW-ACCOUNT"
    user: MYUSER
    role: ACCOUNTADMIN
    database: "MYDATABASE"
    schema: "PUBLIC"
    table: "MYTABLE"
    private_key_file: "my/private/key.p8"
    channel_prefix: "snowflake-channel-for-${HOST}"
    schema_evolution:
      enabled: true
```