Patterns for Remote MCP Servers

This page provides a reference catalog of patterns designed for use with Remote MCP servers in Redpanda Cloud. Use these patterns as building blocks for your own MCP tools. For step-by-step instructions on building, deploying, and testing MCP servers, see Build Remote MCP Servers in Redpanda Cloud.

Each pattern is a reusable example for a common MCP tool scenario. Patterns are grouped by use case. All YAML is ready to use in your MCP server project.

For a high-level overview of MCP servers, see MCP in Redpanda Cloud.

Data generators

Use inputs to create tools that read data from internal or external systems or generate sample data for testing and development.

This example generates a realistic user event message:

label: generate_input
generate:
  mapping: |
    let event_type = ["login", "logout", "purchase", "view_page", "click_button"].index(random_int(max:4))
    root = {
      "id": uuid_v4(),
      "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
      "user_id": random_int(min:1, max:10000),
      "event_type": $event_type,
      "data": {
        "session_id": ksuid(),
        "ip_address": "192.168.%v.%v".format(random_int(max:255), random_int(min:1, max:254)),
        "user_agent": ["Chrome", "Firefox", "Safari", "Edge"].index(random_int(max:3)),
        "amount": if $event_type == "purchase" { random_int(min:10, max:500) } else { null }
      }
    }

meta:
  mcp:
    enabled: true
    description: "Generate an example user event message with realistic data"
    properties: []

External API calls

Use processors to fetch data from external APIs, databases, or services and return formatted results. This is one of the most common patterns for MCP tools.

label: fetch-weather
processors:
  - label: prepare_parameters
    mutation: |
      meta city_name = this.city_name
  - label: fetch_weather
    http:
      url: 'https://wttr.in/${! @city_name }?format=j1'
      verb: GET
      headers:
        Accept: "application/json"
        User-Agent: "redpanda-mcp-server/1.0"
  - label: format_response
    mutation: |
      root = {
        "city": @city_name,
        "temperature": this.current_condition.0.temp_C.number(),
        "feels_like": this.current_condition.0.FeelsLikeC.number(),
        "humidity": this.current_condition.0.humidity.number(),
        "pressure": this.current_condition.0.pressure.number(),
        "description": this.current_condition.0.weatherDesc.0.value,
        "wind_speed": this.current_condition.0.windspeedKmph.number(),
        "metadata": {
          "source": "wttr.in",
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Fetch current weather information for a specified city"
    properties:
      - name: city_name
        type: string
        description: "Name of the city to get weather information for"
        required: true

Database queries

Query external databases and return structured results. This pattern is essential for tools that need to access business data.

label: gcp_bigquery_select_processor
processors:
  - label: prepare_parameters
    mutation: |
      meta customer_id = this.customer_id.string().catch("12345")
      meta limit = this.limit.number().catch(10)
  - label: query_bigquery
    gcp_bigquery_select:
      project: my-gcp-project
      credentials_json: |
        ${secrets.BIGQUERY_CREDENTIALS}
      table: my_dataset.customer_orders
      columns:
        - "order_id"
        - "customer_id"
        - "order_date"
        - "total_amount"
        - "status"
      where: customer_id = ? AND order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
      suffix: "ORDER BY order_date DESC LIMIT ?"
      args_mapping: root = [ @customer_id, @limit ]
  - label: format_response
    mutation: |
      root = {
        "orders": this,
        "metadata": {
          "source": "BigQuery",
          "customer_id": @customer_id,
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Query customer orders from BigQuery"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to filter orders"
        required: true
      - name: limit
        type: number
        description: "Maximum number of orders to return"
        required: false

See also: gcp_bigquery_select processor, sql_select processor (for other databases)

Redpanda integration and data publishing

Use these patterns when you need to integrate with Redpanda infrastructure, publish data to topics, or implement caching systems.

Data publishers (output patterns)

Use outputs to send data to external systems or build caching systems.

This example publishes a message to a Redpanda topic:

label: redpanda_output
redpanda:
  seed_brokers:
    - ${REDPANDA_BROKERS}
  topic: ${! this.topic_name.string().catch("default-topic") }
  timeout: 30s
  tls:
    enabled: true
  sasl:
    - mechanism: SCRAM-SHA-256
      username: ${secrets.REDPANDA_USERNAME}
      password: ${secrets.REDPANDA_PASSWORD}
meta:
  mcp:
    enabled: true
    description: Publishes a message to a specified Redpanda topic
    properties:
      - name: message
        type: string
        description: The message content to publish
        required: true
      - name: topic_name
        type: string
        description: The Redpanda topic to publish to
        required: true

See also: redpanda output

Caching systems

Use caching to store frequently accessed data, reduce latency, and minimize external API calls. You can implement caching using either Redpanda topics or in-memory stores.

label: redpanda_cache
redpanda:
  seed_brokers: ["${REDPANDA_BROKERS}"]
  topic: "mcp-cache-topic"
  tls:
    enabled: true
  sasl:
    - mechanism: "SCRAM-SHA-512"
      username: "${secrets.MCP_REDPANDA_CREDENTIALS.username}"
      password: "${secrets.MCP_REDPANDA_CREDENTIALS.password}"

meta:
  mcp:
    enabled: true
    description: "Redpanda-backed distributed cache using Kafka topics for persistence"

This example implements an in-memory cache for low-latency access to small datasets:

label: memory_cache
memory:
  default_ttl: "5m"
  init_values:
    "user:1001": '{"name": "Alice", "role": "admin"}'
    "user:1002": '{"name": "Bob", "role": "user"}'
    "config:theme": "dark"
    "config:language": "en"
  shards: 4

meta:
  mcp:
    enabled: true
    description: "In-memory cache for storing user data, configuration, and temporary values"

See also: memory cache, Redpanda-backed cache using redpanda output

Production workflows and observability

Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring.

Parameter validation and type coercion

Always validate and coerce input parameters to ensure your tools are robust:

processors:
  - label: validate_params
    mutation: |
      # Validate required parameters
      root = if !this.exists("user_id") {
        throw("user_id parameter is required")
      } else { this }

      # Type coercion with validation
      meta user_id = this.user_id.string()
      meta limit = this.limit.number().catch(10)
      meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h"))

Dynamic configuration

Build tools that adapt their behavior based on input parameters:

processors:
  - label: dynamic_config
    mutation: |
      # Choose data source based on environment
      meta env = this.environment | "production"
      meta table_name = match @env {
        "dev" => "dev_orders",
        "staging" => "staging_orders",
        "production" => "prod_orders",
        _ => "dev_orders"
      }

      # Adjust query complexity based on urgency
      meta columns = if this.detailed.bool().catch(false) {
        ["order_id", "customer_id", "total", "items", "shipping_address"]
      } else {
        ["order_id", "customer_id", "total"]
      }

Error handling and fallbacks

Implement error handling to make your tools reliable:

processors:
  - label: primary_fetch
    try:
      - http:
          url: "https://api.primary.com/data"
          timeout: "10s"
    catch:
      - log:
          message: "Primary API failed, trying fallback"
      - label: fallback_fetch
        http:
          url: "https://api.fallback.com/data"
          timeout: "15s"
      - mutation: |
          root.metadata.source = "fallback"
          root.metadata.warning = "Primary source unavailable"

Conditional processing

Build tools that branch based on input or data characteristics:

processors:
  - label: conditional_processing
    switch:
      - check: this.data_type == "json"
        processors:
          - json:
              operator: "parse"
          - mutation: 'root.parsed_data = this'
      - check: this.data_type == "csv"
        processors:
          - csv:
              parse: true
          - mutation: 'root.parsed_data = this'
      - processors:
          - mutation: 'root.error = "Unsupported data type"'

Secrets and credentials

Securely handle multiple credentials and API keys.

Here is an example of using an API key secret.

  1. Create a secret in the Secrets Store with name EXTERNAL_API_KEY and your API key as the value.

  2. Reference the secret in your YAML configuration:

    processors:
      - label: call_external_api
        http:
          url: "https://api.example.com/data"
          verb: GET
          headers:
            Authorization: "Bearer ${secrets.EXTERNAL_API_KEY}"  (1)
            Accept: "application/json"
    1 The secret is injected at runtime. Never store the actual API key in your YAML configuration. The actual secret value never appears in your configuration files or logs.

Monitoring, debugging, and observability

Use structured logging, request tracing, and performance metrics to gain insights into tool execution.

label: observable_tool
processors:
  - label: init_tracing
    mutation: |
      # Generate correlation ID for request tracing
      meta req_id = uuid_v7()
      meta start_time = now()

      # Log request start with structured data
      root.trace = {
        "request_id": @req_id,
        "timestamp": @start_time.ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "version": "1.0.0"
      }

  - label: log_request_start
    log:
      message: "MCP tool request started"
      fields:
        request_id: "${! @req_id }"
        tool_name: "observable_tool"
        input_params: "${! this.without(\"trace\") }"
        user_agent: "${! meta(\"User-Agent\").catch(\"unknown\") }"
      level: "INFO"

  - label: finalize_response
    mutation: |
      # Calculate total execution time
      meta duration = (now().ts_unix_nano() - @start_time.ts_unix_nano()) / 1000000

      # Add trace information to response
      root.metadata = {
        "request_id": @req_id,
        "execution_time_ms": @duration,
        "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "success": !this.exists("error")
      }

  - label: log_completion
    log:
      message: "MCP tool request completed"
      fields:
        request_id: "${! @req_id }"
        duration_ms: "${! this.metadata.execution_time_ms }"
        success: "${! this.metadata.success }"
        result_size: "${! content().length() }"
      level: "INFO"

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Example tool with comprehensive observability and error handling"
    properties:
      - name: user_id
        type: string
        description: "User ID to fetch data for"
        required: true

Observability features:

  • Correlation IDs: Use uuid_v7() to generate unique request identifiers for tracing

  • Execution timing: Track how long your tools take to execute using nanosecond precision

  • Structured logging: Include consistent fields like request_id, duration_ms, tool_name

  • Request/response metadata: Log input parameters and response characteristics

  • Success tracking: Monitor whether operations complete successfully

You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like:

{
  "metadata": {
    "execution_time_ms": 0.158977,
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "success": true,
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool"
  },
  "trace": {
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool",
    "version": "1.0.0"
  },
  "user_id": "1"
}

Multi-step data enrichment

Build tools that combine data from multiple sources.

This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics.

label: customer_enrichment
processors:
  - label: fetch_customer_base
    sql_select:
      driver: "postgres"
      dsn: "${secrets.POSTGRES_DSN}"
      table: "customers"
      where: "customer_id = ?"
      args_mapping: 'root = [this.customer_id]'

  - label: enrich_with_orders
    sql_select:
      driver: "postgres"
      dsn: "${secrets.POSTGRES_DSN}"
      table: "orders"
      where: "customer_id = ? AND created_at >= NOW() - INTERVAL '30 days'"
      args_mapping: 'root = [this.customer_id]'

  - label: combine_data
    mutation: |
      root = {
        "customer": this.customers.index(0),
        "recent_orders": this.orders,
        "metrics": {
          "total_orders": this.orders.length(),
          "total_spent": this.orders.map_each(o -> o.total).sum(),
          "avg_order_value": this.orders.map_each(o -> o.total).mean()
        }
      }

meta:
  mcp:
    enabled: true
    description: "Get comprehensive customer profile with recent order history and metrics"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to analyze"
        required: true

See also: sql_select processor, Bloblang functions (for data manipulation and aggregations)

Workflow orchestration

Coordinate complex workflows with multiple steps and conditional logic.

This workflow simulates a complete order processing pipeline with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems.

label: order_workflow
processors:
  - label: validate_order
    mutation: |
      # Validation logic
      root = if this.total <= 0 {
        throw("Invalid order total")
      } else { this }

  - label: mock_inventory_check
    mutation: |
      # Mock inventory data for testing
      let inventory = {
        "widget-001": {"quantity": 100, "name": "Standard Widget"},
        "widget-premium": {"quantity": 25, "name": "Premium Widget"},
        "widget-limited": {"quantity": 2, "name": "Limited Edition Widget"}
      }

      let product = $inventory.get(this.product_id)
      root = if $product == null {
        throw("Product not found: " + this.product_id)
      } else if $product.quantity < this.quantity {
        throw("Insufficient inventory. Available: " + $product.quantity.string())
      } else {
        this.merge({
          "inventory_check": "passed",
          "available_quantity": $product.quantity,
          "product_name": $product.name
        })
      }

  - label: route_by_priority
    switch:
      - check: 'this.total > 1000'
        processors:
          - label: mock_high_value_processing
            mutation: |
              # Mock premium processing
              root = this.merge({
                "processing_tier": "premium",
                "processing_time_estimate": "2-4 hours",
                "assigned_rep": "premium-team@company.com",
                "priority_score": 95
              })

      - check: 'this.customer_tier == "vip"'
        processors:
          - label: mock_vip_processing
            mutation: |
              # Mock VIP processing
              root = this.merge({
                "processing_tier": "vip",
                "processing_time_estimate": "1-2 hours",
                "assigned_rep": "vip-team@company.com",
                "priority_score": 90,
                "perks": ["expedited_shipping", "white_glove_service"]
              })

      - processors:
          - label: mock_standard_processing
            mutation: |
              # Mock standard processing
              root = this.merge({
                "processing_tier": "standard",
                "processing_time_estimate": "24-48 hours",
                "assigned_rep": "support@company.com",
                "priority_score": 50
              })

  - label: finalize_order
    mutation: |
      # Add final processing metadata
      # Calculate estimated fulfillment by parsing processing time
      let max_hours = this.processing_time_estimate.split("-").index(1).split(" ").index(0).number()

      root = this.merge({
        "order_status": "processed",
        "processed_at": now().ts_format("2006-01-02T15:04:05.000Z"),
        "estimated_fulfillment": "TBD - calculated based on processing tier",
        "processing_time_hours": $max_hours
      })

meta:
  mcp:
    enabled: true
    description: "Process orders with validation, inventory check, and tiered routing (with mocks for testing)"
    properties:
      - name: order_id
        type: string
        description: "Unique order identifier"
        required: true
      - name: product_id
        type: string
        description: "Product ID (try: widget-001, widget-premium, widget-limited)"
        required: true
      - name: quantity
        type: number
        description: "Quantity to order"
        required: true
      - name: total
        type: number
        description: "Order total in dollars"
        required: true
      - name: customer_tier
        type: string
        description: "Customer tier (optional: vip, standard)"
        required: false

For the input {"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}, the workflow produces:

{
  "assigned_rep": "vip-team@company.com",
  "available_quantity": 100,
  "customer_tier": "vip",
  "estimated_fulfillment": "TBD - calculated based on processing tier",
  "inventory_check": "passed",
  "order_id": "ORD001",
  "order_status": "processed",
  "perks": [
    "expedited_shipping",
    "white_glove_service"
  ],
  "priority_score": 90,
  "processed_at": "2025-09-16T09:05:29.138Z",
  "processing_tier": "vip",
  "processing_time_estimate": "1-2 hours",
  "processing_time_hours": 2,
  "product_id": "widget-001",
  "product_name": "Standard Widget",
  "quantity": 5,
  "total": 250
}

Notice how the workflow:

  1. Preserves original input: order_id, product_id, quantity, total, and customer_tier pass through unchanged.

  2. Adds inventory data: available_quantity, product_name, and inventory_check status from the mock lookup.

  3. Routes by customer tier: Since customer_tier is "vip", it gets VIP processing with special perks and priority.

  4. Enriches with processing metadata: assigned_rep, priority_score, processing_tier, and time estimates.

  5. Finalizes with timestamps: order_status, processed_at, and calculated processing_time_hours.