Redpanda Connect Patterns for MCP Servers

This page provides a reference catalog of configuration patterns designed for use with Redpanda Connect MCP servers. Use these patterns as building blocks for your own MCP tools. For step-by-step instructions on building, deploying, and testing MCP servers, see Build an MCP Server in Redpanda Connect.

Each pattern is a reusable example for a common MCP tool scenario. Patterns are grouped by use case. All YAML is ready to use in your MCP server project.

For a high-level overview of MCP servers, see About Redpanda Connect MCP Server.

External API calls

Use processors to fetch data from external APIs, databases, or services and return formatted results. This is one of the most common patterns for MCP tools.

label: fetch-weather
processors:
  - label: prepare_parameters
    mutation: |
      meta city_name = this.city_name
  - label: fetch_weather
    http:
      url: 'https://wttr.in/${! @city_name }?format=j1'
      verb: GET
      headers:
        Accept: "application/json"
        User-Agent: "redpanda-mcp-server/1.0"
  - label: format_response
    mutation: |
      root = {
        "city": @city_name,
        "temperature": this.current_condition.0.temp_C.number(),
        "feels_like": this.current_condition.0.FeelsLikeC.number(),
        "humidity": this.current_condition.0.humidity.number(),
        "pressure": this.current_condition.0.pressure.number(),
        "description": this.current_condition.0.weatherDesc.0.value,
        "wind_speed": this.current_condition.0.windspeedKmph.number(),
        "metadata": {
          "source": "wttr.in",
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  tags: [ example, weather, api ]
  mcp:
    enabled: true
    description: "Fetch current weather information for a specified city"
    properties:
      - name: city_name
        type: string
        description: "Name of the city to get weather information for"
        required: true

Database queries

Query external databases and return structured results. This pattern is essential for tools that need to access business data.

This example requires setting the DATABASE_URL environment variable with your PostgreSQL connection string. For example, export DATABASE_URL="postgres://user:password@localhost:5432/dbname".
label: user_orders
processors:
  - label: prepare_parameters
    mutation: |
      meta user_id = this.user_id.string()
      meta limit = this.limit.number().catch(10)
  - label: query_database
    sql_select:
      driver: "postgres"
      dsn: "${DATABASE_URL}"
      table: "orders"
      columns: ["id", "total", "status", "created_at"]
      where: "user_id = ? AND created_at > NOW() - INTERVAL '30 days'"
      suffix: "ORDER BY created_at DESC LIMIT ?"
      args_mapping: root = [@user_id, @limit]
  - label: format_response
    mutation: |
      root = {
        "user_id": @user_id,
        "orders": this,
        "total_count": this.length(),
        "metadata": {
          "source": "PostgreSQL",
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  tags: [ database, orders, example ]
  mcp:
    enabled: true
    description: "Get recent orders for a user"
    properties:
      - name: user_id
        type: string
        description: "User ID to fetch orders for"
        required: true
      - name: limit
        type: number
        description: "Maximum number of orders to return (default: 10)"
        required: false

Redpanda integration and data publishing

Build tools that interact with Redpanda topics to publish data, consume events, or stream processing results back to topics for other systems to consume.

The examples in this section require setting the REDPANDA_BROKERS environment variable with your Redpanda broker addresses. For example, export REDPANDA_BROKERS="localhost:19092".

Publishing to Redpanda topics

Create tools that write data to Redpanda topics using the redpanda output:

label: publish_order_events
redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topic: "order-events"
  key: "${! this.order_id }"
  partitioner: "manual"
  partition: "${! this.customer_id.hash(algorithm:\"xxhash64\") % 8 }"
  max_in_flight: 10
meta:
  tags: [ publishing, orders, example ]
  mcp:
    enabled: true
    description: "Publish order events to Redpanda for downstream processing"
    properties:
      - name: order_id
        type: string
        description: "Unique order identifier"
        required: true
      - name: customer_id
        type: string
        description: "Customer ID for partitioning"
        required: true
      - name: order_data
        type: object
        description: "Order details (items, total, etc.)"
        required: true

Consuming from Redpanda topics

Build tools that read data from topics and return processed results:

label: get_recent_events
redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "user-events" ]
  consumer_group: "mcp-event-reader"

meta:
  tags: [ consuming, events, example ]
  mcp:
    enabled: true
    description: "Get recent events for a specific user from Redpanda"

Stream processing with Redpanda Connect

Create tools that process streaming data and return aggregated results:

label: stream_analytics
redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "click-events" ]
  consumer_group: "mcp-analytics"

meta:
  tags: [ analytics, streaming, example ]
  mcp:
    enabled: true
    description: "Consume click stream data for analytics processing"

Event-driven workflows

Build tools that trigger workflows based on Redpanda events:

label: event_triggered_workflow
redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "order-events" ]
  consumer_group: "workflow-trigger"

meta:
  tags: [ workflows, events, example ]
  mcp:
    enabled: true
    description: "Consume order events to trigger workflows"
    properties:
      - name: event_type
        type: string
        description: "Type of event to process (order_created, order_cancelled, etc.)"
        required: false

See also: redpanda input

Production workflows and observability

Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring.

Parameter validation and type coercion

Always validate and coerce input parameters to ensure your tools are robust:

processors:
  - label: validate_params
    mutation: |
      # Validate required parameters
      root = if !this.exists("user_id") {
        throw("user_id parameter is required")
      } else { this }

      # Type coercion with validation
      meta user_id = this.user_id.string()
      meta limit = this.limit.number().catch(10)
      meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h"))

Dynamic configuration

Build tools that adapt their behavior based on input parameters:

processors:
  - label: dynamic_config
    mutation: |
      # Choose data source based on environment
      meta env = this.environment | "production"
      meta table_name = match @env {
        "dev" => "dev_orders",
        "staging" => "staging_orders",
        "production" => "prod_orders",
        _ => "dev_orders"
      }

      # Adjust query complexity based on urgency
      meta columns = if this.detailed.bool().catch(false) {
        ["order_id", "customer_id", "total", "items", "shipping_address"]
      } else {
        ["order_id", "customer_id", "total"]
      }

Error handling and fallbacks

Implement error handling to make your tools reliable:

processors:
  - label: primary_fetch
    try:
      - http:
          url: "https://api.primary.com/data"
          timeout: "10s"
    catch:
      - log:
          message: "Primary API failed, trying fallback"
      - label: fallback_fetch
        http:
          url: "https://api.fallback.com/data"
          timeout: "15s"
      - mutation: |
          root.metadata.source = "fallback"
          root.metadata.warning = "Primary source unavailable"

Conditional processing

Build tools that branch based on input or data characteristics:

processors:
  - label: conditional_processing
    switch:
      - check: this.data_type == "json"
        processors:
          - json:
              operator: "parse"
          - mutation: 'root.parsed_data = this'
      - check: this.data_type == "csv"
        processors:
          - csv:
              parse: true
          - mutation: 'root.parsed_data = this'
      - processors:
          - mutation: 'root.error = "Unsupported data type"'

Secrets and credentials

Securely handle multiple credentials and API keys using environment variables.

Here is an example of using an API key from environment variables.

  1. Set an environment variable with your API key:

    export EXTERNAL_API_KEY="your-api-key-here"
  2. Reference the environment variable in your configuration:

    processors:
      - label: call_external_api
        http:
          url: "https://api.example.com/data"
          verb: GET
          headers:
            Authorization: "Bearer ${EXTERNAL_API_KEY}"  (1)
            Accept: "application/json"
    1 The environment variable is injected at runtime. Never store the actual API key in your YAML. The actual secret value never appears in your configuration files or logs.

Monitoring, debugging, and observability

Use structured logging, request tracing, and performance metrics to gain insights into tool execution.

label: observable_tool
processors:
  - label: init_tracing
    mutation: |
      # Generate correlation ID for request tracing
      meta req_id = uuid_v7()
      meta start_time = now()

      # Log request start with structured data
      root.trace = {
        "request_id": @req_id,
        "timestamp": @start_time.ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "version": "1.0.0"
      }

  - label: log_request_start
    log:
      message: "MCP tool request started"
      fields:
        request_id: "${! @req_id }"
        tool_name: "observable_tool"
        input_params: "${! this.without(\"trace\") }"
        user_agent: "${! meta(\"User-Agent\").catch(\"unknown\") }"
      level: "INFO"

  - label: finalize_response
    mutation: |
      # Calculate total execution time
      meta duration = (now().ts_unix_nano() - @start_time.ts_unix_nano()) / 1000000

      # Add trace information to response
      root.metadata = {
        "request_id": @req_id,
        "execution_time_ms": @duration,
        "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "success": !this.exists("error")
      }

  - label: log_completion
    log:
      message: "MCP tool request completed"
      fields:
        request_id: "${! @req_id }"
        duration_ms: "${! this.metadata.execution_time_ms }"
        success: "${! this.metadata.success }"
        result_size: "${! content().length() }"
      level: "INFO"

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Example tool with comprehensive observability and error handling"
    properties:
      - name: user_id
        type: string
        description: "User ID to fetch data for"
        required: true

Observability features:

  • Correlation IDs: Use uuid_v7() to generate unique request identifiers for tracing

  • Execution timing: Track how long your tools take to execute using nanosecond precision

  • Structured logging: Include consistent fields like request_id, duration_ms, tool_name

  • Request/response metadata: Log input parameters and response characteristics

  • Success tracking: Monitor whether operations complete successfully

You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like:

{
  "metadata": {
    "execution_time_ms": 0.158977,
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "success": true,
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool"
  },
  "trace": {
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool",
    "version": "1.0.0"
  },
  "user_id": "1"
}

See also: log processor, try processor, Bloblang functions (for timing and ID generation)

Multi-step data enrichment

Build tools that combine data from multiple sources.

This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics.

This example requires setting the POSTGRES_DSN environment variable with your PostgreSQL connection string. For example, export POSTGRES_DSN="postgres://user:password@localhost:5432/dbname".
label: customer_enrichment
processors:
  - label: fetch_customer_base
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "customers"
            where: "customer_id = ?"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.customers = this'

  - label: enrich_with_orders
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "orders"
            where: "customer_id = ? AND created_at >= NOW() - INTERVAL ''30 days''"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.orders = this'

  - label: combine_data
    mutation: |
      let order_totals = this.orders.map_each(o -> o.total)
      root = {
        "customer": this.customers.index(0),
        "recent_orders": this.orders,
        "metrics": {
          "total_orders": this.orders.length(),
          "total_spent": $order_totals.sum(),
          "avg_order_value": if $order_totals.length() > 0 { $order_totals.sum() / $order_totals.length() } else { 0 }
        }
      }

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Get comprehensive customer profile with recent order history and metrics"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to analyze"
        required: true

See also: sql_select processor, Bloblang functions (for data manipulation and aggregations)

Workflow orchestration

Coordinate complex workflows with multiple steps and conditional logic.

This workflow simulates a complete order processing configuration with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems.

label: order_workflow
processors:
  - label: validate_order
    mutation: |
      # Validation logic
      root = if this.total <= 0 {
        throw("Invalid order total")
      } else { this }

  - label: mock_inventory_check
    mutation: |
      # Mock inventory data for testing
      let inventory = {
        "widget-001": {"quantity": 100, "name": "Standard Widget"},
        "widget-premium": {"quantity": 25, "name": "Premium Widget"},
        "widget-limited": {"quantity": 2, "name": "Limited Edition Widget"}
      }

      let product = $inventory.get(this.product_id)
      root = if $product == null {
        throw("Product not found: " + this.product_id)
      } else if $product.quantity < this.quantity {
        throw("Insufficient inventory. Available: " + $product.quantity.string())
      } else {
        this.merge({
          "inventory_check": "passed",
          "available_quantity": $product.quantity,
          "product_name": $product.name
        })
      }

  - label: route_by_priority
    switch:
      - check: 'this.total > 1000'
        processors:
          - label: mock_high_value_processing
            mutation: |
              # Mock premium processing
              root = this.merge({
                "processing_tier": "premium",
                "processing_time_estimate": "2-4 hours",
                "assigned_rep": "premium-team@company.com",
                "priority_score": 95
              })

      - check: 'this.customer_tier == "vip"'
        processors:
          - label: mock_vip_processing
            mutation: |
              # Mock VIP processing
              root = this.merge({
                "processing_tier": "vip",
                "processing_time_estimate": "1-2 hours",
                "assigned_rep": "vip-team@company.com",
                "priority_score": 90,
                "perks": ["expedited_shipping", "white_glove_service"]
              })

      - processors:
          - label: mock_standard_processing
            mutation: |
              # Mock standard processing
              root = this.merge({
                "processing_tier": "standard",
                "processing_time_estimate": "24-48 hours",
                "assigned_rep": "support@company.com",
                "priority_score": 50
              })

  - label: finalize_order
    mutation: |
      # Add final processing metadata
      # Calculate estimated fulfillment by parsing processing time
      let max_hours = this.processing_time_estimate.split("-").index(1).split(" ").index(0).number()

      root = this.merge({
        "order_status": "processed",
        "processed_at": now().ts_format("2006-01-02T15:04:05.000Z"),
        "estimated_fulfillment": "TBD - calculated based on processing tier",
        "processing_time_hours": $max_hours
      })

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Process orders with validation, inventory check, and tiered routing (with mocks for testing)"
    properties:
      - name: order_id
        type: string
        description: "Unique order identifier"
        required: true
      - name: product_id
        type: string
        description: "Product ID (try: widget-001, widget-premium, widget-limited)"
        required: true
      - name: quantity
        type: number
        description: "Quantity to order"
        required: true
      - name: total
        type: number
        description: "Order total in dollars"
        required: true
      - name: customer_tier
        type: string
        description: "Customer tier (optional: vip, standard)"
        required: false

For the input {"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}, the workflow produces:

{
  "assigned_rep": "vip-team@company.com",
  "available_quantity": 100,
  "customer_tier": "vip",
  "estimated_fulfillment": "TBD - calculated based on processing tier",
  "inventory_check": "passed",
  "order_id": "ORD001",
  "order_status": "processed",
  "perks": [
    "expedited_shipping",
    "white_glove_service"
  ],
  "priority_score": 90,
  "processed_at": "2025-09-16T09:05:29.138Z",
  "processing_tier": "vip",
  "processing_time_estimate": "1-2 hours",
  "processing_time_hours": 2,
  "product_id": "widget-001",
  "product_name": "Standard Widget",
  "quantity": 5,
  "total": 250
}

Notice how the workflow:

  1. Preserves original input: order_id, product_id, quantity, total, and customer_tier pass through unchanged.

  2. Adds inventory data: available_quantity, product_name, and inventory_check status from the mock lookup.

  3. Routes by customer tier: Since customer_tier is vip, it gets VIP processing with special perks and priority.

  4. Enriches with processing metadata: assigned_rep, priority_score, processing_tier, and time estimates.

  5. Finalizes with timestamps: order_status, processed_at, and calculated processing_time_hours.