Docs Cloud Redpanda Connect Components Outputs snowflake_put snowflake_put Available in: Cloud, Self-Managed License: This component requires an Enterprise license. To upgrade, contact Redpanda sales. Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables. Common Advanced # Common config fields, showing default values output: label: "" snowflake_put: account: "" # No default (required) region: us-west-2 # No default (optional) cloud: aws # No default (optional) user: "" # No default (required) password: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) role: "" # No default (required) database: "" # No default (required) warehouse: "" # No default (required) schema: "" # No default (required) stage: "" # No default (required) path: "" file_name: "" file_extension: "" compression: AUTO request_id: "" snowpipe: "" # No default (optional) batching: count: 0 byte_size: 0 period: "" check: "" max_in_flight: 1 # All config fields, showing default values output: label: "" snowflake_put: account: "" # No default (required) region: us-west-2 # No default (optional) cloud: aws # No default (optional) user: "" # No default (required) password: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) role: "" # No default (required) database: "" # No default (required) warehouse: "" # No default (required) schema: "" # No default (required) stage: "" # No default (required) path: "" file_name: "" file_extension: "" upload_parallel_threads: 4 compression: AUTO request_id: "" snowpipe: "" # No default (optional) client_session_keep_alive: false batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) max_in_flight: 1 In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described in Bloblang queries. When using batching, messages are grouped by the calculated stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe insertFiles REST API call will be made for each individual file. Credentials Two authentication mechanisms are supported: User/password Key Pair Authentication User/password This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with Snowpipe. Key pair authentication This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key beforehand. Please consult the documentation for details on how to set it up and assign the Public Key to your user. Note that the Snowflake documentation used to suggest using this command: openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called pbeWithMD5AndDES-CBC, which is part of the PKCS#5 v1.5 and is considered insecure. Due to this, Redpanda Connect does not support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them by using the following command (as the current Snowflake docs suggest): openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8 If you have an existing key encrypted with PKCS#5 v1.5, you can re-encrypt it with PKCS#5 v2.0 using this command: openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8 Please consult the pkcs8 command documentation for details on PKCS#5 algorithms. Batching It’s common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your messages at the output level and join the batch of messages with an archive and/or compress processor. For the optimal batch size, please consult the Snowflake documentation. Snowpipe Given a table called BENTHOS_TBL with one column of type variant: CREATE OR REPLACE TABLE BENTHOS_DB.PUBLIC.BENTHOS_TBL(RECORD variant) and the following BENTHOS_PIPE Snowpipe: CREATE OR REPLACE PIPE BENTHOS_DB.PUBLIC.BENTHOS_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTHOS_DB.PUBLIC.BENTHOS_TBL FROM (SELECT * FROM @%BENTHOS_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO) you can configure Redpanda Connect to use the implicit table stage @%BENTHOS_TBL as the stage and BENTHOS_PIPE as the snowpipe. In this case, you must set compression to AUTO and, if using message batching, you’ll need to configure an archive processor with the concatenate format. Since the compression is set to AUTO, the gosnowflake client library will compress the messages automatically so you don’t need to add a compress processor for message batches. If you add STRIP_OUTER_ARRAY = TRUE in your Snowpipe FILE_FORMAT definition, then you must use json_array instead of concatenate as the archive processor format. Only Snowpipes with FILE_FORMAT TYPE JSON are currently supported. Snowpipe troubleshooting Snowpipe provides the insertReport and loadHistoryScan REST API endpoints which can be used to get information about recent Snowpipe calls. In order to query them, you’ll first need to generate a valid JWT token for your Snowflake account. There are two methods for doing so: Using the snowsql utility: snowsql --private-key-path rsa_key.p8 --generate-jwt -a <account> -u <user> Using the Python sql-api-generate-jwt utility: python3 sql-api-generate-jwt.py --private_key_file_path=rsa_key.p8 --account=<account> --user=<user> Once you successfully generate a JWT token and store it into the JWT_TOKEN environment variable, then you can, for example, query the insertReport endpoint using curl: curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://<account>.snowflakecomputing.com/v1/data/pipes/<database>.<schema>.<snowpipe>/insertReport" If you need to pass in a valid requestId to any of these Snowpipe REST API endpoints, you can set a uuid_v4() string in a metadata field called request_id, log it via the log processor and then configure request_id: ${ @request_id } ). Alternatively, you can enable debug logging and Redpanda Connect will print the Request IDs that it sends to Snowpipe. General troubleshooting The underlying gosnowflake driver requires write access to the default directory to use for temporary files. Please consult the os.TempDir docs for details on how to change this directory via environment variables. A silent failure can occur due to this issue, where the underlying gosnowflake driver doesn’t return an error and doesn’t log a failure if it can’t figure out the current username. One way to trigger this behavior is by running Redpanda Connect in a Docker container with a non-existent user ID (such as --user 1000:1000). Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight. This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc. Examples Kafka / realtime brokers No compression Parquet format with snappy compression Automatic compression DEFLATE compression RAW_DEFLATE compression Upload message batches from realtime brokers such as Kafka persisting the batch partition and offsets in the stage path and filename similarly to the Kafka Connector scheme and call Snowpipe to load them into a table. When batching is configured at the input level, it is done per-partition. input: kafka: addresses: - localhost:9092 topics: - foo consumer_group: benthos batching: count: 10 period: 3s processors: - mapping: | meta kafka_start_offset = meta("kafka_offset").from(0) meta kafka_end_offset = meta("kafka_offset").from(-1) meta batch_timestamp = if batch_index() == 0 { now() } - mapping: | meta batch_timestamp = if batch_index() != 0 { meta("batch_timestamp").from(0) } output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos/BENTHOS_TBL/${! @kafka_partition } file_name: ${! @kafka_start_offset }_${! @kafka_end_offset }_${! meta("batch_timestamp") } upload_parallel_threads: 4 compression: NONE snowpipe: BENTHOS_PIPE Upload concatenated messages into a .json file to a table stage without calling Snowpipe. output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: NONE batching: count: 10 period: 3s processors: - archive: format: concatenate Upload concatenated messages into a .parquet file to a table stage without calling Snowpipe. output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos file_extension: parquet upload_parallel_threads: 4 compression: NONE batching: count: 10 period: 3s processors: - parquet_encode: schema: - name: ID type: INT64 - name: CONTENT type: BYTE_ARRAY default_compression: snappy Upload concatenated messages compressed automatically into a .gz archive file to a table stage without calling Snowpipe. output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: AUTO batching: count: 10 period: 3s processors: - archive: format: concatenate Upload concatenated messages compressed into a .deflate archive file to a table stage and call Snowpipe to load them into a table. output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: DEFLATE snowpipe: BENTHOS_PIPE batching: count: 10 period: 3s processors: - archive: format: concatenate - mapping: | root = content().compress("zlib") Upload concatenated messages compressed into a .raw_deflate archive file to a table stage and call Snowpipe to load them into a table. output: snowflake_put: account: benthos user: test@benthos.dev private_key_file: path_to_ssh_key.pem role: ACCOUNTADMIN database: BENTHOS_DB warehouse: COMPUTE_WH schema: PUBLIC stage: "@%BENTHOS_TBL" path: benthos upload_parallel_threads: 4 compression: RAW_DEFLATE snowpipe: BENTHOS_PIPE batching: count: 10 period: 3s processors: - archive: format: concatenate - mapping: | root = content().compress("flate") Fields account Account name, which is the same as the Account Identifier. However, when using an Account Locator, the Account Identifier is formatted as <account_locator>.<region_id>.<cloud> and this field needs to be populated using the <account_locator> part. Type: string region Optional region field which needs to be populated when using an Account Locator and it must be set to the <region_id> part of the Account Identifier (<account_locator>.<region_id>.<cloud>). Type: string # Examples region: us-west-2 cloud Optional cloud platform field which needs to be populated when using an Account Locator and it must be set to the <cloud> part of the Account Identifier (<account_locator>.<region_id>.<cloud>). Type: string # Examples cloud: aws cloud: gcp cloud: azure user Username. Type: string password An optional password. This field contains sensitive information. Review your cluster security before adding it to your configuration. Type: string private_key_file The path to a file containing the private SSH key. Type: string private_key_pass An optional private SSH key passphrase. This field contains sensitive information. Review your cluster security before adding it to your configuration. Type: string role Role. Type: string database Database. Type: string warehouse Warehouse. Type: string schema Schema. Type: string stage Stage name. Use either one of the supported stage types. This field supports interpolation functions. Type: string path Stage path. This field supports interpolation functions. Type: string Default: "" file_name Stage file name. Will be equal to the Request ID if not set or empty. This field supports interpolation functions. Type: string Default: "" file_extension Stage file extension. Will be derived from the configured compression if not set or empty. This field supports interpolation functions. Type: string Default: "" # Examples file_extension: csv file_extension: parquet upload_parallel_threads Specifies the number of threads to use for uploading files. Type: int Default: 4 compression Compression type. Type: string Default: "AUTO" Option Summary AUTO Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. Default file_extension: gz. DEFLATE Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). Default file_extension: deflate. GZIP Messages must be pre-compressed using the gzip algorithm. Default file_extension: gz. NONE No compression is applied and messages must contain plain-text JSON. Default file_extension: json. RAW_DEFLATE Messages must be pre-compressed using the flate algorithm (without header, RFC1951). Default file_extension: raw_deflate. ZSTD Messages must be pre-compressed using the Zstandard algorithm. Default file_extension: zst. request_id Request ID. Will be assigned a random UUID (v4) string if not set or empty. This field supports interpolation functions. Type: string Default: "" snowpipe An optional Snowpipe name. Use the <snowpipe> part from <database>.<schema>.<snowpipe>. This field supports interpolation functions. Type: string client_session_keep_alive Enable Snowflake keepalive mechanism to prevent the client session from expiring after 4 hours (error 390114). Type: bool Default: false batching Allows you to configure a batching policy. Type: object # Examples batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.count A number of messages at which the batch should be flushed. If 0 disables count based batching. Type: int Default: 0 batching.byte_size An amount of bytes at which the batch should be flushed. If 0 disables size based batching. Type: int Default: 0 batching.period A period in which an incomplete batch should be flushed regardless of its size. Type: string Default: "" # Examples period: 1s period: 1m period: 500ms batching.check A Bloblang query that should return a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples check: this.type == "end_of_transaction" batching.processors A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type: array # Examples processors: - archive: format: concatenate processors: - archive: format: lines processors: - archive: format: json_array max_in_flight The maximum number of parallel message batches to have in flight at any given time. Type: int Default: 1 Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution sftp splunk_hec