Examples

Real-world code examples for common use cases and patterns

Background Jobs

Track cron jobs, scheduled tasks, and background workers:

import observable

@observable.track("daily_backup")
def backup_database():
    """Run nightly database backup"""
    # Backup logic here
    pass

@observable.track("hourly_sync")
def sync_data():
    """Hourly data sync from external API"""
    observable.trace("records_fetched", 1234)
    observable.trace("source", "api.example.com")
    # Sync logic here
    pass

Web Scraping & Data Collection

Monitor scraping jobs and data pipelines:

import observable
import requests
from bs4 import BeautifulSoup

@observable.track("scrape_prices")
def scrape_product_prices():
    """Daily price monitoring"""
    urls = get_product_urls()

    observable.trace("products_to_check", len(urls))

    prices = []
    for url in urls:
        response = requests.get(url)
        price = extract_price(response.text)
        prices.append(price)

    observable.trace("prices_found", len(prices))
    observable.trace("avg_price", sum(prices) / len(prices))

    return prices

ETL & Data Pipelines

Track data transformation and loading jobs:

import observable
import pandas as pd

@observable.track("etl_pipeline")
def run_etl():
    """Extract, transform, load data"""

    # Extract
    with observable.track("extract_data"):
        df = pd.read_csv("s3://bucket/data.csv")
        observable.trace("rows_extracted", len(df))

    # Transform
    with observable.track("transform_data"):
        df_clean = clean_data(df)
        observable.trace("rows_after_cleaning", len(df_clean))

    # Load
    with observable.track("load_data"):
        df_clean.to_sql("analytics", connection)
        observable.trace("rows_loaded", len(df_clean))

Async Operations

Track async functions and concurrent tasks:

import observable
import asyncio
import aiohttp

@observable.track("fetch_all_apis")
async def fetch_all():
    """Fetch data from multiple APIs concurrently"""
    urls = ["https://api1.com", "https://api2.com", "https://api3.com"]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    observable.trace("apis_called", len(urls))
    observable.trace("results_received", len(results))
    return results

@observable.track("fetch_one_api")
async def fetch_one(session, url):
    """Individual API call"""
    async with session.get(url) as response:
        return await response.json()

Webhook Handlers

Monitor webhook processing and external integrations:

import observable
from flask import Flask, request

app = Flask(__name__)

@app.route("/webhook/stripe", methods=["POST"])
def stripe_webhook():
    """Process Stripe webhook events"""
    payload = request.get_json()
    event_type = payload.get("type")

    observable.trace("event_type", event_type)
    observable.trace("webhook_id", payload.get("id"))

    with observable.track("process_stripe_webhook"):
        if event_type == "payment_intent.succeeded":
            process_payment(payload)
        elif event_type == "customer.subscription.created":
            process_subscription(payload)

    return {"status": "ok"}, 200

ML Model Training

Track machine learning training runs:

import observable
from sklearn.ensemble import RandomForestClassifier

@observable.track("train_model")
def train_model(X_train, y_train):
    """Train ML model"""
    observable.trace("training_samples", len(X_train))
    observable.trace("features", X_train.shape[1])

    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)

    score = model.score(X_test, y_test)
    observable.trace("accuracy", round(score, 4))

    return model

Error Handling

Exceptions are automatically captured and reported:

import observable

@observable.track("risky_operation")
def process_data(data):
    """Function that might fail"""
    if not data:
        raise ValueError("Data cannot be empty")

    # Process data...
    result = transform(data)

    if result is None:
        raise RuntimeError("Transformation failed")

    return result

# When an exception occurs:
# - Event is sent with status="failed"
# - error_type and error_message are captured
# - Exception is re-raised (your code behaves normally)

Manual Events

Send standalone events for important milestones:

import observable

# Application lifecycle events
observable.event("app_started", version="1.2.3", environment="production")

# Business events
observable.event(
    "payment_received",
    amount=99.99,
    currency="USD",
    customer_id="cus_123"
)

# System events
observable.event(
    "disk_space_low",
    status="failed",
    available_gb=5.2,
    threshold_gb=10
)

# Deployment events
observable.event(
    "deployment_complete",
    commit_sha="a1b2c3d",
    deployed_by="alice"
)

Context Manager Pattern

Track blocks of code without wrapping entire functions:

import observable

def complex_workflow():
    """Multi-step workflow with selective tracking"""

    # Setup (not tracked)
    config = load_config()

    # Track critical section
    with observable.track("database_migration"):
        migrate_schema()
        backfill_data()

    # More untracked work
    cleanup()

    # Track another section
    with observable.track("send_notifications"):
        observable.trace("recipients", len(users))
        for user in users:
            send_email(user)

    return "done"

More Examples?

Have a use case you'd like to see documented? These examples show the most common patterns, but Observable Code works with any Python code that runs in the background.