Message Routing Patterns
This cookbook demonstrates common patterns for routing and filtering messages in Redpanda Connect pipelines.
Use this cookbook to:
-
Implement message routing and filtering patterns in pipelines
-
Configure conditional routing using switch and broker outputs
-
Build dead letter queue patterns with fallback outputs
Discarding messages
There are two primary ways to discard messages in Redpanda Connect: using the drop output or the deleted() function in a mapping processor.
Using deleted() in mapping
Use deleted() to filter messages early in the pipeline, before they reach the output stage. This is more efficient when you don’t need conditional output routing:
input:
kafka:
addresses: ["kafka:9092"]
topics: ["events"]
pipeline:
processors:
- mapping: |
# Delete debug messages early in the pipeline
root = if this.type == "debug" { deleted() }
output:
kafka:
addresses: ["kafka:9092"]
topic: production_events
For more filtering patterns with deleted(), see the Filtering and Sampling cookbook.
Using drop output
Use the drop output to discard messages at the output stage, typically in conditional routing or as a fallback. This is useful when you’re routing different message types to different destinations:
output:
switch:
cases:
- check: this.type == "error"
output:
kafka:
addresses: ["kafka:9092"]
topic: errors
- check: this.type == "debug"
output:
drop: {} # Discard debug messages
- output:
kafka:
addresses: ["kafka:9092"]
topic: events
The drop output has minimal overhead and immediately acknowledges messages, making it ideal for this use case.
Conditional routing with switch
The switch output routes messages to different outputs based on conditions. Each case is evaluated in order until a match is found:
output:
switch:
cases:
- check: this.severity == "critical"
output:
label: pagerduty_alerts
http_client:
url: https://events.pagerduty.com/v2/enqueue
verb: POST
- check: this.severity == "error"
output:
label: error_topic
kafka:
addresses: ["kafka:9092"]
topic: errors
max_in_flight: 1
- check: this.severity == "warn"
output:
label: warning_topic
kafka:
addresses: ["kafka:9092"]
topic: warnings
- check: this.severity == "info"
output:
label: info_topic
kafka:
addresses: ["kafka:9092"]
topic: info
- check: this.severity == "debug"
output:
label: drop_debug
drop: {} # Don't store debug logs in production
You can label each output for monitoring and observability.
Error handling with fallback
The fallback output tries outputs in sequence. If an output fails, it moves to the next one. This is useful for implementing dead letter queue (DLQ) patterns:
Basic DLQ pattern
output:
fallback:
- kafka:
addresses: ["kafka:9092"]
topic: primary_topic
max_in_flight: 1
- kafka:
addresses: ["kafka:9092"]
topic: dlq_topic
If writing to primary_topic fails, the message is sent to dlq_topic instead.
DLQ with ultimate fallback
Add a drop output as the final fallback to prevent pipeline failures:
output:
fallback:
- kafka:
addresses: ["kafka:9092"]
topic: primary_topic
max_in_flight: 1
- kafka:
addresses: ["kafka:9092"]
topic: dlq_topic
- drop: {} # Last resort: drop if both outputs fail
This ensures messages don’t cause the pipeline to block, even if both Kafka outputs fail.
Multi-tier fallback with retries
Combine fallback with retry configuration for more sophisticated error handling:
output:
fallback:
- kafka:
addresses: ["kafka:9092"]
topic: primary_topic
max_in_flight: 10
batching:
count: 100
period: 1s
- retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 10s
output:
http_client:
url: https://backup-service.example.com/events
verb: POST
max_in_flight: 5
- kafka:
addresses: ["kafka:9092"]
topic: dlq_topic
- drop: {}
This configuration:
-
Tries to write to Kafka with batching
-
Falls back to an HTTP service with retries
-
Falls back to a DLQ topic
-
Ultimately drops messages if all else fails
Routing with broker
The broker output sends messages to multiple outputs simultaneously. Unlike switch, all outputs receive every message:
output:
broker:
pattern: fan_out
outputs:
- kafka:
addresses: ["kafka:9092"]
topic: events
- elasticsearch_v8:
urls: ["http://localhost:9200"]
index: events
id: ${!json("id")}
action: index
- http_client:
url: https://metrics.example.com/ingest
verb: POST
This sends each message to Kafka, Elasticsearch, and an HTTP endpoint in parallel.
Combining patterns
You can combine these patterns for sophisticated routing logic:
output:
switch:
cases:
# Critical events: send to multiple destinations
- check: this.severity == "critical"
output:
broker:
pattern: fan_out
outputs:
- kafka:
addresses: ["kafka:9092"]
topic: critical_events
- http_client:
url: https://pagerduty.example.com/alert
verb: POST
# Errors: send to Kafka with DLQ fallback
- check: this.severity == "error"
output:
fallback:
- kafka:
addresses: ["kafka:9092"]
topic: errors
- kafka:
addresses: ["kafka:9092"]
topic: dlq
# Info: send to Kafka only
- check: this.severity == "info"
output:
kafka:
addresses: ["kafka:9092"]
topic: info
# Debug: drop in production
- check: this.severity == "debug"
output:
drop: {}
Performance considerations
When choosing between routing approaches, consider:
Filtering with deleted() vs. drop output
-
deleted()in mapping: Filters messages earlier in the pipeline, reducing downstream processing -
dropoutput: Filters at output stage, useful for conditional routing but messages still flow through processors
For pure filtering without routing, use deleted():
# More efficient - filters early
pipeline:
processors:
- mapping: |
root = if this.debug { deleted() }
- log: {} # Doesn't process debug messages
output:
kafka:
addresses: ["kafka:9092"]
topic: events
For conditional routing, use drop output:
# Better for routing - processes all messages, routes selectively
pipeline:
processors:
- mapping: |
root.enriched = true
- log: {} # Processes all messages
output:
switch:
cases:
- check: this.important
output:
kafka:
addresses: ["kafka:9092"]
topic: important
- output:
drop: {} # Discard non-important messages
Output parallelism
Choose between these output types based on whether you need single-destination routing, multi-destination fan-out, or fallback behavior:
-
switch: Sends to one output (sequential evaluation) -
broker: Sends to multiple outputs in parallel -
fallback: Tries outputs sequentially until one succeeds
Testing and debugging
Use drop to test input throughput without output bottlenecks:
input:
kafka:
addresses: ["kafka:9092"]
topics: ["test"]
consumer_group: throughput_test
output:
drop: {} # Measure input consumption speed
This reveals the maximum speed your input can consume data, isolated from output performance.
Next steps
-
Filtering and Sampling - More patterns for using
deleted() -
switch output - Conditional routing reference
-
fallback output - Error handling reference
-
broker output - Fan-out patterns reference
-
drop output - Drop output reference
-
mapping processor - Bloblang mapping reference