# snowflake_put

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [cloud-data-platform-full.txt](https://docs.redpanda.com/cloud-data-platform-full.txt)

---
title: snowflake_put
latest-operator-version: v26.1.4
latest-console-tag: v3.7.3
latest-connect-version: 4.93.0
latest-redpanda-tag: v26.1.9
docname: connect/components/outputs/snowflake_put
page-component-name: cloud-data-platform
page-version: master
page-component-version: master
page-component-title: Cloud
page-relative-src-path: connect/components/outputs/snowflake_put.adoc
page-edit-url: https://github.com/redpanda-data/cloud-docs/edit/main/modules/develop/pages/connect/components/outputs/snowflake_put.adoc
page-git-created-date: "2024-09-09"
page-git-modified-date: "2026-05-26"
---

<!-- Source: https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/snowflake_put.md -->

**Available in:** Cloud, [Self-Managed](https://docs.redpanda.com/connect/components/outputs/snowflake_put/%20%22View%20the%20Self-Managed%20version%20of%20this%20component%22)

> 💡 **TIP**
>
> Use the [`snowflake_streaming` output](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/outputs/snowflake_streaming/) for improved performance, cost-effectiveness, and ease of use.

Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables.

#### Common

```yml
outputs:
  label: ""
  snowflake_put:
    account: "" # No default (required)
    region: "" # No default (optional)
    cloud: "" # No default (optional)
    user: "" # No default (required)
    password: "" # No default (optional)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    role: "" # No default (required)
    database: "" # No default (required)
    warehouse: "" # No default (required)
    schema: "" # No default (required)
    stage: "" # No default (required)
    path: ""
    file_name: ""
    file_extension: ""
    compression: AUTO
    request_id: ""
    snowpipe: "" # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 1
```

#### Advanced

```yml
outputs:
  label: ""
  snowflake_put:
    account: "" # No default (required)
    region: "" # No default (optional)
    cloud: "" # No default (optional)
    user: "" # No default (required)
    password: "" # No default (optional)
    private_key: "" # No default (optional)
    private_key_file: "" # No default (optional)
    private_key_pass: "" # No default (optional)
    role: "" # No default (required)
    database: "" # No default (required)
    warehouse: "" # No default (required)
    schema: "" # No default (required)
    stage: "" # No default (required)
    path: ""
    file_name: ""
    file_extension: ""
    upload_parallel_threads: 4
    compression: AUTO
    request_id: ""
    snowpipe: "" # No default (optional)
    client_session_keep_alive: false
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 1
```

In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described in [Bloblang queries](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries). When using batching, messages are grouped by the calculated stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe `insertFiles` REST API call will be made for each individual file.

## [](#credentials)Credentials

Two authentication mechanisms are supported:

-   User/password

-   Key Pair Authentication


### [](#userpassword)User/password

This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with Snowpipe.

### [](#key-pair-authentication)Key pair authentication

This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key beforehand. Please consult the [documentation](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) for details on how to set it up and assign the Public Key to your user.

Note that the Snowflake documentation [used to suggest](https://twitter.com/felipehoffa/status/1560811785606684672) using this command:

```bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
```

to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called `pbeWithMD5AndDES-CBC`, which is part of the PKCS#5 v1.5 and is considered insecure. Due to this, Redpanda Connect does not support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them by using the following command (as the current Snowflake docs suggest):

```bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
```

If you have an existing key encrypted with PKCS#5 v1.5, you can re-encrypt it with PKCS#5 v2.0 using this command:

```bash
openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8
```

Please consult the [pkcs8 command documentation](https://linux.die.net/man/1/pkcs8) for details on PKCS#5 algorithms.

## [](#batching)Batching

It’s common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your messages at the output level and join the batch of messages with an [`archive`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/archive/) and/or [`compress`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/compress/) processor.

For the optimal batch size, please consult the Snowflake [documentation](https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html).

## [](#snowpipe)Snowpipe

Given a table called `BENTHOS_TBL` with one column of type `variant`:

```sql
CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant)
```

and the following `BENTHOS_PIPE` Snowpipe:

```sql
CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO)
```

you can configure Redpanda Connect to use the implicit table stage `@%BENTHOS_TBL` as the `stage` and `BENTHOS_PIPE` as the `snowpipe`. In this case, you must set `compression` to `AUTO` and, if using message batching, you’ll need to configure an [`archive`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/archive/) processor with the `concatenate` format. Since the `compression` is set to `AUTO`, the [gosnowflake](https://github.com/snowflakedb/gosnowflake) client library will compress the messages automatically so you don’t need to add a [`compress`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/compress/) processor for message batches.

If you add `STRIP_OUTER_ARRAY = TRUE` in your Snowpipe `FILE_FORMAT` definition, then you must use `json_array` instead of `concatenate` as the archive processor format.

> 📝 **NOTE**
>
> Only Snowpipes with `FILE_FORMAT` `TYPE` `JSON` are currently supported.

## [](#snowpipe-troubleshooting)Snowpipe troubleshooting

Snowpipe [provides](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis.html) the `insertReport` and `loadHistoryScan` REST API endpoints which can be used to get information about recent Snowpipe calls. In order to query them, you’ll first need to generate a valid JWT token for your Snowflake account. There are two methods for doing so:

-   Using the `snowsql` [utility](https://docs.snowflake.com/en/user-guide/snowsql.html):


```bash
snowsql --private-key-path rsa_key.p8 --generate-jwt -a <account> -u <user>
```

-   Using the Python `sql-api-generate-jwt` [utility](https://docs.snowflake.com/en/developer-guide/sql-api/authenticating.html#generating-a-jwt-in-python):


```bash
python3 sql-api-generate-jwt.py --private_key_file_path=rsa_key.p8 --account=<account> --user=<user>
```

Once you successfully generate a JWT token and store it into the `JWT_TOKEN` environment variable, then you can, for example, query the `insertReport` endpoint using `curl`:

```bash
curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://<account>.snowflakecomputing.com/v1/data/pipes/<database>.<schema>.<snowpipe>/insertReport"
```

If you need to pass in a valid `requestId` to any of these Snowpipe REST API endpoints, you can set a [uuid\_v4()](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/functions/#uuid_v4) string in a metadata field called `request_id`, log it via the [`log`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/log/) processor and then configure `request_id: ${ @request_id }` ). Alternatively, you can [enable debug logging](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/logger/about/) and Redpanda Connect will print the Request IDs that it sends to Snowpipe.

## [](#general-troubleshooting)General troubleshooting

The underlying [`gosnowflake` driver](https://github.com/snowflakedb/gosnowflake) requires write access to the default directory to use for temporary files. Please consult the [`os.TempDir`](https://pkg.go.dev/os#TempDir) docs for details on how to change this directory via environment variables.

A silent failure can occur due to [this issue](https://github.com/snowflakedb/gosnowflake/issues/701), where the underlying [`gosnowflake` driver](https://github.com/snowflakedb/gosnowflake) doesn’t return an error and doesn’t log a failure if it can’t figure out the current username. One way to trigger this behavior is by running Redpanda Connect in a Docker container with a non-existent user ID (such as `--user 1000:1000`).

## [](#performance)Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more [in this doc](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

## [](#examples)Examples

### [](#kafka-realtime-brokers)Kafka / realtime brokers

Upload message batches from realtime brokers such as Kafka persisting the batch partition and offsets in the stage path and filename similarly to the [Kafka Connector scheme](https://docs.snowflake.com/en/user-guide/kafka-connector-ts.html#step-1-view-the-copy-history-for-the-table) and call Snowpipe to load them into a table. When batching is configured at the input level, it is done per-partition.

```yaml
input:
  redpanda:
    seed_brokers:
      - localhost:9092
    topics:
      - foo
    consumer_group: rpcn
    max_yield_batch_bytes: 8MB
  processors:
    - mapping: |
        meta kafka_start_offset = meta("kafka_offset").from(0)
        meta kafka_end_offset = meta("kafka_offset").from(-1)
        meta batch_timestamp = if batch_index() == 0 { now() }
    - mapping: |
        meta batch_timestamp = if batch_index() != 0 { meta("batch_timestamp").from(0) }

output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos/BENTHOS_TBL/${! @kafka_partition }
    file_name: ${! @kafka_start_offset }_${! @kafka_end_offset }_${! meta("batch_timestamp") }
    upload_parallel_threads: 4
    compression: NONE
    snowpipe: BENTHOS_PIPE
```

### [](#no-compression)No compression

Upload concatenated messages into a `.json` file to a table stage without calling Snowpipe.

```yaml
output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos
    upload_parallel_threads: 4
    compression: NONE
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: concatenate
```

### [](#parquet-format-with-snappy-compression)Parquet format with snappy compression

Upload concatenated messages into a `.parquet` file to a table stage without calling Snowpipe.

```yaml
output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos
    file_extension: parquet
    upload_parallel_threads: 4
    compression: NONE
    batching:
      count: 10
      period: 3s
      processors:
        - parquet_encode:
            schema:
              - name: ID
                type: INT64
              - name: CONTENT
                type: BYTE_ARRAY
            default_compression: snappy
```

### [](#automatic-compression)Automatic compression

Upload concatenated messages compressed automatically into a `.gz` archive file to a table stage without calling Snowpipe.

```yaml
output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos
    upload_parallel_threads: 4
    compression: AUTO
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: concatenate
```

### [](#deflate-compression)DEFLATE compression

Upload concatenated messages compressed into a `.deflate` archive file to a table stage and call Snowpipe to load them into a table.

```yaml
output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos
    upload_parallel_threads: 4
    compression: DEFLATE
    snowpipe: BENTHOS_PIPE
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: concatenate
        - mapping: |
            root = content().compress("zlib")
```

### [](#raw_deflate-compression)RAW_DEFLATE compression

Upload concatenated messages compressed into a `.raw_deflate` archive file to a table stage and call Snowpipe to load them into a table.

```yaml
output:
  snowflake_put:
    account: benthos
    user: test@benthos.dev
    private_key_file: path_to_ssh_key.pem
    role: ACCOUNTADMIN
    database: BENTHOS_DB
    warehouse: COMPUTE_WH
    schema: PUBLIC
    stage: "@%BENTHOS_TBL"
    path: benthos
    upload_parallel_threads: 4
    compression: RAW_DEFLATE
    snowpipe: BENTHOS_PIPE
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: concatenate
        - mapping: |
            root = content().compress("flate")
```

## [](#fields)Fields

### [](#account)`account`

Account name, which is the same as the [Account Identifier](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#where-are-account-identifiers-used). However, when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier), the Account Identifier is formatted as `<account_locator>.<region_id>.<cloud>` and this field needs to be populated using the `<account_locator>` part.

**Type**: `string`

### [](#batching-2)`batching`

Allows you to configure a [batching policy](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/batching/).

**Type**: `object`

```yaml
# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s

# ---

batching:
  count: 10
  period: 1s

# ---

batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m
```

### [](#batching-byte_size)`batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-check)`batching.check`

A [Bloblang query](https://docs.redpanda.com/cloud-data-platform/develop/connect/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
check: this.type == "end_of_transaction"
```

### [](#batching-count)`batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.

**Type**: `int`

**Default**: `0`

### [](#batching-period)`batching.period`

A period in which an incomplete batch should be flushed regardless of its size.

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
period: 1s

# ---

period: 1m

# ---

period: 500ms
```

### [](#batching-processors)`batching.processors[]`

A list of [processors](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/about/) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

**Type**: `processor`

```yaml
# Examples:
processors:
  - archive:
      format: concatenate

# ---

processors:
  - archive:
      format: lines

# ---

processors:
  - archive:
      format: json_array
```

### [](#client_session_keep_alive)`client_session_keep_alive`

Enable Snowflake keepalive mechanism to prevent the client session from expiring after 4 hours (error 390114).

**Type**: `bool`

**Default**: `false`

### [](#cloud)`cloud`

Optional cloud platform field which needs to be populated when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) and it must be set to the `<cloud>` part of the Account Identifier (`<account_locator>.<region_id>.<cloud>`).

**Type**: `string`

```yaml
# Examples:
cloud: aws

# ---

cloud: gcp

# ---

cloud: azure
```

### [](#compression)`compression`

Compression type.

**Type**: `string`

**Default**: `AUTO`

| Option | Summary |
| --- | --- |
| AUTO | Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. Default file_extension: gz. |
| DEFLATE | Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). Default file_extension: deflate. |
| GZIP | Messages must be pre-compressed using the gzip algorithm. Default file_extension: gz. |
| NONE | No compression is applied and messages must contain plain-text JSON. Default file_extension: json. |
| RAW_DEFLATE | Messages must be pre-compressed using the flate algorithm (without header, RFC1951). Default file_extension: raw_deflate. |
| ZSTD | Messages must be pre-compressed using the Zstandard algorithm. Default file_extension: zst. |

### [](#database)`database`

Database.

**Type**: `string`

### [](#file_extension)`file_extension`

Stage file extension. Will be derived from the configured `compression` if not set or empty. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

**Default**: `""`

```yaml
# Examples:
file_extension: csv

# ---

file_extension: parquet
```

### [](#file_name)`file_name`

Stage file name. Will be equal to the Request ID if not set or empty. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

**Default**: `""`

### [](#max_in_flight)`max_in_flight`

The maximum number of parallel message batches to have in flight at any given time.

**Type**: `int`

**Default**: `1`

### [](#password)`password`

An optional password.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#path)`path`

Stage path. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

**Default**: `""`

### [](#private_key)`private_key`

Your private SSH key. When using encrypted keys, you must also set a value for [`private_key_pass`](#private_key_pass).

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#private_key_file)`private_key_file`

The path to a file containing your private SSH key. When using encrypted keys, you must also set a value for [`private_key_pass`](#private_key_pass).

**Type**: `string`

### [](#private_key_pass)`private_key_pass`

The passphrase for your private SSH key.

> ⚠️ **CAUTION**
>
> This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see [Manage Secrets](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/secret-management/) before adding it to your configuration.

**Type**: `string`

### [](#region)`region`

Optional region field which needs to be populated when using an [Account Locator](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier) and it must be set to the `<region_id>` part of the Account Identifier (`<account_locator>.<region_id>.<cloud>`).

**Type**: `string`

```yaml
# Examples:
region: us-west-2
```

### [](#request_id)`request_id`

Request ID. Will be assigned a random UUID (v4) string if not set or empty. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

**Default**: `""`

### [](#role)`role`

Role.

**Type**: `string`

### [](#schema)`schema`

Schema.

**Type**: `string`

### [](#snowpipe-2)`snowpipe`

An optional Snowpipe name. Use the `<snowpipe>` part from `<database>.<schema>.<snowpipe>`. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

### [](#stage)`stage`

Stage name. Use either one of the [supported](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html) stage types. This field supports [interpolation functions](https://docs.redpanda.com/cloud-data-platform/develop/connect/configuration/interpolation/#bloblang-queries).

**Type**: `string`

### [](#upload_parallel_threads)`upload_parallel_threads`

Specifies the number of threads to use for uploading files.

**Type**: `int`

**Default**: `4`

### [](#user)`user`

Username.

**Type**: `string`

### [](#warehouse)`warehouse`

Warehouse.

**Type**: `string`