Docs Cloud Agentic AI Agents Agent Integrations Pipeline to Agent Pipeline Integration Patterns Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Build Redpanda Connect pipelines that invoke agents for automated, event-driven processing. Pipelines use the a2a_message processor to call agents for each event in a stream when you need AI reasoning, classification, or enrichment at scale. The Agentic Data Plane is supported on BYOC clusters running with AWS and Redpanda version 25.3 and later. After reading this page, you will be able to: Identify when pipelines should call agents for stream processing Design event-driven agent invocation using the a2a_message processor Implement streaming enrichment with AI-generated fields This page focuses on pipelines calling agents (pipeline-initiated integration). For agents invoking MCP tools, see Agent needs capabilities. For external applications calling agents, see External system calls agent. How pipelines invoke agents Pipelines use the a2a_message processor to invoke agents for each event in a stream. The processor uses the A2A protocol to discover and communicate with agents. When the a2a_message processor receives an event, it sends the event data to the specified agent along with any prompt you provide. The agent processes the event using its reasoning capabilities and returns a response. The processor then adds the agent’s response to the event for further processing or output. The pipeline determines when to invoke agents based on events, not agent reasoning. When to use this pattern Use the a2a_message processor when pipelines need AI reasoning for every event in a stream. The a2a_message processor is appropriate when: Every event needs AI analysis: Each message requires reasoning, classification, or decision-making. You need streaming enrichment: Add AI-generated fields to events at scale. Processing is fully automated: No human in the loop, event-driven workflows. Batch latency is acceptable: Agent reasoning time is tolerable for your use case. You’re handling high-volume streams: Processing thousands or millions of events. Use cases Use the a2a_message processor in pipelines for these common patterns. Event-driven agent invocation Invoke agents automatically for each event: # Event-driven agent invocation pipeline # Invokes an agent for each event in a stream input: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topics: [transactions] consumer_group: fraud-detector tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" pipeline: processors: - a2a_message: agent_card_url: "${AGENT_CARD_URL}" prompt: "Analyze this transaction: ${!content()}" output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: fraud-alerts tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location. Use case: Real-time fraud detection on every transaction. Streaming data enrichment Add AI-generated metadata to events: processors: - branch: request_map: 'root = this.text' processors: - a2a_message: agent_card_url: "${AGENT_CARD_URL}" result_map: 'root.sentiment = content()' Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location. Use case: Add sentiment scores to every customer review in real-time. Asynchronous workflows Process events in the background: input: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topics: [daily-reports] consumer_group: report-analyzer tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" pipeline: processors: - a2a_message: agent_card_url: "${AGENT_CARD_URL}" prompt: "Summarize this report: ${!content()}" Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location. Use case: Nightly batch summarization of reports where latency is acceptable. Multi-agent pipeline orchestration Chain multiple agents in sequence: processors: - a2a_message: agent_card_url: "${TRANSLATOR_AGENT_URL}" - a2a_message: agent_card_url: "${SENTIMENT_AGENT_URL}" - a2a_message: agent_card_url: "${ROUTER_AGENT_URL}" Replace the agent URL variables with your actual agent card URLs. See Agent card location. Use case: Translate feedback, analyze sentiment, then route to appropriate team. Agent as transformation node Use agent reasoning for complex transformations: processors: - a2a_message: agent_card_url: "${AGENT_CARD_URL}" prompt: "Convert to SQL: ${!this.natural_language_query}" Replace AGENT_CARD_URL with your actual agent card URL. See Agent card location. Use case: Convert natural language queries to SQL for downstream processing. When not to use this pattern Do not use the a2a_message processor when: Users need to interact with agents interactively. The transformation is simple and does not require AI reasoning. Agents need to dynamically decide what data to fetch based on context. For a detailed comparison between pipeline-initiated and agent-initiated integration patterns, see Pattern comparison. Example: Real-time fraud detection This example shows a complete pipeline that analyzes every transaction with an agent. Pipeline configuration # Fraud detection pipeline with score-based routing # Analyzes every transaction and routes to different topics based on fraud score input: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topics: [transactions] consumer_group: fraud-detector tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" pipeline: processors: - branch: request_map: | root.transaction_id = this.id root.amount = this.amount root.merchant = this.merchant root.user_id = this.user_id processors: - a2a_message: agent_card_url: "${AGENT_CARD_URL}" prompt: | Analyze this transaction for fraud: Amount: ${! json("amount") } Merchant: ${! json("merchant") } User: ${! json("user_id") } Return JSON: { "fraud_score": 0-100, "reason": "explanation", "recommend_block": true/false } result_map: | root = this root.fraud_analysis = content().parse_json().catch({}) - mapping: | root = this meta fraud_score = this.fraud_analysis.fraud_score output: switch: cases: - check: 'meta("fraud_score") >= 80' output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: fraud-alerts-high tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" - check: 'meta("fraud_score") >= 50' output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: fraud-alerts-medium tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" - output: redpanda: seed_brokers: ["${REDPANDA_BROKERS}"] topic: transactions-cleared tls: enabled: true sasl: - mechanism: SCRAM-SHA-256 username: "${REDPANDA_USERNAME}" password: "${REDPANDA_PASSWORD}" Replace AGENT_CARD_URL with your agent card URL. See Agent card location. This pipeline: Consumes every transaction from the transactions topic. Sends each transaction to the fraud detection agent using a2a_message. Routes transactions to different topics based on fraud score. Runs continuously, analyzing every transaction in real-time. Next steps MCP Tool Patterns Integration Patterns Overview A2A Protocol Processors Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Integration Patterns A2A Protocol