Docs Connect MCP Servers MCP Server Patterns Redpanda Connect Patterns for MCP Servers This page provides a reference catalog of configuration patterns designed for use with Redpanda Connect MCP servers. Use these patterns as building blocks for your own MCP tools. For step-by-step instructions on building, deploying, and testing MCP servers, see Build an MCP Server in Redpanda Connect. Each pattern is a reusable example for a common MCP tool scenario. Patterns are grouped by use case. All YAML is ready to use in your MCP server project. For a high-level overview of MCP servers, see About Redpanda Connect MCP Server. External API calls Use processors to fetch data from external APIs, databases, or services and return formatted results. This is one of the most common patterns for MCP tools. label: fetch-weather processors: - label: prepare_parameters mutation: | meta city_name = this.city_name - label: fetch_weather http: url: 'https://wttr.in/${! @city_name }?format=j1' verb: GET headers: Accept: "application/json" User-Agent: "redpanda-mcp-server/1.0" - label: format_response mutation: | root = { "city": @city_name, "temperature": this.current_condition.0.temp_C.number(), "feels_like": this.current_condition.0.FeelsLikeC.number(), "humidity": this.current_condition.0.humidity.number(), "pressure": this.current_condition.0.pressure.number(), "description": this.current_condition.0.weatherDesc.0.value, "wind_speed": this.current_condition.0.windspeedKmph.number(), "metadata": { "source": "wttr.in", "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z") } } meta: tags: [ example, weather, api ] mcp: enabled: true description: "Fetch current weather information for a specified city" properties: - name: city_name type: string description: "Name of the city to get weather information for" required: true See also: http processor, mutation processor Database queries Query external databases and return structured results. This pattern is essential for tools that need to access business data. This example requires setting the DATABASE_URL environment variable with your PostgreSQL connection string. For example, export DATABASE_URL="postgres://user:password@localhost:5432/dbname". label: user_orders processors: - label: prepare_parameters mutation: | meta user_id = this.user_id.string() meta limit = this.limit.number().catch(10) - label: query_database sql_select: driver: "postgres" dsn: "${DATABASE_URL}" table: "orders" columns: ["id", "total", "status", "created_at"] where: "user_id = ? AND created_at > NOW() - INTERVAL '30 days'" suffix: "ORDER BY created_at DESC LIMIT ?" args_mapping: root = [@user_id, @limit] - label: format_response mutation: | root = { "user_id": @user_id, "orders": this, "total_count": this.length(), "metadata": { "source": "PostgreSQL", "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z") } } meta: tags: [ database, orders, example ] mcp: enabled: true description: "Get recent orders for a user" properties: - name: user_id type: string description: "User ID to fetch orders for" required: true - name: limit type: number description: "Maximum number of orders to return (default: 10)" required: false See also: sql_select processor Redpanda integration and data publishing Build tools that interact with Redpanda topics to publish data, consume events, or stream processing results back to topics for other systems to consume. The examples in this section require setting the REDPANDA_BROKERS environment variable with your Redpanda broker addresses. For example, export REDPANDA_BROKERS="localhost:19092". Publishing to Redpanda topics Create tools that write data to Redpanda topics using the redpanda output: label: publish_order_events redpanda: seed_brokers: [ "${REDPANDA_BROKERS}" ] topic: "order-events" key: "${! this.order_id }" partitioner: "manual" partition: "${! this.customer_id.hash(algorithm:\"xxhash64\") % 8 }" max_in_flight: 10 meta: tags: [ publishing, orders, example ] mcp: enabled: true description: "Publish order events to Redpanda for downstream processing" properties: - name: order_id type: string description: "Unique order identifier" required: true - name: customer_id type: string description: "Customer ID for partitioning" required: true - name: order_data type: object description: "Order details (items, total, etc.)" required: true Consuming from Redpanda topics Build tools that read data from topics and return processed results: label: get_recent_events redpanda: seed_brokers: [ "${REDPANDA_BROKERS}" ] topics: [ "user-events" ] consumer_group: "mcp-event-reader" meta: tags: [ consuming, events, example ] mcp: enabled: true description: "Get recent events for a specific user from Redpanda" Stream processing with Redpanda Connect Create tools that process streaming data and return aggregated results: label: stream_analytics redpanda: seed_brokers: [ "${REDPANDA_BROKERS}" ] topics: [ "click-events" ] consumer_group: "mcp-analytics" meta: tags: [ analytics, streaming, example ] mcp: enabled: true description: "Consume click stream data for analytics processing" Event-driven workflows Build tools that trigger workflows based on Redpanda events: label: event_triggered_workflow redpanda: seed_brokers: [ "${REDPANDA_BROKERS}" ] topics: [ "order-events" ] consumer_group: "workflow-trigger" meta: tags: [ workflows, events, example ] mcp: enabled: true description: "Consume order events to trigger workflows" properties: - name: event_type type: string description: "Type of event to process (order_created, order_cancelled, etc.)" required: false See also: redpanda input Production workflows and observability Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring. Parameter validation and type coercion Always validate and coerce input parameters to ensure your tools are robust: processors: - label: validate_params mutation: | # Validate required parameters root = if !this.exists("user_id") { throw("user_id parameter is required") } else { this } # Type coercion with validation meta user_id = this.user_id.string() meta limit = this.limit.number().catch(10) meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h")) Dynamic configuration Build tools that adapt their behavior based on input parameters: processors: - label: dynamic_config mutation: | # Choose data source based on environment meta env = this.environment | "production" meta table_name = match @env { "dev" => "dev_orders", "staging" => "staging_orders", "production" => "prod_orders", _ => "dev_orders" } # Adjust query complexity based on urgency meta columns = if this.detailed.bool().catch(false) { ["order_id", "customer_id", "total", "items", "shipping_address"] } else { ["order_id", "customer_id", "total"] } Error handling and fallbacks Implement error handling to make your tools reliable: processors: - label: primary_fetch try: - http: url: "https://api.primary.com/data" timeout: "10s" catch: - log: message: "Primary API failed, trying fallback" - label: fallback_fetch http: url: "https://api.fallback.com/data" timeout: "15s" - mutation: | root.metadata.source = "fallback" root.metadata.warning = "Primary source unavailable" Conditional processing Build tools that branch based on input or data characteristics: processors: - label: conditional_processing switch: - check: this.data_type == "json" processors: - json: operator: "parse" - mutation: 'root.parsed_data = this' - check: this.data_type == "csv" processors: - csv: parse: true - mutation: 'root.parsed_data = this' - processors: - mutation: 'root.error = "Unsupported data type"' Secrets and credentials Securely handle multiple credentials and API keys using environment variables. Here is an example of using an API key from environment variables. Set an environment variable with your API key: export EXTERNAL_API_KEY="your-api-key-here" Reference the environment variable in your configuration: processors: - label: call_external_api http: url: "https://api.example.com/data" verb: GET headers: Authorization: "Bearer ${EXTERNAL_API_KEY}" (1) Accept: "application/json" 1 The environment variable is injected at runtime. Never store the actual API key in your YAML. The actual secret value never appears in your configuration files or logs. Monitoring, debugging, and observability Use structured logging, request tracing, and performance metrics to gain insights into tool execution. label: observable_tool processors: - label: init_tracing mutation: | # Generate correlation ID for request tracing meta req_id = uuid_v7() meta start_time = now() # Log request start with structured data root.trace = { "request_id": @req_id, "timestamp": @start_time.ts_format("2006-01-02T15:04:05.000Z"), "tool": "observable_tool", "version": "1.0.0" } - label: log_request_start log: message: "MCP tool request started" fields: request_id: "${! @req_id }" tool_name: "observable_tool" input_params: "${! this.without(\"trace\") }" user_agent: "${! meta(\"User-Agent\").catch(\"unknown\") }" level: "INFO" - label: finalize_response mutation: | # Calculate total execution time meta duration = (now().ts_unix_nano() - @start_time.ts_unix_nano()) / 1000000 # Add trace information to response root.metadata = { "request_id": @req_id, "execution_time_ms": @duration, "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"), "tool": "observable_tool", "success": !this.exists("error") } - label: log_completion log: message: "MCP tool request completed" fields: request_id: "${! @req_id }" duration_ms: "${! this.metadata.execution_time_ms }" success: "${! this.metadata.success }" result_size: "${! content().length() }" level: "INFO" meta: tags: [ example ] mcp: enabled: true description: "Example tool with comprehensive observability and error handling" properties: - name: user_id type: string description: "User ID to fetch data for" required: true Observability features: Correlation IDs: Use uuid_v7() to generate unique request identifiers for tracing Execution timing: Track how long your tools take to execute using nanosecond precision Structured logging: Include consistent fields like request_id, duration_ms, tool_name Request/response metadata: Log input parameters and response characteristics Success tracking: Monitor whether operations complete successfully You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like: { "metadata": { "execution_time_ms": 0.158977, "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", "success": true, "timestamp": "2025-09-16T08:37:18.589Z", "tool": "observable_tool" }, "trace": { "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", "timestamp": "2025-09-16T08:37:18.589Z", "tool": "observable_tool", "version": "1.0.0" }, "user_id": "1" } See also: log processor, try processor, Bloblang functions (for timing and ID generation) Multi-step data enrichment Build tools that combine data from multiple sources. This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics. This example requires setting the POSTGRES_DSN environment variable with your PostgreSQL connection string. For example, export POSTGRES_DSN="postgres://user:password@localhost:5432/dbname". label: customer_enrichment processors: - label: fetch_customer_base branch: processors: - sql_select: driver: "postgres" dsn: "${POSTGRES_DSN}" table: "customers" where: "customer_id = ?" args_mapping: 'root = [this.customer_id]' result_map: 'root.customers = this' - label: enrich_with_orders branch: processors: - sql_select: driver: "postgres" dsn: "${POSTGRES_DSN}" table: "orders" where: "customer_id = ? AND created_at >= NOW() - INTERVAL ''30 days''" args_mapping: 'root = [this.customer_id]' result_map: 'root.orders = this' - label: combine_data mutation: | let order_totals = this.orders.map_each(o -> o.total) root = { "customer": this.customers.index(0), "recent_orders": this.orders, "metrics": { "total_orders": this.orders.length(), "total_spent": $order_totals.sum(), "avg_order_value": if $order_totals.length() > 0 { $order_totals.sum() / $order_totals.length() } else { 0 } } } meta: tags: [ example ] mcp: enabled: true description: "Get comprehensive customer profile with recent order history and metrics" properties: - name: customer_id type: string description: "Customer ID to analyze" required: true See also: sql_select processor, Bloblang functions (for data manipulation and aggregations) Workflow orchestration Coordinate complex workflows with multiple steps and conditional logic. This workflow simulates a complete order processing configuration with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems. label: order_workflow processors: - label: validate_order mutation: | # Validation logic root = if this.total <= 0 { throw("Invalid order total") } else { this } - label: mock_inventory_check mutation: | # Mock inventory data for testing let inventory = { "widget-001": {"quantity": 100, "name": "Standard Widget"}, "widget-premium": {"quantity": 25, "name": "Premium Widget"}, "widget-limited": {"quantity": 2, "name": "Limited Edition Widget"} } let product = $inventory.get(this.product_id) root = if $product == null { throw("Product not found: " + this.product_id) } else if $product.quantity < this.quantity { throw("Insufficient inventory. Available: " + $product.quantity.string()) } else { this.merge({ "inventory_check": "passed", "available_quantity": $product.quantity, "product_name": $product.name }) } - label: route_by_priority switch: - check: 'this.total > 1000' processors: - label: mock_high_value_processing mutation: | # Mock premium processing root = this.merge({ "processing_tier": "premium", "processing_time_estimate": "2-4 hours", "assigned_rep": "premium-team@company.com", "priority_score": 95 }) - check: 'this.customer_tier == "vip"' processors: - label: mock_vip_processing mutation: | # Mock VIP processing root = this.merge({ "processing_tier": "vip", "processing_time_estimate": "1-2 hours", "assigned_rep": "vip-team@company.com", "priority_score": 90, "perks": ["expedited_shipping", "white_glove_service"] }) - processors: - label: mock_standard_processing mutation: | # Mock standard processing root = this.merge({ "processing_tier": "standard", "processing_time_estimate": "24-48 hours", "assigned_rep": "support@company.com", "priority_score": 50 }) - label: finalize_order mutation: | # Add final processing metadata # Calculate estimated fulfillment by parsing processing time let max_hours = this.processing_time_estimate.split("-").index(1).split(" ").index(0).number() root = this.merge({ "order_status": "processed", "processed_at": now().ts_format("2006-01-02T15:04:05.000Z"), "estimated_fulfillment": "TBD - calculated based on processing tier", "processing_time_hours": $max_hours }) meta: tags: [ example ] mcp: enabled: true description: "Process orders with validation, inventory check, and tiered routing (with mocks for testing)" properties: - name: order_id type: string description: "Unique order identifier" required: true - name: product_id type: string description: "Product ID (try: widget-001, widget-premium, widget-limited)" required: true - name: quantity type: number description: "Quantity to order" required: true - name: total type: number description: "Order total in dollars" required: true - name: customer_tier type: string description: "Customer tier (optional: vip, standard)" required: false For the input {"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}, the workflow produces: { "assigned_rep": "vip-team@company.com", "available_quantity": 100, "customer_tier": "vip", "estimated_fulfillment": "TBD - calculated based on processing tier", "inventory_check": "passed", "order_id": "ORD001", "order_status": "processed", "perks": [ "expedited_shipping", "white_glove_service" ], "priority_score": 90, "processed_at": "2025-09-16T09:05:29.138Z", "processing_tier": "vip", "processing_time_estimate": "1-2 hours", "processing_time_hours": 2, "product_id": "widget-001", "product_name": "Standard Widget", "quantity": 5, "total": 250 } Notice how the workflow: Preserves original input: order_id, product_id, quantity, total, and customer_tier pass through unchanged. Adds inventory data: available_quantity, product_name, and inventory_check status from the mock lookup. Routes by customer tier: Since customer_tier is vip, it gets VIP processing with special perks and priority. Enriches with processing metadata: assigned_rep, priority_score, processing_tier, and time estimates. Finalizes with timestamps: order_status, processed_at, and calculated processing_time_hours. Suggested reading Build an MCP Server in Redpanda Connect About Redpanda Connect MCP Server Redpanda Connect components reference Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Developer Guide Configuration