Conversation persistence is crucial for building agents that maintain context across sessions. With Slide’s Narrator integration, your agents can store and retrieve past interactions, maintain conversation history, and provide contextual experiences.

Why Conversation Persistence Matters

Without persistence, every interaction starts from scratch:
  • No context from previous messages
  • Can’t remember user preferences
  • Can’t track ongoing tasks
  • Poor user experience
With persistence, your agent becomes truly useful:
  • Stores conversation history
  • Maintains context across sessions
  • Can resume interrupted tasks
  • Provides contextual responses

Quick Start with Persistence

import asyncio
from tyler import Agent, Thread, Message, ThreadStore, FileStore

async def create_agent_with_persistence():
    # Set up persistent storage
    # Option 1: SQLite (simple, local)
    thread_store = await ThreadStore.create("sqlite+aiosqlite:///conversations.db")
    
    # Option 2: PostgreSQL with Docker (production-ready)
    # Run: uv run narrator docker-setup
    # thread_store = await ThreadStore.create(
    #     "postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator"
    # )
    
    file_store = await FileStore.create(base_path="./conversation_files")
    
    # Create agent with persistence
    agent = Agent(
        name="assistant",
        model_name="gpt-4",
        purpose="To be a helpful assistant that maintains conversation history",
        thread_store=thread_store,
        file_store=file_store
    )
    
    return agent, thread_store, file_store

# Use the agent
async def main():
    agent, thread_store, _ = await create_agent_with_persistence()
    
    # Create or resume a thread
    thread_id = "user-123-main"
    try:
        thread = await thread_store.get_thread(thread_id)
        print("Welcome back!")
    except:
        thread = Thread(id=thread_id)
        print("Nice to meet you!")
    
    # Continue the conversation...

Storage Backends

Narrator supports multiple storage backends:

SQLite (Development)

Perfect for local development and single-user applications:
thread_store = await ThreadStore.create("sqlite+aiosqlite:///app.db")

PostgreSQL (Production)

Scalable for multi-user applications:
thread_store = await ThreadStore.create(
    "postgresql+asyncpg://user:pass@localhost/dbname"
)

Quick PostgreSQL Setup with Docker

Narrator includes built-in Docker commands for easy PostgreSQL setup:
# One-command setup that starts PostgreSQL and initializes tables
uv run narrator docker-setup

# This creates a PostgreSQL database at:
# postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator
To manage the database:
# Stop container (keeps data)
uv run narrator docker-stop

# Stop and remove all data
uv run narrator docker-stop --remove-volumes

# Start again
uv run narrator docker-start

In-Memory (Testing)

For unit tests and temporary storage:
thread_store = await ThreadStore.create()  # No URL = in-memory

Thread Management

Threads are containers for conversations. Each thread has a unique ID and contains messages.

Creating Threads

# Auto-generated ID
thread = Thread()

# Custom ID (useful for user-specific threads)
thread = Thread(id="user-123-support")

# With metadata
thread = Thread(
    id="project-research",
    metadata={"project": "quantum-computing", "created_by": "alice"}
)

Saving and Loading Threads

# Save a thread
await thread_store.save_thread(thread)

# Load a thread
thread = await thread_store.get_thread("thread-id")

# List all threads
threads = await thread_store.list_threads()

# Delete a thread
await thread_store.delete_thread("thread-id")

Message History

Messages in threads maintain full conversation context:
# Add messages to thread
thread.add_message(Message(role="user", content="What's the capital of France?"))
thread.add_message(Message(role="assistant", content="The capital of France is Paris."))
thread.add_message(Message(role="user", content="What about Germany?"))

# Access message history
for msg in thread.messages:
    print(f"{msg.role}: {msg.content}")

# The agent sees all previous messages when processing
result = await agent.go(thread)

Conversation Patterns

Pattern 1: User-Specific Threads

async def get_user_thread(user_id: str, thread_store: ThreadStore):
    thread_id = f"user-{user_id}-main"
    try:
        return await thread_store.get_thread(thread_id)
    except:
        return Thread(id=thread_id)

# Usage
thread = await get_user_thread("alice@example.com", thread_store)

Pattern 2: Topic-Based Threads

async def create_research_thread(topic: str, thread_store: ThreadStore):
    thread = Thread(
        id=f"research-{topic.lower().replace(' ', '-')}",
        metadata={"type": "research", "topic": topic}
    )
    
    # Add initial context
    thread.add_message(Message(
        role="system",
        content=f"This thread is for researching: {topic}"
    ))
    
    await thread_store.save_thread(thread)
    return thread

Pattern 3: Session Management

class ConversationSession:
    def __init__(self, agent, thread_store):
        self.agent = agent
        self.thread_store = thread_store
        self.thread = None
    
    async def start_or_resume(self, session_id: str):
        try:
            self.thread = await self.thread_store.get_thread(session_id)
            return "resumed"
        except:
            self.thread = Thread(id=session_id)
            return "new"
    
    async def send_message(self, content: str):
        message = Message(role="user", content=content)
        self.thread.add_message(message)
        
        result = await self.agent.go(self.thread)
        self.thread = result.thread
        await self.thread_store.save_thread(self.thread)
        
        return result.new_messages

File Attachments

FileStore handles attachments in conversations:
from tyler import Attachment

# Create agent with file storage
file_store = await FileStore.create(base_path="./uploads")
agent = Agent(
    name="file-assistant",
    model_name="gpt-4",
    purpose="To help with file processing",
    file_store=file_store
)

# Add message with attachment
message = Message(
    role="user",
    content="Please analyze this image"
)

with open("chart.png", "rb") as f:
    attachment = Attachment(
        filename="chart.png",
        content=f.read(),
        mime_type="image/png"
    )
    message.add_attachment(attachment)

thread.add_message(message)

# Files are automatically saved and managed
result = await agent.go(thread)

Advanced Persistence Patterns

Conversation Summarization

async def summarize_old_conversations(thread: Thread, max_messages: int = 50):
    if len(thread.messages) > max_messages:
        # Get messages to summarize
        old_messages = thread.messages[:-max_messages]
        
        # Create summary request
        summary_thread = Thread()
        summary_content = "\n".join([
            f"{msg.role}: {msg.content}" 
            for msg in old_messages
        ])
        
        summary_thread.add_message(Message(
            role="user",
            content=f"Summarize this conversation:\n\n{summary_content}"
        ))
        
        # Get summary
        summary_agent = Agent(
            name="summarizer",
            model_name="gpt-3.5-turbo",
            purpose="To create concise summaries"
        )
        summary_result = await summary_agent.go(summary_thread)
        
        # Replace old messages with summary
        thread.messages = [
            Message(
                role="system",
                content=f"Previous conversation summary: {summary_result.new_messages[-1].content}"
            )
        ] + thread.messages[-max_messages:]

Context Injection

async def inject_user_context(thread: Thread, user_id: str):
    # Load user preferences
    user_prefs = await load_user_preferences(user_id)
    
    # Add context at the beginning
    context_message = Message(
        role="system",
        content=f"User preferences: {json.dumps(user_prefs)}"
    )
    
    thread.messages.insert(0, context_message)
    return thread

Persistence Management Tips

1. Thread Naming Conventions

# Good thread IDs
"user-123-main"           # User-specific main thread
"support-ticket-456"      # Support conversation
"research-2024-01-15"     # Date-based research

# Avoid
"thread1"                 # Not descriptive
"my-thread"              # Not unique

2. Metadata Usage

thread = Thread(
    id="customer-support-789",
    metadata={
        "customer_id": "cust-123",
        "issue_type": "billing",
        "priority": "high",
        "created_at": datetime.now().isoformat()
    }
)

3. Cleanup Strategies

async def cleanup_old_threads(thread_store: ThreadStore, days: int = 30):
    threads = await thread_store.list_threads()
    cutoff = datetime.now() - timedelta(days=days)
    
    for thread in threads:
        if thread.metadata.get("last_updated") < cutoff.isoformat():
            await thread_store.delete_thread(thread.id)

Real-World Example: Customer Support Agent

import asyncio
from datetime import datetime
from tyler import Agent, Thread, Message, ThreadStore, FileStore
from lye import WEB_TOOLS

class SupportAgent:
    def __init__(self):
        self.agent = None
        self.thread_store = None
        self.file_store = None
    
    async def initialize(self):
        # Use Narrator's Docker PostgreSQL (after running: uv run narrator docker-setup)
        self.thread_store = await ThreadStore.create(
            "postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator"
        )
        # Or use your own PostgreSQL instance:
        # self.thread_store = await ThreadStore.create(
        #     "postgresql+asyncpg://localhost/support"
        # )
        self.file_store = await FileStore.create("./support_files")
        
        self.agent = Agent(
            name="support-agent",
            model_name="gpt-4",
            purpose="To help customers with product issues",
            tools=[*WEB_TOOLS],
            thread_store=self.thread_store,
            file_store=self.file_store
        )
    
    async def handle_ticket(self, ticket_id: str, customer_id: str, issue: str):
        # Create thread for this ticket
        thread = Thread(
            id=f"ticket-{ticket_id}",
            metadata={
                "ticket_id": ticket_id,
                "customer_id": customer_id,
                "status": "open",
                "created_at": datetime.now().isoformat()
            }
        )
        
        # Add initial message
        thread.add_message(Message(
            role="user",
            content=f"Customer {customer_id} reports: {issue}"
        ))
        
        # Process with agent
        result = await self.agent.go(thread)
        thread = result.thread
        
        # Save thread
        await self.thread_store.save_thread(thread)
        
        # Return response
        return messages[-1].content if messages else "No response"
    
    async def get_ticket_history(self, ticket_id: str):
        thread = await self.thread_store.get_thread(f"ticket-{ticket_id}")
        return thread.messages

# Usage
    agent = SupportAgent()
    await agent.initialize()
    
    response = await agent.handle_ticket(
    ticket_id="12345",
    customer_id="cust-789",
    issue="Cannot login to my account"
)

Database Setup and Configuration

Environment Variables

When using PostgreSQL, you can configure the connection via environment variables:
# Set the database URL
export NARRATOR_DATABASE_URL="postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator"

# Then initialize tables (one-time setup)
uv run narrator init

# Check database status
uv run narrator status

Connection Pooling

For production applications, Narrator automatically configures connection pooling:
# Optional pool configuration
export NARRATOR_DB_POOL_SIZE=5           # Max connections (default: 5)
export NARRATOR_DB_MAX_OVERFLOW=10       # Max overflow connections (default: 10)
export NARRATOR_DB_POOL_TIMEOUT=30       # Connection timeout in seconds (default: 30)
export NARRATOR_DB_POOL_RECYCLE=300      # Recycle connections after seconds (default: 300)

Performance Considerations

1. Message Limits

Keep threads manageable:
MAX_MESSAGES = 100

if len(thread.messages) > MAX_MESSAGES:
    # Archive old messages or summarize
    thread.messages = thread.messages[-MAX_MESSAGES:]

2. Batch Operations

# Save multiple threads efficiently
threads = [thread1, thread2, thread3]
await asyncio.gather(*[
    thread_store.save_thread(t) for t in threads
])

3. Caching

from functools import lru_cache

@lru_cache(maxsize=100)
async def get_cached_thread(thread_id: str):
    return await thread_store.get_thread(thread_id)

Next steps