Overview

The ExecutionEvent class represents individual events that occur during agent execution. These events provide real-time visibility into what the agent is doing, making them essential for streaming responses, debugging, and monitoring.

Class Definition

@dataclass
class ExecutionEvent:
    type: EventType               # The type of event
    timestamp: datetime           # When the event occurred
    data: Dict[str, Any]         # Event-specific data
    metadata: Optional[Dict[str, Any]] = None  # Additional metadata

Properties

type
EventType
The type of event that occurred (e.g., LLM_REQUEST, TOOL_SELECTED)
timestamp
datetime
UTC timestamp of when the event occurred
data
Dict[str, Any]
Event-specific data. Structure varies by event type
metadata
Optional[Dict[str, Any]]
Optional additional metadata for the event

Event Types and Data

Each event type includes specific data fields:

LLM Events

# LLM_REQUEST
{
    "message_count": int,      # Number of messages in context
    "model": str,              # Model being used
    "temperature": float       # Temperature setting
}

# LLM_RESPONSE
{
    "content": str,            # Response content
    "tool_calls": List[Dict],  # Tool calls if any
    "tokens": {                # Token usage
        "prompt_tokens": int,
        "completion_tokens": int,
        "total_tokens": int
    },
    "latency_ms": float        # Response time
}

# LLM_STREAM_CHUNK
{
    "content_chunk": str       # Streaming content chunk
}

Tool Events

# TOOL_SELECTED
{
    "tool_name": str,          # Name of the tool
    "arguments": Dict,         # Tool arguments
    "tool_call_id": str        # Unique tool call ID
}

# TOOL_RESULT
{
    "tool_name": str,          # Name of the tool
    "result": Any,             # Tool execution result
    "duration_ms": float,      # Execution time
    "tool_call_id": str        # Tool call ID
}

# TOOL_ERROR
{
    "tool_name": str,          # Name of the tool
    "error": str,              # Error message
    "tool_call_id": str        # Tool call ID
}

Message Events

# MESSAGE_CREATED
{
    "message": Message         # The created message object
}

Control Flow Events

# ITERATION_START
{
    "iteration_number": int,   # Current iteration
    "max_iterations": int      # Maximum allowed
}

# ITERATION_LIMIT
{
    "iterations_used": int     # Total iterations used
}

# EXECUTION_ERROR
{
    "error_type": str,         # Type of error
    "message": str,            # Error message
    "traceback": Optional[str] # Stack trace if available
}

# EXECUTION_COMPLETE
{
    "duration_ms": float,      # Total execution time
    "total_tokens": int        # Total tokens used
}

Usage Examples

Streaming Responses

from tyler import Agent, Thread, EventType

async def stream_response(agent: Agent, thread: Thread):
    async for event in agent.go(thread, stream=True):
        if event.type == EventType.LLM_STREAM_CHUNK:
            # Print content as it arrives
            print(event.data["content_chunk"], end="", flush=True)
        
        elif event.type == EventType.TOOL_SELECTED:
            print(f"\n[Using {event.data['tool_name']}...]\n")
        
        elif event.type == EventType.EXECUTION_COMPLETE:
            print(f"\n[Done in {event.data['duration_ms']:.0f}ms]")

Event Logging

import logging

async def log_execution(agent: Agent, thread: Thread):
    async for event in agent.go(thread, stream=True):
        # Log different event types
        if event.type == EventType.LLM_REQUEST:
            logging.info(f"Sending request to {event.data['model']}")
        
        elif event.type == EventType.TOOL_RESULT:
            logging.info(
                f"Tool {event.data['tool_name']} completed in "
                f"{event.data['duration_ms']:.0f}ms"
            )
        
        elif event.type == EventType.EXECUTION_ERROR:
            logging.error(
                f"Error: {event.data['error_type']} - "
                f"{event.data['message']}"
            )

Building Event History

async def collect_events(agent: Agent, thread: Thread) -> List[ExecutionEvent]:
    """Collect all events from an execution"""
    events = []
    
    async for event in agent.go(thread, stream=True):
        events.append(event)
        
        # Process specific events
        if event.type == EventType.MESSAGE_CREATED:
            msg = event.data["message"]
            print(f"New {msg.role} message added")
    
    return events

Real-time UI Updates

class AgentUI:
    async def process_with_ui_updates(self, agent: Agent, thread: Thread):
        async for event in agent.go(thread, stream=True):
            await self.handle_event(event)
    
    async def handle_event(self, event: ExecutionEvent):
        if event.type == EventType.LLM_STREAM_CHUNK:
            await self.append_to_output(event.data["content_chunk"])
        
        elif event.type == EventType.TOOL_SELECTED:
            await self.show_tool_indicator(
                event.data["tool_name"],
                event.data["arguments"]
            )
        
        elif event.type == EventType.TOOL_RESULT:
            await self.hide_tool_indicator()
            await self.show_tool_result(event.data["result"])
        
        elif event.type == EventType.EXECUTION_ERROR:
            await self.show_error(event.data["message"])

Event Filtering

from typing import AsyncGenerator

async def filter_events(
    agent: Agent, 
    thread: Thread,
    event_types: List[EventType]
) -> AsyncGenerator[ExecutionEvent, None]:
    """Filter events by type"""
    async for event in agent.go(thread, stream=True):
        if event.type in event_types:
            yield event

# Only get tool and error events
async for event in filter_events(
    agent, thread, 
    [EventType.TOOL_SELECTED, EventType.TOOL_ERROR]
):
    print(f"{event.type}: {event.data}")

Timing Analysis

def analyze_timing(events: List[ExecutionEvent]) -> Dict[str, float]:
    """Analyze timing from events"""
    timings = {}
    
    # Find start and end
    start = events[0].timestamp
    end = events[-1].timestamp
    timings["total_ms"] = (end - start).total_seconds() * 1000
    
    # LLM timing
    llm_starts = {}
    for event in events:
        if event.type == EventType.LLM_REQUEST:
            llm_starts[event.timestamp] = event
        elif event.type == EventType.LLM_RESPONSE:
            timings["llm_latency_ms"] = event.data["latency_ms"]
    
    # Tool timing
    tool_times = []
    for event in events:
        if event.type == EventType.TOOL_RESULT:
            tool_times.append(event.data["duration_ms"])
    
    if tool_times:
        timings["tool_total_ms"] = sum(tool_times)
        timings["tool_average_ms"] = sum(tool_times) / len(tool_times)
    
    return timings

See Also

  • EventType - All available event types
  • Thread - Thread methods for accessing aggregated information
  • Agent - The main agent class