Lye is Slide’s comprehensive tool library that provides ready-to-use utilities for web interaction, file handling, image processing, audio manipulation, and more. While designed to work seamlessly with Tyler agents, Lye can be used independently in any Python application.

Why Use Lye Standalone?

  • Add powerful capabilities to existing applications
  • No AI or agent dependencies required
  • Async-first design for modern Python
  • Consistent API across all tool categories
  • Well-tested and production-ready

Quick Start

Installation

# Using uv (recommended)
uv add slide-lye

# Using pip (fallback)
pip install slide-lye

Optional system dependencies

While Lye works out of the box, some advanced features benefit from additional system libraries:
  • File type detection: Install libmagic for more accurate MIME type detection
  • PDF OCR: Install poppler to process scanned PDFs
See the Lye package documentation for installation instructions.

Basic usage

import asyncio
from lye.web import search, fetch

async def main():
    # Search the web
    results = await search("Python async programming")
    print(f"Found: {results}")
    
    # Fetch a webpage
    content = await fetch("https://example.com")
    print(f"Page content: {content[:200]}...")

asyncio.run(main())

Tool Categories

Web Tools

Tools for interacting with the web:
from lye.web import search, fetch
from lye import WEB_TOOLS  # All web tools as a list

# Search the web
results = await search("climate change 2024")

# Fetch webpage content
html = await fetch("https://example.com/article")

# Using with rate limiting
async def fetch_with_delay(urls):
    results = []
    for url in urls:
        content = await fetch(url)
        results.append(content)
        await asyncio.sleep(1)  # Be respectful
    return results

File Tools

File system operations:
from lye.files import read_file, write_file, list_files
from lye import FILES_TOOLS

# Read a file
content = await read_file("data.json")

# Write to a file
await write_file("output.txt", "Hello, World!")

# List directory contents
files = await list_files("./documents", pattern="*.pdf")

# Batch file processing
async def process_csv_files(directory):
    csv_files = await list_files(directory, pattern="*.csv")
    for file in csv_files:
        data = await read_file(file)
        # Process data
        processed = transform_data(data)
        await write_file(f"processed_{file}", processed)

Image Tools

Image analysis and processing:
from lye.image import analyze_image, extract_text_from_image
from lye import IMAGE_TOOLS

# Analyze an image
with open("photo.jpg", "rb") as f:
    analysis = await analyze_image(f.read())
    print(f"Image contains: {analysis}")

# Extract text (OCR)
with open("document.png", "rb") as f:
    text = await extract_text_from_image(f.read())
    print(f"Extracted text: {text}")

# Batch image processing
async def catalog_images(image_dir):
    images = await list_files(image_dir, pattern="*.jpg")
    catalog = {}
    
    for img_path in images:
        with open(img_path, "rb") as f:
            image_data = f.read()
        
        catalog[img_path] = {
            "analysis": await analyze_image(image_data),
            "text": await extract_text_from_image(image_data)
        }
    
    return catalog

Audio Tools

Audio processing and transcription:
from lye.audio import transcribe, text_to_speech
from lye import AUDIO_TOOLS

# Transcribe audio
with open("recording.mp3", "rb") as f:
    transcript = await transcribe(f.read())
    print(f"Transcript: {transcript}")

# Generate speech
audio_data = await text_to_speech(
    "Hello, this is a test of text to speech.",
    voice="alloy"  # or "echo", "fable", "onyx", "nova", "shimmer"
)

# Save the audio
with open("output.mp3", "wb") as f:
    f.write(audio_data)

# Process podcast episodes
async def transcribe_podcast(episode_files):
    transcripts = {}
    
    for episode in episode_files:
        with open(episode, "rb") as f:
            audio = f.read()
        
        transcript = await transcribe(audio)
        transcripts[episode] = transcript
        
        # Save transcript
        await write_file(
            f"{episode}_transcript.txt",
            transcript
        )
    
    return transcripts

Browser Tools

Web automation and scraping:
from lye.browser import screenshot, extract_text_from_webpage
from lye import BROWSER_TOOLS

# Take a screenshot
image_data = await screenshot("https://example.com")
with open("screenshot.png", "wb") as f:
    f.write(image_data)

# Extract clean text from webpage
text = await extract_text_from_webpage("https://example.com/article")
print(f"Article text: {text}")

# Monitor website changes
async def monitor_website(url, interval=3600):
    previous_content = None
    
    while True:
        current_content = await extract_text_from_webpage(url)
        
        if previous_content and current_content != previous_content:
            print(f"Website changed at {datetime.now()}")
            # Send notification, save diff, etc.
        
        previous_content = current_content
        await asyncio.sleep(interval)

Real-World Applications

Web Scraper

Build a comprehensive web scraper:
from lye.web import search, fetch
from lye.browser import extract_text_from_webpage
from lye.files import write_file
import json

class WebScraper:
    def __init__(self):
        self.results = []
    
    async def scrape_topic(self, topic, max_results=10):
        # Search for URLs
        search_results = await search(f"{topic} site:medium.com")
        urls = self.extract_urls(search_results)[:max_results]
        
        # Scrape each URL
        for url in urls:
            try:
                content = await extract_text_from_webpage(url)
                self.results.append({
                    "url": url,
                    "content": content,
                    "word_count": len(content.split()),
                    "scraped_at": datetime.now().isoformat()
                })
            except Exception as e:
                print(f"Error scraping {url}: {e}")
        
        # Save results
        await write_file(
            f"{topic}_articles.json",
            json.dumps(self.results, indent=2)
        )
        
        return self.results
    
    def extract_urls(self, search_results):
        # Parse URLs from search results
        # Implementation depends on search result format
        return []

# Usage
scraper = WebScraper()
articles = await scraper.scrape_topic("machine learning", max_results=20)

Media Processor

Process multimedia files:
from lye.image import analyze_image, extract_text_from_image
from lye.audio import transcribe
from lye.files import list_files, write_file
import os

class MediaProcessor:
    def __init__(self, input_dir, output_dir):
        self.input_dir = input_dir
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
    
    async def process_all(self):
        # Process images
        images = await list_files(self.input_dir, pattern="*.{jpg,png,jpeg}")
        for img in images:
            await self.process_image(img)
        
        # Process audio
        audio_files = await list_files(self.input_dir, pattern="*.{mp3,wav,m4a}")
        for audio in audio_files:
            await self.process_audio(audio)
    
    async def process_image(self, image_path):
        with open(image_path, "rb") as f:
            image_data = f.read()
        
        # Analyze and extract text
        analysis = await analyze_image(image_data)
        text = await extract_text_from_image(image_data)
        
        # Save results
        base_name = os.path.basename(image_path)
        output_path = os.path.join(self.output_dir, f"{base_name}_analysis.json")
        
        await write_file(output_path, json.dumps({
            "file": image_path,
            "analysis": analysis,
            "extracted_text": text
        }, indent=2))
    
    async def process_audio(self, audio_path):
        with open(audio_path, "rb") as f:
            audio_data = f.read()
        
        # Transcribe
        transcript = await transcribe(audio_data)
        
        # Save transcript
        base_name = os.path.basename(audio_path)
        output_path = os.path.join(self.output_dir, f"{base_name}_transcript.txt")
        
        await write_file(output_path, transcript)

# Usage
processor = MediaProcessor("./media_files", "./processed")
await processor.process_all()

Research assistant

Automated research tool:
from lye.web import search, fetch
from lye.files import write_file, read_file
from lye.browser import extract_text_from_webpage
from datetime import datetime
import json

class ResearchAssistant:
    def __init__(self, cache_dir="./research_cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    
    async def research_topic(self, topic, questions):
        research_data = {
            "topic": topic,
            "timestamp": datetime.now().isoformat(),
            "questions": {},
            "sources": []
        }
        
        for question in questions:
            # Search for answers
            query = f"{topic} {question}"
            search_results = await search(query)
            
            # Extract URLs and fetch content
            urls = self.extract_urls(search_results)[:3]
            answers = []
            
            for url in urls:
                try:
                    content = await extract_text_from_webpage(url)
                    answers.append({
                        "url": url,
                        "content": content[:1000],  # First 1000 chars
                        "relevance": self.calculate_relevance(content, question)
                    })
                    research_data["sources"].append(url)
                except:
                    continue
            
            research_data["questions"][question] = answers
        
        # Save research
        filename = f"{topic.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        await write_file(
            os.path.join(self.cache_dir, filename),
            json.dumps(research_data, indent=2)
        )
        
        return research_data
    
    def calculate_relevance(self, content, question):
        # Simple keyword matching (can be improved)
        keywords = question.lower().split()
        content_lower = content.lower()
        matches = sum(1 for keyword in keywords if keyword in content_lower)
        return matches / len(keywords)

# Usage
assistant = ResearchAssistant()
research = await assistant.research_topic(
    "renewable energy",
    [
        "What are the latest breakthroughs?",
        "Which countries lead in adoption?",
        "What are the main challenges?"
    ]
)

Advanced patterns

Rate Limiting

Implement rate limiting for API calls:
import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, calls_per_minute=60):
        self.calls_per_minute = calls_per_minute
        self.calls = []
    
    async def wait_if_needed(self):
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        
        # Remove old calls
        self.calls = [call for call in self.calls if call > minute_ago]
        
        if len(self.calls) >= self.calls_per_minute:
            # Wait until the oldest call is more than a minute old
            wait_time = (self.calls[0] + timedelta(minutes=1) - now).total_seconds()
            await asyncio.sleep(wait_time)
        
        self.calls.append(now)

# Use with Lye tools
rate_limiter = RateLimiter(calls_per_minute=30)

async def search_with_limit(query):
    await rate_limiter.wait_if_needed()
    return await search(query)

Error Handling

Robust error handling for tools:
from typing import Optional
import logging

logger = logging.getLogger(__name__)

async def safe_fetch(url: str, retries: int = 3) -> Optional[str]:
    """Fetch URL with retry logic"""
    for attempt in range(retries):
        try:
            content = await fetch(url)
            return content
        except Exception as e:
            logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")
            if attempt < retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    logger.error(f"Failed to fetch {url} after {retries} attempts")
    return None

Parallel Processing

Process multiple items concurrently:
async def process_urls_parallel(urls, max_concurrent=5):
    """Process URLs with controlled concurrency"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_with_semaphore(url):
        async with semaphore:
            return await extract_text_from_webpage(url)
    
    tasks = [process_with_semaphore(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle results and errors
    successful = []
    failed = []
    
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failed.append((url, str(result)))
        else:
            successful.append((url, result))
    
    return successful, failed

Integration Examples

With FastAPI

from fastapi import FastAPI, UploadFile
from lye.image import analyze_image
from lye.audio import transcribe

app = FastAPI()

@app.post("/analyze-image")
async def analyze_uploaded_image(file: UploadFile):
    contents = await file.read()
    analysis = await analyze_image(contents)
    return {"filename": file.filename, "analysis": analysis}

@app.post("/transcribe-audio")
async def transcribe_uploaded_audio(file: UploadFile):
    contents = await file.read()
    transcript = await transcribe(contents)
    return {"filename": file.filename, "transcript": transcript}

With Django

# views.py
from django.http import JsonResponse
from django.views import View
from lye.web import search
import asyncio

class SearchView(View):
    def get(self, request):
        query = request.GET.get('q', '')
        if not query:
            return JsonResponse({'error': 'No query provided'}, status=400)
        
        # Run async function in sync context
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        results = loop.run_until_complete(search(query))
        
        return JsonResponse({'query': query, 'results': results})

Performance tips

  1. Use asyncio.gather() for parallel operations
    results = await asyncio.gather(
        search("topic 1"),
        search("topic 2"),
        search("topic 3")
    )
    
  2. Implement caching for expensive operations
    from functools import lru_cache
    
    @lru_cache(maxsize=100)
    async def cached_search(query):
        return await search(query)
    
  3. Stream large files
    async def process_large_file(filepath):
        # Process in chunks instead of loading entire file
        chunk_size = 1024 * 1024  # 1MB
        
        with open(filepath, 'rb') as f:
            while chunk := f.read(chunk_size):
                # Process chunk
                pass
    

Next steps