Overview
The AgentResult
class encapsulates the complete result of an agent’s execution in non-streaming mode. It provides access to the updated thread, new messages, and the final output.
Class Definition
@dataclass
class AgentResult:
thread: Thread # Updated thread with new messages
new_messages: List[Message] # New messages added during execution
content: Optional[str] # Final assistant response content
Properties
The updated thread containing all messages including those added during this execution
List of new messages added during this execution (excludes pre-existing messages)
The final assistant response content. None if no assistant message was generated
Usage Examples
Basic Usage
from tyler import Agent, Thread, Message
agent = Agent(name="MyAgent", purpose="To help users")
thread = Thread()
thread.add_message(Message(role="user", content="Hello!"))
# Execute and get result
result = await agent.go(thread)
# Access the response
print(f"Response: {result.content}")
Accessing Metrics
# Get token usage from thread
token_stats = result.thread.get_total_tokens()
print(f"Tokens used: {token_stats['overall']['total_tokens']}")
# Calculate duration from message timestamps
if result.new_messages:
start_time = min(msg.timestamp for msg in result.new_messages)
end_time = max(msg.timestamp for msg in result.new_messages)
duration_ms = (end_time - start_time).total_seconds() * 1000
print(f"Duration: {duration_ms:.0f}ms")
# Check tool usage
tool_usage = result.thread.get_tool_usage()
if tool_usage['total_calls'] > 0:
print(f"\nTools used:")
for tool_name, count in tool_usage['tools'].items():
print(f" {tool_name}: {count} calls")
Working with Messages
# Access all new messages
for message in result.new_messages:
print(f"{message.role}: {message.content}")
# Check for tool calls
if message.tool_calls:
print(f" Tool calls: {len(message.tool_calls)}")
# Check metrics
if message.metrics:
tokens = message.metrics.get("usage", {})
print(f" Tokens: {tokens.get('total_tokens', 0)}")
Error Handling
try:
result = await agent.go(thread)
print(f"Response: {result.content}")
except Exception as e:
print(f"Error: {e}")
# The thread may still have partial messages
if thread.messages:
last_message = thread.messages[-1]
print(f"Last message: {last_message.content}")
Thread Management
# The thread is updated in-place
original_message_count = len(thread.messages)
result = await agent.go(thread)
new_message_count = len(result.thread.messages)
print(f"Added {new_message_count - original_message_count} messages")
# You can also access the thread directly
assert result.thread is thread # Same object, modified in-place
Common Patterns
Conversation Loop
async def chat_loop(agent: Agent, thread: Thread):
while True:
user_input = input("You: ")
if user_input.lower() == 'quit':
break
thread.add_message(Message(role="user", content=user_input))
result = await agent.go(thread)
print(f"Assistant: {result.content}")
# Calculate duration
if result.new_messages:
start = min(msg.timestamp for msg in result.new_messages)
end = max(msg.timestamp for msg in result.new_messages)
duration_ms = (end - start).total_seconds() * 1000
print(f"(Took {duration_ms:.0f}ms)")
Result Analysis
def analyze_result(result: AgentResult):
"""Analyze agent execution results"""
# Get token usage
token_stats = result.thread.get_total_tokens()
# Calculate duration
duration_ms = 0
if result.new_messages:
start = min(msg.timestamp for msg in result.new_messages)
end = max(msg.timestamp for msg in result.new_messages)
duration_ms = (end - start).total_seconds() * 1000
# Get tool usage
tool_usage = result.thread.get_tool_usage()
stats = {
"duration_ms": duration_ms,
"tokens": token_stats['overall']['total_tokens'],
"tool_calls": tool_usage['total_calls'],
"messages_added": len(result.new_messages),
"has_content": result.content is not None,
"used_tools": tool_usage['total_calls'] > 0
}
return stats
Persisting Results
from narrator import ThreadStore
store = ThreadStore()
# Execute agent
result = await agent.go(thread)
# Save the updated thread
await store.save(result.thread)
# Store execution metadata
token_stats = result.thread.get_total_tokens()
duration_ms = 0
if result.new_messages:
start = min(msg.timestamp for msg in result.new_messages)
end = max(msg.timestamp for msg in result.new_messages)
duration_ms = (end - start).total_seconds() * 1000
await store.update_metadata(
thread_id=result.thread.id,
metadata={
"last_execution_ms": duration_ms,
"last_tokens_used": token_stats['overall']['total_tokens'],
"last_response": result.content
}
)
See Also
- Thread - Conversation management
- Agent - The main agent class
- ExecutionEvent - Individual execution events (for streaming mode)