Examples
Real-world code examples for common use cases and patterns
Background Jobs
Track cron jobs, scheduled tasks, and background workers:
import observable
@observable.track("daily_backup")
def backup_database():
"""Run nightly database backup"""
# Backup logic here
pass
@observable.track("hourly_sync")
def sync_data():
"""Hourly data sync from external API"""
observable.trace("records_fetched", 1234)
observable.trace("source", "api.example.com")
# Sync logic here
passWeb Scraping & Data Collection
Monitor scraping jobs and data pipelines:
import observable
import requests
from bs4 import BeautifulSoup
@observable.track("scrape_prices")
def scrape_product_prices():
"""Daily price monitoring"""
urls = get_product_urls()
observable.trace("products_to_check", len(urls))
prices = []
for url in urls:
response = requests.get(url)
price = extract_price(response.text)
prices.append(price)
observable.trace("prices_found", len(prices))
observable.trace("avg_price", sum(prices) / len(prices))
return pricesETL & Data Pipelines
Track data transformation and loading jobs:
import observable
import pandas as pd
@observable.track("etl_pipeline")
def run_etl():
"""Extract, transform, load data"""
# Extract
with observable.track("extract_data"):
df = pd.read_csv("s3://bucket/data.csv")
observable.trace("rows_extracted", len(df))
# Transform
with observable.track("transform_data"):
df_clean = clean_data(df)
observable.trace("rows_after_cleaning", len(df_clean))
# Load
with observable.track("load_data"):
df_clean.to_sql("analytics", connection)
observable.trace("rows_loaded", len(df_clean))Async Operations
Track async functions and concurrent tasks:
import observable
import asyncio
import aiohttp
@observable.track("fetch_all_apis")
async def fetch_all():
"""Fetch data from multiple APIs concurrently"""
urls = ["https://api1.com", "https://api2.com", "https://api3.com"]
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
observable.trace("apis_called", len(urls))
observable.trace("results_received", len(results))
return results
@observable.track("fetch_one_api")
async def fetch_one(session, url):
"""Individual API call"""
async with session.get(url) as response:
return await response.json()Webhook Handlers
Monitor webhook processing and external integrations:
import observable
from flask import Flask, request
app = Flask(__name__)
@app.route("/webhook/stripe", methods=["POST"])
def stripe_webhook():
"""Process Stripe webhook events"""
payload = request.get_json()
event_type = payload.get("type")
observable.trace("event_type", event_type)
observable.trace("webhook_id", payload.get("id"))
with observable.track("process_stripe_webhook"):
if event_type == "payment_intent.succeeded":
process_payment(payload)
elif event_type == "customer.subscription.created":
process_subscription(payload)
return {"status": "ok"}, 200ML Model Training
Track machine learning training runs:
import observable
from sklearn.ensemble import RandomForestClassifier
@observable.track("train_model")
def train_model(X_train, y_train):
"""Train ML model"""
observable.trace("training_samples", len(X_train))
observable.trace("features", X_train.shape[1])
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
observable.trace("accuracy", round(score, 4))
return modelError Handling
Exceptions are automatically captured and reported:
import observable
@observable.track("risky_operation")
def process_data(data):
"""Function that might fail"""
if not data:
raise ValueError("Data cannot be empty")
# Process data...
result = transform(data)
if result is None:
raise RuntimeError("Transformation failed")
return result
# When an exception occurs:
# - Event is sent with status="failed"
# - error_type and error_message are captured
# - Exception is re-raised (your code behaves normally)Manual Events
Send standalone events for important milestones:
import observable
# Application lifecycle events
observable.event("app_started", version="1.2.3", environment="production")
# Business events
observable.event(
"payment_received",
amount=99.99,
currency="USD",
customer_id="cus_123"
)
# System events
observable.event(
"disk_space_low",
status="failed",
available_gb=5.2,
threshold_gb=10
)
# Deployment events
observable.event(
"deployment_complete",
commit_sha="a1b2c3d",
deployed_by="alice"
)Context Manager Pattern
Track blocks of code without wrapping entire functions:
import observable
def complex_workflow():
"""Multi-step workflow with selective tracking"""
# Setup (not tracked)
config = load_config()
# Track critical section
with observable.track("database_migration"):
migrate_schema()
backfill_data()
# More untracked work
cleanup()
# Track another section
with observable.track("send_notifications"):
observable.trace("recipients", len(users))
for user in users:
send_email(user)
return "done"More Examples?
Have a use case you'd like to see documented? These examples show the most common patterns, but Observable Code works with any Python code that runs in the background.