Why Use Narrator Standalone?
- Add conversation history to existing AI applications
- Manage chat threads across multiple sessions
- Store and retrieve file attachments
- Switch between storage backends easily
- No dependency on Tyler or other Slide packages
Quick Start
Installation
Copy
# Using uv (recommended)
uv add slide-narrator
# Using pip (fallback)
pip install slide-narrator
Basic usage
Copy
import asyncio
from narrator import Thread, Message, ThreadStore
async def main():
# Create a thread store (in-memory by default)
store = await ThreadStore.create()
# Create a conversation thread
thread = Thread(id="chat-001")
# Add messages
thread.add_message(Message(role="user", content="Hello!"))
thread.add_message(Message(role="assistant", content="Hi there! How can I help?"))
# Save the thread
await store.save_thread(thread)
# Load it later
loaded_thread = await store.get_thread("chat-001")
print(f"Messages in thread: {len(loaded_thread.messages)}")
asyncio.run(main())
Storage Backends
In-Memory Storage
Perfect for testing and temporary conversations:Copy
# Default - no URL needed
store = await ThreadStore.create()
# Explicit
store = await ThreadStore.create("memory://")
SQLite Storage
Great for local applications and development:Copy
# SQLite with async support
store = await ThreadStore.create("sqlite+aiosqlite:///conversations.db")
# Custom options
store = await ThreadStore.create(
"sqlite+aiosqlite:///app.db",
pool_size=5,
max_overflow=10
)
PostgreSQL Storage
For production applications with multiple users:Copy
# PostgreSQL with asyncpg
store = await ThreadStore.create(
"postgresql+asyncpg://user:pass@localhost/dbname"
)
# With connection pool settings
store = await ThreadStore.create(
"postgresql+asyncpg://localhost/chat_app",
pool_size=20,
max_overflow=30,
pool_timeout=30
)
Thread Management
Creating Threads
Copy
from narrator import Thread, Message
import uuid
# Auto-generated ID
thread = Thread()
print(f"Thread ID: {thread.id}")
# Custom ID
thread = Thread(id="user-123-support-chat")
# With metadata
thread = Thread(
id=f"session-{uuid.uuid4()}",
metadata={
"user_id": "user-123",
"channel": "web",
"tags": ["support", "billing"],
"created_at": datetime.now().isoformat()
}
)
Managing Messages
Copy
# Add messages
thread.add_message(Message(
role="user",
content="What's the weather like?",
metadata={"timestamp": datetime.now().isoformat()}
))
thread.add_message(Message(
role="assistant",
content="I'll help you check the weather. What's your location?"
))
# Access messages
for msg in thread.messages:
print(f"{msg.role}: {msg.content}")
# Get last message
last_message = thread.messages[-1] if thread.messages else None
# Filter messages
user_messages = [m for m in thread.messages if m.role == "user"]
Thread Operations
Copy
# Save thread
await store.save_thread(thread)
# Load thread
thread = await store.get_thread("thread-id")
# List all threads
threads = await store.list_threads()
for t in threads:
print(f"Thread {t.id}: {len(t.messages)} messages")
# Delete thread
await store.delete_thread("thread-id")
# Search threads by metadata
user_threads = await store.list_threads(
filters={"metadata.user_id": "user-123"}
)
File Storage
Setting Up File Storage
Copy
from narrator import FileStore
# Local file storage
file_store = await FileStore.create(base_path="./uploads")
# S3-compatible storage (requires boto3)
file_store = await FileStore.create(
backend="s3",
bucket="my-app-files",
region="us-east-1"
)
Working with Attachments
Copy
from narrator import Attachment
# Create attachment from file
with open("document.pdf", "rb") as f:
attachment = Attachment(
filename="document.pdf",
content=f.read(),
mime_type="application/pdf"
)
# Add to message
message = Message(
role="user",
content="Please review this document"
)
message.add_attachment(attachment)
# Save with file store
await file_store.save_attachment(thread.id, attachment)
# Retrieve attachments
attachments = await file_store.get_attachments(thread.id)
for att in attachments:
print(f"File: {att.filename} ({att.mime_type})")
Integration Examples
With OpenAI
Copy
import openai
from narrator import Thread, Message, ThreadStore
async def chat_with_persistence():
store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
thread_id = "user-session-123"
# Load or create thread
try:
thread = await store.get_thread(thread_id)
except:
thread = Thread(id=thread_id)
# Get user input
user_input = input("You: ")
thread.add_message(Message(role="user", content=user_input))
# Prepare messages for OpenAI
messages = [
{"role": msg.role, "content": msg.content}
for msg in thread.messages
]
# Call OpenAI
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=messages
)
# Add response to thread
assistant_message = Message(
role="assistant",
content=response.choices[0].message.content
)
thread.add_message(assistant_message)
# Save thread
await store.save_thread(thread)
print(f"Assistant: {assistant_message.content}")
With LangChain
Copy
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from narrator import ThreadStore, Thread, Message
class NarratorMemory(ConversationSummaryBufferMemory):
def __init__(self, thread_store: ThreadStore, thread_id: str):
super().__init__()
self.thread_store = thread_store
self.thread_id = thread_id
async def load_memory(self):
try:
thread = await self.thread_store.get_thread(self.thread_id)
for msg in thread.messages:
if msg.role == "user":
self.chat_memory.add_user_message(msg.content)
elif msg.role == "assistant":
self.chat_memory.add_ai_message(msg.content)
except:
pass
async def save_context(self, inputs: dict, outputs: dict):
thread = Thread(id=self.thread_id)
# Add messages from current context
for msg in self.chat_memory.messages:
if isinstance(msg, HumanMessage):
thread.add_message(Message(role="user", content=msg.content))
elif isinstance(msg, AIMessage):
thread.add_message(Message(role="assistant", content=msg.content))
await self.thread_store.save_thread(thread)
With Custom AI Systems
Copy
from narrator import Thread, Message, ThreadStore
from typing import List, Dict
class ChatBot:
def __init__(self, thread_store: ThreadStore):
self.thread_store = thread_store
self.active_sessions = {}
async def start_session(self, user_id: str) -> str:
"""Start or resume a chat session"""
thread_id = f"user-{user_id}-chat"
try:
thread = await self.thread_store.get_thread(thread_id)
print(f"Resuming session with {len(thread.messages)} messages")
except:
thread = Thread(
id=thread_id,
metadata={"user_id": user_id, "started": datetime.now().isoformat()}
)
print("Starting new session")
self.active_sessions[user_id] = thread
return thread_id
async def process_message(self, user_id: str, content: str) -> str:
"""Process a user message and generate response"""
thread = self.active_sessions.get(user_id)
if not thread:
await self.start_session(user_id)
thread = self.active_sessions[user_id]
# Add user message
thread.add_message(Message(
role="user",
content=content,
metadata={"timestamp": datetime.now().isoformat()}
))
# Generate response (your AI logic here)
response = await self.generate_response(thread.messages)
# Add assistant message
thread.add_message(Message(
role="assistant",
content=response,
metadata={"timestamp": datetime.now().isoformat()}
))
# Save thread
await self.thread_store.save_thread(thread)
return response
async def generate_response(self, messages: List[Message]) -> str:
# Your AI logic here
return "This is where your AI generates a response"
Advanced patterns
Thread Archival
Copy
async def archive_old_threads(store: ThreadStore, days: int = 30):
"""Archive threads older than specified days"""
cutoff = datetime.now() - timedelta(days=days)
threads = await store.list_threads()
archived_count = 0
for thread in threads:
# Check last message time
if thread.messages:
last_msg_time = thread.messages[-1].metadata.get("timestamp", "")
if last_msg_time and datetime.fromisoformat(last_msg_time) < cutoff:
# Mark as archived
thread.metadata["archived"] = True
thread.metadata["archived_at"] = datetime.now().isoformat()
await store.save_thread(thread)
archived_count += 1
print(f"Archived {archived_count} threads")
Message Search
Copy
async def search_messages(
store: ThreadStore,
query: str,
user_id: str = None
) -> List[Dict]:
"""Search messages across threads"""
results = []
# Get relevant threads
if user_id:
threads = await store.list_threads(
filters={"metadata.user_id": user_id}
)
else:
threads = await store.list_threads()
# Search messages
for thread in threads:
for msg in thread.messages:
if query.lower() in msg.content.lower():
results.append({
"thread_id": thread.id,
"message": msg,
"context": thread.metadata
})
return results
Export/Import
Copy
import json
async def export_thread(thread: Thread, file_path: str):
"""Export thread to JSON file"""
data = {
"id": thread.id,
"metadata": thread.metadata,
"messages": [
{
"role": msg.role,
"content": msg.content,
"metadata": msg.metadata,
"attachments": [
{
"filename": att.filename,
"mime_type": att.mime_type
} for att in msg.attachments
]
} for msg in thread.messages
]
}
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
async def import_thread(store: ThreadStore, file_path: str) -> Thread:
"""Import thread from JSON file"""
with open(file_path, "r") as f:
data = json.load(f)
thread = Thread(id=data["id"], metadata=data["metadata"])
for msg_data in data["messages"]:
message = Message(
role=msg_data["role"],
content=msg_data["content"],
metadata=msg_data.get("metadata", {})
)
thread.add_message(message)
await store.save_thread(thread)
return thread
Performance tips
1. Connection Pooling
Copy
# Configure connection pool for PostgreSQL
store = await ThreadStore.create(
"postgresql+asyncpg://localhost/chat",
pool_size=20, # Number of connections
max_overflow=10, # Additional connections when needed
pool_timeout=30, # Timeout for getting connection
pool_recycle=3600 # Recycle connections after 1 hour
)
2. Batch Operations
Copy
# Save multiple threads efficiently
threads = [thread1, thread2, thread3]
await asyncio.gather(*[
store.save_thread(thread) for thread in threads
])
3. Lazy Loading
Copy
# Load only thread metadata first
thread_list = await store.list_threads(load_messages=False)
# Load full thread only when needed
full_thread = await store.get_thread(thread_id)
Migration Guide
From File-based Storage
Copy
async def migrate_from_files(file_dir: str, store: ThreadStore):
"""Migrate from file-based storage to database"""
import os
import json
for filename in os.listdir(file_dir):
if filename.endswith(".json"):
with open(os.path.join(file_dir, filename), "r") as f:
data = json.load(f)
thread = Thread(id=data["id"])
for msg in data["messages"]:
thread.add_message(Message(
role=msg["role"],
content=msg["content"]
))
await store.save_thread(thread)
print(f"Migrated thread {thread.id}")