Redpanda Connect Quickstart

The Connect page provides a wizard to create pipelines for streaming data into and out of Redpanda. The wizard populates the YAML configuration automatically, so you can get started quickly.

Advanced users can skip directly to the Edit pipeline step in the wizard to configure the YAML file themselves.

This quickstart shows how Redpanda Connect can generate, transform, and handle streaming data end-to-end. It creates the following pipelines:

  • The first pipeline generates (produces) dad jokes and writes them to a topic in your cluster.

  • The second pipeline reads (consumes) those dad jokes and gives each one a random "cringe rating" from 1-10.

The producer pipeline uses the following Redpanda Connect components:

Component type Component Purpose

Input

generate

Creates jokes

Output

redpanda

Writes messages to your topic

Processor

log

Logs generated messages

Processor

catch

Catches errors

The consumer pipeline uses the following Redpanda Connect components:

Component type Component Purpose

Input

redpanda

Reads messages from your topic

Output

drop

Drops the processed messages

Processor

bloblang

Processes ratings

Processor

log

Logs processed messages

Processor

catch

Catches errors

Prerequisites

You must have a Redpanda Cloud account with a Serverless, Dedicated, or standard BYOC cluster. If you don’t already have an account, sign up for a free trial.

Serverless clusters support up to 10 pipelines.

Build a producer pipeline

Follow these steps to create the producer pipeline:

  1. Go to the Connect page for your cluster to create a pipeline.

  2. Add an input: Search for and select generate from the list of connectors. Click Next.

  3. Add an output: Search for and select redpanda from the list of connectors. Click Next.

  4. Add a topic: Create a new topic called dad-jokes. This is where Redpanda will store the generated messages. Click Next.

  5. Add permissions: Create a new user called connect. Leave the rest of the default settings. The user will be created with the necessary permissions. Click Next.

  6. Edit pipeline:

    1. Enter this name for the pipeline: joke-generator-producer.

    2. The Configuration section automatically populates the YAML with your selected components. Under Connectors, you can add the processors log and catch to log generated jokes and monitor for errors. However, for simplicity in this quickstart, replace the entire configuration with the following YAML, which includes the processors and the mapping for joke generation:

      input:
        generate:
          interval: 5s
          count: 0
          mapping: |
            let jokes = [
              "Why don't scientists trust atoms? Because they make up everything!",
              "I'm reading a book about anti-gravity. It's impossible to put down!",
              "Why did the scarecrow win an award? He was outstanding in his field!",
              "What do you call a fake noodle? An impasta!",
              "Why don't eggs tell jokes? They'd crack each other up!",
              "I used to play piano by ear, but now I use my hands.",
              "What do you call a bear with no teeth? A gummy bear!",
              "Why did the bicycle fall over? It was two tired!",
              "What do you call a fish wearing a crown? A king fish!",
              "Why don't skeletons fight each other? They don't have the guts!",
              "What do you call cheese that isn't yours? Nacho cheese!",
              "Why can't you hear a pterodactyl using the bathroom? Because the 'p' is silent!",
              "What did the ocean say to the beach? Nothing, it just waved!",
              "Why did the math book look sad? It had too many problems!",
              "What do you call a sleeping bull? A bulldozer!",
              "How do you organize a space party? You planet!",
              "What's orange and sounds like a parrot? A carrot!",
              "Why did the coffee file a police report? It got mugged!",
              "What do you call a can opener that doesn't work? A can't opener!",
              "Why don't oysters donate to charity? Because they're shellfish!"
            ]
      
            let joke_index = random_int() % $jokes.length()
      
            root.joke = $jokes.index($joke_index)
            root.id = uuid_v4()
            root.timestamp = now()
            root.source = "dad-joke-generator"
            root.joke_length = root.joke.length()
      
      
      pipeline:
        processors:
          - log:
              level: INFO
              message: "📝 Generating joke: ${! json(\"joke\") }"
      
          - catch:
              - log:
                  level: ERROR
                  message: "❌ Error generating joke: ${! error() }"
      
      output:
        redpanda:
          seed_brokers: # Optional
            - ${REDPANDA_BROKERS}
          tls:
            enabled: true # Optional (default: false)
            client_certs: []
          sasl:
            - mechanism: SCRAM-SHA-256
              username: ${secrets.KAFKA_USER_CONNECT}
              password: ${secrets.KAFKA_PASSWORD_CONNECT}
          topic: dad-jokes # Optional
    3. Click Create.

    • Notice the ${REDPANDA_BROKERS} contextual variable in the configuration. This automatically references your cluster’s bootstrap server address, so you can use it in any pipeline without hardcoding connection details.

    • Notice ${secrets.KAFKA_USER_CONNECT} and ${secrets.KAFKA_PASSWORD_CONNECT}. These were generated when you created the connect user.

    • The Brave browser does not fully support code snippets.

  7. Your pipeline details display, and after a few seconds, the status changes from Starting to Running. If you don’t see this change, refresh the page. Once running, your pipeline generates a new joke every five seconds and writes the joke to your topic.

    After a minute, select the pipeline and click Stop so you can examine the results.

Review the pipeline logs

  1. Click the Logs tab to see the pipeline’s activity log.

  2. Click through the log messages to see the startup sequence. For example, you’ll see when the output becomes active:

    {
        "instance_id": "d4548gl54smc73b65t0g",
        "label": "",
        "level": "INFO",
        "message": "Output type redpanda is now active",
        "path": "root.output",
        "pipeline_id": "d4548fihlips73dmcl80",
        "time": "2025-11-04T18:21:55.223350785Z"
    }

View the processed messages

  1. Go to the Topics page and select the dad-jokes topic.

  2. Click any message to see the structure. For example:

    {
        "id": "cf653b9e-ce96-4790-888f-6a867bed56a5",
        "joke": "I used to play piano by ear, but now I use my hands.",
        "joke_length": 52,
        "source": "dad-joke-generator",
        "timestamp": "2025-11-04T18:21:55.020574506Z"
    }

Build a consumer pipeline

This pipeline rates the jokes that you generated in the first pipeline. Follow these steps to create the consumer pipeline:

  1. On the Connect page for your cluster, click Create pipeline.

  2. Add an input: Search for and select redpanda from the list of connectors. Click Next.

  3. Add an output: Search for and select drop from the list of connectors. (Because this quickstart is just for testing, this output drops the message instead of sending it anywhere else. In a real scenario you’d replace the drop connector with your real destination.) Click Next.

  4. Add a topic: Select the existing topic called dad-jokes. Click Next.

  5. Add permissions:

    1. Select the existing user called connect.

    2. Add a consumer group: Enter dad-joke-raters as the name for the consumer group.

    3. Click Next.

  6. Edit pipeline:

    1. Enter this name for the pipeline: joke-generator-consumer.

    2. The Configuration section automatically populates the YAML with your selected components. To add the bloblang, log, and catch processors, replace the entire configuration with the following YAML. Bloblang is Redpanda Connect’s scripting language used to add logic (for example, random ratings).

      input:
        redpanda:
          seed_brokers: # Optional
            - ${REDPANDA_BROKERS}
          client_id: benthos # Optional (default: "benthos")
          tls:
            enabled: true # Optional (default: false)
            client_certs: []
          sasl:
            - mechanism: SCRAM-SHA-256
              username: ${secrets.KAFKA_USER_CONNECT}
              password: ${secrets.KAFKA_PASSWORD_CONNECT}
          metadata_max_age: 5m # Optional (default: "5m")
          request_timeout_overhead: 10s # Optional (default: "10s")
          conn_idle_timeout: 20s # Optional (default: "20s")
          topics: # Optional
            - dad-jokes
          regexp_topics: false # Optional (default: false)
          rebalance_timeout: 45s # Optional (default: "45s")
          session_timeout: 1m # Optional (default: "1m")
          heartbeat_interval: 3s # Optional (default: "3s")
          start_from_oldest: true # Optional (default: true)
          start_offset: earliest # Optional (default: "earliest")
          fetch_max_bytes: 50MiB # Optional (default: "50MiB")
          fetch_max_wait: 5s # Optional (default: "5s")
          fetch_min_bytes: 1B # Optional (default: "1B")
          fetch_max_partition_bytes: 1MiB # Optional (default: "1MiB")
          transaction_isolation_level: read_uncommitted # Optional (default: "read_uncommitted")
          consumer_group: dad-joke-raters # Optional
          commit_period: 5s # Optional (default: "5s")
          partition_buffer_bytes: 1MB # Optional (default: "1MB")
          topic_lag_refresh_period: 5s # Optional (default: "5s")
          max_yield_batch_bytes: 32KB # Optional (default: "32KB")
          auto_replay_nacks: true # Optional (default: true)
      
      
      pipeline:
        processors:
          - bloblang: |
              root = this
      
              let rating = random_int(min: 1, max: 11)
              root.cringe_rating = $rating
      
              root.cringe_level = if $rating <= 3 {
                "Mild - Almost acceptable"
              } else if $rating <= 6 {
                "Medium - Classic dad joke territory"
              } else if $rating <= 8 {
                "High - Eye-roll inducing"
              } else {
                "EXTREME - Peak dad joke achievement"
              }
      
              root.processed_at = now()
      
              root.rating_emoji = match {
                $rating <= 3 => "😐",
                $rating <= 6 => "😬",
                $rating <= 8 => "🤦",
                _ => "💀"
              }
      
              let age_seconds = (timestamp_unix() - this.timestamp.ts_parse("2006-01-02T15:04:05Z07:00").ts_unix())
              root.age_seconds = $age_seconds
      
          - log:
              level: INFO
              message: |
                🎭 JOKE RATED! ${! json("rating_emoji") }
                Joke: "${! json("joke") }"
                Cringe Rating: ${! json("cringe_rating") }/10 - ${! json("cringe_level") }
                Age: ${! json("age_seconds") } seconds old
                Processed at: ${! json("processed_at") }
      
          - catch:
              - log:
                  level: ERROR
                  message: "❌ Failed to process joke: ${! error() }"
      
      
      output:
        drop: {}
    3. Click Create to start your pipeline.

    This example explicitly includes several optional configuration fields for the redpanda input. They’re shown here for demonstration purposes, so you can see a range of available settings.
  7. Your pipeline details display, and the status changes from Starting to Running. If you don’t see this change, refresh the page.

  8. Open the logs to see a rated joke. For example:

    {
        "custom_source": "true",
        "instance_id": "d454dkn4u2is73ava480",
        "label": "",
        "level": "INFO",
        "message": "🎭 JOKE RATED! 💀\nJoke: \"I used to play piano by ear, but now I use my hands.\"\nCringe Rating: 9/10 - EXTREME - Peak dad joke achievement\nAge: 659 seconds old\nProcessed at: 2025-11-04T18:32:54.340229297Z\n",
        "path": "root.pipeline.processors.1",
        "pipeline_id": "d454djahlips73dmcll0",
        "time": "2025-11-04T18:32:54.341137527Z"
    }

Clean up

When you’ve finished experimenting with your data pipeline, you can delete the pipelines and the topic you created for this quickstart.

  1. On the Connect page, select the delete icon next to the joke-generator-producer pipeline and the joke-generator-consumer pipeline.

  2. Confirm your deletion to remove the pipelines and associated logs.

  3. On the Topics page, delete the dad-jokes topic.