Agent Patterns
Tool selection pattern
Dynamically select tools based on the task:Copy
from typing import List, Dict
from lye import WEB_TOOLS, FILES_TOOLS, IMAGE_TOOLS, AUDIO_TOOLS
class AdaptiveAgent:
def __init__(self):
self.tool_sets = {
"research": WEB_TOOLS,
"files": FILES_TOOLS,
"visual": IMAGE_TOOLS,
"audio": AUDIO_TOOLS
}
async def create_agent_for_task(self, task: str) -> Agent:
"""Create an agent with appropriate tools for the task"""
# Analyze task to determine needed tools
tools_needed = []
task_lower = task.lower()
if any(word in task_lower for word in ["search", "research", "find"]):
tools_needed.extend(self.tool_sets["research"])
if any(word in task_lower for word in ["save", "write", "read", "file"]):
tools_needed.extend(self.tool_sets["files"])
if any(word in task_lower for word in ["image", "picture", "visual", "analyze"]):
tools_needed.extend(self.tool_sets["visual"])
if any(word in task_lower for word in ["audio", "speech", "transcribe"]):
tools_needed.extend(self.tool_sets["audio"])
# Create agent with selected tools
return Agent(
name="adaptive-agent",
model_name="gpt-4",
purpose=f"To help with: {task}",
tools=tools_needed
)
# Usage
adapter = AdaptiveAgent()
agent = await adapter.create_agent_for_task("Research AI and create a visual report")
Validation pattern
Ensure agent outputs meet requirements:Copy
from typing import Callable, Any
import json
class ValidatedAgent:
def __init__(self, agent: Agent, validators: Dict[str, Callable]):
self.agent = agent
self.validators = validators
async def go_with_validation(self, thread: Thread) -> tuple[Thread, List[Message]]:
"""Process thread with output validation"""
# Process normally
result = await self.agent.go(thread)
result_thread = result.thread
messages = result.new_messages
# Validate outputs
for message in messages:
if message.role == "assistant":
validation_errors = self.validate_content(message.content)
if validation_errors:
# Add correction request
thread.add_message(Message(
role="system",
content=f"Please correct these issues: {', '.join(validation_errors)}"
))
# Retry
return await self.agent.go(thread)
return result_thread, messages
def validate_content(self, content: str) -> List[str]:
"""Run validators on content"""
errors = []
for name, validator in self.validators.items():
try:
if not validator(content):
errors.append(f"Failed {name} validation")
except Exception as e:
errors.append(f"Validation error in {name}: {str(e)}")
return errors
# Example validators
def has_json_output(content: str) -> bool:
"""Check if response contains valid JSON"""
try:
# Find JSON in content
start = content.find('{')
end = content.rfind('}') + 1
if start != -1 and end > start:
json.loads(content[start:end])
return True
except:
pass
return False
def meets_length_requirement(content: str, min_length: int = 100) -> bool:
"""Check if response meets minimum length"""
return len(content) >= min_length
# Usage
agent = Agent(name="reporter", model_name="gpt-4", purpose="To create reports")
validated_agent = ValidatedAgent(
agent,
validators={
"json_output": has_json_output,
"length": lambda c: meets_length_requirement(c, 200)
}
)
Persistence Patterns
Context window management
Manage long conversations efficiently:Copy
from narrator import Thread, Message, ThreadStore
class ContextManager:
def __init__(self, max_messages: int = 50, summary_threshold: int = 100):
self.max_messages = max_messages
self.summary_threshold = summary_threshold
async def manage_context(self, thread: Thread, agent: Agent) -> Thread:
"""Manage context window for long conversations"""
if len(thread.messages) > self.summary_threshold:
# Create summary of older messages
summary = await self.summarize_messages(
thread.messages[:-self.max_messages],
agent
)
# Create new thread with summary
new_thread = Thread(id=thread.id, metadata=thread.metadata)
# Add summary as system message
new_thread.add_message(Message(
role="system",
content=f"Previous conversation summary: {summary}"
))
# Add recent messages
for msg in thread.messages[-self.max_messages:]:
new_thread.add_message(msg)
return new_thread
return thread
async def summarize_messages(self, messages: List[Message], agent: Agent) -> str:
"""Create summary of messages"""
# Format messages for summarization
conversation = "\n".join([
f"{msg.role}: {msg.content[:200]}..."
for msg in messages
])
# Create summary request
summary_thread = Thread()
summary_thread.add_message(Message(
role="user",
content=f"Summarize this conversation concisely:\n\n{conversation}"
))
# Get summary
result = await agent.go(summary_thread)
return result.new_messages[-1].content if result.new_messages else "No summary available"
Branching conversations
Support multiple conversation branches:Copy
class ConversationTree:
def __init__(self, thread_store: ThreadStore):
self.thread_store = thread_store
self.branches = {} # branch_id -> parent_thread_id
async def create_branch(self, parent_thread_id: str, branch_point: int) -> Thread:
"""Create a new branch from a conversation"""
# Load parent thread
parent_thread = await self.thread_store.get_thread(parent_thread_id)
# Create new thread with messages up to branch point
branch_id = f"{parent_thread_id}-branch-{len(self.branches)}"
branch_thread = Thread(id=branch_id)
# Copy messages up to branch point
for i, msg in enumerate(parent_thread.messages):
if i <= branch_point:
branch_thread.add_message(msg)
# Save branch
await self.thread_store.save_thread(branch_thread)
self.branches[branch_id] = parent_thread_id
return branch_thread
async def merge_branch(self, branch_id: str, target_thread_id: str):
"""Merge a branch back into target thread"""
branch_thread = await self.thread_store.get_thread(branch_id)
target_thread = await self.thread_store.get_thread(target_thread_id)
# Find divergence point
divergence_point = self.find_divergence_point(branch_thread, target_thread)
# Add new messages from branch
for msg in branch_thread.messages[divergence_point + 1:]:
target_thread.add_message(msg)
# Save merged thread
await self.thread_store.save_thread(target_thread)
Tool patterns
Retry with Fallback
Implement robust tool execution:Copy
from typing import List, Callable, Any
import asyncio
class RobustToolExecutor:
def __init__(self, max_retries: int = 3, fallback_tools: Dict[str, List[Callable]] = None):
self.max_retries = max_retries
self.fallback_tools = fallback_tools or {}
async def execute_with_fallback(
self,
primary_tool: Callable,
tool_name: str,
*args,
**kwargs
) -> Any:
"""Execute tool with retry and fallback logic"""
# Try primary tool
for attempt in range(self.max_retries):
try:
result = await primary_tool(*args, **kwargs)
return result
except Exception as e:
if attempt == self.max_retries - 1:
# Try fallback tools
if tool_name in self.fallback_tools:
for fallback in self.fallback_tools[tool_name]:
try:
return await fallback(*args, **kwargs)
except:
continue
raise e
# Exponential backoff
await asyncio.sleep(2 ** attempt)
# Example usage
from lye.web import search, fetch
async def search_duckduckgo(query: str) -> str:
"""Fallback search using DuckDuckGo"""
# Implementation
pass
executor = RobustToolExecutor(
fallback_tools={
"web-search": [search_duckduckgo],
"web-fetch": [lambda url: fetch(f"https://archive.org/wayback/{url}")]
}
)
# Execute with automatic fallback
result = await executor.execute_with_fallback(search, "web-search", "AI news")
Tool composition
Create complex tools from simple ones:Copy
class CompositeTools:
@staticmethod
def create_research_tool(search_fn, fetch_fn, write_fn):
"""Create a composite research tool"""
async def research_and_save(topic: str, output_file: str) -> str:
# Search for information
search_results = await search_fn(topic)
# Extract URLs (simplified)
urls = extract_urls(search_results)[:5]
# Fetch content
contents = []
for url in urls:
try:
content = await fetch_fn(url)
contents.append({"url": url, "content": content})
except:
continue
# Create report
report = f"# Research Report: {topic}\n\n"
for item in contents:
report += f"## Source: {item['url']}\n{item['content'][:500]}...\n\n"
# Save report
await write_fn(output_file, report)
return f"Research report saved to {output_file}"
return {
"definition": {
"name": "research_and_save",
"description": "Research a topic and save findings to a file",
"parameters": {
"type": "object",
"properties": {
"topic": {"type": "string"},
"output_file": {"type": "string"}
},
"required": ["topic", "output_file"]
}
},
"implementation": research_and_save
}
Streaming Patterns
Buffered Streaming
Optimize streaming for better UX:Copy
from tyler.models.execution import EventType
class StreamBuffer:
def __init__(self, buffer_size: int = 10, flush_interval: float = 0.5):
self.buffer = []
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self.last_flush = asyncio.get_event_loop().time()
async def process_stream(self, agent: Agent, thread: Thread):
"""Process stream with intelligent buffering"""
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
self.buffer.append(update.data.get("content_chunk", ""))
# Flush if buffer full or timeout
current_time = asyncio.get_event_loop().time()
if (len(self.buffer) >= self.buffer_size or
current_time - self.last_flush > self.flush_interval):
yield "".join(self.buffer)
self.buffer = []
self.last_flush = current_time
elif update.type == EventType.EXECUTION_COMPLETE:
# Flush remaining buffer
if self.buffer:
yield "".join(self.buffer)
# Yield completion
yield {"type": "complete", "thread": update.data}
Progress Tracking
Track progress for long-running operations:Copy
class ProgressTracker:
def __init__(self):
self.stages = []
self.current_stage = 0
self.listeners = []
def add_stage(self, name: str, weight: float = 1.0):
"""Add a progress stage"""
self.stages.append({"name": name, "weight": weight, "progress": 0})
def add_listener(self, callback: Callable):
"""Add progress listener"""
self.listeners.append(callback)
async def update_progress(self, stage_progress: float):
"""Update current stage progress"""
if self.current_stage < len(self.stages):
self.stages[self.current_stage]["progress"] = stage_progress
# Calculate total progress
total_weight = sum(s["weight"] for s in self.stages)
total_progress = sum(
s["weight"] * s["progress"] for s in self.stages
) / total_weight
# Notify listeners
for listener in self.listeners:
await listener({
"stage": self.stages[self.current_stage]["name"],
"stage_progress": stage_progress,
"total_progress": total_progress
})
async def next_stage(self):
"""Move to next stage"""
if self.current_stage < len(self.stages):
self.stages[self.current_stage]["progress"] = 1.0
self.current_stage += 1
await self.update_progress(0)
# Usage with agent
tracker = ProgressTracker()
tracker.add_stage("Research", weight=2)
tracker.add_stage("Analysis", weight=3)
tracker.add_stage("Report Generation", weight=1)
async def progress_callback(update):
print(f"{update['stage']}: {update['total_progress']:.0%}")
tracker.add_listener(progress_callback)
Error handling patterns
Graceful Degradation
Handle errors without failing completely:Copy
class GracefulAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.error_handlers = {}
def register_error_handler(self, error_type: type, handler: Callable):
"""Register handler for specific error type"""
self.error_handlers[error_type] = handler
async def go_with_graceful_degradation(self, thread: Thread):
"""Process with graceful error handling"""
try:
return await self.agent.go(thread)
except Exception as e:
# Find appropriate handler
for error_type, handler in self.error_handlers.items():
if isinstance(e, error_type):
return await handler(thread, e)
# Default handler
return await self.default_error_handler(thread, e)
async def default_error_handler(self, thread: Thread, error: Exception):
"""Default error handling"""
error_thread = Thread(id=thread.id)
# Copy messages
for msg in thread.messages:
error_thread.add_message(msg)
# Add error message
error_thread.add_message(Message(
role="assistant",
content=f"I encountered an error: {str(error)}. Let me try a different approach."
))
return error_thread, []
# Usage
agent = Agent(name="worker", model_name="gpt-4", purpose="To complete tasks")
graceful = GracefulAgent(agent)
# Register specific handlers
async def handle_tool_error(thread, error):
# Retry without tools
no_tool_agent = Agent(
name="worker",
model_name="gpt-4",
purpose="To complete tasks without tools"
)
return await no_tool_agent.go(thread)
graceful.register_error_handler(ToolExecutionError, handle_tool_error)
Production Patterns
Health Monitoring
Monitor agent health in production:Copy
from datetime import datetime, timedelta
import statistics
class AgentHealthMonitor:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.response_times = []
self.error_count = 0
self.success_count = 0
self.last_health_check = datetime.now()
async def monitor_execution(self, agent: Agent, thread: Thread):
"""Execute with monitoring"""
start_time = datetime.now()
try:
result = await agent.go(thread)
self.success_count += 1
# Record response time
response_time = (datetime.now() - start_time).total_seconds()
self.response_times.append(response_time)
# Keep window size
if len(self.response_times) > self.window_size:
self.response_times.pop(0)
return result
except Exception as e:
self.error_count += 1
raise e
def get_health_metrics(self) -> Dict[str, Any]:
"""Get current health metrics"""
metrics = {
"status": "healthy",
"success_rate": self.success_count / (self.success_count + self.error_count)
if (self.success_count + self.error_count) > 0 else 0,
"error_count": self.error_count,
"success_count": self.success_count
}
if self.response_times:
metrics.update({
"avg_response_time": statistics.mean(self.response_times),
"p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
"p99_response_time": statistics.quantiles(self.response_times, n=100)[98]
})
# Determine health status
if metrics["success_rate"] < 0.95:
metrics["status"] = "degraded"
if metrics["success_rate"] < 0.8:
metrics["status"] = "unhealthy"
return metrics