Docs Connect Cookbooks Message Routing Patterns Message Routing Patterns Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code This cookbook demonstrates common patterns for routing and filtering messages in Redpanda Connect pipelines. Use this cookbook to: Implement message routing and filtering patterns in pipelines Configure conditional routing using switch and broker outputs Build dead letter queue patterns with fallback outputs Discarding messages There are two primary ways to discard messages in Redpanda Connect: using the drop output or the deleted() function in a mapping processor. Using deleted() in mapping Use deleted() to filter messages early in the pipeline, before they reach the output stage. This is more efficient when you don’t need conditional output routing: input: kafka: addresses: ["kafka:9092"] topics: ["events"] pipeline: processors: - mapping: | # Delete debug messages early in the pipeline root = if this.type == "debug" { deleted() } output: kafka: addresses: ["kafka:9092"] topic: production_events For more filtering patterns with deleted(), see the Filtering and Sampling cookbook. Using drop output Use the drop output to discard messages at the output stage, typically in conditional routing or as a fallback. This is useful when you’re routing different message types to different destinations: output: switch: cases: - check: this.type == "error" output: kafka: addresses: ["kafka:9092"] topic: errors - check: this.type == "debug" output: drop: {} # Discard debug messages - output: kafka: addresses: ["kafka:9092"] topic: events The drop output has minimal overhead and immediately acknowledges messages, making it ideal for this use case. Conditional routing with switch The switch output routes messages to different outputs based on conditions. Each case is evaluated in order until a match is found: output: switch: cases: - check: this.severity == "critical" output: label: pagerduty_alerts http_client: url: https://events.pagerduty.com/v2/enqueue verb: POST - check: this.severity == "error" output: label: error_topic kafka: addresses: ["kafka:9092"] topic: errors max_in_flight: 1 - check: this.severity == "warn" output: label: warning_topic kafka: addresses: ["kafka:9092"] topic: warnings - check: this.severity == "info" output: label: info_topic kafka: addresses: ["kafka:9092"] topic: info - check: this.severity == "debug" output: label: drop_debug drop: {} # Don't store debug logs in production You can label each output for monitoring and observability. Error handling with fallback The fallback output tries outputs in sequence. If an output fails, it moves to the next one. This is useful for implementing dead letter queue (DLQ) patterns: Basic DLQ pattern output: fallback: - kafka: addresses: ["kafka:9092"] topic: primary_topic max_in_flight: 1 - kafka: addresses: ["kafka:9092"] topic: dlq_topic If writing to primary_topic fails, the message is sent to dlq_topic instead. DLQ with ultimate fallback Add a drop output as the final fallback to prevent pipeline failures: output: fallback: - kafka: addresses: ["kafka:9092"] topic: primary_topic max_in_flight: 1 - kafka: addresses: ["kafka:9092"] topic: dlq_topic - drop: {} # Last resort: drop if both outputs fail This ensures messages don’t cause the pipeline to block, even if both Kafka outputs fail. Multi-tier fallback with retries Combine fallback with retry configuration for more sophisticated error handling: output: fallback: - kafka: addresses: ["kafka:9092"] topic: primary_topic max_in_flight: 10 batching: count: 100 period: 1s - retry: max_retries: 3 backoff: initial_interval: 1s max_interval: 10s output: http_client: url: https://backup-service.example.com/events verb: POST max_in_flight: 5 - kafka: addresses: ["kafka:9092"] topic: dlq_topic - drop: {} This configuration: Tries to write to Kafka with batching Falls back to an HTTP service with retries Falls back to a DLQ topic Ultimately drops messages if all else fails Routing with broker The broker output sends messages to multiple outputs simultaneously. Unlike switch, all outputs receive every message: output: broker: pattern: fan_out outputs: - kafka: addresses: ["kafka:9092"] topic: events - elasticsearch_v8: urls: ["http://localhost:9200"] index: events id: ${!json("id")} action: index - http_client: url: https://metrics.example.com/ingest verb: POST This sends each message to Kafka, Elasticsearch, and an HTTP endpoint in parallel. Combining patterns You can combine these patterns for sophisticated routing logic: output: switch: cases: # Critical events: send to multiple destinations - check: this.severity == "critical" output: broker: pattern: fan_out outputs: - kafka: addresses: ["kafka:9092"] topic: critical_events - http_client: url: https://pagerduty.example.com/alert verb: POST # Errors: send to Kafka with DLQ fallback - check: this.severity == "error" output: fallback: - kafka: addresses: ["kafka:9092"] topic: errors - kafka: addresses: ["kafka:9092"] topic: dlq # Info: send to Kafka only - check: this.severity == "info" output: kafka: addresses: ["kafka:9092"] topic: info # Debug: drop in production - check: this.severity == "debug" output: drop: {} Performance considerations When choosing between routing approaches, consider: Filtering with deleted() vs. drop output deleted() in mapping: Filters messages earlier in the pipeline, reducing downstream processing drop output: Filters at output stage, useful for conditional routing but messages still flow through processors For pure filtering without routing, use deleted(): # More efficient - filters early pipeline: processors: - mapping: | root = if this.debug { deleted() } - log: {} # Doesn't process debug messages output: kafka: addresses: ["kafka:9092"] topic: events For conditional routing, use drop output: # Better for routing - processes all messages, routes selectively pipeline: processors: - mapping: | root.enriched = true - log: {} # Processes all messages output: switch: cases: - check: this.important output: kafka: addresses: ["kafka:9092"] topic: important - output: drop: {} # Discard non-important messages Output parallelism Choose between these output types based on whether you need single-destination routing, multi-destination fan-out, or fallback behavior: switch: Sends to one output (sequential evaluation) broker: Sends to multiple outputs in parallel fallback: Tries outputs sequentially until one succeeds Testing and debugging Use drop to test input throughput without output bottlenecks: input: kafka: addresses: ["kafka:9092"] topics: ["test"] consumer_group: throughput_test output: drop: {} # Measure input consumption speed This reveals the maximum speed your input can consume data, isolated from output performance. Next steps Filtering and Sampling - More patterns for using deleted() switch output - Conditional routing reference fallback output - Error handling reference broker output - Fan-out patterns reference drop output - Drop output reference mapping processor - Bloblang mapping reference Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Filtering and Sampling Work with Jira Issues