Narrator is Slide’s conversation persistence and storage system. While it integrates seamlessly with Tyler agents, you can also use Narrator independently for any application that needs to manage conversations and files.Documentation Index
Fetch the complete documentation index at: https://slide.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Why Use Narrator Standalone?
- Add conversation history to existing AI applications
- Manage chat threads across multiple sessions
- Store and retrieve file attachments
- Switch between storage backends easily
- No dependency on Tyler or other Slide packages
Quick Start
Installation
# Using uv (recommended)
uv add slide-narrator
# Using pip (fallback)
pip install slide-narrator
Basic usage
import asyncio
from narrator import Thread, Message, ThreadStore
async def main():
# Create a thread store (in-memory by default)
store = await ThreadStore.create()
# Create a conversation thread
thread = Thread(id="chat-001")
# Add messages
thread.add_message(Message(role="user", content="Hello!"))
thread.add_message(Message(role="assistant", content="Hi there! How can I help?"))
# Save the thread
await store.save_thread(thread)
# Load it later
loaded_thread = await store.get_thread("chat-001")
print(f"Messages in thread: {len(loaded_thread.messages)}")
asyncio.run(main())
Storage Backends
In-Memory Storage
Perfect for testing and temporary conversations:# Default - no URL needed
store = await ThreadStore.create()
# Explicit
store = await ThreadStore.create("memory://")
SQLite Storage
Great for local applications and development:# SQLite with async support
store = await ThreadStore.create("sqlite+aiosqlite:///conversations.db")
# Custom options
store = await ThreadStore.create(
"sqlite+aiosqlite:///app.db",
pool_size=5,
max_overflow=10
)
PostgreSQL Storage
For production applications with multiple users:# PostgreSQL with asyncpg
store = await ThreadStore.create(
"postgresql+asyncpg://user:pass@localhost/dbname"
)
# With connection pool settings
store = await ThreadStore.create(
"postgresql+asyncpg://localhost/chat_app",
pool_size=20,
max_overflow=30,
pool_timeout=30
)
Thread Management
Creating Threads
from narrator import Thread, Message
import uuid
# Auto-generated ID
thread = Thread()
print(f"Thread ID: {thread.id}")
# Custom ID
thread = Thread(id="user-123-support-chat")
# With metadata
thread = Thread(
id=f"session-{uuid.uuid4()}",
metadata={
"user_id": "user-123",
"channel": "web",
"tags": ["support", "billing"],
"created_at": datetime.now().isoformat()
}
)
Managing Messages
# Add messages
thread.add_message(Message(
role="user",
content="What's the weather like?",
metadata={"timestamp": datetime.now().isoformat()}
))
thread.add_message(Message(
role="assistant",
content="I'll help you check the weather. What's your location?"
))
# Access messages
for msg in thread.messages:
print(f"{msg.role}: {msg.content}")
# Get last message
last_message = thread.messages[-1] if thread.messages else None
# Filter messages
user_messages = [m for m in thread.messages if m.role == "user"]
Thread Operations
# Save thread
await store.save_thread(thread)
# Load thread
thread = await store.get_thread("thread-id")
# List all threads
threads = await store.list_threads()
for t in threads:
print(f"Thread {t.id}: {len(t.messages)} messages")
# Delete thread
await store.delete_thread("thread-id")
# Search threads by metadata
user_threads = await store.list_threads(
filters={"metadata.user_id": "user-123"}
)
File Storage
Setting Up File Storage
from narrator import FileStore
# Local file storage
file_store = await FileStore.create(base_path="./uploads")
# S3-compatible storage (requires boto3)
file_store = await FileStore.create(
backend="s3",
bucket="my-app-files",
region="us-east-1"
)
Working with Attachments
from narrator import Attachment
# Create attachment from file
with open("document.pdf", "rb") as f:
attachment = Attachment(
filename="document.pdf",
content=f.read(),
mime_type="application/pdf"
)
# Add to message
message = Message(
role="user",
content="Please review this document"
)
message.add_attachment(attachment)
# Save with file store
await file_store.save_attachment(thread.id, attachment)
# Retrieve attachments
attachments = await file_store.get_attachments(thread.id)
for att in attachments:
print(f"File: {att.filename} ({att.mime_type})")
Integration Examples
With OpenAI
import openai
from narrator import Thread, Message, ThreadStore
async def chat_with_persistence():
store = await ThreadStore.create("sqlite+aiosqlite:///chat.db")
thread_id = "user-session-123"
# Load or create thread
try:
thread = await store.get_thread(thread_id)
except:
thread = Thread(id=thread_id)
# Get user input
user_input = input("You: ")
thread.add_message(Message(role="user", content=user_input))
# Prepare messages for OpenAI
messages = [
{"role": msg.role, "content": msg.content}
for msg in thread.messages
]
# Call OpenAI
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=messages
)
# Add response to thread
assistant_message = Message(
role="assistant",
content=response.choices[0].message.content
)
thread.add_message(assistant_message)
# Save thread
await store.save_thread(thread)
print(f"Assistant: {assistant_message.content}")
With LangChain
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from narrator import ThreadStore, Thread, Message
class NarratorMemory(ConversationSummaryBufferMemory):
def __init__(self, thread_store: ThreadStore, thread_id: str):
super().__init__()
self.thread_store = thread_store
self.thread_id = thread_id
async def load_memory(self):
try:
thread = await self.thread_store.get_thread(self.thread_id)
for msg in thread.messages:
if msg.role == "user":
self.chat_memory.add_user_message(msg.content)
elif msg.role == "assistant":
self.chat_memory.add_ai_message(msg.content)
except:
pass
async def save_context(self, inputs: dict, outputs: dict):
thread = Thread(id=self.thread_id)
# Add messages from current context
for msg in self.chat_memory.messages:
if isinstance(msg, HumanMessage):
thread.add_message(Message(role="user", content=msg.content))
elif isinstance(msg, AIMessage):
thread.add_message(Message(role="assistant", content=msg.content))
await self.thread_store.save_thread(thread)
With Custom AI Systems
from narrator import Thread, Message, ThreadStore
from typing import List, Dict
class ChatBot:
def __init__(self, thread_store: ThreadStore):
self.thread_store = thread_store
self.active_sessions = {}
async def start_session(self, user_id: str) -> str:
"""Start or resume a chat session"""
thread_id = f"user-{user_id}-chat"
try:
thread = await self.thread_store.get_thread(thread_id)
print(f"Resuming session with {len(thread.messages)} messages")
except:
thread = Thread(
id=thread_id,
metadata={"user_id": user_id, "started": datetime.now().isoformat()}
)
print("Starting new session")
self.active_sessions[user_id] = thread
return thread_id
async def process_message(self, user_id: str, content: str) -> str:
"""Process a user message and generate response"""
thread = self.active_sessions.get(user_id)
if not thread:
await self.start_session(user_id)
thread = self.active_sessions[user_id]
# Add user message
thread.add_message(Message(
role="user",
content=content,
metadata={"timestamp": datetime.now().isoformat()}
))
# Generate response (your AI logic here)
response = await self.generate_response(thread.messages)
# Add assistant message
thread.add_message(Message(
role="assistant",
content=response,
metadata={"timestamp": datetime.now().isoformat()}
))
# Save thread
await self.thread_store.save_thread(thread)
return response
async def generate_response(self, messages: List[Message]) -> str:
# Your AI logic here
return "This is where your AI generates a response"
Advanced patterns
Thread Archival
async def archive_old_threads(store: ThreadStore, days: int = 30):
"""Archive threads older than specified days"""
cutoff = datetime.now() - timedelta(days=days)
threads = await store.list_threads()
archived_count = 0
for thread in threads:
# Check last message time
if thread.messages:
last_msg_time = thread.messages[-1].metadata.get("timestamp", "")
if last_msg_time and datetime.fromisoformat(last_msg_time) < cutoff:
# Mark as archived
thread.metadata["archived"] = True
thread.metadata["archived_at"] = datetime.now().isoformat()
await store.save_thread(thread)
archived_count += 1
print(f"Archived {archived_count} threads")
Message Search
async def search_messages(
store: ThreadStore,
query: str,
user_id: str = None
) -> List[Dict]:
"""Search messages across threads"""
results = []
# Get relevant threads
if user_id:
threads = await store.list_threads(
filters={"metadata.user_id": user_id}
)
else:
threads = await store.list_threads()
# Search messages
for thread in threads:
for msg in thread.messages:
if query.lower() in msg.content.lower():
results.append({
"thread_id": thread.id,
"message": msg,
"context": thread.metadata
})
return results
Export/Import
import json
async def export_thread(thread: Thread, file_path: str):
"""Export thread to JSON file"""
data = {
"id": thread.id,
"metadata": thread.metadata,
"messages": [
{
"role": msg.role,
"content": msg.content,
"metadata": msg.metadata,
"attachments": [
{
"filename": att.filename,
"mime_type": att.mime_type
} for att in msg.attachments
]
} for msg in thread.messages
]
}
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
async def import_thread(store: ThreadStore, file_path: str) -> Thread:
"""Import thread from JSON file"""
with open(file_path, "r") as f:
data = json.load(f)
thread = Thread(id=data["id"], metadata=data["metadata"])
for msg_data in data["messages"]:
message = Message(
role=msg_data["role"],
content=msg_data["content"],
metadata=msg_data.get("metadata", {})
)
thread.add_message(message)
await store.save_thread(thread)
return thread
Performance tips
1. Connection Pooling
# Configure connection pool for PostgreSQL
store = await ThreadStore.create(
"postgresql+asyncpg://localhost/chat",
pool_size=20, # Number of connections
max_overflow=10, # Additional connections when needed
pool_timeout=30, # Timeout for getting connection
pool_recycle=3600 # Recycle connections after 1 hour
)
2. Batch Operations
# Save multiple threads efficiently
threads = [thread1, thread2, thread3]
await asyncio.gather(*[
store.save_thread(thread) for thread in threads
])
3. Lazy Loading
# Load only thread metadata first
thread_list = await store.list_threads(load_messages=False)
# Load full thread only when needed
full_thread = await store.get_thread(thread_id)
Migration Guide
From File-based Storage
async def migrate_from_files(file_dir: str, store: ThreadStore):
"""Migrate from file-based storage to database"""
import os
import json
for filename in os.listdir(file_dir):
if filename.endswith(".json"):
with open(os.path.join(file_dir, filename), "r") as f:
data = json.load(f)
thread = Thread(id=data["id"])
for msg in data["messages"]:
thread.add_message(Message(
role=msg["role"],
content=msg["content"]
))
await store.save_thread(thread)
print(f"Migrated thread {thread.id}")
Next steps
Conversation Persistence
Advanced persistence patterns
API Reference
Complete API documentation
Tyler Integration
Use with Tyler agents
Examples
See more examples