Narrator is Slide’s conversation persistence and storage system. While it integrates seamlessly with Tyler agents, you can also use Narrator independently for any application that needs to manage conversations and files.

Why Use Narrator Standalone?

  • Add conversation history to existing AI applications
  • Manage chat threads across multiple sessions
  • Store and retrieve file attachments
  • Switch between storage backends easily
  • No dependency on Tyler or other Slide packages

Quick Start

Installation

# Using uv (recommended)
uv add slide-narrator

# Using pip (fallback)
pip install slide-narrator

Basic usage

import asyncio
from narrator import Thread, Message, ThreadStore

async def main():
    # Create a thread store (in-memory by default)
    store = await ThreadStore.create()
    
    # Create a conversation thread
    thread = Thread(id="chat-001")
    
    # Add messages
    thread.add_message(Message(role="user", content="Hello!"))
    thread.add_message(Message(role="assistant", content="Hi there! How can I help?"))
    
    # Save the thread
    await store.save_thread(thread)
    
    # Load it later
    loaded_thread = await store.get_thread("chat-001")
    print(f"Messages in thread: {len(loaded_thread.messages)}")

asyncio.run(main())

Storage Backends

In-Memory Storage

Perfect for testing and temporary conversations:
# Default - no URL needed
store = await ThreadStore.create()

# Explicit
store = await ThreadStore.create("memory://")

SQLite Storage

Great for local applications and development:
# SQLite with async support
store = await ThreadStore.create("sqlite+aiosqlite:///conversations.db")

# Custom options
store = await ThreadStore.create(
    "sqlite+aiosqlite:///app.db",
    pool_size=5,
    max_overflow=10
)

PostgreSQL Storage

For production applications with multiple users:
# PostgreSQL with asyncpg
store = await ThreadStore.create(
    "postgresql+asyncpg://user:pass@localhost/dbname"
)

# With connection pool settings
store = await ThreadStore.create(
    "postgresql+asyncpg://localhost/chat_app",
    pool_size=20,
    max_overflow=30,
    pool_timeout=30
)

Thread Management

Creating Threads

from narrator import Thread, Message
import uuid

# Auto-generated ID
thread = Thread()
print(f"Thread ID: {thread.id}")

# Custom ID
thread = Thread(id="user-123-support-chat")

# With metadata
thread = Thread(
    id=f"session-{uuid.uuid4()}",
    metadata={
        "user_id": "user-123",
        "channel": "web",
        "tags": ["support", "billing"],
        "created_at": datetime.now().isoformat()
    }
)

Managing Messages

# Add messages
thread.add_message(Message(
    role="user",
    content="What's the weather like?",
    metadata={"timestamp": datetime.now().isoformat()}
))

thread.add_message(Message(
    role="assistant",
    content="I'll help you check the weather. What's your location?"
))

# Access messages
for msg in thread.messages:
    print(f"{msg.role}: {msg.content}")

# Get last message
last_message = thread.messages[-1] if thread.messages else None

# Filter messages
user_messages = [m for m in thread.messages if m.role == "user"]

Thread Operations

# Save thread
await store.save_thread(thread)

# Load thread
thread = await store.get_thread("thread-id")

# List all threads
threads = await store.list_threads()
for t in threads:
    print(f"Thread {t.id}: {len(t.messages)} messages")

# Delete thread
await store.delete_thread("thread-id")

# Search threads by metadata
user_threads = await store.list_threads(
    filters={"metadata.user_id": "user-123"}
)

File Storage

Setting Up File Storage

from narrator import FileStore

# Local file storage
file_store = await FileStore.create(base_path="./uploads")

# S3-compatible storage (requires boto3)
file_store = await FileStore.create(
    backend="s3",
    bucket="my-app-files",
    region="us-east-1"
)

Working with Attachments

from narrator import Attachment

# Create attachment from file
with open("document.pdf", "rb") as f:
    attachment = Attachment(
        filename="document.pdf",
        content=f.read(),
        mime_type="application/pdf"
    )

# Add to message
message = Message(
    role="user",
    content="Please review this document"
)
message.add_attachment(attachment)

# Save with file store
await file_store.save_attachment(thread.id, attachment)

# Retrieve attachments
attachments = await file_store.get_attachments(thread.id)
for att in attachments:
    print(f"File: {att.filename} ({att.mime_type})")

Integration Examples

With OpenAI

import openai
from narrator import Thread, Message, ThreadStore

async def chat_with_persistence():
    store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
    thread_id = "user-session-123"
    
    # Load or create thread
    try:
        thread = await store.get_thread(thread_id)
    except:
        thread = Thread(id=thread_id)
    
    # Get user input
    user_input = input("You: ")
    thread.add_message(Message(role="user", content=user_input))
    
    # Prepare messages for OpenAI
    messages = [
        {"role": msg.role, "content": msg.content}
        for msg in thread.messages
    ]
    
    # Call OpenAI
    response = await openai.ChatCompletion.acreate(
        model="gpt-4",
        messages=messages
    )
    
    # Add response to thread
    assistant_message = Message(
        role="assistant",
        content=response.choices[0].message.content
    )
    thread.add_message(assistant_message)
    
    # Save thread
    await store.save_thread(thread)
    
    print(f"Assistant: {assistant_message.content}")

With LangChain

from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from narrator import ThreadStore, Thread, Message

class NarratorMemory(ConversationSummaryBufferMemory):
    def __init__(self, thread_store: ThreadStore, thread_id: str):
        super().__init__()
        self.thread_store = thread_store
        self.thread_id = thread_id
    
    async def load_memory(self):
        try:
            thread = await self.thread_store.get_thread(self.thread_id)
            for msg in thread.messages:
                if msg.role == "user":
                    self.chat_memory.add_user_message(msg.content)
                elif msg.role == "assistant":
                    self.chat_memory.add_ai_message(msg.content)
        except:
            pass
    
    async def save_context(self, inputs: dict, outputs: dict):
        thread = Thread(id=self.thread_id)
        
        # Add messages from current context
        for msg in self.chat_memory.messages:
            if isinstance(msg, HumanMessage):
                thread.add_message(Message(role="user", content=msg.content))
            elif isinstance(msg, AIMessage):
                thread.add_message(Message(role="assistant", content=msg.content))
        
        await self.thread_store.save_thread(thread)

With Custom AI Systems

from narrator import Thread, Message, ThreadStore
from typing import List, Dict

class ChatBot:
    def __init__(self, thread_store: ThreadStore):
        self.thread_store = thread_store
        self.active_sessions = {}
    
    async def start_session(self, user_id: str) -> str:
        """Start or resume a chat session"""
        thread_id = f"user-{user_id}-chat"
        
        try:
            thread = await self.thread_store.get_thread(thread_id)
            print(f"Resuming session with {len(thread.messages)} messages")
        except:
            thread = Thread(
                id=thread_id,
                metadata={"user_id": user_id, "started": datetime.now().isoformat()}
            )
            print("Starting new session")
        
        self.active_sessions[user_id] = thread
        return thread_id
    
    async def process_message(self, user_id: str, content: str) -> str:
        """Process a user message and generate response"""
        thread = self.active_sessions.get(user_id)
        if not thread:
            await self.start_session(user_id)
            thread = self.active_sessions[user_id]
        
        # Add user message
        thread.add_message(Message(
            role="user",
            content=content,
            metadata={"timestamp": datetime.now().isoformat()}
        ))
        
        # Generate response (your AI logic here)
        response = await self.generate_response(thread.messages)
        
        # Add assistant message
        thread.add_message(Message(
            role="assistant",
            content=response,
            metadata={"timestamp": datetime.now().isoformat()}
        ))
        
        # Save thread
        await self.thread_store.save_thread(thread)
        
        return response
    
    async def generate_response(self, messages: List[Message]) -> str:
        # Your AI logic here
        return "This is where your AI generates a response"

Advanced patterns

Thread Archival

async def archive_old_threads(store: ThreadStore, days: int = 30):
    """Archive threads older than specified days"""
    cutoff = datetime.now() - timedelta(days=days)
    
    threads = await store.list_threads()
    archived_count = 0
    
    for thread in threads:
        # Check last message time
        if thread.messages:
            last_msg_time = thread.messages[-1].metadata.get("timestamp", "")
            if last_msg_time and datetime.fromisoformat(last_msg_time) < cutoff:
                # Mark as archived
                thread.metadata["archived"] = True
                thread.metadata["archived_at"] = datetime.now().isoformat()
                await store.save_thread(thread)
                archived_count += 1
    
    print(f"Archived {archived_count} threads")
async def search_messages(
    store: ThreadStore, 
    query: str, 
    user_id: str = None
) -> List[Dict]:
    """Search messages across threads"""
    results = []
    
    # Get relevant threads
    if user_id:
        threads = await store.list_threads(
            filters={"metadata.user_id": user_id}
        )
    else:
        threads = await store.list_threads()
    
    # Search messages
    for thread in threads:
        for msg in thread.messages:
            if query.lower() in msg.content.lower():
                results.append({
                    "thread_id": thread.id,
                    "message": msg,
                    "context": thread.metadata
                })
    
    return results

Export/Import

import json

async def export_thread(thread: Thread, file_path: str):
    """Export thread to JSON file"""
    data = {
        "id": thread.id,
        "metadata": thread.metadata,
        "messages": [
            {
                "role": msg.role,
                "content": msg.content,
                "metadata": msg.metadata,
                "attachments": [
                    {
                        "filename": att.filename,
                        "mime_type": att.mime_type
                    } for att in msg.attachments
                ]
            } for msg in thread.messages
        ]
    }
    
    with open(file_path, "w") as f:
        json.dump(data, f, indent=2)

async def import_thread(store: ThreadStore, file_path: str) -> Thread:
    """Import thread from JSON file"""
    with open(file_path, "r") as f:
        data = json.load(f)
    
    thread = Thread(id=data["id"], metadata=data["metadata"])
    
    for msg_data in data["messages"]:
        message = Message(
            role=msg_data["role"],
            content=msg_data["content"],
            metadata=msg_data.get("metadata", {})
        )
        thread.add_message(message)
    
    await store.save_thread(thread)
    return thread

Performance tips

1. Connection Pooling

# Configure connection pool for PostgreSQL
store = await ThreadStore.create(
    "postgresql+asyncpg://localhost/chat",
    pool_size=20,  # Number of connections
    max_overflow=10,  # Additional connections when needed
    pool_timeout=30,  # Timeout for getting connection
    pool_recycle=3600  # Recycle connections after 1 hour
)

2. Batch Operations

# Save multiple threads efficiently
threads = [thread1, thread2, thread3]
await asyncio.gather(*[
    store.save_thread(thread) for thread in threads
])

3. Lazy Loading

# Load only thread metadata first
thread_list = await store.list_threads(load_messages=False)

# Load full thread only when needed
full_thread = await store.get_thread(thread_id)

Migration Guide

From File-based Storage

async def migrate_from_files(file_dir: str, store: ThreadStore):
    """Migrate from file-based storage to database"""
    import os
    import json
    
    for filename in os.listdir(file_dir):
        if filename.endswith(".json"):
            with open(os.path.join(file_dir, filename), "r") as f:
                data = json.load(f)
            
            thread = Thread(id=data["id"])
            for msg in data["messages"]:
                thread.add_message(Message(
                    role=msg["role"],
                    content=msg["content"]
                ))
            
            await store.save_thread(thread)
            print(f"Migrated thread {thread.id}")

Next steps