Why Use Lye Standalone?
- Add powerful capabilities to existing applications
- No AI or agent dependencies required
- Async-first design for modern Python
- Consistent API across all tool categories
- Well-tested and production-ready
Quick Start
Installation
Copy
# Using uv (recommended)
uv add slide-lye
# Using pip (fallback)
pip install slide-lye
Optional system dependencies
While Lye works out of the box, some advanced features benefit from additional system libraries:- File type detection: Install
libmagic
for more accurate MIME type detection - PDF OCR: Install
poppler
to process scanned PDFs
Basic usage
Copy
import asyncio
from lye.web import search, fetch
async def main():
# Search the web
results = await search("Python async programming")
print(f"Found: {results}")
# Fetch a webpage
content = await fetch("https://example.com")
print(f"Page content: {content[:200]}...")
asyncio.run(main())
Tool Categories
Web Tools
Tools for interacting with the web:Copy
from lye.web import search, fetch
from lye import WEB_TOOLS # All web tools as a list
# Search the web
results = await search("climate change 2024")
# Fetch webpage content
html = await fetch("https://example.com/article")
# Using with rate limiting
async def fetch_with_delay(urls):
results = []
for url in urls:
content = await fetch(url)
results.append(content)
await asyncio.sleep(1) # Be respectful
return results
File Tools
File system operations:Copy
from lye.files import read_file, write_file, list_files
from lye import FILES_TOOLS
# Read a file
content = await read_file("data.json")
# Write to a file
await write_file("output.txt", "Hello, World!")
# List directory contents
files = await list_files("./documents", pattern="*.pdf")
# Batch file processing
async def process_csv_files(directory):
csv_files = await list_files(directory, pattern="*.csv")
for file in csv_files:
data = await read_file(file)
# Process data
processed = transform_data(data)
await write_file(f"processed_{file}", processed)
Image Tools
Image analysis and processing:Copy
from lye.image import analyze_image, extract_text_from_image
from lye import IMAGE_TOOLS
# Analyze an image
with open("photo.jpg", "rb") as f:
analysis = await analyze_image(f.read())
print(f"Image contains: {analysis}")
# Extract text (OCR)
with open("document.png", "rb") as f:
text = await extract_text_from_image(f.read())
print(f"Extracted text: {text}")
# Batch image processing
async def catalog_images(image_dir):
images = await list_files(image_dir, pattern="*.jpg")
catalog = {}
for img_path in images:
with open(img_path, "rb") as f:
image_data = f.read()
catalog[img_path] = {
"analysis": await analyze_image(image_data),
"text": await extract_text_from_image(image_data)
}
return catalog
Audio Tools
Audio processing and transcription:Copy
from lye.audio import transcribe, text_to_speech
from lye import AUDIO_TOOLS
# Transcribe audio
with open("recording.mp3", "rb") as f:
transcript = await transcribe(f.read())
print(f"Transcript: {transcript}")
# Generate speech
audio_data = await text_to_speech(
"Hello, this is a test of text to speech.",
voice="alloy" # or "echo", "fable", "onyx", "nova", "shimmer"
)
# Save the audio
with open("output.mp3", "wb") as f:
f.write(audio_data)
# Process podcast episodes
async def transcribe_podcast(episode_files):
transcripts = {}
for episode in episode_files:
with open(episode, "rb") as f:
audio = f.read()
transcript = await transcribe(audio)
transcripts[episode] = transcript
# Save transcript
await write_file(
f"{episode}_transcript.txt",
transcript
)
return transcripts
Browser Tools
Web automation and scraping:Copy
from lye.browser import screenshot, extract_text_from_webpage
from lye import BROWSER_TOOLS
# Take a screenshot
image_data = await screenshot("https://example.com")
with open("screenshot.png", "wb") as f:
f.write(image_data)
# Extract clean text from webpage
text = await extract_text_from_webpage("https://example.com/article")
print(f"Article text: {text}")
# Monitor website changes
async def monitor_website(url, interval=3600):
previous_content = None
while True:
current_content = await extract_text_from_webpage(url)
if previous_content and current_content != previous_content:
print(f"Website changed at {datetime.now()}")
# Send notification, save diff, etc.
previous_content = current_content
await asyncio.sleep(interval)
Real-World Applications
Web Scraper
Build a comprehensive web scraper:Copy
from lye.web import search, fetch
from lye.browser import extract_text_from_webpage
from lye.files import write_file
import json
class WebScraper:
def __init__(self):
self.results = []
async def scrape_topic(self, topic, max_results=10):
# Search for URLs
search_results = await search(f"{topic} site:medium.com")
urls = self.extract_urls(search_results)[:max_results]
# Scrape each URL
for url in urls:
try:
content = await extract_text_from_webpage(url)
self.results.append({
"url": url,
"content": content,
"word_count": len(content.split()),
"scraped_at": datetime.now().isoformat()
})
except Exception as e:
print(f"Error scraping {url}: {e}")
# Save results
await write_file(
f"{topic}_articles.json",
json.dumps(self.results, indent=2)
)
return self.results
def extract_urls(self, search_results):
# Parse URLs from search results
# Implementation depends on search result format
return []
# Usage
scraper = WebScraper()
articles = await scraper.scrape_topic("machine learning", max_results=20)
Media Processor
Process multimedia files:Copy
from lye.image import analyze_image, extract_text_from_image
from lye.audio import transcribe
from lye.files import list_files, write_file
import os
class MediaProcessor:
def __init__(self, input_dir, output_dir):
self.input_dir = input_dir
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
async def process_all(self):
# Process images
images = await list_files(self.input_dir, pattern="*.{jpg,png,jpeg}")
for img in images:
await self.process_image(img)
# Process audio
audio_files = await list_files(self.input_dir, pattern="*.{mp3,wav,m4a}")
for audio in audio_files:
await self.process_audio(audio)
async def process_image(self, image_path):
with open(image_path, "rb") as f:
image_data = f.read()
# Analyze and extract text
analysis = await analyze_image(image_data)
text = await extract_text_from_image(image_data)
# Save results
base_name = os.path.basename(image_path)
output_path = os.path.join(self.output_dir, f"{base_name}_analysis.json")
await write_file(output_path, json.dumps({
"file": image_path,
"analysis": analysis,
"extracted_text": text
}, indent=2))
async def process_audio(self, audio_path):
with open(audio_path, "rb") as f:
audio_data = f.read()
# Transcribe
transcript = await transcribe(audio_data)
# Save transcript
base_name = os.path.basename(audio_path)
output_path = os.path.join(self.output_dir, f"{base_name}_transcript.txt")
await write_file(output_path, transcript)
# Usage
processor = MediaProcessor("./media_files", "./processed")
await processor.process_all()
Research assistant
Automated research tool:Copy
from lye.web import search, fetch
from lye.files import write_file, read_file
from lye.browser import extract_text_from_webpage
from datetime import datetime
import json
class ResearchAssistant:
def __init__(self, cache_dir="./research_cache"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
async def research_topic(self, topic, questions):
research_data = {
"topic": topic,
"timestamp": datetime.now().isoformat(),
"questions": {},
"sources": []
}
for question in questions:
# Search for answers
query = f"{topic} {question}"
search_results = await search(query)
# Extract URLs and fetch content
urls = self.extract_urls(search_results)[:3]
answers = []
for url in urls:
try:
content = await extract_text_from_webpage(url)
answers.append({
"url": url,
"content": content[:1000], # First 1000 chars
"relevance": self.calculate_relevance(content, question)
})
research_data["sources"].append(url)
except:
continue
research_data["questions"][question] = answers
# Save research
filename = f"{topic.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
await write_file(
os.path.join(self.cache_dir, filename),
json.dumps(research_data, indent=2)
)
return research_data
def calculate_relevance(self, content, question):
# Simple keyword matching (can be improved)
keywords = question.lower().split()
content_lower = content.lower()
matches = sum(1 for keyword in keywords if keyword in content_lower)
return matches / len(keywords)
# Usage
assistant = ResearchAssistant()
research = await assistant.research_topic(
"renewable energy",
[
"What are the latest breakthroughs?",
"Which countries lead in adoption?",
"What are the main challenges?"
]
)
Advanced patterns
Rate Limiting
Implement rate limiting for API calls:Copy
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, calls_per_minute=60):
self.calls_per_minute = calls_per_minute
self.calls = []
async def wait_if_needed(self):
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
# Remove old calls
self.calls = [call for call in self.calls if call > minute_ago]
if len(self.calls) >= self.calls_per_minute:
# Wait until the oldest call is more than a minute old
wait_time = (self.calls[0] + timedelta(minutes=1) - now).total_seconds()
await asyncio.sleep(wait_time)
self.calls.append(now)
# Use with Lye tools
rate_limiter = RateLimiter(calls_per_minute=30)
async def search_with_limit(query):
await rate_limiter.wait_if_needed()
return await search(query)
Error Handling
Robust error handling for tools:Copy
from typing import Optional
import logging
logger = logging.getLogger(__name__)
async def safe_fetch(url: str, retries: int = 3) -> Optional[str]:
"""Fetch URL with retry logic"""
for attempt in range(retries):
try:
content = await fetch(url)
return content
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
logger.error(f"Failed to fetch {url} after {retries} attempts")
return None
Parallel Processing
Process multiple items concurrently:Copy
async def process_urls_parallel(urls, max_concurrent=5):
"""Process URLs with controlled concurrency"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(url):
async with semaphore:
return await extract_text_from_webpage(url)
tasks = [process_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results and errors
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failed.append((url, str(result)))
else:
successful.append((url, result))
return successful, failed
Integration Examples
With FastAPI
Copy
from fastapi import FastAPI, UploadFile
from lye.image import analyze_image
from lye.audio import transcribe
app = FastAPI()
@app.post("/analyze-image")
async def analyze_uploaded_image(file: UploadFile):
contents = await file.read()
analysis = await analyze_image(contents)
return {"filename": file.filename, "analysis": analysis}
@app.post("/transcribe-audio")
async def transcribe_uploaded_audio(file: UploadFile):
contents = await file.read()
transcript = await transcribe(contents)
return {"filename": file.filename, "transcript": transcript}
With Django
Copy
# views.py
from django.http import JsonResponse
from django.views import View
from lye.web import search
import asyncio
class SearchView(View):
def get(self, request):
query = request.GET.get('q', '')
if not query:
return JsonResponse({'error': 'No query provided'}, status=400)
# Run async function in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(search(query))
return JsonResponse({'query': query, 'results': results})
Performance tips
-
Use asyncio.gather() for parallel operations
Copy
results = await asyncio.gather( search("topic 1"), search("topic 2"), search("topic 3") )
-
Implement caching for expensive operations
Copy
from functools import lru_cache @lru_cache(maxsize=100) async def cached_search(query): return await search(query)
-
Stream large files
Copy
async def process_large_file(filepath): # Process in chunks instead of loading entire file chunk_size = 1024 * 1024 # 1MB with open(filepath, 'rb') as f: while chunk := f.read(chunk_size): # Process chunk pass