Why Use Streaming?
Traditional (non-streaming) approach:- User waits for entire response
- No feedback during processing
- Can feel slow for long responses
- Immediate visual feedback
- See responses as they’re generated
- Better user experience
- Can see tool usage in real-time
Basic Streaming
The simplest way to stream responses:Copy
import asyncio
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
agent = Agent(
name="streaming-assistant",
model_name="gpt-4",
purpose="To provide real-time responses"
)
async def stream_response():
thread = Thread()
message = Message(role="user", content="Tell me a story about space exploration")
thread.add_message(message)
print("🤖 Assistant: ", end="", flush=True)
async for event in agent.go(thread, stream=True):
if event.type == EventType.LLM_STREAM_CHUNK:
print(event.data.get("content_chunk", ""), end="", flush=True)
print() # New line at the end
asyncio.run(stream_response())
Understanding Execution Events
ExecutionEvent objects provide detailed information about the agent’s execution:Copy
from tyler import EventType
async for event in agent.go(thread, stream=True):
if event.type == EventType.LLM_STREAM_CHUNK:
# Text being generated
print(event.data.get("content_chunk", ""), end="", flush=True)
elif event.type == EventType.TOOL_SELECTED:
# Tool is about to be called
print(f"\n🔧 Calling tool: {event.data['tool_name']}")
elif event.type == EventType.MESSAGE_CREATED:
# New message added to thread
msg = event.data["message"]
if msg.role == "tool":
print(f"\n✅ Tool {msg.name} completed")
elif event.type == EventType.EXECUTION_COMPLETE:
# All processing complete
print(f"\n✅ Complete in {event.data['duration_ms']:.0f}ms!")
Streaming with Tools
See tool usage in real-time:Copy
from lye import WEB_TOOLS, FILES_TOOLS
agent = Agent(
name="research-assistant",
model_name="gpt-4",
purpose="To research and create reports",
tools=[*WEB_TOOLS, *FILES_TOOLS]
)
async def research_with_streaming(topic: str):
thread = Thread()
message = Message(
role="user",
content=f"Research {topic} and create a summary"
)
thread.add_message(message)
async for event in agent.go(thread, stream=True):
if event.type == EventType.LLM_STREAM_CHUNK:
print(event.data.get("content_chunk", ""), end="", flush=True)
elif event.type == EventType.TOOL_SELECTED:
tool_name = event.data["tool_name"]
print(f"\n\n🔧 Using {tool_name}...", flush=True)
elif event.type == EventType.TOOL_RESULT:
result = event.data["result"]
# Show abbreviated tool results
if len(result) > 100:
print(f" Result: {result[:100]}...")
else:
print(f" Result: {result}")
print("\n🤖 ", end="", flush=True)
Building interactive applications
Terminal chat interface
Copy
import asyncio
from tyler import Agent, Thread, Message, ThreadStore
from tyler.models.execution import ExecutionEvent, EventType
class StreamingChat:
def __init__(self):
self.agent = None
self.thread_store = None
self.thread = None
async def initialize(self):
self.thread_store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
self.agent = Agent(
name="chat-assistant",
model_name="gpt-4",
purpose="To have helpful conversations",
thread_store=self.thread_store
)
async def start_session(self, session_id: str):
try:
self.thread = await self.thread_store.get_thread(session_id)
print("📚 Resuming conversation...")
except:
self.thread = Thread(id=session_id)
print("🆕 Starting new conversation...")
async def send_message(self, content: str):
message = Message(role="user", content=content)
self.thread.add_message(message)
print("\n🤖 ", end="", flush=True)
async for update in self.agent.go(self.thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
print(update.data.get("content_chunk", ""), end="", flush=True)
elif update.type == EventType.EXECUTION_COMPLETE:
# Thread is already updated in place
await self.thread_store.save_thread(self.thread)
print("\n")
# Usage
async def main():
chat = StreamingChat()
await chat.initialize()
await chat.start_session("main-chat")
while True:
user_input = input("\nYou: ")
if user_input.lower() in ['exit', 'quit']:
break
await chat.send_message(user_input)
asyncio.run(main())
Web application streaming
For web applications, you can stream to a WebSocket or Server-Sent Events:Copy
# FastAPI example with WebSocket
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
agent = Agent(
name="web-assistant",
model_name="gpt-4",
purpose="To assist web users"
)
thread = Thread()
while True:
# Receive message from client
data = await websocket.receive_text()
message_data = json.loads(data)
# Add to thread
message = Message(role="user", content=message_data["content"])
thread.add_message(message)
# Stream response
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
await websocket.send_json({
"type": "content",
"data": update.data
})
elif update.type == EventType.TOOL_SELECTED:
await websocket.send_json({
"type": "tool",
"name": update.data.get("tool_name", ""),
"content": str(update.data.get("arguments", ""))[:100] + "..."
})
elif update.type == EventType.EXECUTION_COMPLETE:
# Thread is already updated in place
await websocket.send_json({"type": "complete"})
Advanced Streaming Patterns
Progress indicators
Show progress for long-running tasks:Copy
async def stream_with_progress():
thread = Thread()
message = Message(
role="user",
content="Analyze these 10 websites and create a report"
)
thread.add_message(message)
tool_count = 0
content_buffer = []
async for update in agent.go(thread, stream=True):
if update.type == EventType.TOOL_SELECTED:
tool_count += 1
print(f"\r⏳ Processing... ({tool_count} tools used)", end="", flush=True)
elif update.type == EventType.LLM_STREAM_CHUNK:
content_buffer.append(update.data.get("content_chunk", ""))
elif update.type == EventType.EXECUTION_COMPLETE:
print("\r✅ Complete!" + " " * 30) # Clear progress
print("\n🤖 " + "".join(content_buffer))
Buffered streaming
For smoother output, buffer chunks:Copy
class BufferedStreamer:
def __init__(self, buffer_size: int = 5):
self.buffer = []
self.buffer_size = buffer_size
async def stream(self, agent, thread):
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
self.buffer.append(update.data.get("content_chunk", ""))
if len(self.buffer) >= self.buffer_size:
yield "".join(self.buffer)
self.buffer = []
elif update.type == EventType.EXECUTION_COMPLETE:
if self.buffer:
yield "".join(self.buffer)
yield {"type": "complete", "thread": update.data}
# Usage
streamer = BufferedStreamer()
async for chunk in streamer.stream(agent, thread):
if isinstance(chunk, str):
print(chunk, end="", flush=True)
else:
# Handle completion
pass
Cancellable streaming
Allow users to stop generation:Copy
import asyncio
class CancellableStream:
def __init__(self):
self.cancelled = False
async def stream_with_cancel(self, agent, thread):
try:
async for update in agent.go(thread, stream=True):
if self.cancelled:
print("\n\n⚠️ Generation cancelled by user")
break
if update.type == EventType.LLM_STREAM_CHUNK:
print(update.data.get("content_chunk", ""), end="", flush=True)
except asyncio.CancelledError:
print("\n\n⚠️ Stream interrupted")
def cancel(self):
self.cancelled = True
# Usage with keyboard interrupt
import signal
stream_handler = CancellableStream()
def signal_handler(sig, frame):
stream_handler.cancel()
signal.signal(signal.SIGINT, signal_handler)
Streaming UI Components
Rich terminal UI
Using therich
library for better terminal output:
Copy
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.panel import Panel
console = Console()
async def rich_streaming():
thread = Thread()
message = Message(role="user", content="Explain quantum computing")
thread.add_message(message)
content = ""
with Live(Panel("", title="🤖 Assistant"), refresh_per_second=10) as live:
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
content += update.data.get("content_chunk", "")
live.update(Panel(Markdown(content), title="🤖 Assistant"))
elif update.type == EventType.TOOL_SELECTED:
tool_panel = Panel(
f"Using: {update.data.get('tool_name', '')}",
title="🔧 Tool",
style="yellow"
)
console.print(tool_panel)
Token counting
Track tokens during streaming:Copy
class TokenCounter:
def __init__(self):
self.total_tokens = 0
self.chunks = []
async def count_stream(self, agent, thread):
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
self.chunks.append(update.data.get("content_chunk", ""))
# Rough estimate: 1 token ≈ 4 characters
self.total_tokens += len(update.data) // 4
yield update
print(f"\n\n📊 Approximate tokens used: {self.total_tokens}")
Performance tips
1. Chunk Size Optimization
Larger chunks reduce overhead but decrease responsiveness:Copy
# Configure in your agent if supported
agent = Agent(
name="optimized-streamer",
model_name="gpt-4",
purpose="To stream efficiently",
# streaming_chunk_size=10 # If available
)
2. Async Processing
Process streams asynchronously for better performance:Copy
async def process_multiple_streams():
tasks = []
for query in queries:
thread = Thread()
thread.add_message(Message(role="user", content=query))
task = asyncio.create_task(collect_stream(agent, thread))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def collect_stream(agent, thread):
content = []
async for update in agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
content.append(update.data.get("content_chunk", ""))
return "".join(content)
3. Error Handling in Streams
Copy
from datetime import datetime
async def safe_stream(agent, thread):
try:
async for update in agent.go(thread, stream=True):
yield update
except asyncio.TimeoutError:
yield ExecutionEvent(
type=EventType.EXECUTION_ERROR,
timestamp=datetime.now(),
data={"message": "Stream timed out"}
)
except Exception as e:
yield ExecutionEvent(
type=EventType.EXECUTION_ERROR,
timestamp=datetime.now(),
data={"message": f"Stream error: {str(e)}"}
)
Real-World Example: Live Research Assistant
Copy
import asyncio
from datetime import datetime
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
from lye import WEB_TOOLS, FILES_TOOLS
class LiveResearchAssistant:
def __init__(self):
self.agent = Agent(
name="live-researcher",
model_name="gpt-4",
purpose="To conduct research and provide real-time updates",
tools=[*WEB_TOOLS, *FILES_TOOLS]
)
async def research(self, topic: str, save_to_file: bool = True):
thread = Thread()
message = Message(
role="user",
content=f"""
Research '{topic}' comprehensively:
1. Search for recent information
2. Analyze multiple sources
3. Create a detailed report
{"4. Save the report to a file" if save_to_file else ""}
"""
)
thread.add_message(message)
print(f"\n{'='*50}")
print(f"🔍 Researching: {topic}")
print(f"⏰ Started: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*50}\n")
content_buffer = []
tool_uses = []
async for update in self.agent.go(thread, stream=True):
if update.type == EventType.LLM_STREAM_CHUNK:
chunk = update.data.get("content_chunk", "")
content_buffer.append(chunk)
print(chunk, end="", flush=True)
elif update.type == EventType.TOOL_SELECTED:
tool_name = update.data.get("tool_name", "")
tool_uses.append(tool_name)
# Show tool use inline
print(f"\n\n[🔧 {tool_name}]", end="")
if tool_name == "web-search":
print(f" Searching for information...")
elif tool_name == "files-write":
print(f" Saving report...")
print("\n", end="")
elif update.type == EventType.EXECUTION_COMPLETE:
print(f"\n\n{'='*50}")
print(f"✅ Research Complete!")
print(f"📊 Tools used: {', '.join(set(tool_uses))}")
print(f"📝 Total length: {len(''.join(content_buffer))} characters")
print(f"⏰ Finished: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*50}")
return update.data
# Usage
async def main():
assistant = LiveResearchAssistant()
topics = [
"Latest breakthroughs in quantum computing",
"Climate change solutions for 2024",
"AI safety research progress"
]
for topic in topics:
await assistant.research(topic, save_to_file=True)
print("\n" + "="*70 + "\n")
asyncio.run(main())