Pipeline Integration Patterns

Build Redpanda Connect pipelines that invoke agents for automated, event-driven processing. Pipelines use the a2a_message processor to call agents for each event in a stream when you need AI reasoning, classification, or enrichment at scale.

The Agentic Data Plane is supported on BYOC clusters running with AWS and Redpanda version 25.3 and later.

After reading this page, you will be able to:

  • Identify when pipelines should call agents for stream processing

  • Design event-driven agent invocation using the a2a_message processor

  • Implement streaming enrichment with AI-generated fields

This page focuses on pipelines calling agents (pipeline-initiated integration). For agents invoking MCP tools, see Agent needs capabilities. For external applications calling agents, see External system calls agent.

How pipelines invoke agents

Pipelines use the a2a_message processor to invoke agents for each event in a stream. The processor uses the A2A protocol to discover and communicate with agents.

When the a2a_message processor receives an event, it sends the event data to the specified agent along with any prompt you provide. The agent processes the event using its reasoning capabilities and returns a response. The processor then adds the agent’s response to the event for further processing or output.

The pipeline determines when to invoke agents based on events, not agent reasoning.

When to use this pattern

Use the a2a_message processor when pipelines need AI reasoning for every event in a stream.

The a2a_message processor is appropriate when:

  • Every event needs AI analysis: Each message requires reasoning, classification, or decision-making.

  • You need streaming enrichment: Add AI-generated fields to events at scale.

  • Processing is fully automated: No human in the loop, event-driven workflows.

  • Batch latency is acceptable: Agent reasoning time is tolerable for your use case.

  • You’re handling high-volume streams: Processing thousands or millions of events.

Use cases

Use the a2a_message processor in pipelines for these common patterns.

Event-driven agent invocation

Invoke agents automatically for each event:

# Event-driven agent invocation pipeline
# Invokes an agent for each event in a stream
input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [transactions]
    consumer_group: fraud-detector
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - a2a_message:
        agent_card_url: "${AGENT_CARD_URL}"
        prompt: "Analyze this transaction: ${!content()}"

output:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topic: fraud-alerts
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location.

Use case: Real-time fraud detection on every transaction.

Streaming data enrichment

Add AI-generated metadata to events:

processors:
  - branch:
      request_map: 'root = this.text'
      processors:
        - a2a_message:
            agent_card_url: "${AGENT_CARD_URL}"
      result_map: 'root.sentiment = content()'

Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location.

Use case: Add sentiment scores to every customer review in real-time.

Asynchronous workflows

Process events in the background:

input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [daily-reports]
    consumer_group: report-analyzer
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - a2a_message:
        agent_card_url: "${AGENT_CARD_URL}"
        prompt: "Summarize this report: ${!content()}"

Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location.

Use case: Nightly batch summarization of reports where latency is acceptable.

Multi-agent pipeline orchestration

Chain multiple agents in sequence:

processors:
  - a2a_message:
      agent_card_url: "${TRANSLATOR_AGENT_URL}"
  - a2a_message:
      agent_card_url: "${SENTIMENT_AGENT_URL}"
  - a2a_message:
      agent_card_url: "${ROUTER_AGENT_URL}"

Replace the agent URL variables with your actual agent card URLs. See Agent card location.

Use case: Translate feedback, analyze sentiment, then route to appropriate team.

Agent as transformation node

Use agent reasoning for complex transformations:

processors:
  - a2a_message:
      agent_card_url: "${AGENT_CARD_URL}"
      prompt: "Convert to SQL: ${!this.natural_language_query}"

Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location.

Use case: Convert natural language queries to SQL for downstream processing.

When not to use this pattern

Do not use the a2a_message processor when:

  • Users need to interact with agents interactively.

  • The transformation is simple and does not require AI reasoning.

  • Agents need to dynamically decide what data to fetch based on context.

For a detailed comparison between pipeline-initiated and agent-initiated integration patterns, see Pattern comparison.

Example: Real-time fraud detection

This example shows a complete pipeline that analyzes every transaction with an agent.

Pipeline configuration

# Fraud detection pipeline with score-based routing
# Analyzes every transaction and routes to different topics based on fraud score
input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [transactions]
    consumer_group: fraud-detector
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - branch:
        request_map: |
          root.transaction_id = this.id
          root.amount = this.amount
          root.merchant = this.merchant
          root.user_id = this.user_id
        processors:
          - a2a_message:
              agent_card_url: "${AGENT_CARD_URL}"
              prompt: |
                Analyze this transaction for fraud:
                Amount: ${! json("amount") }
                Merchant: ${! json("merchant") }
                User: ${! json("user_id") }

                Return JSON: { "fraud_score": 0-100, "reason": "explanation", "recommend_block": true/false }
        result_map: |
          root = this
          root.fraud_analysis = content().parse_json().catch({})

    - mapping: |
        root = this
        meta fraud_score = this.fraud_analysis.fraud_score

output:
  switch:
    cases:
      - check: 'meta("fraud_score") >= 80'
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: fraud-alerts-high
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"
      - check: 'meta("fraud_score") >= 50'
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: fraud-alerts-medium
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"
      - output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: transactions-cleared
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"

Replace AGENT_CARD_URL with your agent card URL. See Agent card location.

This pipeline:

  • Consumes every transaction from the transactions topic.

  • Sends each transaction to the fraud detection agent using a2a_message.

  • Routes transactions to different topics based on fraud score.

  • Runs continuously, analyzing every transaction in real-time.