# Integrate with Redpanda Pipelines

> For the complete documentation index, see [llms.txt](https://docs.redpanda.com/llms.txt). Component-specific: [agentic-data-plane-full.txt](https://docs.redpanda.com/agentic-data-plane-full.txt)

---
title: Integrate with Redpanda Pipelines
latest-operator-version: v26.1.5
latest-console-tag: v3.7.4
latest-connect-version: 4.96.1
latest-redpanda-tag: v26.1.10
docname: pipeline-integration-patterns
page-component-name: agentic-data-plane
page-version: master
page-component-version: master
page-component-title: Agentic Data Plane
page-relative-src-path: pipeline-integration-patterns.adoc
page-edit-url: https://github.com/redpanda-data/adp-docs/edit/main/modules/connect/pages/pipeline-integration-patterns.adoc
description: Build Redpanda Connect pipelines that invoke agents for event-driven processing and streaming enrichment.
page-topic-type: best-practices
personas: agent_builder, platform_engineer
learning-objective-1: Identify when pipelines should call agents for stream processing
learning-objective-2: Design event-driven agent invocation using the <code>a2a_message</code> processor
learning-objective-3: Implement streaming enrichment with AI-generated fields
page-git-created-date: "2026-05-28"
page-git-modified-date: "2026-06-10"
---

<!-- Source: https://docs.redpanda.com/agentic-data-plane/connect/pipeline-integration-patterns.md -->

Build Redpanda Connect pipelines that invoke agents for automated, event-driven processing. Pipelines use the `a2a_message` processor to call agents for each event in a stream when you need AI reasoning, classification, or enrichment at scale.

After reading this page, you will be able to:

-   Identify when pipelines should call agents for stream processing

-   Design event-driven agent invocation using the `a2a_message` processor

-   Implement streaming enrichment with AI-generated fields


This page focuses on pipelines calling agents (pipeline-initiated integration). For agents invoking MCP tools, see [Agent needs capabilities](https://docs.redpanda.com/agentic-data-plane/connect/integration-overview/#agent-needs-capabilities). For external applications calling agents, see [External system calls agent](https://docs.redpanda.com/agentic-data-plane/connect/integration-overview/#external-system-calls-agent).

## [](#how-pipelines-invoke-agents)How pipelines invoke agents

Pipelines use the [`a2a_message`](https://docs.redpanda.com/cloud-data-platform/develop/connect/components/processors/a2a_message/) processor to invoke agents for each event in a stream. The processor uses the [A2A protocol](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/) to discover and communicate with agents.

When the `a2a_message` processor receives an event, it sends the event data to the specified agent along with any prompt you provide. The agent processes the event using its reasoning capabilities and returns a response. The processor then adds the agent’s response to the event for further processing or output.

The pipeline determines when to invoke agents based on events, not agent reasoning.

## [](#when-to-use-this-pattern)When to use this pattern

Use the `a2a_message` processor when pipelines need AI reasoning for every event in a stream.

The `a2a_message` processor is appropriate when:

-   **Every event needs AI analysis:** Each message requires reasoning, classification, or decision-making.

-   **You need streaming enrichment:** Add AI-generated fields to events at scale.

-   **Processing is fully automated:** No human in the loop, event-driven workflows.

-   **Batch latency is acceptable:** Agent reasoning time is tolerable for your use case.

-   **You’re handling high-volume streams:** Processing thousands or millions of events.


## [](#use-cases)Use cases

Use the `a2a_message` processor in pipelines for these common patterns.

### [](#event-driven-agent-invocation)Event-driven agent invocation

Use this pattern for real-time fraud detection on every transaction.

Invoke agents automatically for each event:

```yaml
# Event-driven agent invocation pipeline
# Invokes an agent for each event in a stream
input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [transactions]
    consumer_group: fraud-detector
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - a2a_message:
        agent_card_url: "${AGENT_CARD_URL}"
        prompt: "Analyze this transaction: ${!content()}"

output:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topic: fraud-alerts
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"
```

Replace `AGENT_CARD_URL` with your actual agent card URL. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

### [](#streaming-data-enrichment)Streaming data enrichment

Use this pattern to add sentiment scores to every customer review in real time.

Add AI-generated metadata to events:

```yaml
processors:
  - branch:
      request_map: 'root = this.text'
      processors:
        - a2a_message:
            agent_card_url: "${AGENT_CARD_URL}"
      result_map: 'root.sentiment = content()'
```

Replace `AGENT_CARD_URL` with your actual agent card URL. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

### [](#asynchronous-workflows)Asynchronous workflows

Use this pattern for nightly batch summarization of reports where latency is acceptable.

Process events in the background:

```yaml
input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [daily-reports]
    consumer_group: report-analyzer
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - a2a_message:
        agent_card_url: "${AGENT_CARD_URL}"
        prompt: "Summarize this report: ${!content()}"
```

Replace `AGENT_CARD_URL` with your actual agent card URL. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

### [](#multi-agent-pipeline-orchestration)Multi-agent pipeline orchestration

Use this pattern to translate feedback, analyze sentiment, and route it to the appropriate team.

Chain multiple agents in sequence:

```yaml
processors:
  - a2a_message:
      agent_card_url: "${TRANSLATOR_AGENT_URL}"
  - a2a_message:
      agent_card_url: "${SENTIMENT_AGENT_URL}"
  - a2a_message:
      agent_card_url: "${ROUTER_AGENT_URL}"
```

Replace the agent URL variables with your actual agent card URLs. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

### [](#agent-as-transformation-node)Agent as transformation node

Use this pattern to convert natural language queries to SQL for downstream processing.

Use agent reasoning for complex transformations:

```yaml
processors:
  - a2a_message:
      agent_card_url: "${AGENT_CARD_URL}"
      prompt: "Convert to SQL: ${!this.natural_language_query}"
```

Replace `AGENT_CARD_URL` with your actual agent card URL. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

## [](#when-not-to-use-this-pattern)When not to use this pattern

Do not use the `a2a_message` processor when:

-   Users need to interact with agents interactively.

-   The transformation is simple and does not require AI reasoning.

-   Agents need to dynamically decide what data to fetch based on context.


For a detailed comparison between pipeline-initiated and agent-initiated integration patterns, see [Pattern comparison](https://docs.redpanda.com/agentic-data-plane/connect/integration-overview/#pattern-comparison).

## [](#example-real-time-fraud-detection)Example: Real-time fraud detection

This example shows a complete pipeline that analyzes every transaction with an agent.

### [](#pipeline-configuration)Pipeline configuration

```yaml
# Fraud detection pipeline with score-based routing
# Analyzes every transaction and routes to different topics based on fraud score
input:
  redpanda:
    seed_brokers: ["${REDPANDA_BROKERS}"]
    topics: [transactions]
    consumer_group: fraud-detector
    tls:
      enabled: true
    sasl:
      - mechanism: SCRAM-SHA-256
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"

pipeline:
  processors:
    - branch:
        request_map: |
          root.transaction_id = this.id
          root.amount = this.amount
          root.merchant = this.merchant
          root.user_id = this.user_id
        processors:
          - a2a_message:
              agent_card_url: "${AGENT_CARD_URL}"
              prompt: |
                Analyze this transaction for fraud:
                Amount: ${! json("amount") }
                Merchant: ${! json("merchant") }
                User: ${! json("user_id") }

                Return JSON: { "fraud_score": 0-100, "reason": "explanation", "recommend_block": true/false }
        result_map: |
          root = this
          root.fraud_analysis = content().parse_json().catch({})

    - mapping: |
        root = this
        meta fraud_score = this.fraud_analysis.fraud_score

output:
  switch:
    cases:
      - check: 'meta("fraud_score") >= 80'
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: fraud-alerts-high
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"
      - check: 'meta("fraud_score") >= 50'
        output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: fraud-alerts-medium
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"
      - output:
          redpanda:
            seed_brokers: ["${REDPANDA_BROKERS}"]
            topic: transactions-cleared
            tls:
              enabled: true
            sasl:
              - mechanism: SCRAM-SHA-256
                username: "${REDPANDA_USERNAME}"
                password: "${REDPANDA_PASSWORD}"
```

Replace `AGENT_CARD_URL` with your agent card URL. See [Agent card location](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/#agent-card-location).

This pipeline:

-   Consumes every transaction from the `transactions` topic.

-   Sends each transaction to the fraud detection agent using `a2a_message`.

-   Routes transactions to different topics based on fraud score.

-   Runs continuously, analyzing every transaction in real time.


## [](#next-steps)Next steps

-   [How MCP Servers Work](https://docs.redpanda.com/agentic-data-plane/connect/mcp-overview/)

-   [Choose an Integration Pattern](https://docs.redpanda.com/agentic-data-plane/connect/integration-overview/)

-   [Connect Agents with A2A](https://docs.redpanda.com/agentic-data-plane/connect/a2a-concepts/)