Overview
The ThreadStore
class provides a unified interface for persisting conversation threads with support for multiple storage backends: in-memory (for development), SQLite (for local storage), and PostgreSQL (for production).
Creating a ThreadStore
Recommended: Factory Method
from narrator import ThreadStore
# In-memory storage (default)
store = await ThreadStore.create()
# SQLite storage
store = await ThreadStore.create("sqlite+aiosqlite:///threads.db")
# PostgreSQL storage
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")
The factory method validates the connection immediately, catching configuration errors early.
Direct Constructor
# Creates store but doesn't connect until first operation
store = ThreadStore("postgresql+asyncpg://localhost/db")
# Connection happens on first use
thread = await store.get("thread-123") # Connects here
Storage Backends
In-Memory (Default)
# No configuration needed
store = await ThreadStore.create()
# Perfect for:
# - Development and testing
# - Temporary conversation storage
# - Single-instance applications
SQLite
# Local file storage
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/threads.db")
# In-memory SQLite (for tests)
store = await ThreadStore.create("sqlite+aiosqlite:///:memory:")
# Perfect for:
# - Desktop applications
# - Single-user apps
# - Local development with persistence
PostgreSQL
# Production database
store = await ThreadStore.create(
"postgresql+asyncpg://user:password@host:5432/database"
)
# With connection pooling (configured via environment)
# NARRATOR_DB_POOL_SIZE=10
# NARRATOR_DB_MAX_OVERFLOW=20
# NARRATOR_DB_POOL_TIMEOUT=30
# NARRATOR_DB_POOL_RECYCLE=300
# Perfect for:
# - Multi-user applications
# - Production deployments
# - Horizontal scaling
Quick Docker Setup
For local development with PostgreSQL:
# One-command setup (starts PostgreSQL and initializes tables)
uv run narrator docker-setup
# Then use in your code:
store = await ThreadStore.create(
"postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator"
)
To manage the database:
# Stop container (preserves data)
uv run narrator docker-stop
# Stop and remove all data
uv run narrator docker-stop --remove-volumes
Connection Pool Configuration
Configure connection pooling via environment variables:
Maximum number of connections to maintain in the pool
Maximum overflow connections above pool_size
Seconds to wait for a connection from the pool
Seconds after which to recycle connections
Basic operations
Saving Threads
from narrator import Thread, Message
# Create a thread
thread = Thread(title="Customer Support")
thread.add_message(Message(role="user", content="I need help"))
thread.add_message(Message(role="assistant", content="I'm here to help!"))
# Save to storage
saved_thread = await store.save(thread)
# Note: System messages are NOT persisted by design
# They remain in memory but aren't stored in the database
Retrieving Threads
# Get by ID
thread = await store.get("thread-123")
if thread:
print(f"Found thread: {thread.title}")
else:
print("Thread not found")
# Get or create
thread = await store.get_or_create("thread-123")
# Returns existing thread or creates new one with given ID
Listing Threads
# Get recent threads
threads = await store.list(limit=10)
for thread in threads:
print(f"{thread.title} - {thread.updated_at}")
# Pagination
page1 = await store.list(limit=20, offset=0)
page2 = await store.list(limit=20, offset=20)
Searching Threads
# Search in thread titles
results = await store.search_by_title("customer")
for thread in results:
print(f"Found: {thread.title}")
# Search in message content
results = await store.search_by_content("refund", limit=5)
for thread in results:
print(f"Thread {thread.id} contains 'refund'")
# Search by platform reference
slack_threads = await store.search_by_platform("slack", "C123456")
for thread in slack_threads:
print(f"Slack thread: {thread.title}")
Deleting Threads
# Delete a single thread
success = await store.delete("thread-123")
print(f"Deleted: {success}")
# Delete multiple threads
await store.delete_many(["thread-1", "thread-2", "thread-3"])
Thread Aliases
ThreadStore supports aliases - alternative IDs for threads:
# Create thread with Slack channel as alias
thread = Thread(title="Team Discussion")
await store.save(thread)
await store.save_thread_alias(thread.id, "slack-C123456")
# Retrieve using alias
thread = await store.get_by_alias("slack-C123456")
# Manage aliases
await store.delete_thread_alias("slack-C123456")
aliases = await store.get_thread_aliases(thread.id)
Usage Statistics
# Get storage statistics
stats = await store.get_usage_stats()
print(f"Total threads: {stats['total_threads']}")
print(f"Total messages: {stats['total_messages']}")
print(f"Average messages per thread: {stats['avg_messages_per_thread']}")
# Token usage across all threads
print(f"Total tokens used: {stats['total_tokens']['total']}")
print(f"By model: {stats['total_tokens']['by_model']}")
# Recent activity
print(f"Threads updated in last hour: {stats['threads_last_hour']}")
print(f"Threads updated in last 24h: {stats['threads_last_24h']}")
print(f"Threads updated in last 7d: {stats['threads_last_7d']}")
Example: Multi-User Chat Application
from narrator import ThreadStore, Thread, Message
import os
# Initialize store with PostgreSQL for production
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///chat.db")
store = await ThreadStore.create(DATABASE_URL)
# Create a support thread
thread = Thread(
title="Order #12345 Support",
attributes={
"customer_id": "cust_789",
"order_id": "12345",
"priority": "high"
},
platforms={
"slack": {
"channel": "C123456",
"thread_ts": "1234567890.123"
}
}
)
# Add conversation
thread.add_message(Message(
role="user",
content="My order hasn't arrived yet",
source={"type": "user", "id": "cust_789", "name": "John Doe"}
))
thread.add_message(Message(
role="assistant",
content="I'll check on that order for you right away.",
source={"type": "agent", "id": "support_bot", "name": "Support Bot"}
))
# Save thread
await store.save(thread)
# Create alias for easy lookup
await store.save_thread_alias(thread.id, f"order-{thread.attributes['order_id']}")
# Later: retrieve by order
order_thread = await store.get_by_alias("order-12345")
# Search for high priority threads
high_priority = await store.search_by_content("priority.*high", limit=10)
# Get usage stats
stats = await store.get_usage_stats()
print(f"Active support threads: {stats['total_threads']}")
Error handling
try:
store = await ThreadStore.create("postgresql+asyncpg://bad-host/db")
except RuntimeError as e:
print(f"Failed to connect: {e}")
# Fall back to in-memory
store = await ThreadStore.create()
# Handle individual operation failures
try:
thread = await store.get("thread-123")
except Exception as e:
print(f"Failed to retrieve thread: {e}")
Best practices
- Use the factory method (
ThreadStore.create()
) for immediate validation
- Configure connection pools for production PostgreSQL deployments
- Use aliases for integrating with external systems (Slack, Discord, etc.)
- Don’t store sensitive data in thread attributes - they’re not encrypted
- System messages aren’t persisted - add them dynamically when needed
- Use search sparingly - it can be expensive on large datasets
Migration Between Backends
# Export from SQLite
sqlite_store = await ThreadStore.create("sqlite+aiosqlite:///old.db")
threads = await sqlite_store.list(limit=1000)
# Import to PostgreSQL
pg_store = await ThreadStore.create("postgresql+asyncpg://localhost/new_db")
for thread in threads:
await pg_store.save(thread)
print(f"Migrated {len(threads)} threads")