Streaming responses enable your agents to provide real-time feedback, making interactions feel more natural and responsive. Instead of waiting for the entire response, users see content as itβs generated.π» Code Examples
The default streaming mode with full observability:
import asynciofrom tyler import Agent, Thread, Messagefrom tyler.models.execution import ExecutionEvent, EventTypeagent = Agent( name="streaming-assistant", model_name="gpt-4", purpose="To provide real-time responses")async def stream_response(): thread = Thread() message = Message(role="user", content="Tell me a story about space exploration") thread.add_message(message) print("π€ Assistant: ", end="", flush=True) # Use stream=True or stream="events" (both work identically) async for event in agent.stream(thread): if event.type == EventType.LLM_STREAM_CHUNK: print(event.data.get("content_chunk", ""), end="", flush=True) print() # New line at the endasyncio.run(stream_response())
OpenAI mode is for advanced use cases. Tools ARE executed for full agentic behavior, but you only receive raw chunks (no ExecutionEvents).
Stream raw LiteLLM chunks for OpenAI compatibility:
import asynciofrom tyler import Agent, Thread, Messageagent = Agent( name="proxy-assistant", model_name="gpt-4o", purpose="OpenAI-compatible streaming")async def openai_stream_response(): thread = Thread() message = Message(role="user", content="Hello!") thread.add_message(message) # Get raw OpenAI-compatible chunks async for chunk in agent.stream(thread, mode="openai"): # chunk is a raw LiteLLM object if hasattr(chunk, 'choices') and chunk.choices: delta = chunk.choices[0].delta # Delta can be dict or object depending on LiteLLM version if isinstance(delta, dict): content = delta.get('content') else: content = getattr(delta, 'content', None) if content: print(content, end="", flush=True) # Usage info in final chunk if hasattr(chunk, 'usage') and chunk.usage: print(f"\n\nTokens: {chunk.usage.total_tokens}")asyncio.run(openai_stream_response())
When to use openai mode:
Building OpenAI API proxies or gateways
Direct integration with OpenAI-compatible clients
Minimal latency requirements (no transformation overhead)
How it works:
β Tools ARE executed (fully agentic behavior)
β Multi-turn iteration supported
β Frontend sees finish_reason: "tool_calls" in chunks
β οΈ No ExecutionEvent telemetry (only raw chunks)
β οΈ Silent during tool execution (brief pauses expected)
β οΈ Consumer must handle chunk formatting (SSE serialization)
Matches the pattern from OpenAIβs Agents SDK:
Raw chunks β finish_reason=βtool_callsβ β [agent executes tools] β more raw chunks β repeatSSE Serialization Example:
import jsondef serialize_chunk_to_sse(chunk) -> str: """Convert raw chunk to Server-Sent Events format""" chunk_dict = { "id": getattr(chunk, 'id', 'unknown'), "object": getattr(chunk, 'object', 'chat.completion.chunk'), "created": getattr(chunk, 'created', 0), "model": getattr(chunk, 'model', 'unknown'), "choices": [] } if hasattr(chunk, 'choices') and chunk.choices: for choice in chunk.choices: choice_dict = { "index": getattr(choice, 'index', 0), "delta": {}, "finish_reason": getattr(choice, 'finish_reason', None) } if hasattr(choice, 'delta'): delta = choice.delta if isinstance(delta, dict): choice_dict["delta"] = delta else: if hasattr(delta, 'content') and delta.content: choice_dict["delta"]["content"] = delta.content if hasattr(delta, 'role') and delta.role: choice_dict["delta"]["role"] = delta.role chunk_dict["choices"].append(choice_dict) if hasattr(chunk, 'usage') and chunk.usage: chunk_dict["usage"] = { "prompt_tokens": chunk.usage.prompt_tokens, "completion_tokens": chunk.usage.completion_tokens, "total_tokens": chunk.usage.total_tokens } return f"data: {json.dumps(chunk_dict)}\n\n"# Use in a FastAPI endpointfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponseapp = FastAPI()@app.get("/v1/chat/completions")async def openai_compatible_endpoint(messages: list): thread = Thread() for msg in messages: thread.add_message(Message(role=msg["role"], content=msg["content"])) async def generate(): async for chunk in agent.stream(thread, mode="openai"): yield serialize_chunk_to_sse(chunk) return StreamingResponse(generate(), media_type="text/event-stream")
See examples/005_openai_streaming.py for a complete working example.
ExecutionEvent objects provide detailed information about the agentβs execution:
from tyler import EventTypeasync for event in agent.stream(thread): if event.type == EventType.LLM_STREAM_CHUNK: # Text being generated print(event.data.get("content_chunk", ""), end="", flush=True) elif event.type == EventType.TOOL_SELECTED: # Tool is about to be called print(f"\nπ§ Calling tool: {event.data['tool_name']}") elif event.type == EventType.MESSAGE_CREATED: # New message added to thread msg = event.data["message"] if msg.role == "tool": print(f"\nβ Tool {msg.name} completed") elif event.type == EventType.EXECUTION_COMPLETE: # All processing complete print(f"\nβ Complete in {event.data['duration_ms']:.0f}ms!")
Requires LiteLLM >= 1.63.0 and a reasoning-capable model like OpenAI o1 or Anthropic Claude with extended thinking.
Models like OpenAI o1 and Anthropic Claude can emit their reasoning process as separate βthinking tokensβ alongside the response content. Tylerβs streaming API exposes these as dedicated LLM_THINKING_CHUNK events, allowing you to display reasoning separately from the final answer.
OpenAI mode preserves all thinking fields from LiteLLM:
async for chunk in agent.stream(thread, mode="openai"): if hasattr(chunk, 'choices') and chunk.choices: delta = chunk.choices[0].delta # LiteLLM standardized field (v1.63.0+) if hasattr(delta, 'reasoning_content') and delta.reasoning_content: print(f"[Reasoning] {delta.reasoning_content}") # Anthropic-specific field if hasattr(delta, 'thinking') and delta.thinking: print(f"[Thinking] {delta.thinking}") # Regular content if hasattr(delta, 'content') and delta.content: print(delta.content, end="")
A common pattern is showing thinking in a collapsible section:
thinking_section = []response_section = []async for event in agent.stream(thread): if event.type == EventType.LLM_THINKING_CHUNK: thinking_section.append(event.data['thinking_chunk']) elif event.type == EventType.LLM_STREAM_CHUNK: response_section.append(event.data['content_chunk'])# Display in UIprint("βββ Thinking Process (click to expand) βββ")print(''.join(thinking_section))print("βββ Response βββ")print(''.join(response_section))
Models without thinking support work unchanged - no LLM_THINKING_CHUNK events are emitted:
agent = Agent(name="regular", model_name="gpt-4o") # No thinkingasync for event in agent.stream(thread): if event.type == EventType.LLM_THINKING_CHUNK: # This won't execute for non-reasoning models pass elif event.type == EventType.LLM_STREAM_CHUNK: # Regular content streaming works as before print(event.data['content_chunk'], end="")
See packages/tyler/examples/006_thinking_tokens.py for complete working examples.
Larger chunks reduce overhead but decrease responsiveness:
# Configure in your agent if supportedagent = Agent( name="optimized-streamer", model_name="gpt-4", purpose="To stream efficiently", # streaming_chunk_size=10 # If available)