This guide walks you through integrating the Agent-to-Agent (A2A) protocol with your Tyler agents, enabling multi-agent coordination and delegation across different platforms.

Prerequisites

Before starting, ensure you have:
  • Tyler installed and working
  • Basic understanding of Tyler agents
  • Python 3.13+ environment

Installation

Install the A2A SDK dependency:
pip install a2a-sdk

# For server functionality (optional):
pip install uvicorn fastapi
The A2A integration is optional. Tyler works perfectly without it, but A2A enables powerful multi-agent coordination capabilities.

Part 1: Connecting to Remote A2A Agents (Client Mode)

Step 1: Create an A2A Adapter

from tyler.a2a import A2AAdapter

# Create adapter for connecting to remote agents
adapter = A2AAdapter()

Step 2: Connect to Remote Agents

import asyncio

async def connect_to_agents():
    # Connect to a research specialist agent
    research_connected = await adapter.connect(
        name="research_agent",
        base_url="https://research-ai.example.com"
    )
    
    # Connect to an analysis specialist agent
    analysis_connected = await adapter.connect(
        name="analysis_agent", 
        base_url="https://analysis-ai.example.com"
    )
    
    if research_connected and analysis_connected:
        print("✅ Connected to both remote agents")
    else:
        print("❌ Failed to connect to one or more agents")

asyncio.run(connect_to_agents())

Step 3: Create Tyler Agent with Delegation Tools

from tyler import Agent

# Get delegation tools from connected agents
delegation_tools = adapter.get_tools_for_agent()

# Create Tyler agent that can delegate tasks
coordinator = Agent(
    name="Project Coordinator",
    model_name="gpt-4o",
    purpose="""You coordinate complex projects by delegating specialized tasks to remote agents.
    
    You have access to:
    - Research agent: For web research, fact-checking, and information gathering
    - Analysis agent: For data analysis, insights, and strategic recommendations
    
    Use these agents strategically to break down complex requests.""",
    tools=delegation_tools
)

Step 4: Use the Coordinating Agent

from tyler import Thread, Message

async def coordinate_project():
    # Create a complex request
    thread = Thread()
    thread.add_message(Message(
        role="user",
        content="""I need a comprehensive market analysis for electric vehicle charging stations.
        
        Please:
        1. Research current market size, key players, and growth trends
        2. Analyze competitive landscape and identify opportunities  
        3. Provide strategic recommendations for market entry
        """
    ))
    
    # The coordinator will automatically delegate to appropriate agents
    result = await coordinator.go(thread)
    
    # Print the coordinated response
    for message in result.thread.messages:
        if message.role == "assistant":
            print(f"Coordinator: {message.content}")

asyncio.run(coordinate_project())

Part 2: Exposing Tyler Agents via A2A (Server Mode)

Step 1: Create a Specialized Tyler Agent

from tyler import Agent
from lye import WEB_TOOLS, FILES_TOOLS

# Create a specialized research agent
research_agent = Agent(
    name="Research Specialist",
    model_name="gpt-4o-mini",
    purpose="""You are an expert research specialist with web search and document processing capabilities.
    
    Your expertise includes:
    - Comprehensive web research and fact-finding
    - Academic and market research
    - Document analysis and summarization
    - Competitive intelligence gathering
    
    Always provide well-sourced, accurate information.""",
    tools=[*WEB_TOOLS, *FILES_TOOLS]
)

Step 2: Create A2A Server

from tyler.a2a import A2AServer

# Create server to expose the agent
server = A2AServer(
    tyler_agent=research_agent,
    agent_card={
        "name": "Tyler Research Specialist",
        "version": "1.0.0",
        "description": "AI research specialist with web search and document processing",
        "capabilities": [
            "web_research",
            "fact_checking", 
            "document_analysis",
            "market_research",
            "competitive_intelligence"
        ],
        "contact": {
            "name": "Your Organization",
            "email": "ai-support@yourorg.com"
        },
        "vendor": "Tyler Framework"
    }
)

Step 3: Start the A2A Server

async def start_research_service():
    print("🚀 Starting Tyler Research Specialist A2A Server...")
    print("📡 Other agents can connect at: http://localhost:8000")
    
    # Start the server (this will run indefinitely)
    await server.start_server(host="0.0.0.0", port=8000)

# Run the server
if __name__ == "__main__":
    try:
        asyncio.run(start_research_service())
    except KeyboardInterrupt:
        print("\n🛑 Server stopped by user")

Part 3: Advanced Multi-Agent Patterns

Creating Agent Networks

import asyncio
from typing import Dict, Any

class AgentNetwork:
    """Manages a network of specialized A2A agents."""
    
    def __init__(self):
        self.adapter = A2AAdapter()
        self.agents: Dict[str, Any] = {}
    
    async def add_agent(self, name: str, base_url: str, specialization: str):
        """Add a specialized agent to the network."""
        connected = await self.adapter.connect(name, base_url)
        if connected:
            self.agents[name] = {
                "base_url": base_url,
                "specialization": specialization,
                "status": "connected"
            }
            print(f"✅ Added {name} ({specialization})")
        else:
            print(f"❌ Failed to connect to {name}")
    
    async def create_coordinator(self) -> Agent:
        """Create a coordinator agent for this network."""
        tools = self.adapter.get_tools_for_agent()
        
        # Build purpose description based on connected agents
        specializations = [agent["specialization"] for agent in self.agents.values()]
        purpose = f"""You coordinate complex tasks across a network of specialized agents.
        
        Available specialists: {', '.join(specializations)}
        
        Break down complex requests and delegate appropriately to leverage each agent's expertise."""
        
        return Agent(
            name="Network Coordinator",
            model_name="gpt-4o",
            purpose=purpose,
            tools=tools
        )
    
    async def health_check(self):
        """Check the health of all connected agents."""
        for name, info in self.agents.items():
            try:
                status = await self.adapter.get_agent_status(name)
                if status:
                    print(f"✅ {name}: healthy ({status.get('active_tasks', 0)} active tasks)")
                else:
                    print(f"⚠️ {name}: unreachable")
            except Exception as e:
                print(f"❌ {name}: error - {e}")

# Example usage
async def setup_enterprise_network():
    network = AgentNetwork()
    
    # Add specialized agents
    await network.add_agent("research", "https://research.corp.com", "Research & Intelligence")
    await network.add_agent("analysis", "https://analytics.corp.com", "Data Analysis")
    await network.add_agent("compliance", "https://compliance.corp.com", "Legal & Compliance")
    
    # Create coordinator
    coordinator = await network.create_coordinator()
    
    # Health check
    await network.health_check()
    
    return coordinator, network

Task Streaming and Monitoring

from tyler.models.execution import EventType

async def stream_coordinated_task():
    """Example of streaming responses from coordinated agents."""
    
    thread = Thread()
    thread.add_message(Message(
        role="user",
        content="Create a comprehensive business plan for a new AI startup"
    ))
    
    print("🎯 Starting coordinated task execution...")
    print("=" * 50)
    
    async for update in coordinator.go(thread, stream=True):
        if update.type == EventType.LLM_STREAM_CHUNK:
            print(update.data.get("content_chunk", ""), end="", flush=True)
        elif update.type == EventType.TOOL_SELECTED:
            tool_name = update.data.get("tool_name", "")
            if "delegate_to_" in tool_name:
                agent_name = tool_name.replace("delegate_to_", "")
                print(f"\n\n🤝 Delegating to {agent_name}...")
            print()
        elif update.type == EventType.EXECUTION_COMPLETE:
            print("\n\n✅ Task coordination complete!")

Part 4: Production Considerations

Security and Authentication

# Secure A2A connections
async def secure_connection():
    adapter = A2AAdapter()
    
    # Connect with authentication
    await adapter.connect(
        name="secure_agent",
        base_url="https://secure-agents.company.com",
        headers={
            "Authorization": "Bearer your-secure-token",
            "X-API-Version": "1.0"
        }
    )

Error Handling and Resilience

from typing import Optional

class ResilientA2AAdapter:
    """A2A adapter with built-in resilience patterns."""
    
    def __init__(self):
        self.adapter = A2AAdapter()
        self.fallback_agents: Dict[str, str] = {}
    
    async def connect_with_fallback(self, primary_url: str, fallback_url: Optional[str] = None):
        """Connect with automatic fallback."""
        try:
            success = await self.adapter.connect("primary", primary_url)
            if success:
                return True
        except Exception as e:
            print(f"Primary connection failed: {e}")
        
        if fallback_url:
            try:
                return await self.adapter.connect("fallback", fallback_url)
            except Exception as e:
                print(f"Fallback connection failed: {e}")
        
        return False
    
    async def robust_delegation(self, task: str, max_retries: int = 3):
        """Delegate with retry logic."""
        for attempt in range(max_retries):
            try:
                # Attempt task delegation
                # Implementation would include actual delegation logic
                return await self._execute_task(task)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

Monitoring and Metrics

import time
from dataclasses import dataclass
from typing import List

@dataclass
class TaskMetrics:
    task_id: str
    agent_name: str
    start_time: float
    end_time: Optional[float] = None
    status: str = "running"
    error: Optional[str] = None

class A2AMonitor:
    """Monitor A2A agent performance and health."""
    
    def __init__(self):
        self.metrics: List[TaskMetrics] = []
    
    def start_task(self, task_id: str, agent_name: str):
        """Record task start."""
        self.metrics.append(TaskMetrics(
            task_id=task_id,
            agent_name=agent_name,
            start_time=time.time()
        ))
    
    def complete_task(self, task_id: str, status: str = "completed", error: Optional[str] = None):
        """Record task completion."""
        for metric in self.metrics:
            if metric.task_id == task_id:
                metric.end_time = time.time()
                metric.status = status
                metric.error = error
                break
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Generate performance report."""
        completed_tasks = [m for m in self.metrics if m.end_time]
        
        if not completed_tasks:
            return {"message": "No completed tasks"}
        
        avg_duration = sum(
            m.end_time - m.start_time for m in completed_tasks
        ) / len(completed_tasks)
        
        success_rate = sum(
            1 for m in completed_tasks if m.status == "completed"
        ) / len(completed_tasks)
        
        return {
            "total_tasks": len(completed_tasks),
            "avg_duration": avg_duration,
            "success_rate": success_rate,
            "agent_performance": self._agent_breakdown()
        }
    
    def _agent_breakdown(self) -> Dict[str, Dict[str, Any]]:
        """Break down performance by agent."""
        agent_stats = {}
        for metric in self.metrics:
            if metric.agent_name not in agent_stats:
                agent_stats[metric.agent_name] = {
                    "total_tasks": 0,
                    "completed_tasks": 0,
                    "failed_tasks": 0,
                    "avg_duration": 0
                }
            
            agent_stats[metric.agent_name]["total_tasks"] += 1
            
            if metric.end_time:
                if metric.status == "completed":
                    agent_stats[metric.agent_name]["completed_tasks"] += 1
                else:
                    agent_stats[metric.agent_name]["failed_tasks"] += 1
        
        return agent_stats

Part 5: Examples and Use Cases

Research Workflow Automation

async def automated_research_pipeline():
    """Example: Automated research pipeline using multiple A2A agents."""
    
    # Set up research network
    adapter = A2AAdapter()
    await adapter.connect("researcher", "https://research-ai.example.com")
    await adapter.connect("analyzer", "https://analysis-ai.example.com") 
    await adapter.connect("writer", "https://writing-ai.example.com")
    
    # Create pipeline coordinator
    pipeline = Agent(
        name="Research Pipeline Coordinator",
        tools=adapter.get_tools_for_agent(),
        purpose="""Coordinate automated research workflows:
        1. Delegate initial research to specialist agents
        2. Have findings analyzed for insights
        3. Generate final reports with recommendations"""
    )
    
    # Execute research pipeline
    topics = ["AI market trends", "Blockchain adoption", "Green energy investments"]
    
    for topic in topics:
        thread = Thread()
        thread.add_message(Message(
            role="user",
            content=f"Execute full research pipeline for: {topic}"
        ))
        
        result = await pipeline.go(thread)
        print(f"✅ Completed research pipeline for: {topic}")

Customer Support Agent Network

async def customer_support_network():
    """Example: Customer support using specialized A2A agents."""
    
    adapter = A2AAdapter()
    
    # Connect to specialized support agents
    await adapter.connect("technical_support", "https://tech-support.company.com")
    await adapter.connect("billing_support", "https://billing-support.company.com")
    await adapter.connect("product_expert", "https://product-help.company.com")
    
    # Create support coordinator
    support_coordinator = Agent(
        name="Customer Support Coordinator",
        tools=adapter.get_tools_for_agent(),
        purpose="""Route customer inquiries to specialized support agents:
        - Technical issues → technical_support agent
        - Billing questions → billing_support agent  
        - Product features → product_expert agent
        
        Always ensure customer gets expert help for their specific need."""
    )
    
    return support_coordinator

Troubleshooting

Common Issues

Best Practices Summary

A2A Integration Best Practices

Connection Management
  • Always handle connection failures gracefully
  • Implement health checks for connected agents
  • Use connection pooling for high-throughput scenarios
Task Delegation
  • Be specific in task descriptions for remote agents
  • Monitor task progress and handle timeouts
  • Implement fallback strategies for critical workflows
Security
  • Always use HTTPS in production
  • Implement proper authentication and authorization
  • Validate agent cards and capabilities
Performance
  • Monitor delegation latency and success rates
  • Use streaming for long-running tasks
  • Implement caching where appropriate
Error Handling
  • Plan for network failures and agent unavailability
  • Log delegation attempts and outcomes
  • Provide meaningful error messages to users

Next steps

Now that you have A2A integration working, explore these advanced topics: