Ingest data into Snowflake

Configure a Redpanda Connect pipeline to generate and write data into a Redpanda Serverless topic, and then ingest that data into Snowflake using Snowpipe Streaming.

Set up your Redpanda cluster

In Redpanda Cloud, create a new Serverless Standard cluster.

When the cluster is ready, run rpk cloud cluster select to select the cluster and set it to be your current rpk profile.

Next, create a demo_topic to use as the data source for ingesting data into Snowflake:

rpk topic create demo_topic

Create a user with minimal ACLs to run the ingestion pipeline into Snowflake:

rpk security user create ingestion_user --password Testing1234

Now that the user exists, give them read permissions to demo_topic, as well as full control over any consumer group with the prefix redpanda_connect:

rpk security acl create --allow-principal ingestion_user --operation read --topic demo_topic
rpk security acl create --allow-principal ingestion_user --resource-pattern-type prefixed --operation all --group redpanda_connect

Set up your Snowflake account

Log in to your Snowflake account with a user who has the ACCOUNTADMIN role. Then, run the following SQL commands in a worksheet. They set up another user with minimal permissions to write data into a specified database and schema, ready for streaming data to Snowflake.

-- Set default values for multiple variables
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'STREAMING_DB';
SET ROLE = 'REDPANDA_CONNECT';
SET WH = 'STREAMING_WH';
USE ROLE ACCOUNTADMIN;
-- Create users
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER FOR REDPANDA CONNECT';
-- Create roles
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);
-- Create the destination database and virtual warehouse
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';
-- Grant privileges
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);
-- Set defaults
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;
-- Run the following commands to find your account identifier. Copy it down for later use.
-- It will be something like `organization_name-account_name`
-- e.g. ykmxgak-wyb52636
WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

Create an RSA key pair

Create an RSA key pair using openssl to authenticate Redpanda Connect to Snowflake. When you’re prompted to give an encryption password, record it for later.

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -passout pass:Testing123 -out rsa_key.p8

Create a public key. You’re prompted to enter your encryption password.

openssl rsa -in rsa_key.p8 -pubout -passout pass:Testing123 -out rsa_key.pub

To register the public key in Snowflake, remove the public key delimiters and output only the base64-encoded portion of the PEM file. Run the following bash command to print it:

cat rsa_key.pub | sed -e '1d' -e '$d' | tr -d '\n'

In the Snowflake worksheet, add the output of the bash command you just ran to the following SQL command and execute it:

use role accountadmin;
alter user streaming_user set rsa_public_key='< PubKeyWithoutDelimiters >';

Create a schema using streaming_user

Log out of Snowflake and sign back in as the default user (streaming_user) with the associated password (default: Test1234567). You created these credentials in Set up your Snowflake account.

Run the following SQL commands in a worksheet to create a schema (e.g. STREAMING_SCHEMA) in the default database (e.g. STREAMING_DB):

SET DB = 'STREAMING_DB';
SET SCHEMA = 'STREAMING_SCHEMA';
USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);

Create a pipeline from your Redpanda cluster to Snowflake

You can now create the pipeline. First create secrets for the passwords and keys you created during setup.

On your Serverless cluster, go to the Connect page, select the Secrets tab and then create three secrets:

  • REDPANDA_PASS with the value Testing1234

  • SNOWFLAKE_KEY with the output value of awk '{printf "%s\\n", $0}' rsa_key.p8

  • SNOWFLAKE_KEY_PASS with the value Testing123

Select the Pipelines tab and create a pipeline called RedpandaToSnowflake. Use the following YAML configuration:

input:
  # Reads data from our `demo_topic`
  kafka_franz:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: ["demo_topic"]
    consumer_group: "redpanda_connect_to_snowflake"
    tls: {enabled: true}
    checkpoint_limit: 4096
    sasl:
      - mechanism: SCRAM-SHA-256
        username: ingestion_user
        password: ${secrets.REDPANDA_PASS}
    # Define the batching policy. This cookbook creates small batches,
    # but in a production environment use the largest file size you can.
    batching:
      count: 100 # Collect 10 messages before flushing
      period: 10s # or after 10 seconds, whichever comes first
output:
  snowflake_streaming:
    # Replace this placeholder with your account identifier
    account: "< OrgName-AccountName >"
    user: STREAMING_USER
    role: REDPANDA_CONNECT
    database: STREAMING_DB
    schema: STREAMING_SCHEMA
    table: STREAMING_TABLE
    # Inject your private key and password
    private_key_file: "${secrets.SNOWFLAKE_KEY}"
    private_key_pass: "${secrets.SNOWFLAKE_KEY_PASS}"
    schema_evolution:
      enabled: true
    max_in_flight: 1

You now can produce some data using rpk to test that everything works:

echo '{"animal":"redpanda","attributes":"cute","age":6}' | rpk topic produce demo_topic -f '%v\n'
echo '{"animal":"polar bear","attributes":"cool","age":13}' | rpk topic produce demo_topic -f '%v\n'
echo '{"animal":"unicorn","attributes":"rare","age":999}' | rpk topic produce demo_topic -f '%v\n'

The data produced into the demo_topic is consumed and streamed into Snowflake in seconds. Go back to the Snowflake worksheet and run the following query to see data arrive in Snowflake with the schema from the JSON data you produced.

SELECT * FROM STREAMING_DB.STREAMING_SCHEMA.STREAMING_DATA LIMIT 50;