This guide covers advanced patterns for building production-ready AI applications with Slide. These patterns have been tested in real-world scenarios and represent best practices from the community.

Agent Patterns

Tool selection pattern

Dynamically select tools based on the task:
from typing import List, Dict
from lye import WEB_TOOLS, FILES_TOOLS, IMAGE_TOOLS, AUDIO_TOOLS

class AdaptiveAgent:
    def __init__(self):
        self.tool_sets = {
            "research": WEB_TOOLS,
            "files": FILES_TOOLS,
            "visual": IMAGE_TOOLS,
            "audio": AUDIO_TOOLS
        }
    
    async def create_agent_for_task(self, task: str) -> Agent:
        """Create an agent with appropriate tools for the task"""
        
        # Analyze task to determine needed tools
        tools_needed = []
        
        task_lower = task.lower()
        if any(word in task_lower for word in ["search", "research", "find"]):
            tools_needed.extend(self.tool_sets["research"])
        
        if any(word in task_lower for word in ["save", "write", "read", "file"]):
            tools_needed.extend(self.tool_sets["files"])
        
        if any(word in task_lower for word in ["image", "picture", "visual", "analyze"]):
            tools_needed.extend(self.tool_sets["visual"])
        
        if any(word in task_lower for word in ["audio", "speech", "transcribe"]):
            tools_needed.extend(self.tool_sets["audio"])
        
        # Create agent with selected tools
        return Agent(
            name="adaptive-agent",
            model_name="gpt-4",
            purpose=f"To help with: {task}",
            tools=tools_needed
        )

# Usage
adapter = AdaptiveAgent()
agent = await adapter.create_agent_for_task("Research AI and create a visual report")

Validation pattern

Ensure agent outputs meet requirements:
from typing import Callable, Any
import json

class ValidatedAgent:
    def __init__(self, agent: Agent, validators: Dict[str, Callable]):
        self.agent = agent
        self.validators = validators
    
    async def go_with_validation(self, thread: Thread) -> tuple[Thread, List[Message]]:
        """Process thread with output validation"""
        
        # Process normally
        result = await self.agent.go(thread)
        result_thread = result.thread
        messages = result.new_messages
        
        # Validate outputs
        for message in messages:
            if message.role == "assistant":
                validation_errors = self.validate_content(message.content)
                
                if validation_errors:
                    # Add correction request
                    thread.add_message(Message(
                        role="system",
                        content=f"Please correct these issues: {', '.join(validation_errors)}"
                    ))
                    
                    # Retry
                    return await self.agent.go(thread)
        
        return result_thread, messages
    
    def validate_content(self, content: str) -> List[str]:
        """Run validators on content"""
        errors = []
        
        for name, validator in self.validators.items():
            try:
                if not validator(content):
                    errors.append(f"Failed {name} validation")
            except Exception as e:
                errors.append(f"Validation error in {name}: {str(e)}")
        
        return errors

# Example validators
def has_json_output(content: str) -> bool:
    """Check if response contains valid JSON"""
    try:
        # Find JSON in content
        start = content.find('{')
        end = content.rfind('}') + 1
        if start != -1 and end > start:
            json.loads(content[start:end])
            return True
    except:
        pass
    return False

def meets_length_requirement(content: str, min_length: int = 100) -> bool:
    """Check if response meets minimum length"""
    return len(content) >= min_length

# Usage
agent = Agent(name="reporter", model_name="gpt-4", purpose="To create reports")
validated_agent = ValidatedAgent(
    agent,
    validators={
        "json_output": has_json_output,
        "length": lambda c: meets_length_requirement(c, 200)
    }
)

Persistence Patterns

Context window management

Manage long conversations efficiently:
from narrator import Thread, Message, ThreadStore

class ContextManager:
    def __init__(self, max_messages: int = 50, summary_threshold: int = 100):
        self.max_messages = max_messages
        self.summary_threshold = summary_threshold
    
    async def manage_context(self, thread: Thread, agent: Agent) -> Thread:
        """Manage context window for long conversations"""
        
        if len(thread.messages) > self.summary_threshold:
            # Create summary of older messages
            summary = await self.summarize_messages(
                thread.messages[:-self.max_messages],
                agent
            )
            
            # Create new thread with summary
            new_thread = Thread(id=thread.id, metadata=thread.metadata)
            
            # Add summary as system message
            new_thread.add_message(Message(
                role="system",
                content=f"Previous conversation summary: {summary}"
            ))
            
            # Add recent messages
            for msg in thread.messages[-self.max_messages:]:
                new_thread.add_message(msg)
            
            return new_thread
        
        return thread
    
    async def summarize_messages(self, messages: List[Message], agent: Agent) -> str:
        """Create summary of messages"""
        
        # Format messages for summarization
        conversation = "\n".join([
            f"{msg.role}: {msg.content[:200]}..."
            for msg in messages
        ])
        
        # Create summary request
        summary_thread = Thread()
        summary_thread.add_message(Message(
            role="user",
            content=f"Summarize this conversation concisely:\n\n{conversation}"
        ))
        
        # Get summary
        result = await agent.go(summary_thread)
        return result.new_messages[-1].content if result.new_messages else "No summary available"

Branching conversations

Support multiple conversation branches:
class ConversationTree:
    def __init__(self, thread_store: ThreadStore):
        self.thread_store = thread_store
        self.branches = {}  # branch_id -> parent_thread_id
    
    async def create_branch(self, parent_thread_id: str, branch_point: int) -> Thread:
        """Create a new branch from a conversation"""
        
        # Load parent thread
        parent_thread = await self.thread_store.get_thread(parent_thread_id)
        
        # Create new thread with messages up to branch point
        branch_id = f"{parent_thread_id}-branch-{len(self.branches)}"
        branch_thread = Thread(id=branch_id)
        
        # Copy messages up to branch point
        for i, msg in enumerate(parent_thread.messages):
            if i <= branch_point:
                branch_thread.add_message(msg)
        
        # Save branch
        await self.thread_store.save_thread(branch_thread)
        self.branches[branch_id] = parent_thread_id
        
        return branch_thread
    
    async def merge_branch(self, branch_id: str, target_thread_id: str):
        """Merge a branch back into target thread"""
        
        branch_thread = await self.thread_store.get_thread(branch_id)
        target_thread = await self.thread_store.get_thread(target_thread_id)
        
        # Find divergence point
        divergence_point = self.find_divergence_point(branch_thread, target_thread)
        
        # Add new messages from branch
        for msg in branch_thread.messages[divergence_point + 1:]:
            target_thread.add_message(msg)
        
        # Save merged thread
        await self.thread_store.save_thread(target_thread)

Tool patterns

Retry with Fallback

Implement robust tool execution:
from typing import List, Callable, Any
import asyncio

class RobustToolExecutor:
    def __init__(self, max_retries: int = 3, fallback_tools: Dict[str, List[Callable]] = None):
        self.max_retries = max_retries
        self.fallback_tools = fallback_tools or {}
    
    async def execute_with_fallback(
        self, 
        primary_tool: Callable,
        tool_name: str,
        *args, 
        **kwargs
    ) -> Any:
        """Execute tool with retry and fallback logic"""
        
        # Try primary tool
        for attempt in range(self.max_retries):
            try:
                result = await primary_tool(*args, **kwargs)
                return result
            except Exception as e:
                if attempt == self.max_retries - 1:
                    # Try fallback tools
                    if tool_name in self.fallback_tools:
                        for fallback in self.fallback_tools[tool_name]:
                            try:
                                return await fallback(*args, **kwargs)
                            except:
                                continue
                    raise e
                
                # Exponential backoff
                await asyncio.sleep(2 ** attempt)

# Example usage
from lye.web import search, fetch

async def search_duckduckgo(query: str) -> str:
    """Fallback search using DuckDuckGo"""
    # Implementation
    pass

executor = RobustToolExecutor(
    fallback_tools={
        "web-search": [search_duckduckgo],
        "web-fetch": [lambda url: fetch(f"https://archive.org/wayback/{url}")]
    }
)

# Execute with automatic fallback
result = await executor.execute_with_fallback(search, "web-search", "AI news")

Tool composition

Create complex tools from simple ones:
class CompositeTools:
    @staticmethod
    def create_research_tool(search_fn, fetch_fn, write_fn):
        """Create a composite research tool"""
        
        async def research_and_save(topic: str, output_file: str) -> str:
            # Search for information
            search_results = await search_fn(topic)
            
            # Extract URLs (simplified)
            urls = extract_urls(search_results)[:5]
            
            # Fetch content
            contents = []
            for url in urls:
                try:
                    content = await fetch_fn(url)
                    contents.append({"url": url, "content": content})
                except:
                    continue
            
            # Create report
            report = f"# Research Report: {topic}\n\n"
            for item in contents:
                report += f"## Source: {item['url']}\n{item['content'][:500]}...\n\n"
            
            # Save report
            await write_fn(output_file, report)
            
            return f"Research report saved to {output_file}"
        
        return {
            "definition": {
                "name": "research_and_save",
                "description": "Research a topic and save findings to a file",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "topic": {"type": "string"},
                        "output_file": {"type": "string"}
                    },
                    "required": ["topic", "output_file"]
                }
            },
            "implementation": research_and_save
        }

Streaming Patterns

Buffered Streaming

Optimize streaming for better UX:
from tyler.models.execution import EventType

class StreamBuffer:
    def __init__(self, buffer_size: int = 10, flush_interval: float = 0.5):
        self.buffer = []
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval
        self.last_flush = asyncio.get_event_loop().time()
    
    async def process_stream(self, agent: Agent, thread: Thread):
        """Process stream with intelligent buffering"""
        
        async for update in agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                self.buffer.append(update.data.get("content_chunk", ""))
                
                # Flush if buffer full or timeout
                current_time = asyncio.get_event_loop().time()
                if (len(self.buffer) >= self.buffer_size or 
                    current_time - self.last_flush > self.flush_interval):
                    
                    yield "".join(self.buffer)
                    self.buffer = []
                    self.last_flush = current_time
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                # Flush remaining buffer
                if self.buffer:
                    yield "".join(self.buffer)
                
                # Yield completion
                yield {"type": "complete", "thread": update.data}

Progress Tracking

Track progress for long-running operations:
class ProgressTracker:
    def __init__(self):
        self.stages = []
        self.current_stage = 0
        self.listeners = []
    
    def add_stage(self, name: str, weight: float = 1.0):
        """Add a progress stage"""
        self.stages.append({"name": name, "weight": weight, "progress": 0})
    
    def add_listener(self, callback: Callable):
        """Add progress listener"""
        self.listeners.append(callback)
    
    async def update_progress(self, stage_progress: float):
        """Update current stage progress"""
        if self.current_stage < len(self.stages):
            self.stages[self.current_stage]["progress"] = stage_progress
            
            # Calculate total progress
            total_weight = sum(s["weight"] for s in self.stages)
            total_progress = sum(
                s["weight"] * s["progress"] for s in self.stages
            ) / total_weight
            
            # Notify listeners
            for listener in self.listeners:
                await listener({
                    "stage": self.stages[self.current_stage]["name"],
                    "stage_progress": stage_progress,
                    "total_progress": total_progress
                })
    
    async def next_stage(self):
        """Move to next stage"""
        if self.current_stage < len(self.stages):
            self.stages[self.current_stage]["progress"] = 1.0
            self.current_stage += 1
            await self.update_progress(0)

# Usage with agent
tracker = ProgressTracker()
tracker.add_stage("Research", weight=2)
tracker.add_stage("Analysis", weight=3)
tracker.add_stage("Report Generation", weight=1)

async def progress_callback(update):
    print(f"{update['stage']}: {update['total_progress']:.0%}")

tracker.add_listener(progress_callback)

Error handling patterns

Graceful Degradation

Handle errors without failing completely:
class GracefulAgent:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.error_handlers = {}
    
    def register_error_handler(self, error_type: type, handler: Callable):
        """Register handler for specific error type"""
        self.error_handlers[error_type] = handler
    
    async def go_with_graceful_degradation(self, thread: Thread):
        """Process with graceful error handling"""
        
        try:
            return await self.agent.go(thread)
        except Exception as e:
            # Find appropriate handler
            for error_type, handler in self.error_handlers.items():
                if isinstance(e, error_type):
                    return await handler(thread, e)
            
            # Default handler
            return await self.default_error_handler(thread, e)
    
    async def default_error_handler(self, thread: Thread, error: Exception):
        """Default error handling"""
        error_thread = Thread(id=thread.id)
        
        # Copy messages
        for msg in thread.messages:
            error_thread.add_message(msg)
        
        # Add error message
        error_thread.add_message(Message(
            role="assistant",
            content=f"I encountered an error: {str(error)}. Let me try a different approach."
        ))
        
        return error_thread, []

# Usage
agent = Agent(name="worker", model_name="gpt-4", purpose="To complete tasks")
graceful = GracefulAgent(agent)

# Register specific handlers
async def handle_tool_error(thread, error):
    # Retry without tools
    no_tool_agent = Agent(
        name="worker",
        model_name="gpt-4",
        purpose="To complete tasks without tools"
    )
    return await no_tool_agent.go(thread)

graceful.register_error_handler(ToolExecutionError, handle_tool_error)

Production Patterns

Health Monitoring

Monitor agent health in production:
from datetime import datetime, timedelta
import statistics

class AgentHealthMonitor:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.response_times = []
        self.error_count = 0
        self.success_count = 0
        self.last_health_check = datetime.now()
    
    async def monitor_execution(self, agent: Agent, thread: Thread):
        """Execute with monitoring"""
        start_time = datetime.now()
        
        try:
            result = await agent.go(thread)
            self.success_count += 1
            
            # Record response time
            response_time = (datetime.now() - start_time).total_seconds()
            self.response_times.append(response_time)
            
            # Keep window size
            if len(self.response_times) > self.window_size:
                self.response_times.pop(0)
            
            return result
            
        except Exception as e:
            self.error_count += 1
            raise e
    
    def get_health_metrics(self) -> Dict[str, Any]:
        """Get current health metrics"""
        
        metrics = {
            "status": "healthy",
            "success_rate": self.success_count / (self.success_count + self.error_count)
                           if (self.success_count + self.error_count) > 0 else 0,
            "error_count": self.error_count,
            "success_count": self.success_count
        }
        
        if self.response_times:
            metrics.update({
                "avg_response_time": statistics.mean(self.response_times),
                "p95_response_time": statistics.quantiles(self.response_times, n=20)[18],
                "p99_response_time": statistics.quantiles(self.response_times, n=100)[98]
            })
        
        # Determine health status
        if metrics["success_rate"] < 0.95:
            metrics["status"] = "degraded"
        if metrics["success_rate"] < 0.8:
            metrics["status"] = "unhealthy"
        
        return metrics

Next steps