Message Routing Patterns

This cookbook demonstrates common patterns for routing and filtering messages in Redpanda Connect pipelines.

Use this cookbook to:

  • Implement message routing and filtering patterns in pipelines

  • Configure conditional routing using switch and broker outputs

  • Build dead letter queue patterns with fallback outputs

Discarding messages

There are two primary ways to discard messages in Redpanda Connect: using the drop output or the deleted() function in a mapping processor.

Using deleted() in mapping

Use deleted() to filter messages early in the pipeline, before they reach the output stage. This is more efficient when you don’t need conditional output routing:

input:
  kafka:
    addresses: ["kafka:9092"]
    topics: ["events"]

pipeline:
  processors:
    - mapping: |
        # Delete debug messages early in the pipeline
        root = if this.type == "debug" { deleted() }

output:
  kafka:
    addresses: ["kafka:9092"]
    topic: production_events

For more filtering patterns with deleted(), see the Filtering and Sampling cookbook.

Using drop output

Use the drop output to discard messages at the output stage, typically in conditional routing or as a fallback. This is useful when you’re routing different message types to different destinations:

output:
  switch:
    cases:
      - check: this.type == "error"
        output:
          kafka:
            addresses: ["kafka:9092"]
            topic: errors
      - check: this.type == "debug"
        output:
          drop: {}  # Discard debug messages
      - output:
          kafka:
            addresses: ["kafka:9092"]
            topic: events

The drop output has minimal overhead and immediately acknowledges messages, making it ideal for this use case.

Conditional routing with switch

The switch output routes messages to different outputs based on conditions. Each case is evaluated in order until a match is found:

output:
  switch:
    cases:
      - check: this.severity == "critical"
        output:
          label: pagerduty_alerts
          http_client:
            url: https://events.pagerduty.com/v2/enqueue
            verb: POST

      - check: this.severity == "error"
        output:
          label: error_topic
          kafka:
            addresses: ["kafka:9092"]
            topic: errors
            max_in_flight: 1

      - check: this.severity == "warn"
        output:
          label: warning_topic
          kafka:
            addresses: ["kafka:9092"]
            topic: warnings

      - check: this.severity == "info"
        output:
          label: info_topic
          kafka:
            addresses: ["kafka:9092"]
            topic: info

      - check: this.severity == "debug"
        output:
          label: drop_debug
          drop: {}  # Don't store debug logs in production

You can label each output for monitoring and observability.

Error handling with fallback

The fallback output tries outputs in sequence. If an output fails, it moves to the next one. This is useful for implementing dead letter queue (DLQ) patterns:

Basic DLQ pattern

output:
  fallback:
    - kafka:
        addresses: ["kafka:9092"]
        topic: primary_topic
        max_in_flight: 1

    - kafka:
        addresses: ["kafka:9092"]
        topic: dlq_topic

If writing to primary_topic fails, the message is sent to dlq_topic instead.

DLQ with ultimate fallback

Add a drop output as the final fallback to prevent pipeline failures:

output:
  fallback:
    - kafka:
        addresses: ["kafka:9092"]
        topic: primary_topic
        max_in_flight: 1

    - kafka:
        addresses: ["kafka:9092"]
        topic: dlq_topic

    - drop: {}  # Last resort: drop if both outputs fail

This ensures messages don’t cause the pipeline to block, even if both Kafka outputs fail.

Multi-tier fallback with retries

Combine fallback with retry configuration for more sophisticated error handling:

output:
  fallback:
    - kafka:
        addresses: ["kafka:9092"]
        topic: primary_topic
        max_in_flight: 10
        batching:
          count: 100
          period: 1s

    - retry:
        max_retries: 3
        backoff:
          initial_interval: 1s
          max_interval: 10s
        output:
          http_client:
            url: https://backup-service.example.com/events
            verb: POST
            max_in_flight: 5

    - kafka:
        addresses: ["kafka:9092"]
        topic: dlq_topic

    - drop: {}

This configuration:

  1. Tries to write to Kafka with batching

  2. Falls back to an HTTP service with retries

  3. Falls back to a DLQ topic

  4. Ultimately drops messages if all else fails

Routing with broker

The broker output sends messages to multiple outputs simultaneously. Unlike switch, all outputs receive every message:

output:
  broker:
    pattern: fan_out
    outputs:
      - kafka:
          addresses: ["kafka:9092"]
          topic: events

      - elasticsearch_v8:
          urls: ["http://localhost:9200"]
          index: events
          id: ${!json("id")}
          action: index

      - http_client:
          url: https://metrics.example.com/ingest
          verb: POST

This sends each message to Kafka, Elasticsearch, and an HTTP endpoint in parallel.

Combining patterns

You can combine these patterns for sophisticated routing logic:

output:
  switch:
    cases:
      # Critical events: send to multiple destinations
      - check: this.severity == "critical"
        output:
          broker:
            pattern: fan_out
            outputs:
              - kafka:
                  addresses: ["kafka:9092"]
                  topic: critical_events
              - http_client:
                  url: https://pagerduty.example.com/alert
                  verb: POST

      # Errors: send to Kafka with DLQ fallback
      - check: this.severity == "error"
        output:
          fallback:
            - kafka:
                addresses: ["kafka:9092"]
                topic: errors
            - kafka:
                addresses: ["kafka:9092"]
                topic: dlq

      # Info: send to Kafka only
      - check: this.severity == "info"
        output:
          kafka:
            addresses: ["kafka:9092"]
            topic: info

      # Debug: drop in production
      - check: this.severity == "debug"
        output:
          drop: {}

Performance considerations

When choosing between routing approaches, consider:

Filtering with deleted() vs. drop output

  • deleted() in mapping: Filters messages earlier in the pipeline, reducing downstream processing

  • drop output: Filters at output stage, useful for conditional routing but messages still flow through processors

For pure filtering without routing, use deleted():

# More efficient - filters early
pipeline:
  processors:
    - mapping: |
        root = if this.debug { deleted() }
    - log: {}  # Doesn't process debug messages

output:
  kafka:
    addresses: ["kafka:9092"]
    topic: events

For conditional routing, use drop output:

# Better for routing - processes all messages, routes selectively
pipeline:
  processors:
    - mapping: |
        root.enriched = true
    - log: {}  # Processes all messages

output:
  switch:
    cases:
      - check: this.important
        output:
          kafka:
            addresses: ["kafka:9092"]
            topic: important
      - output:
          drop: {}  # Discard non-important messages

Output parallelism

Choose between these output types based on whether you need single-destination routing, multi-destination fan-out, or fallback behavior:

  • switch: Sends to one output (sequential evaluation)

  • broker: Sends to multiple outputs in parallel

  • fallback: Tries outputs sequentially until one succeeds

Testing and debugging

Use drop to test input throughput without output bottlenecks:

input:
  kafka:
    addresses: ["kafka:9092"]
    topics: ["test"]
    consumer_group: throughput_test

output:
  drop: {}  # Measure input consumption speed

This reveals the maximum speed your input can consume data, isolated from output performance.

Next steps