Streaming responses enable your agents to provide real-time feedback, making interactions feel more natural and responsive. Instead of waiting for the entire response, users see content as it’s generated.
Why Use Streaming?
Traditional (non-streaming) approach:
- User waits for entire response
- No feedback during processing
- Can feel slow for long responses
Streaming approach:
- Immediate visual feedback
- See responses as they’re generated
- Better user experience
- Can see tool usage in real-time
Streaming Modes
Tyler supports three streaming modes via the stream parameter:
| Mode | Value | Output Type | Use Case |
|---|
| Non-Streaming | False (default) | AgentResult | Simple request/response |
| Event Streaming | True or "events" | ExecutionEvent | Rich observability, tool tracking |
| Raw Streaming | "raw" | Raw LiteLLM chunks | OpenAI compatibility, proxying |
Event Streaming (Recommended)
The default streaming mode with full observability:
import asyncio
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
agent = Agent(
name="streaming-assistant",
model_name="gpt-4",
purpose="To provide real-time responses"
)
async def stream_response():
thread = Thread()
message = Message(role="user", content="Tell me a story about space exploration")
thread.add_message(message)
print("🤖 Assistant: ", end="", flush=True)
# Use stream=True or stream="events" (both work identically)
async for event in agent.stream(thread):
if event.type == EventType.LLM_STREAM_CHUNK:
print(event.data.get("content_chunk", ""), end="", flush=True)
print() # New line at the end
asyncio.run(stream_response())
Raw Streaming (Advanced)
Raw mode is for advanced use cases. Tools ARE executed for full agentic behavior, but you only receive raw chunks (no ExecutionEvents).
Stream raw LiteLLM chunks for OpenAI compatibility:
import asyncio
from tyler import Agent, Thread, Message
agent = Agent(
name="proxy-assistant",
model_name="gpt-4o",
purpose="OpenAI-compatible streaming"
)
async def raw_stream_response():
thread = Thread()
message = Message(role="user", content="Hello!")
thread.add_message(message)
# Get raw OpenAI-compatible chunks
async for chunk in agent.stream(thread, mode="raw"):
# chunk is a raw LiteLLM object
if hasattr(chunk, 'choices') and chunk.choices:
delta = chunk.choices[0].delta
# Delta can be dict or object depending on LiteLLM version
if isinstance(delta, dict):
content = delta.get('content')
else:
content = getattr(delta, 'content', None)
if content:
print(content, end="", flush=True)
# Usage info in final chunk
if hasattr(chunk, 'usage') and chunk.usage:
print(f"\n\nTokens: {chunk.usage.total_tokens}")
asyncio.run(raw_stream_response())
When to use raw mode:
- Building OpenAI API proxies or gateways
- Direct integration with OpenAI-compatible clients
- Minimal latency requirements (no transformation overhead)
How it works:
- ✅ Tools ARE executed (fully agentic behavior)
- ✅ Multi-turn iteration supported
- ✅ Frontend sees
finish_reason: "tool_calls" in chunks
- ⚠️ No ExecutionEvent telemetry (only raw chunks)
- ⚠️ Silent during tool execution (brief pauses expected)
- ⚠️ Consumer must handle chunk formatting (SSE serialization)
Matches the pattern from OpenAI’s Agents SDK:
Raw chunks → finish_reason=“tool_calls” → [agent executes tools] → more raw chunks → repeat
SSE Serialization Example:
import json
def serialize_chunk_to_sse(chunk) -> str:
"""Convert raw chunk to Server-Sent Events format"""
chunk_dict = {
"id": getattr(chunk, 'id', 'unknown'),
"object": getattr(chunk, 'object', 'chat.completion.chunk'),
"created": getattr(chunk, 'created', 0),
"model": getattr(chunk, 'model', 'unknown'),
"choices": []
}
if hasattr(chunk, 'choices') and chunk.choices:
for choice in chunk.choices:
choice_dict = {
"index": getattr(choice, 'index', 0),
"delta": {},
"finish_reason": getattr(choice, 'finish_reason', None)
}
if hasattr(choice, 'delta'):
delta = choice.delta
if isinstance(delta, dict):
choice_dict["delta"] = delta
else:
if hasattr(delta, 'content') and delta.content:
choice_dict["delta"]["content"] = delta.content
if hasattr(delta, 'role') and delta.role:
choice_dict["delta"]["role"] = delta.role
chunk_dict["choices"].append(choice_dict)
if hasattr(chunk, 'usage') and chunk.usage:
chunk_dict["usage"] = {
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens
}
return f"data: {json.dumps(chunk_dict)}\n\n"
# Use in a FastAPI endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/v1/chat/completions")
async def openai_compatible_endpoint(messages: list):
thread = Thread()
for msg in messages:
thread.add_message(Message(role=msg["role"], content=msg["content"]))
async def generate():
async for chunk in agent.stream(thread, mode="raw"):
yield serialize_chunk_to_sse(chunk)
return StreamingResponse(generate(), media_type="text/event-stream")
See examples/005_raw_streaming.py for a complete working example.
Understanding Execution Events
ExecutionEvent objects provide detailed information about the agent’s execution:
from tyler import EventType
async for event in agent.stream(thread):
if event.type == EventType.LLM_STREAM_CHUNK:
# Text being generated
print(event.data.get("content_chunk", ""), end="", flush=True)
elif event.type == EventType.TOOL_SELECTED:
# Tool is about to be called
print(f"\n🔧 Calling tool: {event.data['tool_name']}")
elif event.type == EventType.MESSAGE_CREATED:
# New message added to thread
msg = event.data["message"]
if msg.role == "tool":
print(f"\n✅ Tool {msg.name} completed")
elif event.type == EventType.EXECUTION_COMPLETE:
# All processing complete
print(f"\n✅ Complete in {event.data['duration_ms']:.0f}ms!")
Thinking Tokens (Reasoning Content)
Requires LiteLLM >= 1.63.0 and a reasoning-capable model like OpenAI o1 or Anthropic Claude with extended thinking.
Models like OpenAI o1 and Anthropic Claude can emit their reasoning process as separate “thinking tokens” alongside the response content. Tyler’s streaming API exposes these as dedicated LLM_THINKING_CHUNK events, allowing you to display reasoning separately from the final answer.
Why Use Thinking Tokens?
- Transparency: Show users how the AI arrived at its answer
- Debugging: Trace model reasoning for better agent development
- UX: Display thinking in a collapsible section or different style
- Trust: Users can verify the model’s reasoning process
Event Streaming with Thinking
from tyler import Agent, Thread, Message, EventType
agent = Agent(
name="thinking-agent",
model_name="anthropic/claude-3-7-sonnet-20250219", # or "o1-preview"
purpose="To demonstrate thinking tokens"
)
thread = Thread()
thread.add_message(Message(
role="user",
content="What's 137 * 284? Show your thinking."
))
print("💭 Thinking: ", end="", flush=True)
print("\n💬 Response: ", end="", flush=True)
async for event in agent.stream(thread):
if event.type == EventType.LLM_THINKING_CHUNK:
# Thinking/reasoning tokens (separate from content)
thinking = event.data['thinking_chunk']
thinking_type = event.data['thinking_type'] # "reasoning", "thinking", etc.
print(f"\n[{thinking_type}] {thinking}", flush=True)
elif event.type == EventType.LLM_STREAM_CHUNK:
# Regular response content
print(event.data['content_chunk'], end="", flush=True)
Output:
💭 Thinking:
[reasoning] Let me calculate 137 * 284 step by step...
[reasoning] 137 * 284 = 137 * (280 + 4) = 137 * 280 + 137 * 4...
💬 Response: The answer is 38,908.
Thinking in Message Object
After streaming completes, thinking content is stored as a top-level field on the message:
async for event in agent.stream(thread):
if event.type == EventType.MESSAGE_CREATED:
msg = event.data['message']
if msg.role == "assistant":
# Access complete reasoning (top-level field)
if msg.reasoning_content:
print(f"Full reasoning: {msg.reasoning_content}")
Raw Streaming with Thinking
Raw mode preserves all thinking fields from LiteLLM:
async for chunk in agent.stream(thread, mode="raw"):
if hasattr(chunk, 'choices') and chunk.choices:
delta = chunk.choices[0].delta
# LiteLLM standardized field (v1.63.0+)
if hasattr(delta, 'reasoning_content') and delta.reasoning_content:
print(f"[Reasoning] {delta.reasoning_content}")
# Anthropic-specific field
if hasattr(delta, 'thinking') and delta.thinking:
print(f"[Thinking] {delta.thinking}")
# Regular content
if hasattr(delta, 'content') and delta.content:
print(delta.content, end="")
UI Pattern: Separated Display
A common pattern is showing thinking in a collapsible section:
thinking_section = []
response_section = []
async for event in agent.stream(thread):
if event.type == EventType.LLM_THINKING_CHUNK:
thinking_section.append(event.data['thinking_chunk'])
elif event.type == EventType.LLM_STREAM_CHUNK:
response_section.append(event.data['content_chunk'])
# Display in UI
print("─── Thinking Process (click to expand) ───")
print(''.join(thinking_section))
print("─── Response ───")
print(''.join(response_section))
Supported Models
OpenAI:
o1-preview (reasoning_content)
o1-mini (reasoning_content)
Anthropic:
claude-3-7-sonnet-20250219 with extended thinking
- Future Claude models with thinking capability
Other Providers via LiteLLM:
- Deepseek
- XAI
- Google AI Studio
- Perplexity (Magistral models)
- Groq
See LiteLLM docs for full list.
Backward Compatibility
Models without thinking support work unchanged - no LLM_THINKING_CHUNK events are emitted:
agent = Agent(name="regular", model_name="gpt-4o") # No thinking
async for event in agent.stream(thread):
if event.type == EventType.LLM_THINKING_CHUNK:
# This won't execute for non-reasoning models
pass
elif event.type == EventType.LLM_STREAM_CHUNK:
# Regular content streaming works as before
print(event.data['content_chunk'], end="")
See packages/tyler/examples/006_thinking_tokens.py for complete working examples.
See tool usage in real-time:
from lye import WEB_TOOLS, FILES_TOOLS
agent = Agent(
name="research-assistant",
model_name="gpt-4",
purpose="To research and create reports",
tools=[*WEB_TOOLS, *FILES_TOOLS]
)
async def research_with_streaming(topic: str):
thread = Thread()
message = Message(
role="user",
content=f"Research {topic} and create a summary"
)
thread.add_message(message)
async for event in agent.stream(thread):
if event.type == EventType.LLM_STREAM_CHUNK:
print(event.data.get("content_chunk", ""), end="", flush=True)
elif event.type == EventType.TOOL_SELECTED:
tool_name = event.data["tool_name"]
print(f"\n\n🔧 Using {tool_name}...", flush=True)
elif event.type == EventType.TOOL_RESULT:
result = event.data["result"]
# Show abbreviated tool results
if len(result) > 100:
print(f" Result: {result[:100]}...")
else:
print(f" Result: {result}")
print("\n🤖 ", end="", flush=True)
Building interactive applications
Terminal chat interface
import asyncio
from tyler import Agent, Thread, Message, ThreadStore
from tyler.models.execution import ExecutionEvent, EventType
class StreamingChat:
def __init__(self):
self.agent = None
self.thread_store = None
self.thread = None
async def initialize(self):
self.thread_store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
self.agent = Agent(
name="chat-assistant",
model_name="gpt-4",
purpose="To have helpful conversations",
thread_store=self.thread_store
)
async def start_session(self, session_id: str):
try:
self.thread = await self.thread_store.get_thread(session_id)
print("📚 Resuming conversation...")
except:
self.thread = Thread(id=session_id)
print("🆕 Starting new conversation...")
async def send_message(self, content: str):
message = Message(role="user", content=content)
self.thread.add_message(message)
print("\n🤖 ", end="", flush=True)
async for update in self.agent.stream(self.thread):
if update.type == EventType.LLM_STREAM_CHUNK:
print(update.data.get("content_chunk", ""), end="", flush=True)
elif update.type == EventType.EXECUTION_COMPLETE:
# Thread is already updated in place
await self.thread_store.save_thread(self.thread)
print("\n")
# Usage
async def main():
chat = StreamingChat()
await chat.initialize()
await chat.start_session("main-chat")
while True:
user_input = input("\nYou: ")
if user_input.lower() in ['exit', 'quit']:
break
await chat.send_message(user_input)
asyncio.run(main())
Web application streaming
For web applications, you can stream to a WebSocket or Server-Sent Events:
# FastAPI example with WebSocket
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
agent = Agent(
name="web-assistant",
model_name="gpt-4",
purpose="To assist web users"
)
thread = Thread()
while True:
# Receive message from client
data = await websocket.receive_text()
message_data = json.loads(data)
# Add to thread
message = Message(role="user", content=message_data["content"])
thread.add_message(message)
# Stream response
async for update in agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
await websocket.send_json({
"type": "content",
"data": update.data
})
elif update.type == EventType.TOOL_SELECTED:
await websocket.send_json({
"type": "tool",
"name": update.data.get("tool_name", ""),
"content": str(update.data.get("arguments", ""))[:100] + "..."
})
elif update.type == EventType.EXECUTION_COMPLETE:
# Thread is already updated in place
await websocket.send_json({"type": "complete"})
Advanced Streaming Patterns
Progress indicators
Show progress for long-running tasks:
async def stream_with_progress():
thread = Thread()
message = Message(
role="user",
content="Analyze these 10 websites and create a report"
)
thread.add_message(message)
tool_count = 0
content_buffer = []
async for update in agent.stream(thread):
if update.type == EventType.TOOL_SELECTED:
tool_count += 1
print(f"\r⏳ Processing... ({tool_count} tools used)", end="", flush=True)
elif update.type == EventType.LLM_STREAM_CHUNK:
content_buffer.append(update.data.get("content_chunk", ""))
elif update.type == EventType.EXECUTION_COMPLETE:
print("\r✅ Complete!" + " " * 30) # Clear progress
print("\n🤖 " + "".join(content_buffer))
Buffered streaming
For smoother output, buffer chunks:
class BufferedStreamer:
def __init__(self, buffer_size: int = 5):
self.buffer = []
self.buffer_size = buffer_size
async def stream(self, agent, thread):
async for update in agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
self.buffer.append(update.data.get("content_chunk", ""))
if len(self.buffer) >= self.buffer_size:
yield "".join(self.buffer)
self.buffer = []
elif update.type == EventType.EXECUTION_COMPLETE:
if self.buffer:
yield "".join(self.buffer)
yield {"type": "complete", "thread": update.data}
# Usage
streamer = BufferedStreamer()
async for chunk in streamer.stream(agent, thread):
if isinstance(chunk, str):
print(chunk, end="", flush=True)
else:
# Handle completion
pass
Cancellable streaming
Allow users to stop generation:
import asyncio
class CancellableStream:
def __init__(self):
self.cancelled = False
async def stream_with_cancel(self, agent, thread):
try:
async for update in agent.stream(thread):
if self.cancelled:
print("\n\n⚠️ Generation cancelled by user")
break
if update.type == EventType.LLM_STREAM_CHUNK:
print(update.data.get("content_chunk", ""), end="", flush=True)
except asyncio.CancelledError:
print("\n\n⚠️ Stream interrupted")
def cancel(self):
self.cancelled = True
# Usage with keyboard interrupt
import signal
stream_handler = CancellableStream()
def signal_handler(sig, frame):
stream_handler.cancel()
signal.signal(signal.SIGINT, signal_handler)
Streaming UI Components
Rich terminal UI
Using the rich library for better terminal output:
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.panel import Panel
console = Console()
async def rich_streaming():
thread = Thread()
message = Message(role="user", content="Explain quantum computing")
thread.add_message(message)
content = ""
with Live(Panel("", title="🤖 Assistant"), refresh_per_second=10) as live:
async for update in agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
content += update.data.get("content_chunk", "")
live.update(Panel(Markdown(content), title="🤖 Assistant"))
elif update.type == EventType.TOOL_SELECTED:
tool_panel = Panel(
f"Using: {update.data.get('tool_name', '')}",
title="🔧 Tool",
style="yellow"
)
console.print(tool_panel)
Token counting
Track tokens during streaming:
class TokenCounter:
def __init__(self):
self.total_tokens = 0
self.chunks = []
async def count_stream(self, agent, thread):
async for update in agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
self.chunks.append(update.data.get("content_chunk", ""))
# Rough estimate: 1 token ≈ 4 characters
self.total_tokens += len(update.data) // 4
yield update
print(f"\n\n📊 Approximate tokens used: {self.total_tokens}")
1. Chunk Size Optimization
Larger chunks reduce overhead but decrease responsiveness:
# Configure in your agent if supported
agent = Agent(
name="optimized-streamer",
model_name="gpt-4",
purpose="To stream efficiently",
# streaming_chunk_size=10 # If available
)
2. Async Processing
Process streams asynchronously for better performance:
async def process_multiple_streams():
tasks = []
for query in queries:
thread = Thread()
thread.add_message(Message(role="user", content=query))
task = asyncio.create_task(collect_stream(agent, thread))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def collect_stream(agent, thread):
content = []
async for update in agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
content.append(update.data.get("content_chunk", ""))
return "".join(content)
3. Error Handling in Streams
from datetime import datetime
async def safe_stream(agent, thread):
try:
async for update in agent.stream(thread):
yield update
except asyncio.TimeoutError:
yield ExecutionEvent(
type=EventType.EXECUTION_ERROR,
timestamp=datetime.now(),
data={"message": "Stream timed out"}
)
except Exception as e:
yield ExecutionEvent(
type=EventType.EXECUTION_ERROR,
timestamp=datetime.now(),
data={"message": f"Stream error: {str(e)}"}
)
Real-World Example: Live Research Assistant
import asyncio
from datetime import datetime
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
from lye import WEB_TOOLS, FILES_TOOLS
class LiveResearchAssistant:
def __init__(self):
self.agent = Agent(
name="live-researcher",
model_name="gpt-4",
purpose="To conduct research and provide real-time updates",
tools=[*WEB_TOOLS, *FILES_TOOLS]
)
async def research(self, topic: str, save_to_file: bool = True):
thread = Thread()
message = Message(
role="user",
content=f"""
Research '{topic}' comprehensively:
1. Search for recent information
2. Analyze multiple sources
3. Create a detailed report
{"4. Save the report to a file" if save_to_file else ""}
"""
)
thread.add_message(message)
print(f"\n{'='*50}")
print(f"🔍 Researching: {topic}")
print(f"⏰ Started: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*50}\n")
content_buffer = []
tool_uses = []
async for update in self.agent.stream(thread):
if update.type == EventType.LLM_STREAM_CHUNK:
chunk = update.data.get("content_chunk", "")
content_buffer.append(chunk)
print(chunk, end="", flush=True)
elif update.type == EventType.TOOL_SELECTED:
tool_name = update.data.get("tool_name", "")
tool_uses.append(tool_name)
# Show tool use inline
print(f"\n\n[🔧 {tool_name}]", end="")
if tool_name == "web-search":
print(f" Searching for information...")
elif tool_name == "files-write":
print(f" Saving report...")
print("\n", end="")
elif update.type == EventType.EXECUTION_COMPLETE:
print(f"\n\n{'='*50}")
print(f"✅ Research Complete!")
print(f"📊 Tools used: {', '.join(set(tool_uses))}")
print(f"📝 Total length: {len(''.join(content_buffer))} characters")
print(f"⏰ Finished: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*50}")
return update.data
# Usage
async def main():
assistant = LiveResearchAssistant()
topics = [
"Latest breakthroughs in quantum computing",
"Climate change solutions for 2024",
"AI safety research progress"
]
for topic in topics:
await assistant.research(topic, save_to_file=True)
print("\n" + "="*70 + "\n")
asyncio.run(main())
Next steps