MCP Tool Patterns

When building tools, use these patterns as starting points for common scenarios. For step-by-step instructions, see Create an MCP Tool. For design guidelines, see MCP Tool Design.

After reading this page, you will be able to:

  • Find reusable patterns for common MCP tool scenarios

  • Apply validation and error handling patterns for production robustness

  • Format responses consistently for AI client consumption

Read data

Use inputs to create tools that read from data sources or generate sample data.

Generate test data

When to use: Development and testing environments where you need synthetic data, load testing scenarios, or demonstrating data flows without real data sources.

Example use cases: Mock user events, test order data, synthetic sensor readings, demo data for presentations.

label: generate_input
generate:
  mapping: |
    let event_type = ["login", "logout", "purchase", "view_page", "click_button"].index(random_int(max:4))
    root = {
      "id": uuid_v4(),
      "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
      "user_id": random_int(min:1, max:10000),
      "event_type": $event_type,
      "data": {
        "session_id": ksuid(),
        "ip_address": "192.168.%v.%v".format(random_int(max:255), random_int(min:1, max:254)),
        "user_agent": ["Chrome", "Firefox", "Safari", "Edge"].index(random_int(max:3)),
        "amount": if $event_type == "purchase" { random_int(min:10, max:500) } else { null }
      }
    }

meta:
  mcp:
    enabled: true
    description: "Generate an example user event message with realistic data"
    properties: []

Consume from Redpanda topics

When to use: Processing events from Redpanda topics, building event-driven AI agents, consuming audit logs, or subscribing to data change streams.

Example use cases: Monitor order events, process user activity streams, consume IoT sensor data, react to system notifications.

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "user-events" ]
  consumer_group: "mcp-event-processor"
  start_from_oldest: true
  tls:
    enabled: true
  sasl:
    - mechanism: "${REDPANDA_SASL_MECHANISM}"
      username: "${REDPANDA_SASL_USERNAME}"
      password: "${REDPANDA_SASL_PASSWORD}"

See also: redpanda input

Process streaming data

When to use: Real-time analytics, windowed aggregations, computing metrics over time, or building streaming dashboards.

Example use cases: Calculate rolling averages, count events per time window, detect anomalies in streams, aggregate metrics.

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "sensor-readings" ]
  consumer_group: "analytics-processor"
  tls:
    enabled: true
  sasl:
    - mechanism: "${REDPANDA_SASL_MECHANISM}"
      username: "${REDPANDA_SASL_USERNAME}"
      password: "${REDPANDA_SASL_PASSWORD}"
  processors:
    - mapping: |
        root.sensor_id = this.sensor_id
        root.avg_temperature = this.readings.map_each(r -> r.temperature).mean()
        root.max_temperature = this.readings.map_each(r -> r.temperature).max()
        root.reading_count = this.readings.length()
        root.window_end = now()

See also: redpanda input

Call external services

Use processors to fetch data from external APIs, databases, or AI services.

Call REST APIs

When to use: Integrating with third-party services, fetching real-time data, calling internal microservices, or enriching event data with external information.

Example use cases: Fetch user profile from CRM, get product pricing from inventory API, validate addresses with geocoding service, retrieve weather data.

label: fetch-weather
processors:
  - label: prepare_parameters
    mutation: |
      meta city_name = this.city_name
  - label: fetch_weather
    http:
      url: 'https://wttr.in/${! @city_name }?format=j1'
      verb: GET
      headers:
        Accept: "application/json"
        User-Agent: "redpanda-mcp-server/1.0"
  - label: format_response
    mutation: |
      root = {
        "city": @city_name,
        "temperature": this.current_condition.0.temp_C.number(),
        "feels_like": this.current_condition.0.FeelsLikeC.number(),
        "humidity": this.current_condition.0.humidity.number(),
        "pressure": this.current_condition.0.pressure.number(),
        "description": this.current_condition.0.weatherDesc.0.value,
        "wind_speed": this.current_condition.0.windspeedKmph.number(),
        "metadata": {
          "source": "wttr.in",
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Fetch current weather information for a specified city"
    properties:
      - name: city_name
        type: string
        description: "Name of the city to get weather information for"
        required: true

Query databases

When to use: Retrieving customer records, querying analytics data, looking up configuration values, or joining streaming data with dimensional data from data warehouses.

Example use cases: Fetch customer details from PostgreSQL, query sales data from BigQuery, retrieve product catalog from MongoDB, look up reference data.

label: gcp_bigquery_select_processor
processors:
  - label: prepare_parameters
    mutation: |
      meta customer_id = this.customer_id.string().catch("12345")
      meta limit = this.limit.number().catch(10)
  - label: query_bigquery
    gcp_bigquery_select:
      project: my-gcp-project
      credentials_json: |
        ${secrets.BIGQUERY_CREDENTIALS}
      table: my_dataset.customer_orders
      columns:
        - "order_id"
        - "customer_id"
        - "order_date"
        - "total_amount"
        - "status"
      where: customer_id = ? AND order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
      suffix: "ORDER BY order_date DESC LIMIT ?"
      args_mapping: root = [ @customer_id, @limit ]
  - label: format_response
    mutation: |
      root = {
        "orders": this,
        "metadata": {
          "source": "BigQuery",
          "customer_id": @customer_id,
          "fetched_at": now().ts_format("2006-01-02T15:04:05.000Z")
        }
      }

meta:
  mcp:
    enabled: true
    description: "Query customer orders from BigQuery"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to filter orders"
        required: true
      - name: limit
        type: number
        description: "Maximum number of orders to return"
        required: false

Integrate with AI/LLM services

When to use: Generating embeddings for semantic search, calling LLM APIs for text generation, building RAG pipelines, or analyzing sentiment.

Example use cases: Generate embeddings for documents, classify customer feedback, summarize long text, extract entities, answer questions with context.

OpenAI chat completion

openai_chat_completion:
  api_key: "${secrets.OPENAI_API_KEY}"
  model: "gpt-4"
  prompt: |
    Analyze this customer feedback and provide:
    1. Sentiment (positive/negative/neutral)
    2. Key themes
    3. Actionable insights

    Feedback: ${! json("feedback_text") }
  max_tokens: 500

Generate embeddings

openai_embeddings:
  api_key: "${secrets.OPENAI_API_KEY}"
  model: "text-embedding-3-small"
  text: ${! json("content") }

Write data

Use outputs to write data to Redpanda topics or cache stores.

Publish to Redpanda topics

When to use: Publishing events to Redpanda for consumption by other services, creating event sourcing patterns, building audit trails, or triggering downstream workflows.

Example use cases: Publish order confirmations, emit audit events, trigger notifications, create event-driven workflows.

label: redpanda_output
redpanda:
  seed_brokers:
    - ${REDPANDA_BROKERS}
  topic: ${! this.topic_name.string().catch("default-topic") }
  timeout: 30s
  tls:
    enabled: true
  sasl:
    - mechanism: SCRAM-SHA-256
      username: ${secrets.REDPANDA_USERNAME}
      password: ${secrets.REDPANDA_PASSWORD}
meta:
  mcp:
    enabled: true
    description: Publishes a message to a specified Redpanda topic
    properties:
      - name: message
        type: string
        description: The message content to publish
        required: true
      - name: topic_name
        type: string
        description: The Redpanda topic to publish to
        required: true

See also: redpanda output

Outputs with processors

Output tools can include processors to transform data before publishing. This pattern is useful when you need to process data and save the result to a destination in a single tool.

When to use: Processing user input with an LLM and saving the response, transforming data before publishing to a topic, enriching events before writing to external systems.

label: summarize_and_publish

processors:
  - openai_chat_completion:
      api_key: "${secrets.OPENAI_API_KEY}"
      model: "gpt-4"
      prompt: ${! json("question") }
  - mapping: |
      root.question = this.question
      root.answer = this.content
      root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00")

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topic: "llm-responses"
  tls:
    enabled: true
  sasl:
    - mechanism: SCRAM-SHA-256
      username: "${secrets.MCP_USERNAME}"
      password: "${secrets.MCP_PASSWORD}"

meta:
  mcp:
    enabled: true
    description: "Process a question through an LLM and publish the response to Redpanda"
    properties:
      - name: question
        type: string
        description: "The question to send to the LLM"
        required: true

Cache data

When to use: Reducing repeated API calls, storing lookup tables, caching database query results, or maintaining session state across tool invocations.

Example use cases: Cache user profiles, store API rate limit counters, maintain configuration values, cache product catalogs.

Redpanda-backed cache
label: redpanda_cache
redpanda:
  seed_brokers: ["${REDPANDA_BROKERS}"]
  topic: "mcp-cache-topic"
  tls:
    enabled: true
  sasl:
    - mechanism: "SCRAM-SHA-512"
      username: "${secrets.MCP_REDPANDA_CREDENTIALS.username}"
      password: "${secrets.MCP_REDPANDA_CREDENTIALS.password}"

meta:
  mcp:
    enabled: true
    description: "Redpanda-backed distributed cache using Kafka topics for persistence"
In-memory cache
label: memory_cache
memory:
  default_ttl: "5m"
  init_values:
    "user:1001": '{"name": "Alice", "role": "admin"}'
    "user:1002": '{"name": "Bob", "role": "user"}'
    "config:theme": "dark"
    "config:language": "en"
  shards: 4

meta:
  mcp:
    enabled: true
    description: "In-memory cache for storing user data, configuration, and temporary values"

Transform data

Use Bloblang and processors to transform, validate, and route data.

Transform and validate

When to use: Converting data formats, validating schemas, filtering events, enriching messages with computed fields, or normalizing data structures.

Example use cases: Parse JSON payloads, validate required fields, add timestamps, convert units, mask sensitive data, aggregate nested objects.

mapping: |
  # Parse and validate incoming data
  root.user_id = this.user_id.or(throw("user_id is required"))
  root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00")

  # Transform and enrich
  root.email_domain = this.email.split("@").index(1)
  root.is_premium = this.subscription_tier == "premium"

  # Filter sensitive data
  root.profile = this.profile.without("ssn", "credit_card")

Build event-driven workflows

When to use: Orchestrating multi-step processes, responding to business events, implementing saga patterns, or coordinating microservices.

Example use cases: Order fulfillment workflows, approval processes, notification cascades, data pipeline orchestration.

redpanda:
  seed_brokers: [ "${REDPANDA_BROKERS}" ]
  topics: [ "order-events" ]
  consumer_group: "workflow-orchestrator"
  tls:
    enabled: true
  sasl:
    - mechanism: "${REDPANDA_SASL_MECHANISM}"
      username: "${REDPANDA_SASL_USERNAME}"
      password: "${REDPANDA_SASL_PASSWORD}"
  processors:
    - switch:
        - check: this.event_type == "order_created"
          processors:
            - http:
                url: "${secrets.INVENTORY_API}/reserve"
                verb: POST
                body: '{"order_id": "${! this.order_id }", "items": ${! json("items") }}'
        - check: this.event_type == "payment_confirmed"
          processors:
            - http:
                url: "${secrets.FULFILLMENT_API}/ship"
                verb: POST
                body: '{"order_id": "${! this.order_id }"}'

See also: redpanda input

Production readiness

Build production-ready tools with proper input validation, error handling, and response formatting.

Validate input

AI clients may send unexpected or malformed input. Validate early to return helpful error messages instead of cryptic failures from downstream components.

Always validate inputs before processing. This prevents errors and provides clear feedback to the AI client. The following example shows a basic validation pattern:

- label: validate_input
  mutation: |
    let city = this.city.or("").trim()
    root = if $city == "" {
      {"error": "City name is required"}
    } else {
      {"city": $city}
    }

This validation does three things: First, .or("") provides an empty string default if the city field is missing, which prevents null errors. Then, .trim() removes whitespace so " " doesn’t pass as a valid city. Then, the if expression returns either an error object or the validated data. The AI client receives clear feedback either way.

Essential validation methods

Use these Bloblang methods for input validation:

Method Purpose Example

.or(default)

Provide fallback for missing fields

this.city.or("unknown")

.trim()

Remove leading/trailing whitespace

this.name.trim()

.exists("field")

Check if a field is present

this.exists("email")

.type()

Get the type of a value

this.count.type() == "number"

.length()

Check string or array length

this.items.length() > 0

.re_match(pattern)

Validate against regex

this.email.re_match("^@[^@]$")

.number()

Convert and validate as number

this.quantity.number()

Sanitize string inputs

Remove potentially dangerous characters from user inputs. This is especially important when inputs will be used in URLs, database queries, or shell commands:

- label: sanitize_input
  mutation: |
    let clean_city = this.city.or("").trim().re_replace_all("[^a-zA-Z\\s-]", "")
    root = if $clean_city == "" {
      {"error": "City name contains only invalid characters"}
    } else {
      {"city": $clean_city}
    }

The regex [^a-zA-Z\\s\\-] matches any character that is not a letter, space, or hyphen, and re_replace_all removes all matches. An input like "New York!@#$" becomes "New York". The meta keyword stores the result in message metadata (using @sanitized_city), keeping it separate from the message body until validation passes.

For regex replacement syntax, see re_replace_all.

Validate numeric ranges

Check that numeric inputs fall within acceptable bounds:

- label: validate_quantity
  mutation: |
    let qty = this.quantity.or(0).number()
    root = if $qty < 1 {
      {"error": "Quantity must be at least 1", "received": $qty}
    } else if $qty > 1000 {
      {"error": "Quantity cannot exceed 1000", "received": $qty}
    } else {
      {"quantity": $qty, "valid": true}
    }

This example chains .or(0) with .number() to handle both missing values and type conversion. The chained if/else if checks both lower and upper bounds. Including the received value in error responses helps AI clients understand what went wrong and correct their input.

Validate multiple fields

For forms or complex inputs, collect all errors before returning. This gives AI clients a complete list of problems to fix rather than failing on the first error:

- label: validate_order
  mutation: |
    let errors = []
    let errors = if !this.exists("order_id") || this.order_id == "" {
      $errors.append("order_id is required")
    } else { $errors }
    let errors = if !this.exists("items") || this.items.length() == 0 {
      $errors.append("at least one item is required")
    } else { $errors }
    let errors = if this.exists("email") && !this.email.contains("@") {
      $errors.append("invalid email format")
    } else { $errors }
    root = if $errors.length() > 0 {
      {"valid": false, "errors": $errors}
    } else {
      {"valid": true, "order_id": this.order_id}
    }

The pattern uses variable reassignment (let errors = …​) to accumulate errors into an array. Each check appends to the array if validation fails, or returns the unchanged array if it passes. At the end, if any errors were collected, the response includes all of them. Notice that the email validation only runs if the field exists. This allows optional fields that, when provided, must be valid.

Validate enum values

Restrict inputs to a set of allowed values. This prevents invalid states and provides helpful feedback when the input doesn’t match:

- label: validate_status
  mutation: |
    let allowed = ["pending", "approved", "rejected"]
    let status = this.status.or("").lowercase()
    root = if $status == "" {
      {"error": "status is required", "allowed": $allowed}
    } else if !$allowed.contains($status) {
      {"error": "invalid status", "received": $status, "allowed": $allowed}
    } else {
      {"status": $status, "valid": true}
    }

The lowercase() call normalizes the input so "PENDING", "Pending", and "pending" all match. When validation fails, the error response includes the list of allowed values. This helps AI clients self-correct without needing to look up valid options.

For more details, see contains and lowercase.

Use throw for validation failures

Use throw() to stop processing with an error message. This is useful when validation failure should stop the entire tool execution:

- label: require_auth
  mutation: |
    root = if !this.exists("api_key") || this.api_key == "" {
      throw("API key is required for this operation")
    } else {
      this
    }

Unlike returning an error object, throw() immediately stops the processor chain and triggers any catch block that follows. Use throw() for critical validation failures where continuing would be pointless or dangerous. The else branch returns this unchanged, passing all input fields to the next processor.

Handle errors

External services fail. Databases go down. APIs return unexpected responses. Wrap risky operations in error handling so your tool returns useful error messages instead of crashing.

Wrap operations that can fail in try/catch blocks. This ensures the tool returns useful errors instead of failing silently.

processors:
  - try:
      - http:
          url: "https://api.example.com/data"
          verb: GET

  - catch:
      - mutation: |
          root.error = true
          root.message = "Request failed: " + error()

For full configuration options, see try processor and catch processor.

Return error details

The error() function returns the error message from the most recent failure. Use it in catch blocks to capture what went wrong:

- label: handle_errors
  catch:
    - mutation: |
        root = {
          "success": false,
          "error_message": error(),
          "timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00")
        }

Set timeouts

Always set explicit timeouts on external calls to prevent tools from hanging indefinitely:

- label: fetch_with_timeout
  try:
    - http:
        url: "https://httpbin.org/get"
        verb: GET
        timeout: "10s"  # Fail after 10 seconds
        retries: 2      # Retry twice before failing
        retry_period: "1s"

For all timeout and retry options, see http processor.

Handle specific error types

Create different responses based on error type:

- catch:
    - mutation: |
        let err = error()
        root.error = true
        root.error_type = if $err.contains("timeout") {
          "TIMEOUT"
        } else if $err.contains("connection refused") {
          "CONNECTION_ERROR"
        } else if $err.contains("404") {
          "NOT_FOUND"
        } else {
          "UNKNOWN"
        }
        root.message = $err
        root.retry_suggested = root.error_type == "TIMEOUT" || root.error_type == "CONNECTION_ERROR"

Log errors for debugging

Add logging inside catch blocks to aid troubleshooting:

- catch:
    - log:
        message: "Tool failed: ${! error() }"
        level: ERROR
        fields_mapping: |
          root.input = this
          root.error = error()

    - mutation: |
        root.error = true
        root.message = error()

For log level options, see log processor.

Preserve input context in errors

Include original input data in error responses to help AI clients retry with corrections:

- label: validate_and_fetch
  try:
    - mutation: |
        meta original_input = this
    - mutation: |
        root = throw("User not found in database")

- label: handle_errors
  catch:
    - mutation: |
        root = {
          "error": "Failed to fetch user",
          "details": error(),
          "input_received": @original_input,
          "suggestion": "Verify the user_id exists"
        }

Format responses

AI clients work best with clean, predictable response structures. Transform raw component output into consistent formats.

Structure responses consistently so AI clients can interpret them reliably. The following example takes a raw weather API response and transforms it into a clean, predictable format:

- label: format_response
  mapping: |
    root = {
      "city": this.location.name,
      "temperature_c": this.current.temp_c.number(),
      "description": this.current.condition.text,
      "timestamp": now().ts_format("2006-01-02T15:04:05Z")
    }

This mapping does four things:

  • Extracts the city name from a nested location.name field

  • Converts temp_c to a number type (APIs sometimes return numbers as strings)

  • Pulls out the weather description text

  • Adds a timestamp so the AI client knows when the data was fetched

The result is a flat JSON object with predictable field names and types, rather than the raw API response which might have deeply nested structures or inconsistent formatting.

Type coercion methods

Use type coercion methods to ensure fields have the correct data types:

Method Purpose Example

.string()

Convert to string

this.id.string() becomes "123"

.number()

Convert to number

this.price.number() becomes 19.99

.bool()

Convert to boolean

this.active.bool() becomes true

.int64()

Convert to 64-bit integer

this.count.int64() becomes 42

.float64()

Convert to 64-bit float

this.ratio.float64() becomes 0.75

Format timestamps

Use now() with format_timestamp() for consistent time formatting:

- label: add_timestamps
  mutation: |
    root = this
    root.timestamp = now().format_timestamp("2006-01-02T15:04:05Z07:00")
    root.date = now().format_timestamp("2006-01-02")
    root.time = now().format_timestamp("15:04:05")

This example preserves all existing fields (root = this) and adds three timestamp fields. The now() function returns the current time, and format_timestamp() converts it to a string. Each field uses a different format: full ISO 8601 timestamp, date only, and time only.

The format string uses Go’s reference time layout. Common patterns:

Format Output example

"2006-01-02T15:04:05Z07:00"

2024-03-15T14:30:00-07:00 (ISO 8601)

"2006-01-02"

2024-03-15

"15:04:05"

14:30:00

"Mon, 02 Jan 2006"

Fri, 15 Mar 2024

Extract nested fields

API responses often have deeply nested structures. Extract only the fields your AI client needs and flatten them into a simple object:

- label: extract_user_data
  mapping: |
    root = {
      "user_id": this.data.user.id.string(),
      "name": this.data.user.profile.display_name,
      "email": this.data.user.contact.email,
      "is_verified": this.data.user.status.verified.bool(),
      "created_at": this.data.user.metadata.created_at
    }

This mapping navigates a nested structure like {"data": {"user": {"id": 123, "profile": {"display_name": "…​"}}}} and creates a flat response. The dot notation (this.data.user.id) drills down through nested objects. Type coercion (.string(), .bool()) ensures consistent output types.

For navigating nested structures, see dot notation.

Handle arrays

When your data contains arrays, you can transform each element, extract specific items, or compute aggregates:

- label: format_items
  mapping: |
    root = {
      "total_items": this.items.length(),
      "items": this.items.map_each(item -> {
        "id": item.id.string(),
        "name": item.name,
        "price": item.price.number()
      }),
      "first_item": this.items.index(0).name,
      "item_names": this.items.map_each(i -> i.name)
    }

This example demonstrates four array operations:

  • length() returns the array size for the total_items count

  • map_each() transforms each item into a new object with only the fields you need

  • index(0) accesses the first element (zero-indexed) to get the first item’s name

  • A second map_each() extracts just the names into a simple string array

For array operations, see map_each() and index().

Include fields conditionally

Sometimes you want to include fields only when they have meaningful values. This avoids returning null or empty fields that clutter the response:

- mutation: |
    root.id = this.id
    root.name = this.name
    root.email = if this.exists("email") && this.email != "" { this.email } else { deleted() }
    root.phone = if this.exists("phone") { this.phone } else { deleted() }

This mapping starts with required fields (id, name), then conditionally adds optional fields. The exists() check prevents errors when accessing missing fields. When the condition is false, deleted() removes the field entirely from the output. The AI client won’t see "email": null. The field simply won’t exist.

The deleted() function removes the field from the output entirely.

Filter sensitive data

When your data source contains sensitive fields, strip them before returning responses to the AI client:

- mutation: |
    root = this.without("password", "ssn", "api_key", "internal_notes")

The without() method creates a copy of the object with the specified fields removed. This is safer than manually selecting fields because new fields added to the source data are included automatically, so you only need to maintain the exclusion list. Use this when returning database records or API responses that might contain credentials or personal information.

For field removal, see without().

Wrap responses in a success envelope

When AI clients call multiple tools, they need a predictable way to check if the call succeeded. Wrapping responses in a consistent envelope structure makes this easy:

- label: format_success
  mapping: |
    root = {
      "success": true,
      "data": {
        "user_id": this.id,
        "name": this.name
      },
      "timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00")
    }

Both success and error responses share the same top-level structure: a success boolean, a payload field (data or error), and a timestamp. The AI client can check success first, then access the appropriate field. The error response uses the catch processor to handle failures and the error() function to capture the error message.

Advanced workflows

Build multi-step workflows with dynamic configuration, conditional logic, and observability.

Dynamic configuration

Build tools that adapt their behavior based on input parameters:

processors:
  - label: dynamic_config
    mutation: |
      # Choose data source based on environment
      meta env = this.environment | "production"
      meta table_name = match @env {
        "dev" => "dev_orders",
        "staging" => "staging_orders",
        "production" => "prod_orders",
        _ => "dev_orders"
      }

      # Adjust query complexity based on urgency
      meta columns = if this.detailed.bool().catch(false) {
        ["order_id", "customer_id", "total", "items", "shipping_address"]
      } else {
        ["order_id", "customer_id", "total"]
      }

Conditional processing

Build tools that branch based on input or data characteristics:

processors:
  - label: conditional_processing
    switch:
      - check: this.data_type == "json"
        processors:
          - mapping: |
              root.parsed_data = this.content.parse_json()
              root.format = "json"
      - check: this.data_type == "csv"
        processors:
          - mapping: |
              root.parsed_data = this.content.parse_csv()
              root.format = "csv"
      - processors:
          - mapping: |
              root.error = "Unsupported data type"
              root.supported_types = ["json", "csv"]

Secrets and credentials

Securely handle multiple credentials and API keys.

Here is an example of using an API key secret.

  1. Create a secret in the Secrets Store with name EXTERNAL_API_KEY and your API key as the value.

  2. Reference the secret in your YAML configuration:

    processors:
      - label: call_external_api
        http:
          url: "https://api.example.com/data"
          verb: GET
          headers:
            Authorization: "Bearer ${secrets.EXTERNAL_API_KEY}"  (1)
            Accept: "application/json"
    1 The secret is injected at runtime. Never store the actual API key in your YAML configuration. The actual secret value never appears in your configuration files or logs.

Monitoring, debugging, and observability

Use structured logging, request tracing, and performance metrics to gain insights into tool execution.

label: observable_tool
processors:
  - label: init_tracing
    mutation: |
      # Generate correlation ID for request tracing
      meta req_id = uuid_v7()
      meta start_time = now()

      # Log request start with structured data
      root.trace = {
        "request_id": @req_id,
        "timestamp": @start_time.ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "version": "1.0.0"
      }

  - label: log_request_start
    log:
      message: "MCP tool request started"
      fields:
        request_id: "${! @req_id }"
        tool_name: "observable_tool"
        input_params: "${! this.without(\"trace\") }"
        user_agent: "${! meta(\"User-Agent\").catch(\"unknown\") }"
      level: "INFO"

  - label: finalize_response
    mutation: |
      # Calculate total execution time
      meta duration = (now().ts_unix_nano() - @start_time.ts_unix_nano()) / 1000000

      # Add trace information to response
      root.metadata = {
        "request_id": @req_id,
        "execution_time_ms": @duration,
        "timestamp": now().ts_format("2006-01-02T15:04:05.000Z"),
        "tool": "observable_tool",
        "success": !this.exists("error")
      }

  - label: log_completion
    log:
      message: "MCP tool request completed"
      fields:
        request_id: "${! @req_id }"
        duration_ms: "${! this.metadata.execution_time_ms }"
        success: "${! this.metadata.success }"
        result_size: "${! content().length() }"
      level: "INFO"

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Example tool with comprehensive observability and error handling"
    properties:
      - name: user_id
        type: string
        description: "User ID to fetch data for"
        required: true

Observability features:

  • Correlation IDs: Use uuid_v7() to generate unique request identifiers for tracing

  • Execution timing: Track how long your tools take to execute using nanosecond precision

  • Structured logging: Include consistent fields like request_id, duration_ms, tool_name

  • Request/response metadata: Log input parameters and response characteristics

  • Success tracking: Monitor whether operations complete successfully

You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like:

{
  "metadata": {
    "execution_time_ms": 0.158977,
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "success": true,
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool"
  },
  "trace": {
    "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95",
    "timestamp": "2025-09-16T08:37:18.589Z",
    "tool": "observable_tool",
    "version": "1.0.0"
  },
  "user_id": "1"
}

See also: log processor, try processor, Bloblang functions (for timing and ID generation)

Multi-step data enrichment

Build tools that combine data from multiple sources.

This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics.

label: customer_enrichment
processors:
  - label: fetch_customer_base
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "customers"
            where: "customer_id = ?"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.customers = this'

  - label: enrich_with_orders
    branch:
      processors:
        - sql_select:
            driver: "postgres"
            dsn: "${POSTGRES_DSN}"
            table: "orders"
            where: "customer_id = ? AND created_at >= NOW() - INTERVAL ''30 days''"
            args_mapping: 'root = [this.customer_id]'
      result_map: 'root.orders = this'

  - label: combine_data
    mutation: |
      let order_totals = this.orders.map_each(o -> o.total)
      root = {
        "customer": this.customers.index(0),
        "recent_orders": this.orders,
        "metrics": {
          "total_orders": this.orders.length(),
          "total_spent": $order_totals.sum(),
          "avg_order_value": if $order_totals.length() > 0 { $order_totals.sum() / $order_totals.length() } else { 0 }
        }
      }

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Get comprehensive customer profile with recent order history and metrics"
    properties:
      - name: customer_id
        type: string
        description: "Customer ID to analyze"
        required: true

See also: sql_select processor, Bloblang functions (for data manipulation and aggregations)

Workflow orchestration

Coordinate complex workflows with multiple steps and conditional logic.

This workflow simulates a complete order processing pipeline with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems.

label: order_workflow
processors:
  - label: validate_order
    mutation: |
      # Validation logic
      root = if this.total <= 0 {
        throw("Invalid order total")
      } else { this }

  - label: mock_inventory_check
    mutation: |
      # Mock inventory data for testing
      let inventory = {
        "widget-001": {"quantity": 100, "name": "Standard Widget"},
        "widget-premium": {"quantity": 25, "name": "Premium Widget"},
        "widget-limited": {"quantity": 2, "name": "Limited Edition Widget"}
      }

      let product = $inventory.get(this.product_id)
      root = if $product == null {
        throw("Product not found: " + this.product_id)
      } else if $product.quantity < this.quantity {
        throw("Insufficient inventory. Available: " + $product.quantity.string())
      } else {
        this.merge({
          "inventory_check": "passed",
          "available_quantity": $product.quantity,
          "product_name": $product.name
        })
      }

  - label: route_by_priority
    switch:
      - check: 'this.total > 1000'
        processors:
          - label: mock_high_value_processing
            mutation: |
              # Mock premium processing
              root = this.merge({
                "processing_tier": "premium",
                "processing_time_estimate": "2-4 hours",
                "assigned_rep": "premium-team@company.com",
                "priority_score": 95
              })

      - check: 'this.customer_tier == "vip"'
        processors:
          - label: mock_vip_processing
            mutation: |
              # Mock VIP processing
              root = this.merge({
                "processing_tier": "vip",
                "processing_time_estimate": "1-2 hours",
                "assigned_rep": "vip-team@company.com",
                "priority_score": 90,
                "perks": ["expedited_shipping", "white_glove_service"]
              })

      - processors:
          - label: mock_standard_processing
            mutation: |
              # Mock standard processing
              root = this.merge({
                "processing_tier": "standard",
                "processing_time_estimate": "24-48 hours",
                "assigned_rep": "support@company.com",
                "priority_score": 50
              })

  - label: finalize_order
    mutation: |
      # Add final processing metadata
      # Calculate estimated fulfillment by parsing processing time
      let max_hours = this.processing_time_estimate.split("-").index(1).split(" ").index(0).number()

      root = this.merge({
        "order_status": "processed",
        "processed_at": now().ts_format("2006-01-02T15:04:05.000Z"),
        "estimated_fulfillment": "TBD - calculated based on processing tier",
        "processing_time_hours": $max_hours
      })

meta:
  tags: [ example ]
  mcp:
    enabled: true
    description: "Process orders with validation, inventory check, and tiered routing (with mocks for testing)"
    properties:
      - name: order_id
        type: string
        description: "Unique order identifier"
        required: true
      - name: product_id
        type: string
        description: "Product ID (try: widget-001, widget-premium, widget-limited)"
        required: true
      - name: quantity
        type: number
        description: "Quantity to order"
        required: true
      - name: total
        type: number
        description: "Order total in dollars"
        required: true
      - name: customer_tier
        type: string
        description: "Customer tier (optional: vip, standard)"
        required: false

For the input {"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}, the workflow produces:

{
  "assigned_rep": "vip-team@company.com",
  "available_quantity": 100,
  "customer_tier": "vip",
  "estimated_fulfillment": "TBD - calculated based on processing tier",
  "inventory_check": "passed",
  "order_id": "ORD001",
  "order_status": "processed",
  "perks": [
    "expedited_shipping",
    "white_glove_service"
  ],
  "priority_score": 90,
  "processed_at": "2025-09-16T09:05:29.138Z",
  "processing_tier": "vip",
  "processing_time_estimate": "1-2 hours",
  "processing_time_hours": 2,
  "product_id": "widget-001",
  "product_name": "Standard Widget",
  "quantity": 5,
  "total": 250
}

Notice how the workflow:

  1. Preserves original input: order_id, product_id, quantity, total, and customer_tier pass through unchanged.

  2. Adds inventory data: available_quantity, product_name, and inventory_check status from the mock lookup.

  3. Routes by customer tier: Since customer_tier is vip, it gets VIP processing with special perks and priority.

  4. Enriches with processing metadata: assigned_rep, priority_score, processing_tier, and time estimates.

  5. Finalizes with timestamps: order_status, processed_at, and calculated processing_time_hours.

Next steps