Skip to main content
Streaming responses enable your agents to provide real-time feedback, making interactions feel more natural and responsive. Instead of waiting for the entire response, users see content as it’s generated.

Why Use Streaming?

Traditional (non-streaming) approach:
  • User waits for entire response
  • No feedback during processing
  • Can feel slow for long responses
Streaming approach:
  • Immediate visual feedback
  • See responses as they’re generated
  • Better user experience
  • Can see tool usage in real-time

Streaming Modes

Tyler supports three streaming modes via the stream parameter:
ModeValueOutput TypeUse Case
Non-StreamingFalse (default)AgentResultSimple request/response
Event StreamingTrue or "events"ExecutionEventRich observability, tool tracking
Raw Streaming"raw"Raw LiteLLM chunksOpenAI compatibility, proxying
The default streaming mode with full observability:
import asyncio
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType

agent = Agent(
    name="streaming-assistant",
    model_name="gpt-4",
    purpose="To provide real-time responses"
)

async def stream_response():
    thread = Thread()
    message = Message(role="user", content="Tell me a story about space exploration")
    thread.add_message(message)
    
    print("🤖 Assistant: ", end="", flush=True)
    
    # Use stream=True or stream="events" (both work identically)
    async for event in agent.stream(thread):
        if event.type == EventType.LLM_STREAM_CHUNK:
            print(event.data.get("content_chunk", ""), end="", flush=True)
    
    print()  # New line at the end

asyncio.run(stream_response())

Raw Streaming (Advanced)

Raw mode is for advanced use cases. Tools ARE executed for full agentic behavior, but you only receive raw chunks (no ExecutionEvents).
Stream raw LiteLLM chunks for OpenAI compatibility:
import asyncio
from tyler import Agent, Thread, Message

agent = Agent(
    name="proxy-assistant",
    model_name="gpt-4o",
    purpose="OpenAI-compatible streaming"
)

async def raw_stream_response():
    thread = Thread()
    message = Message(role="user", content="Hello!")
    thread.add_message(message)
    
    # Get raw OpenAI-compatible chunks
    async for chunk in agent.stream(thread, mode="raw"):
        # chunk is a raw LiteLLM object
        if hasattr(chunk, 'choices') and chunk.choices:
            delta = chunk.choices[0].delta
            
            # Delta can be dict or object depending on LiteLLM version
            if isinstance(delta, dict):
                content = delta.get('content')
            else:
                content = getattr(delta, 'content', None)
            
            if content:
                print(content, end="", flush=True)
        
        # Usage info in final chunk
        if hasattr(chunk, 'usage') and chunk.usage:
            print(f"\n\nTokens: {chunk.usage.total_tokens}")

asyncio.run(raw_stream_response())
When to use raw mode:
  • Building OpenAI API proxies or gateways
  • Direct integration with OpenAI-compatible clients
  • Minimal latency requirements (no transformation overhead)
How it works:
  • ✅ Tools ARE executed (fully agentic behavior)
  • ✅ Multi-turn iteration supported
  • ✅ Frontend sees finish_reason: "tool_calls" in chunks
  • ⚠️ No ExecutionEvent telemetry (only raw chunks)
  • ⚠️ Silent during tool execution (brief pauses expected)
  • ⚠️ Consumer must handle chunk formatting (SSE serialization)
Matches the pattern from OpenAI’s Agents SDK: Raw chunks → finish_reason=“tool_calls” → [agent executes tools] → more raw chunks → repeat SSE Serialization Example:
import json

def serialize_chunk_to_sse(chunk) -> str:
    """Convert raw chunk to Server-Sent Events format"""
    chunk_dict = {
        "id": getattr(chunk, 'id', 'unknown'),
        "object": getattr(chunk, 'object', 'chat.completion.chunk'),
        "created": getattr(chunk, 'created', 0),
        "model": getattr(chunk, 'model', 'unknown'),
        "choices": []
    }
    
    if hasattr(chunk, 'choices') and chunk.choices:
        for choice in chunk.choices:
            choice_dict = {
                "index": getattr(choice, 'index', 0),
                "delta": {},
                "finish_reason": getattr(choice, 'finish_reason', None)
            }
            
            if hasattr(choice, 'delta'):
                delta = choice.delta
                if isinstance(delta, dict):
                    choice_dict["delta"] = delta
                else:
                    if hasattr(delta, 'content') and delta.content:
                        choice_dict["delta"]["content"] = delta.content
                    if hasattr(delta, 'role') and delta.role:
                        choice_dict["delta"]["role"] = delta.role
            
            chunk_dict["choices"].append(choice_dict)
    
    if hasattr(chunk, 'usage') and chunk.usage:
        chunk_dict["usage"] = {
            "prompt_tokens": chunk.usage.prompt_tokens,
            "completion_tokens": chunk.usage.completion_tokens,
            "total_tokens": chunk.usage.total_tokens
        }
    
    return f"data: {json.dumps(chunk_dict)}\n\n"

# Use in a FastAPI endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/v1/chat/completions")
async def openai_compatible_endpoint(messages: list):
    thread = Thread()
    for msg in messages:
        thread.add_message(Message(role=msg["role"], content=msg["content"]))
    
    async def generate():
        async for chunk in agent.stream(thread, mode="raw"):
            yield serialize_chunk_to_sse(chunk)
    
    return StreamingResponse(generate(), media_type="text/event-stream")
See examples/005_raw_streaming.py for a complete working example.

Understanding Execution Events

ExecutionEvent objects provide detailed information about the agent’s execution:
from tyler import EventType

async for event in agent.stream(thread):
    if event.type == EventType.LLM_STREAM_CHUNK:
        # Text being generated
        print(event.data.get("content_chunk", ""), end="", flush=True)
    
    elif event.type == EventType.TOOL_SELECTED:
        # Tool is about to be called
        print(f"\n🔧 Calling tool: {event.data['tool_name']}")
    
    elif event.type == EventType.MESSAGE_CREATED:
        # New message added to thread
        msg = event.data["message"]
        if msg.role == "tool":
            print(f"\n✅ Tool {msg.name} completed")
    
    elif event.type == EventType.EXECUTION_COMPLETE:
        # All processing complete
        print(f"\n✅ Complete in {event.data['duration_ms']:.0f}ms!")

Thinking Tokens (Reasoning Content)

Requires LiteLLM >= 1.63.0 and a reasoning-capable model like OpenAI o1 or Anthropic Claude with extended thinking.
Models like OpenAI o1 and Anthropic Claude can emit their reasoning process as separate “thinking tokens” alongside the response content. Tyler’s streaming API exposes these as dedicated LLM_THINKING_CHUNK events, allowing you to display reasoning separately from the final answer.

Why Use Thinking Tokens?

  • Transparency: Show users how the AI arrived at its answer
  • Debugging: Trace model reasoning for better agent development
  • UX: Display thinking in a collapsible section or different style
  • Trust: Users can verify the model’s reasoning process

Event Streaming with Thinking

from tyler import Agent, Thread, Message, EventType

agent = Agent(
    name="thinking-agent",
    model_name="anthropic/claude-3-7-sonnet-20250219",  # or "o1-preview"
    purpose="To demonstrate thinking tokens"
)

thread = Thread()
thread.add_message(Message(
    role="user",
    content="What's 137 * 284? Show your thinking."
))

print("💭 Thinking: ", end="", flush=True)
print("\n💬 Response: ", end="", flush=True)

async for event in agent.stream(thread):
    if event.type == EventType.LLM_THINKING_CHUNK:
        # Thinking/reasoning tokens (separate from content)
        thinking = event.data['thinking_chunk']
        thinking_type = event.data['thinking_type']  # "reasoning", "thinking", etc.
        print(f"\n[{thinking_type}] {thinking}", flush=True)
    
    elif event.type == EventType.LLM_STREAM_CHUNK:
        # Regular response content
        print(event.data['content_chunk'], end="", flush=True)
Output:
💭 Thinking:
[reasoning] Let me calculate 137 * 284 step by step...
[reasoning] 137 * 284 = 137 * (280 + 4) = 137 * 280 + 137 * 4...

💬 Response: The answer is 38,908.

Thinking in Message Object

After streaming completes, thinking content is stored as a top-level field on the message:
async for event in agent.stream(thread):
    if event.type == EventType.MESSAGE_CREATED:
        msg = event.data['message']
        if msg.role == "assistant":
            # Access complete reasoning (top-level field)
            if msg.reasoning_content:
                print(f"Full reasoning: {msg.reasoning_content}")

Raw Streaming with Thinking

Raw mode preserves all thinking fields from LiteLLM:
async for chunk in agent.stream(thread, mode="raw"):
    if hasattr(chunk, 'choices') and chunk.choices:
        delta = chunk.choices[0].delta
        
        # LiteLLM standardized field (v1.63.0+)
        if hasattr(delta, 'reasoning_content') and delta.reasoning_content:
            print(f"[Reasoning] {delta.reasoning_content}")
        
        # Anthropic-specific field
        if hasattr(delta, 'thinking') and delta.thinking:
            print(f"[Thinking] {delta.thinking}")
        
        # Regular content
        if hasattr(delta, 'content') and delta.content:
            print(delta.content, end="")

UI Pattern: Separated Display

A common pattern is showing thinking in a collapsible section:
thinking_section = []
response_section = []

async for event in agent.stream(thread):
    if event.type == EventType.LLM_THINKING_CHUNK:
        thinking_section.append(event.data['thinking_chunk'])
    elif event.type == EventType.LLM_STREAM_CHUNK:
        response_section.append(event.data['content_chunk'])

# Display in UI
print("─── Thinking Process (click to expand) ───")
print(''.join(thinking_section))
print("─── Response ───")
print(''.join(response_section))

Supported Models

OpenAI:
  • o1-preview (reasoning_content)
  • o1-mini (reasoning_content)
Anthropic:
  • claude-3-7-sonnet-20250219 with extended thinking
  • Future Claude models with thinking capability
Other Providers via LiteLLM:
  • Deepseek
  • XAI
  • Google AI Studio
  • Perplexity (Magistral models)
  • Groq
See LiteLLM docs for full list.

Backward Compatibility

Models without thinking support work unchanged - no LLM_THINKING_CHUNK events are emitted:
agent = Agent(name="regular", model_name="gpt-4o")  # No thinking

async for event in agent.stream(thread):
    if event.type == EventType.LLM_THINKING_CHUNK:
        # This won't execute for non-reasoning models
        pass
    elif event.type == EventType.LLM_STREAM_CHUNK:
        # Regular content streaming works as before
        print(event.data['content_chunk'], end="")
See packages/tyler/examples/006_thinking_tokens.py for complete working examples.

Streaming with Tools

See tool usage in real-time:
from lye import WEB_TOOLS, FILES_TOOLS

agent = Agent(
    name="research-assistant",
    model_name="gpt-4",
    purpose="To research and create reports",
    tools=[*WEB_TOOLS, *FILES_TOOLS]
)

async def research_with_streaming(topic: str):
    thread = Thread()
    message = Message(
        role="user",
        content=f"Research {topic} and create a summary"
    )
    thread.add_message(message)
    
    async for event in agent.stream(thread):
        if event.type == EventType.LLM_STREAM_CHUNK:
            print(event.data.get("content_chunk", ""), end="", flush=True)
        
        elif event.type == EventType.TOOL_SELECTED:
            tool_name = event.data["tool_name"]
            print(f"\n\n🔧 Using {tool_name}...", flush=True)
        
        elif event.type == EventType.TOOL_RESULT:
            result = event.data["result"]
            # Show abbreviated tool results
            if len(result) > 100:
                print(f"   Result: {result[:100]}...")
            else:
                print(f"   Result: {result}")
            
            print("\n🤖 ", end="", flush=True)

Building interactive applications

Terminal chat interface

import asyncio
from tyler import Agent, Thread, Message, ThreadStore
from tyler.models.execution import ExecutionEvent, EventType

class StreamingChat:
    def __init__(self):
        self.agent = None
        self.thread_store = None
        self.thread = None
    
    async def initialize(self):
        self.thread_store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
        self.agent = Agent(
            name="chat-assistant",
            model_name="gpt-4",
            purpose="To have helpful conversations",
            thread_store=self.thread_store
        )
    
    async def start_session(self, session_id: str):
        try:
            self.thread = await self.thread_store.get_thread(session_id)
            print("📚 Resuming conversation...")
        except:
            self.thread = Thread(id=session_id)
            print("🆕 Starting new conversation...")
    
    async def send_message(self, content: str):
        message = Message(role="user", content=content)
        self.thread.add_message(message)
        
        print("\n🤖 ", end="", flush=True)
        
        async for update in self.agent.stream(self.thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                print(update.data.get("content_chunk", ""), end="", flush=True)
            elif update.type == EventType.EXECUTION_COMPLETE:
                # Thread is already updated in place
                await self.thread_store.save_thread(self.thread)
        
        print("\n")

# Usage
async def main():
    chat = StreamingChat()
    await chat.initialize()
    await chat.start_session("main-chat")
    
    while True:
        user_input = input("\nYou: ")
        if user_input.lower() in ['exit', 'quit']:
            break
        
        await chat.send_message(user_input)

asyncio.run(main())

Web application streaming

For web applications, you can stream to a WebSocket or Server-Sent Events:
# FastAPI example with WebSocket
from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    agent = Agent(
        name="web-assistant",
        model_name="gpt-4",
        purpose="To assist web users"
    )
    
    thread = Thread()
    
    while True:
        # Receive message from client
        data = await websocket.receive_text()
        message_data = json.loads(data)
        
        # Add to thread
        message = Message(role="user", content=message_data["content"])
        thread.add_message(message)
        
        # Stream response
        async for update in agent.stream(thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                await websocket.send_json({
                    "type": "content",
                    "data": update.data
                })
            
            elif update.type == EventType.TOOL_SELECTED:
                await websocket.send_json({
                    "type": "tool",
                    "name": update.data.get("tool_name", ""),
                    "content": str(update.data.get("arguments", ""))[:100] + "..."
                })
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                # Thread is already updated in place
                await websocket.send_json({"type": "complete"})

Advanced Streaming Patterns

Progress indicators

Show progress for long-running tasks:
async def stream_with_progress():
    thread = Thread()
    message = Message(
        role="user",
        content="Analyze these 10 websites and create a report"
    )
    thread.add_message(message)
    
    tool_count = 0
    content_buffer = []
    
    async for update in agent.stream(thread):
        if update.type == EventType.TOOL_SELECTED:
            tool_count += 1
            print(f"\r⏳ Processing... ({tool_count} tools used)", end="", flush=True)
        
        elif update.type == EventType.LLM_STREAM_CHUNK:
            content_buffer.append(update.data.get("content_chunk", ""))
        
        elif update.type == EventType.EXECUTION_COMPLETE:
            print("\r✅ Complete!" + " " * 30)  # Clear progress
            print("\n🤖 " + "".join(content_buffer))

Buffered streaming

For smoother output, buffer chunks:
class BufferedStreamer:
    def __init__(self, buffer_size: int = 5):
        self.buffer = []
        self.buffer_size = buffer_size
    
    async def stream(self, agent, thread):
        async for update in agent.stream(thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                self.buffer.append(update.data.get("content_chunk", ""))
                
                if len(self.buffer) >= self.buffer_size:
                    yield "".join(self.buffer)
                    self.buffer = []
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                if self.buffer:
                    yield "".join(self.buffer)
                yield {"type": "complete", "thread": update.data}

# Usage
streamer = BufferedStreamer()
async for chunk in streamer.stream(agent, thread):
    if isinstance(chunk, str):
        print(chunk, end="", flush=True)
    else:
        # Handle completion
        pass

Cancellable streaming

Allow users to stop generation:
import asyncio

class CancellableStream:
    def __init__(self):
        self.cancelled = False
    
    async def stream_with_cancel(self, agent, thread):
        try:
            async for update in agent.stream(thread):
                if self.cancelled:
                    print("\n\n⚠️  Generation cancelled by user")
                    break
                
                if update.type == EventType.LLM_STREAM_CHUNK:
                    print(update.data.get("content_chunk", ""), end="", flush=True)
        except asyncio.CancelledError:
            print("\n\n⚠️  Stream interrupted")
    
    def cancel(self):
        self.cancelled = True

# Usage with keyboard interrupt
import signal

stream_handler = CancellableStream()

def signal_handler(sig, frame):
    stream_handler.cancel()

signal.signal(signal.SIGINT, signal_handler)

Streaming UI Components

Rich terminal UI

Using the rich library for better terminal output:
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.panel import Panel

console = Console()

async def rich_streaming():
    thread = Thread()
    message = Message(role="user", content="Explain quantum computing")
    thread.add_message(message)
    
    content = ""
    
    with Live(Panel("", title="🤖 Assistant"), refresh_per_second=10) as live:
        async for update in agent.stream(thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                content += update.data.get("content_chunk", "")
                live.update(Panel(Markdown(content), title="🤖 Assistant"))
            
            elif update.type == EventType.TOOL_SELECTED:
                tool_panel = Panel(
                    f"Using: {update.data.get('tool_name', '')}",
                    title="🔧 Tool",
                    style="yellow"
                )
                console.print(tool_panel)

Token counting

Track tokens during streaming:
class TokenCounter:
    def __init__(self):
        self.total_tokens = 0
        self.chunks = []
    
    async def count_stream(self, agent, thread):
        async for update in agent.stream(thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                self.chunks.append(update.data.get("content_chunk", ""))
                # Rough estimate: 1 token ≈ 4 characters
                self.total_tokens += len(update.data) // 4
                
                yield update
        
        print(f"\n\n📊 Approximate tokens used: {self.total_tokens}")

Performance tips

1. Chunk Size Optimization

Larger chunks reduce overhead but decrease responsiveness:
# Configure in your agent if supported
agent = Agent(
    name="optimized-streamer",
    model_name="gpt-4",
    purpose="To stream efficiently",
    # streaming_chunk_size=10  # If available
)

2. Async Processing

Process streams asynchronously for better performance:
async def process_multiple_streams():
    tasks = []
    
    for query in queries:
        thread = Thread()
        thread.add_message(Message(role="user", content=query))
        task = asyncio.create_task(collect_stream(agent, thread))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

async def collect_stream(agent, thread):
    content = []
    async for update in agent.stream(thread):
        if update.type == EventType.LLM_STREAM_CHUNK:
            content.append(update.data.get("content_chunk", ""))
    return "".join(content)

3. Error Handling in Streams

from datetime import datetime

async def safe_stream(agent, thread):
    try:
        async for update in agent.stream(thread):
            yield update
    except asyncio.TimeoutError:
        yield ExecutionEvent(
            type=EventType.EXECUTION_ERROR,
            timestamp=datetime.now(),
            data={"message": "Stream timed out"}
        )
    except Exception as e:
        yield ExecutionEvent(
            type=EventType.EXECUTION_ERROR,
            timestamp=datetime.now(),
            data={"message": f"Stream error: {str(e)}"}
        )

Real-World Example: Live Research Assistant

import asyncio
from datetime import datetime
from tyler import Agent, Thread, Message
from tyler.models.execution import ExecutionEvent, EventType
from lye import WEB_TOOLS, FILES_TOOLS

class LiveResearchAssistant:
    def __init__(self):
        self.agent = Agent(
            name="live-researcher",
            model_name="gpt-4",
            purpose="To conduct research and provide real-time updates",
            tools=[*WEB_TOOLS, *FILES_TOOLS]
        )
    
    async def research(self, topic: str, save_to_file: bool = True):
        thread = Thread()
        message = Message(
            role="user",
            content=f"""
            Research '{topic}' comprehensively:
            1. Search for recent information
            2. Analyze multiple sources
            3. Create a detailed report
            {"4. Save the report to a file" if save_to_file else ""}
            """
        )
        thread.add_message(message)
        
        print(f"\n{'='*50}")
        print(f"🔍 Researching: {topic}")
        print(f"⏰ Started: {datetime.now().strftime('%H:%M:%S')}")
        print(f"{'='*50}\n")
        
        content_buffer = []
        tool_uses = []
        
        async for update in self.agent.stream(thread):
            if update.type == EventType.LLM_STREAM_CHUNK:
                chunk = update.data.get("content_chunk", "")
                content_buffer.append(chunk)
                print(chunk, end="", flush=True)
            
            elif update.type == EventType.TOOL_SELECTED:
                tool_name = update.data.get("tool_name", "")
                tool_uses.append(tool_name)
                
                # Show tool use inline
                print(f"\n\n[🔧 {tool_name}]", end="")
                if tool_name == "web-search":
                    print(f" Searching for information...")
                elif tool_name == "files-write":
                    print(f" Saving report...")
                print("\n", end="")
            
            elif update.type == EventType.EXECUTION_COMPLETE:
                print(f"\n\n{'='*50}")
                print(f"✅ Research Complete!")
                print(f"📊 Tools used: {', '.join(set(tool_uses))}")
                print(f"📝 Total length: {len(''.join(content_buffer))} characters")
                print(f"⏰ Finished: {datetime.now().strftime('%H:%M:%S')}")
                print(f"{'='*50}")
                
                return update.data

# Usage
async def main():
    assistant = LiveResearchAssistant()
    
    topics = [
        "Latest breakthroughs in quantum computing",
        "Climate change solutions for 2024",
        "AI safety research progress"
    ]
    
    for topic in topics:
        await assistant.research(topic, save_to_file=True)
        print("\n" + "="*70 + "\n")

asyncio.run(main())

Next steps