Streaming responses enable your agents to provide real-time feedback, making interactions feel more natural and responsive. Instead of waiting for the entire response, users see content as it’s generated.

Why Use Streaming?

Traditional (non-streaming) approach:
  • User waits for entire response
  • No feedback during processing
  • Can feel slow for long responses
Streaming approach:
  • Immediate visual feedback
  • See responses as they’re generated
  • Better user experience
  • Can see tool usage in real-time

Basic Streaming

The simplest way to stream responses:
import asyncio
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType

agent = Agent(
    name="streaming-assistant",
    model_name="gpt-4",
    purpose="To provide real-time responses"
)

async def stream_response():
    thread = Thread()
    message = Message(role="user", content="Tell me a story about space exploration")
    thread.add_message(message)
    
    print("🤖 Assistant: ", end="", flush=True)
    
    async for event in agent.go(thread, stream=True):
        if event.type == EventType.LLM_STREAM_CHUNK:
            print(event.data.get("content_chunk", ""), end="", flush=True)
    
    print()  # New line at the end

asyncio.run(stream_response())

Understanding Execution Events

ExecutionEvent objects provide detailed information about the agent’s execution:
from tyler import EventType

async for event in agent.go(thread, stream=True):
    if event.type == EventType.LLM_STREAM_CHUNK:
        # Text being generated
        print(event.data.get("content_chunk", ""), end="", flush=True)
    
    elif event.type == EventType.TOOL_SELECTED:
        # Tool is about to be called
        print(f"\n🔧 Calling tool: {event.data['tool_name']}")
    
    elif event.type == EventType.MESSAGE_CREATED:
        # New message added to thread
        msg = event.data["message"]
        if msg.role == "tool":
            print(f"\n✅ Tool {msg.name} completed")
    
    elif event.type == EventType.EXECUTION_COMPLETE:
        # All processing complete
        print(f"\n✅ Complete in {event.data['duration_ms']:.0f}ms!")

Streaming with Tools

See tool usage in real-time:
from lye import WEB_TOOLS, FILES_TOOLS

agent = Agent(
    name="research-assistant",
    model_name="gpt-4",
    purpose="To research and create reports",
    tools=[*WEB_TOOLS, *FILES_TOOLS]
)

async def research_with_streaming(topic: str):
    thread = Thread()
    message = Message(
        role="user",
        content=f"Research {topic} and create a summary"
    )
    thread.add_message(message)
    
    async for event in agent.go(thread, stream=True):
        if event.type == EventType.LLM_STREAM_CHUNK:
            print(event.data.get("content_chunk", ""), end="", flush=True)
        
        elif event.type == EventType.TOOL_SELECTED:
            tool_name = event.data["tool_name"]
            print(f"\n\n🔧 Using {tool_name}...", flush=True)
        
        elif event.type == EventType.TOOL_RESULT:
            result = event.data["result"]
            # Show abbreviated tool results
            if len(result) > 100:
                print(f"   Result: {result[:100]}...")
            else:
                print(f"   Result: {result}")
            
            print("\n🤖 ", end="", flush=True)

Building interactive applications

Terminal chat interface

import asyncio
from tyler import Agent, Thread, Message, ThreadStore
from tyler.models.execution import ExecutionEvent, EventType

class StreamingChat:
    def __init__(self):
        self.agent = None
        self.thread_store = None
        self.thread = None
    
    async def initialize(self):
        self.thread_store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
        self.agent = Agent(
            name="chat-assistant",
            model_name="gpt-4",
            purpose="To have helpful conversations",
            thread_store=self.thread_store
        )
    
    async def start_session(self, session_id: str):
        try:
            self.thread = await self.thread_store.get_thread(session_id)
            print("📚 Resuming conversation...")
        except:
            self.thread = Thread(id=session_id)
            print("🆕 Starting new conversation...")
    
    async def send_message(self, content: str):
        message = Message(role="user", content=content)
        self.thread.add_message(message)
        
        print("\n🤖 ", end="", flush=True)
        
        async for update in self.agent.go(self.thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                print(update.data.get("content_chunk", ""), end="", flush=True)
            elif update.type == EventType.EXECUTION_COMPLETE:
                # Thread is already updated in place
                await self.thread_store.save_thread(self.thread)
        
        print("\n")

# Usage
async def main():
    chat = StreamingChat()
    await chat.initialize()
    await chat.start_session("main-chat")
    
    while True:
        user_input = input("\nYou: ")
        if user_input.lower() in ['exit', 'quit']:
            break
        
        await chat.send_message(user_input)

asyncio.run(main())

Web application streaming

For web applications, you can stream to a WebSocket or Server-Sent Events:
# FastAPI example with WebSocket
from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    agent = Agent(
        name="web-assistant",
        model_name="gpt-4",
        purpose="To assist web users"
    )
    
    thread = Thread()
    
    while True:
        # Receive message from client
        data = await websocket.receive_text()
        message_data = json.loads(data)
        
        # Add to thread
        message = Message(role="user", content=message_data["content"])
        thread.add_message(message)
        
        # Stream response
        async for update in agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                await websocket.send_json({
                    "type": "content",
                    "data": update.data
                })
            
            elif update.type == EventType.TOOL_SELECTED:
                await websocket.send_json({
                    "type": "tool",
                    "name": update.data.get("tool_name", ""),
                    "content": str(update.data.get("arguments", ""))[:100] + "..."
                })
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                # Thread is already updated in place
                await websocket.send_json({"type": "complete"})

Advanced Streaming Patterns

Progress indicators

Show progress for long-running tasks:
async def stream_with_progress():
    thread = Thread()
    message = Message(
        role="user",
        content="Analyze these 10 websites and create a report"
    )
    thread.add_message(message)
    
    tool_count = 0
    content_buffer = []
    
    async for update in agent.go(thread, stream=True):
        if update.type == EventType.TOOL_SELECTED:
            tool_count += 1
            print(f"\r⏳ Processing... ({tool_count} tools used)", end="", flush=True)
        
        elif update.type == EventType.LLM_STREAM_CHUNK:
            content_buffer.append(update.data.get("content_chunk", ""))
        
        elif update.type == EventType.EXECUTION_COMPLETE:
            print("\r✅ Complete!" + " " * 30)  # Clear progress
            print("\n🤖 " + "".join(content_buffer))

Buffered streaming

For smoother output, buffer chunks:
class BufferedStreamer:
    def __init__(self, buffer_size: int = 5):
        self.buffer = []
        self.buffer_size = buffer_size
    
    async def stream(self, agent, thread):
        async for update in agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                self.buffer.append(update.data.get("content_chunk", ""))
                
                if len(self.buffer) >= self.buffer_size:
                    yield "".join(self.buffer)
                    self.buffer = []
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                if self.buffer:
                    yield "".join(self.buffer)
                yield {"type": "complete", "thread": update.data}

# Usage
streamer = BufferedStreamer()
async for chunk in streamer.stream(agent, thread):
    if isinstance(chunk, str):
        print(chunk, end="", flush=True)
    else:
        # Handle completion
        pass

Cancellable streaming

Allow users to stop generation:
import asyncio

class CancellableStream:
    def __init__(self):
        self.cancelled = False
    
    async def stream_with_cancel(self, agent, thread):
        try:
            async for update in agent.go(thread, stream=True):
                if self.cancelled:
                    print("\n\n⚠️  Generation cancelled by user")
                    break
                
                if update.type == EventType.LLM_STREAM_CHUNK:
                    print(update.data.get("content_chunk", ""), end="", flush=True)
        except asyncio.CancelledError:
            print("\n\n⚠️  Stream interrupted")
    
    def cancel(self):
        self.cancelled = True

# Usage with keyboard interrupt
import signal

stream_handler = CancellableStream()

def signal_handler(sig, frame):
    stream_handler.cancel()

signal.signal(signal.SIGINT, signal_handler)

Streaming UI Components

Rich terminal UI

Using the rich library for better terminal output:
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.panel import Panel

console = Console()

async def rich_streaming():
    thread = Thread()
    message = Message(role="user", content="Explain quantum computing")
    thread.add_message(message)
    
    content = ""
    
    with Live(Panel("", title="🤖 Assistant"), refresh_per_second=10) as live:
        async for update in agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                content += update.data.get("content_chunk", "")
                live.update(Panel(Markdown(content), title="🤖 Assistant"))
            
            elif update.type == EventType.TOOL_SELECTED:
                tool_panel = Panel(
                    f"Using: {update.data.get('tool_name', '')}",
                    title="🔧 Tool",
                    style="yellow"
                )
                console.print(tool_panel)

Token counting

Track tokens during streaming:
class TokenCounter:
    def __init__(self):
        self.total_tokens = 0
        self.chunks = []
    
    async def count_stream(self, agent, thread):
        async for update in agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                self.chunks.append(update.data.get("content_chunk", ""))
                # Rough estimate: 1 token ≈ 4 characters
                self.total_tokens += len(update.data) // 4
                
                yield update
        
        print(f"\n\n📊 Approximate tokens used: {self.total_tokens}")

Performance tips

1. Chunk Size Optimization

Larger chunks reduce overhead but decrease responsiveness:
# Configure in your agent if supported
agent = Agent(
    name="optimized-streamer",
    model_name="gpt-4",
    purpose="To stream efficiently",
    # streaming_chunk_size=10  # If available
)

2. Async Processing

Process streams asynchronously for better performance:
async def process_multiple_streams():
    tasks = []
    
    for query in queries:
        thread = Thread()
        thread.add_message(Message(role="user", content=query))
        task = asyncio.create_task(collect_stream(agent, thread))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

async def collect_stream(agent, thread):
    content = []
    async for update in agent.go(thread, stream=True):
        if update.type == EventType.LLM_STREAM_CHUNK:
            content.append(update.data.get("content_chunk", ""))
    return "".join(content)

3. Error Handling in Streams

from datetime import datetime

async def safe_stream(agent, thread):
    try:
        async for update in agent.go(thread, stream=True):
            yield update
    except asyncio.TimeoutError:
        yield ExecutionEvent(
            type=EventType.EXECUTION_ERROR,
            timestamp=datetime.now(),
            data={"message": "Stream timed out"}
        )
    except Exception as e:
        yield ExecutionEvent(
            type=EventType.EXECUTION_ERROR,
            timestamp=datetime.now(),
            data={"message": f"Stream error: {str(e)}"}
        )

Real-World Example: Live Research Assistant

import asyncio
from datetime import datetime
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
from lye import WEB_TOOLS, FILES_TOOLS

class LiveResearchAssistant:
    def __init__(self):
        self.agent = Agent(
            name="live-researcher",
            model_name="gpt-4",
            purpose="To conduct research and provide real-time updates",
            tools=[*WEB_TOOLS, *FILES_TOOLS]
        )
    
    async def research(self, topic: str, save_to_file: bool = True):
        thread = Thread()
        message = Message(
            role="user",
            content=f"""
            Research '{topic}' comprehensively:
            1. Search for recent information
            2. Analyze multiple sources
            3. Create a detailed report
            {"4. Save the report to a file" if save_to_file else ""}
            """
        )
        thread.add_message(message)
        
        print(f"\n{'='*50}")
        print(f"🔍 Researching: {topic}")
        print(f"⏰ Started: {datetime.now().strftime('%H:%M:%S')}")
        print(f"{'='*50}\n")
        
        content_buffer = []
        tool_uses = []
        
        async for update in self.agent.go(thread, stream=True):
            if update.type == EventType.LLM_STREAM_CHUNK:
                chunk = update.data.get("content_chunk", "")
                content_buffer.append(chunk)
                print(chunk, end="", flush=True)
            
            elif update.type == EventType.TOOL_SELECTED:
                tool_name = update.data.get("tool_name", "")
                tool_uses.append(tool_name)
                
                # Show tool use inline
                print(f"\n\n[🔧 {tool_name}]", end="")
                if tool_name == "web-search":
                    print(f" Searching for information...")
                elif tool_name == "files-write":
                    print(f" Saving report...")
                print("\n", end="")
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                print(f"\n\n{'='*50}")
                print(f"✅ Research Complete!")
                print(f"📊 Tools used: {', '.join(set(tool_uses))}")
                print(f"📝 Total length: {len(''.join(content_buffer))} characters")
                print(f"⏰ Finished: {datetime.now().strftime('%H:%M:%S')}")
                print(f"{'='*50}")
                
                return update.data

# Usage
async def main():
    assistant = LiveResearchAssistant()
    
    topics = [
        "Latest breakthroughs in quantum computing",
        "Climate change solutions for 2024",
        "AI safety research progress"
    ]
    
    for topic in topics:
        await assistant.research(topic, save_to_file=True)
        print("\n" + "="*70 + "\n")

asyncio.run(main())

Next steps