Patterns for Remote MCP Servers

This page provides a reference catalog of patterns designed for use with MCP servers in Redpanda Cloud. Use these patterns as building blocks for your own MCP tools. For step-by-step instructions on building, deploying, and testing MCP servers, see Build Remote MCP Servers in Redpanda Cloud.

Each pattern is a reusable example for a common MCP tool scenario. Patterns are grouped by use case. All YAML is ready to use in your MCP server project.

For a high-level overview of MCP servers, see MCP in Redpanda Cloud.

Pattern selection guide

Choose the right pattern for your use case:

Data generators

Use inputs to create tools that read data from internal or external systems or generate sample data for testing and development.

When to use: Development and testing environments where you need synthetic data, load testing scenarios, or demonstrating data flows without real data sources.

Example use cases: Mock user events, test order data, synthetic sensor readings, demo data for presentations.

This example generates a realistic user event message:

label: generate_input
generate:
  mapping: |
    let event_type = ["login", "logout", "purchase", "view_page", "click_button"].index(random_int(max:4))
    root = {
      "id": uuid_v4(),
      "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
      "user_id": random_int(min:1, max:10000),
      "event_type": $event_type,
      "data": {
        "session_id": ksuid(),
        "ip_address": "192.168.%v.%v".format(random_int(max:255), random_int(min:1, max:254)),
        "user_agent": ["Chrome", "Firefox", "Safari", "Edge"].index(random_int(max:3)),
        "amount": if $event_type == "purchase" { random_int(min:10, max:500) } else { null }
      }
    }

meta:
  mcp:
    enabled: true
    description: "Generate an example user event message with realistic data"
    properties: []

External API calls

Use processors to fetch data from external APIs, databases, or services and return formatted results. This is one of the most common patterns for MCP tools.

When to use: Integrating with third-party services, fetching real-time data, calling internal microservices, or enriching event data with external information.

Example use cases: Fetch user profile from CRM, get product pricing from inventory API, validate addresses with geocoding service, retrieve weather data.

label: fetch-weather
processors:
  - label: prepare_parameters
    mutation: |
      meta city_name = this.city_name
  - label: fetch_weather
    http:
      url: 'https://wttr.in/${! @city_name }?format=j1'
      verb: GET
      headers:
        Accept: "application/json"
        User-Agent: "redpanda-mcp-server/1.0"
  - label: format_response
    mutation: |
      root = {
        "city": @city_name,
        "temperature": this.current_condition.0.temp_C.number(),
        "feels_like": this.current_condition.0.FeelsLikeC.number(),
        "humidity": this.current_condition.0.humidity.number(),
        "pressure": this.current_condition.0.pressure.number(),
        "description": this.current_condition.0.weatherDesc.0.value,
        "wind_speed": this.current_condition.0.windspeedKmph.number(),
        "metadata": {
          "source": "wttr.in",
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Fetch current weather information for a specified city"
    properties:
      - name: city_name
        type: string
        description: "Name of the city to get weather information for"
        required: true

Database queries

Query external databases and return structured results. This pattern is essential for tools that need to access business data.

When to use: Retrieving customer records, querying analytics data, looking up configuration values, or joining streaming data with dimensional data from data warehouses.

Example use cases: Fetch customer details from PostgreSQL, query sales data from BigQuery, retrieve product catalog from MongoDB, look up reference data.

label: gcp_bigquery_select_processor
processors:
  - label: prepare_parameters
    mutation: |
      meta customer_id = this.customer_id.string().catch("12345")
      meta limit = this.limit.number().catch(10)
  - label: query_bigquery
    gcp_bigquery_select:
      project: my-gcp-project
      credentials_json: |
        ${secrets.BIGQUERY_CREDENTIALS}
      table: my_dataset.customer_orders
      columns:
        - "order_id"
        - "customer_id"
        - "order_date"
        - "total_amount"
        - "status"
      where: customer_id = ? AND order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
      suffix: "ORDER BY order_date DESC LIMIT ?"
      args_mapping: root = [ @customer_id, @limit ]
  - label: format_response
    mutation: |
      root = {
        "orders": this,
        "metadata": {
          "source": "BigQuery",
          "customer_id": @customer_id,
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Query customer orders from BigQuery"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to filter orders"
        required: true
      - name: limit
        type: number
        description: "Maximum number of orders to return"
        required: false

See also: gcp_bigquery_select processor, sql_select processor (for other databases)

Data consumers

Read messages from Redpanda topics to process streaming data in your MCP tools. This pattern enables event-driven workflows and real-time data processing.

When to use: Processing events from Redpanda topics, building event-driven AI agents, consuming audit logs, or subscribing to data change streams.

Example use cases: Monitor order events, process user activity streams, consume IoT sensor data, react to system notifications.

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "user-events" ]
  consumer_group: "mcp-event-processor"
  start_from_oldest: true
  tls:
    enabled: true
  sasl:
    - mechanism: "${REDPANDA_SASL_MECHANISM}"
      username: "${REDPANDA_SASL_USERNAME}"
      password: "${REDPANDA_SASL_PASSWORD}"

See also: redpanda input

Data transformation

Transform, validate, and enrich data as it flows through your MCP tools. Use Bloblang mapping language for powerful data manipulation.

When to use: Converting data formats, validating schemas, filtering events, enriching messages with computed fields, or normalizing data structures.

Example use cases: Parse JSON payloads, validate required fields, add timestamps, convert units, mask sensitive data, aggregate nested objects.

mapping: |
  # Parse and validate incoming data
  root.user_id = this.user_id.or(throw("user_id is required"))
  root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00")

  # Transform and enrich
  root.email_domain = this.email.split("@").index(1)
  root.is_premium = this.subscription_tier == "premium"

  # Filter sensitive data
  root.profile = this.profile.without("ssn", "credit_card")

AI/LLM integration

Integrate AI and LLM services into your MCP tools for intelligent data processing, embeddings generation, and natural language understanding.

When to use: Generating embeddings for semantic search, calling LLM APIs for text generation, building RAG (Retrieval Augmented Generation) pipelines, or analyzing sentiment.

Example use cases: Generate embeddings for documents, classify customer feedback, summarize long text, extract entities, answer questions with context.

OpenAI integration

openai_chat_completion:
  api_key: "${secrets.OPENAI_API_KEY}"
  model: "gpt-4"
  prompt: |
    Analyze this customer feedback and provide:
    1. Sentiment (positive/negative/neutral)
    2. Key themes
    3. Actionable insights

    Feedback: ${! json("feedback_text") }
  max_tokens: 500

Embeddings generation

openai_embeddings:
  api_key: "${secrets.OPENAI_API_KEY}"
  model: "text-embedding-3-small"
  text: ${! json("content") }

Redpanda integration

Publish data to Redpanda topics and implement caching systems backed by Redpanda.

Use these patterns when you need to integrate with Redpanda infrastructure, publish data to topics, or implement caching systems.

Data publishers

Use outputs to send data to external systems, publish events to Redpanda topics, or integrate with downstream services.

When to use: Publishing events to Redpanda for consumption by other services, creating event sourcing patterns, building audit trails, or triggering downstream workflows.

Example use cases: Publish order confirmations, emit audit events, trigger notifications, create event-driven workflows.

This example publishes a message to a Redpanda topic:

label: redpanda_output
redpanda:
  seed_brokers:
    - ${REDPANDA_BROKERS}
  topic: ${! this.topic_name.string().catch("default-topic") }
  timeout: 30s
  tls:
    enabled: true
  sasl:
    - mechanism: SCRAM-SHA-256
      username: ${secrets.REDPANDA_USERNAME}
      password: ${secrets.REDPANDA_PASSWORD}
meta:
  mcp:
    enabled: true
    description: Publishes a message to a specified Redpanda topic
    properties:
      - name: message
        type: string
        description: The message content to publish
        required: true
      - name: topic_name
        type: string
        description: The Redpanda topic to publish to
        required: true

See also: redpanda output

Outputs with processors

Output tools can include processors to transform data before publishing. This pattern is useful when you need to process data and save the result to a destination in a single tool.

When to use: Processing user input with an LLM and saving the response, transforming data before publishing to a topic, enriching events before writing to external systems.

Example use cases: Send a prompt to an LLM, then save the answer to a topic in Redpanda.

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topic: "llm-responses"
  tls:
    enabled: true
  sasl:
    - mechanism: "${REDPANDA_SASL_MECHANISM}"
      username: "${REDPANDA_SASL_USERNAME}"
      password: "${REDPANDA_SASL_PASSWORD}"

processors:
  - openai_chat_completion:
      api_key: "${secrets.OPENAI_API_KEY}"
      model: "gpt-4"
      prompt: ${! json("question") }
  - mapping: |
      root.question = this.question
      root.answer = this.content
      root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00")

You can use an output component type with processors, but you cannot use a processor component type with outputs. The processors field is available in all output components.

Caching systems

Use caching to store frequently accessed data, reduce latency, and minimize external API calls. You can implement caching using either Redpanda topics or in-memory stores.

When to use: Reducing repeated API calls, storing lookup tables, caching database query results, or maintaining session state across tool invocations.

Example use cases: Cache user profiles, store API rate limit counters, maintain configuration values, cache product catalogs.

Redpanda-backed cache
label: redpanda_cache
redpanda:
  seed_brokers: ["${REDPANDA_BROKERS}"]
  topic: "mcp-cache-topic"
  tls:
    enabled: true
  sasl:
    - mechanism: "SCRAM-SHA-512"
      username: "${secrets.MCP_REDPANDA_CREDENTIALS.username}"
      password: "${secrets.MCP_REDPANDA_CREDENTIALS.password}"

meta:
  mcp:
    enabled: true
    description: "Redpanda-backed distributed cache using Kafka topics for persistence"
In-memory cache for low-latency access to small datasets
label: memory_cache
memory:
  default_ttl: "5m"
  init_values:
    "user:1001": '{"name": "Alice", "role": "admin"}'
    "user:1002": '{"name": "Bob", "role": "user"}'
    "config:theme": "dark"
    "config:language": "en"
  shards: 4

meta:
  mcp:
    enabled: true
    description: "In-memory cache for storing user data, configuration, and temporary values"

See also: memory cache, Redpanda-backed cache using redpanda output

Production workflows and observability

Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring.

Parameter validation and type coercion

Always validate and coerce input parameters to ensure your tools are robust:

processors:
  - label: validate_params
    mutation: |
      # Validate required parameters
      root = if !this.exists("user_id") {
        throw("user_id parameter is required")
      } else { this }

      # Type coercion with validation
      meta user_id = this.user_id.string()
      meta limit = this.limit.number().catch(10)
      meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h"))

Dynamic configuration

Build tools that adapt their behavior based on input parameters:

processors:
  - label: dynamic_config
    mutation: |
      # Choose data source based on environment
      meta env = this.environment | "production"
      meta table_name = match @env {
        "dev" => "dev_orders",
        "staging" => "staging_orders",
        "production" => "prod_orders",
        _ => "dev_orders"
      }

      # Adjust query complexity based on urgency
      meta columns = if this.detailed.bool().catch(false) {
        ["order_id", "customer_id", "total", "items", "shipping_address"]
      } else {
        ["order_id", "customer_id", "total"]
      }

Error handling and fallbacks

Implement error handling to make your tools reliable:

processors:
  - label: primary_fetch
    try:
      - http:
          url: "https://api.primary.com/data"
          timeout: "10s"
    catch:
      - log:
          message: "Primary API failed, trying fallback"
      - label: fallback_fetch
        http:
          url: "https://api.fallback.com/data"
          timeout: "15s"
      - mutation: |
          root.metadata.source = "fallback"
          root.metadata.warning = "Primary source unavailable"

Conditional processing

Build tools that branch based on input or data characteristics:

processors:
  - label: conditional_processing
    switch:
      - check: this.data_type == "json"
        processors:
          - json:
              operator: "parse"
          - mutation: 'root.parsed_data = this'
      - check: this.data_type == "csv"
        processors:
          - csv:
              parse: true
          - mutation: 'root.parsed_data = this'
      - processors:
          - mutation: 'root.error = "Unsupported data type"'

Secrets and credentials

Securely handle multiple credentials and API keys.

Here is an example of using an API key secret.

  1. Create a secret in the Secrets Store with name EXTERNAL_API_KEY and your API key as the value.

  2. Reference the secret in your YAML configuration:

    processors:
      - label: call_external_api
        http:
          url: "https://api.example.com/data"
          verb: GET
          headers:
            Authorization: "Bearer ${secrets.EXTERNAL_API_KEY}"  (1)
            Accept: "application/json"
    1 The secret is injected at runtime. Never store the actual API key in your YAML configuration. The actual secret value never appears in your configuration files or logs.

Monitoring, debugging, and observability

Use structured logging, request tracing, and performance metrics to gain insights into tool execution.

label: observable_tool
processors:
  - label: init_tracing
    mutation: |
      # Generate correlation ID for request tracing
      meta req_id = uuid_v7()
      meta start_time = now()

      # Log request start with structured data
      root.trace = {
        "request_id": @req_id,
        "timestamp": @start_time.ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "version": "1.0.0"
      }

  - label: log_request_start
    log:
      message: "MCP tool request started"
      fields:
        request_id: "${! @req_id }"
        tool_name: "observable_tool"
        input_params: "${! this.without(\"trace\") }"
        user_agent: "${! meta(\"User-Agent\").catch(\"unknown\") }"
      level: "INFO"

  - label: finalize_response
    mutation: |
      # Calculate total execution time
      meta duration = (now().ts_unix_nano() - @start_time.ts_unix_nano()) / 1000000

      # Add trace information to response
      root.metadata = {
        "request_id": @req_id,
        "execution_time_ms": @duration,
        "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "success": !this.exists("error")
      }

  - label: log_completion
    log:
      message: "MCP tool request completed"
      fields:
        request_id: "${! @req_id }"
        duration_ms: "${! this.metadata.execution_time_ms }"
        success: "${! this.metadata.success }"
        result_size: "${! content().length() }"
      level: "INFO"

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Example tool with comprehensive observability and error handling"
    properties:
      - name: user_id
        type: string
        description: "User ID to fetch data for"
        required: true

Observability features:

  • Correlation IDs: Use uuid_v7() to generate unique request identifiers for tracing

  • Execution timing: Track how long your tools take to execute using nanosecond precision

  • Structured logging: Include consistent fields like request_id, duration_ms, tool_name

  • Request/response metadata: Log input parameters and response characteristics

  • Success tracking: Monitor whether operations complete successfully

You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like:

{
  "metadata": {
    "execution_time_ms": 0.158977,
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "success": true,
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool"
  },
  "trace": {
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool",
    "version": "1.0.0"
  },
  "user_id": "1"
}

See also: log processor, try processor, Bloblang functions (for timing and ID generation)

Multi-step data enrichment

Build tools that combine data from multiple sources.

This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics.

label: customer_enrichment
processors:
  - label: fetch_customer_base
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "customers"
            where: "customer_id = ?"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.customers = this'

  - label: enrich_with_orders
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "orders"
            where: "customer_id = ? AND created_at >= NOW() - INTERVAL ''30 days''"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.orders = this'

  - label: combine_data
    mutation: |
      let order_totals = this.orders.map_each(o -> o.total)
      root = {
        "customer": this.customers.index(0),
        "recent_orders": this.orders,
        "metrics": {
          "total_orders": this.orders.length(),
          "total_spent": $order_totals.sum(),
          "avg_order_value": if $order_totals.length() > 0 { $order_totals.sum() / $order_totals.length() } else { 0 }
        }
      }

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Get comprehensive customer profile with recent order history and metrics"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to analyze"
        required: true

See also: sql_select processor, Bloblang functions (for data manipulation and aggregations)

Workflow orchestration

Coordinate complex workflows with multiple steps and conditional logic.

This workflow simulates a complete order processing pipeline with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems.

label: order_workflow
processors:
  - label: validate_order
    mutation: |
      # Validation logic
      root = if this.total <= 0 {
        throw("Invalid order total")
      } else { this }

  - label: mock_inventory_check
    mutation: |
      # Mock inventory data for testing
      let inventory = {
        "widget-001": {"quantity": 100, "name": "Standard Widget"},
        "widget-premium": {"quantity": 25, "name": "Premium Widget"},
        "widget-limited": {"quantity": 2, "name": "Limited Edition Widget"}
      }

      let product = $inventory.get(this.product_id)
      root = if $product == null {
        throw("Product not found: " + this.product_id)
      } else if $product.quantity < this.quantity {
        throw("Insufficient inventory. Available: " + $product.quantity.string())
      } else {
        this.merge({
          "inventory_check": "passed",
          "available_quantity": $product.quantity,
          "product_name": $product.name
        })
      }

  - label: route_by_priority
    switch:
      - check: 'this.total > 1000'
        processors:
          - label: mock_high_value_processing
            mutation: |
              # Mock premium processing
              root = this.merge({
                "processing_tier": "premium",
                "processing_time_estimate": "2-4 hours",
                "assigned_rep": "premium-team@company.com",
                "priority_score": 95
              })

      - check: 'this.customer_tier == "vip"'
        processors:
          - label: mock_vip_processing
            mutation: |
              # Mock VIP processing
              root = this.merge({
                "processing_tier": "vip",
                "processing_time_estimate": "1-2 hours",
                "assigned_rep": "vip-team@company.com",
                "priority_score": 90,
                "perks": ["expedited_shipping", "white_glove_service"]
              })

      - processors:
          - label: mock_standard_processing
            mutation: |
              # Mock standard processing
              root = this.merge({
                "processing_tier": "standard",
                "processing_time_estimate": "24-48 hours",
                "assigned_rep": "support@company.com",
                "priority_score": 50
              })

  - label: finalize_order
    mutation: |
      # Add final processing metadata
      # Calculate estimated fulfillment by parsing processing time
      let max_hours = this.processing_time_estimate.split("-").index(1).split(" ").index(0).number()

      root = this.merge({
        "order_status": "processed",
        "processed_at": now().ts_format("2006-01-02T15:04:05.000Z"),
        "estimated_fulfillment": "TBD - calculated based on processing tier",
        "processing_time_hours": $max_hours
      })

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Process orders with validation, inventory check, and tiered routing (with mocks for testing)"
    properties:
      - name: order_id
        type: string
        description: "Unique order identifier"
        required: true
      - name: product_id
        type: string
        description: "Product ID (try: widget-001, widget-premium, widget-limited)"
        required: true
      - name: quantity
        type: number
        description: "Quantity to order"
        required: true
      - name: total
        type: number
        description: "Order total in dollars"
        required: true
      - name: customer_tier
        type: string
        description: "Customer tier (optional: vip, standard)"
        required: false

For the input {"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}, the workflow produces:

{
  "assigned_rep": "vip-team@company.com",
  "available_quantity": 100,
  "customer_tier": "vip",
  "estimated_fulfillment": "TBD - calculated based on processing tier",
  "inventory_check": "passed",
  "order_id": "ORD001",
  "order_status": "processed",
  "perks": [
    "expedited_shipping",
    "white_glove_service"
  ],
  "priority_score": 90,
  "processed_at": "2025-09-16T09:05:29.138Z",
  "processing_tier": "vip",
  "processing_time_estimate": "1-2 hours",
  "processing_time_hours": 2,
  "product_id": "widget-001",
  "product_name": "Standard Widget",
  "quantity": 5,
  "total": 250
}

Notice how the workflow:

  1. Preserves original input: order_id, product_id, quantity, total, and customer_tier pass through unchanged.

  2. Adds inventory data: available_quantity, product_name, and inventory_check status from the mock lookup.

  3. Routes by customer tier: Since customer_tier is vip, it gets VIP processing with special perks and priority.

  4. Enriches with processing metadata: assigned_rep, priority_score, processing_tier, and time estimates.

  5. Finalizes with timestamps: order_status, processed_at, and calculated processing_time_hours.