Overview
TheExecutionEvent
class represents individual events that occur during agent execution. These events provide real-time visibility into what the agent is doing, making them essential for streaming responses, debugging, and monitoring.
Class Definition
Copy
@dataclass
class ExecutionEvent:
type: EventType # The type of event
timestamp: datetime # When the event occurred
data: Dict[str, Any] # Event-specific data
metadata: Optional[Dict[str, Any]] = None # Additional metadata
Properties
The type of event that occurred (e.g., LLM_REQUEST, TOOL_SELECTED)
UTC timestamp of when the event occurred
Event-specific data. Structure varies by event type
Optional additional metadata for the event
Event Types and Data
Each event type includes specific data fields:LLM Events
Copy
# LLM_REQUEST
{
"message_count": int, # Number of messages in context
"model": str, # Model being used
"temperature": float # Temperature setting
}
# LLM_RESPONSE
{
"content": str, # Response content
"tool_calls": List[Dict], # Tool calls if any
"tokens": { # Token usage
"prompt_tokens": int,
"completion_tokens": int,
"total_tokens": int
},
"latency_ms": float # Response time
}
# LLM_STREAM_CHUNK
{
"content_chunk": str # Streaming content chunk
}
Tool Events
Copy
# TOOL_SELECTED
{
"tool_name": str, # Name of the tool
"arguments": Dict, # Tool arguments
"tool_call_id": str # Unique tool call ID
}
# TOOL_RESULT
{
"tool_name": str, # Name of the tool
"result": Any, # Tool execution result
"duration_ms": float, # Execution time
"tool_call_id": str # Tool call ID
}
# TOOL_ERROR
{
"tool_name": str, # Name of the tool
"error": str, # Error message
"tool_call_id": str # Tool call ID
}
Message Events
Copy
# MESSAGE_CREATED
{
"message": Message # The created message object
}
Control Flow Events
Copy
# ITERATION_START
{
"iteration_number": int, # Current iteration
"max_iterations": int # Maximum allowed
}
# ITERATION_LIMIT
{
"iterations_used": int # Total iterations used
}
# EXECUTION_ERROR
{
"error_type": str, # Type of error
"message": str, # Error message
"traceback": Optional[str] # Stack trace if available
}
# EXECUTION_COMPLETE
{
"duration_ms": float, # Total execution time
"total_tokens": int # Total tokens used
}
Usage Examples
Streaming Responses
Copy
from tyler import Agent, Thread, EventType
async def stream_response(agent: Agent, thread: Thread):
async for event in agent.go(thread, stream=True):
if event.type == EventType.LLM_STREAM_CHUNK:
# Print content as it arrives
print(event.data["content_chunk"], end="", flush=True)
elif event.type == EventType.TOOL_SELECTED:
print(f"\n[Using {event.data['tool_name']}...]\n")
elif event.type == EventType.EXECUTION_COMPLETE:
print(f"\n[Done in {event.data['duration_ms']:.0f}ms]")
Event Logging
Copy
import logging
async def log_execution(agent: Agent, thread: Thread):
async for event in agent.go(thread, stream=True):
# Log different event types
if event.type == EventType.LLM_REQUEST:
logging.info(f"Sending request to {event.data['model']}")
elif event.type == EventType.TOOL_RESULT:
logging.info(
f"Tool {event.data['tool_name']} completed in "
f"{event.data['duration_ms']:.0f}ms"
)
elif event.type == EventType.EXECUTION_ERROR:
logging.error(
f"Error: {event.data['error_type']} - "
f"{event.data['message']}"
)
Building Event History
Copy
async def collect_events(agent: Agent, thread: Thread) -> List[ExecutionEvent]:
"""Collect all events from an execution"""
events = []
async for event in agent.go(thread, stream=True):
events.append(event)
# Process specific events
if event.type == EventType.MESSAGE_CREATED:
msg = event.data["message"]
print(f"New {msg.role} message added")
return events
Real-time UI Updates
Copy
class AgentUI:
async def process_with_ui_updates(self, agent: Agent, thread: Thread):
async for event in agent.go(thread, stream=True):
await self.handle_event(event)
async def handle_event(self, event: ExecutionEvent):
if event.type == EventType.LLM_STREAM_CHUNK:
await self.append_to_output(event.data["content_chunk"])
elif event.type == EventType.TOOL_SELECTED:
await self.show_tool_indicator(
event.data["tool_name"],
event.data["arguments"]
)
elif event.type == EventType.TOOL_RESULT:
await self.hide_tool_indicator()
await self.show_tool_result(event.data["result"])
elif event.type == EventType.EXECUTION_ERROR:
await self.show_error(event.data["message"])
Event Filtering
Copy
from typing import AsyncGenerator
async def filter_events(
agent: Agent,
thread: Thread,
event_types: List[EventType]
) -> AsyncGenerator[ExecutionEvent, None]:
"""Filter events by type"""
async for event in agent.go(thread, stream=True):
if event.type in event_types:
yield event
# Only get tool and error events
async for event in filter_events(
agent, thread,
[EventType.TOOL_SELECTED, EventType.TOOL_ERROR]
):
print(f"{event.type}: {event.data}")
Timing Analysis
Copy
def analyze_timing(events: List[ExecutionEvent]) -> Dict[str, float]:
"""Analyze timing from events"""
timings = {}
# Find start and end
start = events[0].timestamp
end = events[-1].timestamp
timings["total_ms"] = (end - start).total_seconds() * 1000
# LLM timing
llm_starts = {}
for event in events:
if event.type == EventType.LLM_REQUEST:
llm_starts[event.timestamp] = event
elif event.type == EventType.LLM_RESPONSE:
timings["llm_latency_ms"] = event.data["latency_ms"]
# Tool timing
tool_times = []
for event in events:
if event.type == EventType.TOOL_RESULT:
tool_times.append(event.data["duration_ms"])
if tool_times:
timings["tool_total_ms"] = sum(tool_times)
timings["tool_average_ms"] = sum(tool_times) / len(tool_times)
return timings