Overview

The ThreadStore class provides a unified interface for persisting conversation threads with support for multiple storage backends: in-memory (for development), SQLite (for local storage), and PostgreSQL (for production).

Creating a ThreadStore

Recommended: Factory Method

from narrator import ThreadStore

# In-memory storage (default)
store = await ThreadStore.create()

# SQLite storage
store = await ThreadStore.create("sqlite+aiosqlite:///threads.db")

# PostgreSQL storage
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")
The factory method validates the connection immediately, catching configuration errors early.

Direct Constructor

# Creates store but doesn't connect until first operation
store = ThreadStore("postgresql+asyncpg://localhost/db")

# Connection happens on first use
thread = await store.get("thread-123")  # Connects here

Storage Backends

In-Memory (Default)

# No configuration needed
store = await ThreadStore.create()

# Perfect for:
# - Development and testing
# - Temporary conversation storage
# - Single-instance applications

SQLite

# Local file storage
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/threads.db")

# In-memory SQLite (for tests)
store = await ThreadStore.create("sqlite+aiosqlite:///:memory:")

# Perfect for:
# - Desktop applications
# - Single-user apps
# - Local development with persistence

PostgreSQL

# Production database
store = await ThreadStore.create(
    "postgresql+asyncpg://user:password@host:5432/database"
)

# With connection pooling (configured via environment)
# NARRATOR_DB_POOL_SIZE=10
# NARRATOR_DB_MAX_OVERFLOW=20
# NARRATOR_DB_POOL_TIMEOUT=30
# NARRATOR_DB_POOL_RECYCLE=300

# Perfect for:
# - Multi-user applications
# - Production deployments
# - Horizontal scaling

Quick Docker Setup

For local development with PostgreSQL:
# One-command setup (starts PostgreSQL and initializes tables)
uv run narrator docker-setup

# Then use in your code:
store = await ThreadStore.create(
    "postgresql+asyncpg://narrator:narrator_dev@localhost:5432/narrator"
)
To manage the database:
# Stop container (preserves data)
uv run narrator docker-stop

# Stop and remove all data
uv run narrator docker-stop --remove-volumes

Connection Pool Configuration

Configure connection pooling via environment variables:
NARRATOR_DB_POOL_SIZE
int
default:"5"
Maximum number of connections to maintain in the pool
NARRATOR_DB_MAX_OVERFLOW
int
default:"10"
Maximum overflow connections above pool_size
NARRATOR_DB_POOL_TIMEOUT
int
default:"30"
Seconds to wait for a connection from the pool
NARRATOR_DB_POOL_RECYCLE
int
default:"300"
Seconds after which to recycle connections

Basic operations

Saving Threads

from narrator import Thread, Message

# Create a thread
thread = Thread(title="Customer Support")
thread.add_message(Message(role="user", content="I need help"))
thread.add_message(Message(role="assistant", content="I'm here to help!"))

# Save to storage
saved_thread = await store.save(thread)

# Note: System messages are NOT persisted by design
# They remain in memory but aren't stored in the database

Retrieving Threads

# Get by ID
thread = await store.get("thread-123")
if thread:
    print(f"Found thread: {thread.title}")
else:
    print("Thread not found")

# Get or create
thread = await store.get_or_create("thread-123")
# Returns existing thread or creates new one with given ID

Listing Threads

# Get recent threads
threads = await store.list(limit=10)
for thread in threads:
    print(f"{thread.title} - {thread.updated_at}")

# Pagination
page1 = await store.list(limit=20, offset=0)
page2 = await store.list(limit=20, offset=20)

Searching Threads

# Search in thread titles
results = await store.search_by_title("customer")
for thread in results:
    print(f"Found: {thread.title}")

# Search in message content
results = await store.search_by_content("refund", limit=5)
for thread in results:
    print(f"Thread {thread.id} contains 'refund'")

# Search by platform reference
slack_threads = await store.search_by_platform("slack", "C123456")
for thread in slack_threads:
    print(f"Slack thread: {thread.title}")

Deleting Threads

# Delete a single thread
success = await store.delete("thread-123")
print(f"Deleted: {success}")

# Delete multiple threads
await store.delete_many(["thread-1", "thread-2", "thread-3"])

Thread Aliases

ThreadStore supports aliases - alternative IDs for threads:
# Create thread with Slack channel as alias
thread = Thread(title="Team Discussion")
await store.save(thread)
await store.save_thread_alias(thread.id, "slack-C123456")

# Retrieve using alias
thread = await store.get_by_alias("slack-C123456")

# Manage aliases
await store.delete_thread_alias("slack-C123456")
aliases = await store.get_thread_aliases(thread.id)

Usage Statistics

# Get storage statistics
stats = await store.get_usage_stats()
print(f"Total threads: {stats['total_threads']}")
print(f"Total messages: {stats['total_messages']}")
print(f"Average messages per thread: {stats['avg_messages_per_thread']}")

# Token usage across all threads
print(f"Total tokens used: {stats['total_tokens']['total']}")
print(f"By model: {stats['total_tokens']['by_model']}")

# Recent activity
print(f"Threads updated in last hour: {stats['threads_last_hour']}")
print(f"Threads updated in last 24h: {stats['threads_last_24h']}")
print(f"Threads updated in last 7d: {stats['threads_last_7d']}")

Example: Multi-User Chat Application

from narrator import ThreadStore, Thread, Message
import os

# Initialize store with PostgreSQL for production
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///chat.db")
store = await ThreadStore.create(DATABASE_URL)

# Create a support thread
thread = Thread(
    title="Order #12345 Support",
    attributes={
        "customer_id": "cust_789",
        "order_id": "12345",
        "priority": "high"
    },
    platforms={
        "slack": {
            "channel": "C123456",
            "thread_ts": "1234567890.123"
        }
    }
)

# Add conversation
thread.add_message(Message(
    role="user",
    content="My order hasn't arrived yet",
    source={"type": "user", "id": "cust_789", "name": "John Doe"}
))

thread.add_message(Message(
    role="assistant",
    content="I'll check on that order for you right away.",
    source={"type": "agent", "id": "support_bot", "name": "Support Bot"}
))

# Save thread
await store.save(thread)

# Create alias for easy lookup
await store.save_thread_alias(thread.id, f"order-{thread.attributes['order_id']}")

# Later: retrieve by order
order_thread = await store.get_by_alias("order-12345")

# Search for high priority threads
high_priority = await store.search_by_content("priority.*high", limit=10)

# Get usage stats
stats = await store.get_usage_stats()
print(f"Active support threads: {stats['total_threads']}")

Error handling

try:
    store = await ThreadStore.create("postgresql+asyncpg://bad-host/db")
except RuntimeError as e:
    print(f"Failed to connect: {e}")
    # Fall back to in-memory
    store = await ThreadStore.create()

# Handle individual operation failures
try:
    thread = await store.get("thread-123")
except Exception as e:
    print(f"Failed to retrieve thread: {e}")

Best practices

  1. Use the factory method (ThreadStore.create()) for immediate validation
  2. Configure connection pools for production PostgreSQL deployments
  3. Use aliases for integrating with external systems (Slack, Discord, etc.)
  4. Don’t store sensitive data in thread attributes - they’re not encrypted
  5. System messages aren’t persisted - add them dynamically when needed
  6. Use search sparingly - it can be expensive on large datasets

Migration Between Backends

# Export from SQLite
sqlite_store = await ThreadStore.create("sqlite+aiosqlite:///old.db")
threads = await sqlite_store.list(limit=1000)

# Import to PostgreSQL
pg_store = await ThreadStore.create("postgresql+asyncpg://localhost/new_db")
for thread in threads:
    await pg_store.save(thread)
    
print(f"Migrated {len(threads)} threads")