Real code, realistic data. Every sample below is a working example of what we deliver.
| Plan | Price | Features | vs. Market |
|---|---|---|---|
| Starter | $49/mo | 5 users, 10GB | ↓ 18% below avg |
| Professional | $149/mo | 25 users, 50GB | At market |
| Enterprise | $499/mo | Unlimited | ↑ 12% above avg |
Input: sales_data.csv (2,847 rows, Jan–Dec 2025) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ REGIONAL SALES BREAKDOWN Region Revenue Share YoY ────────────────────────────────────────── Pacific NW $842,300 34.2% +12.4% California $624,100 25.4% +7.1% Mountain West $418,500 17.0% +18.9% Southwest $312,800 12.7% -3.2% Other $263,400 10.7% +5.5% ────────────────────────────────────────── TOTAL $2,461,100 100.0% +8.7% MONTHLY TREND (Q4 2025) Oct ████████████████████ $218,400 Nov ██████████████████ $196,200 Dec ████████████████████████ $261,700 ← peak KEY STATISTICS Total Revenue: $2,461,100 YoY Growth: +8.7% Best Region: Pacific NW ($842,300) Best Month: December ($261,700) Avg Order Value: $864.42 Transactions: 2,847 ANOMALIES DETECTED [!] Southwest region declining -3.2% — investigate Q3 drop [!] 14 duplicate transaction IDs found and flagged [✓] Data quality score: 97.2% (cleaned 81 malformed rows)
# csv_monitor.py — Watch a folder for new CSV files,
# process them automatically, and send a summary email.
import os, time, csv, smtplib, logging
from pathlib import Path
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# ── Config ────────────────────────────────────────────
WATCH_DIR = Path(os.getenv("WATCH_DIR", "./incoming"))
DONE_DIR = Path(os.getenv("DONE_DIR", "./processed"))
SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
NOTIFY_TO = os.getenv("NOTIFY_EMAIL")
POLL_SEC = 30
logging.basicConfig(
filename="csv_monitor.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
def process_csv(path: Path) -> dict:
# Read CSV and return basic stats.
rows, errors = [], 0
with open(path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
try:
rows.append(row)
except Exception:
errors += 1
return {"file": path.name, "rows": len(rows),
"cols": len(rows[0]) if rows else 0, "errors": errors}
def send_summary(stats: list[dict]) -> None:
# Send summary email via SMTP.
if not all([SMTP_USER, SMTP_PASS, NOTIFY_TO]):
logging.warning("Email not configured — skipping notification")
return
body = "CSV Processing Summary\n" + "="*40 + "\n"
for s in stats:
body += f" {s['file']}: {s['rows']} rows, {s['cols']} columns"
if s['errors']:
body += f" ({s['errors']} errors)"
body += "\n"
msg = MIMEMultipart()
msg["From"], msg["To"] = SMTP_USER, NOTIFY_TO
msg["Subject"] = f"[CSV Monitor] {len(stats)} file(s) processed"
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as s:
s.starttls()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
logging.info(f"Summary email sent for {len(stats)} file(s)")
def watch() -> None:
WATCH_DIR.mkdir(exist_ok=True)
DONE_DIR.mkdir(exist_ok=True)
logging.info(f"Watching {WATCH_DIR} every {POLL_SEC}s")
seen = set()
while True:
batch = []
for f in sorted(WATCH_DIR.glob("*.csv")):
if f.name not in seen:
seen.add(f.name)
stats = process_csv(f)
batch.append(stats)
f.rename(DONE_DIR / f.name)
logging.info(f"Processed: {stats}")
if batch:
send_summary(batch)
time.sleep(POLL_SEC)
if __name__ == "__main__":
watch()
[
{
"id": "prod_001",
"name": "TrailMaster Pro Hiking Boot",
"price": "$189.99",
"original_price": "$229.99",
"discount": "17%",
"rating": 4.7,
"review_count": 1284,
"availability": "in_stock",
"sku": "TM-PRO-M10",
"scraped_at": "2026-05-30T07:14:22Z"
},
{
"id": "prod_002",
"name": "Alpine Summit 3-Season Tent",
"price": "$349.00",
"original_price": "$349.00",
"discount": null,
"rating": 4.9,
"review_count": 437,
"availability": "in_stock",
"sku": "AS-TENT-2P",
"scraped_at": "2026-05-30T07:14:23Z"
},
{
"id": "prod_003",
"name": "Merino Wool Base Layer Set",
"price": "$94.95",
"original_price": "$119.00",
"discount": "20%",
"rating": 4.5,
"review_count": 2891,
"availability": "low_stock",
"sku": "MW-BASE-M-L",
"scraped_at": "2026-05-30T07:14:24Z"
},
{
"id": "prod_004",
"name": "Carbon Fiber Trekking Poles (pair)",
"price": "$79.99",
"original_price": "$79.99",
"discount": null,
"rating": 4.3,
"review_count": 618,
"availability": "in_stock",
"sku": "CF-POLES-ADJ",
"scraped_at": "2026-05-30T07:14:24Z"
},
{
"id": "prod_005",
"name": "Hydration Pack 2.5L",
"price": "$67.50",
"original_price": "$89.99",
"discount": "25%",
"rating": 4.6,
"review_count": 943,
"availability": "out_of_stock",
"sku": "HP-25L-BLK",
"scraped_at": "2026-05-30T07:14:25Z"
}
]
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Execution stats:
Pages scraped: 847
Products found: 4,231
Runtime: 12.3s
Errors: 3 (rate-limited, retried)
Output: products_2026-05-30.json
# weather_collector.py — Fetch 7-day forecast from Open-Meteo API,
# transform to business-friendly format, write to CSV.
# No API key required. Runs as-is.
import csv, requests
from datetime import datetime, timedelta
BASE_URL = "https://api.open-meteo.com/v1/forecast"
def fetch_forecast(lat: float, lon: float, city: str) -> list[dict]:
# Fetch and transform 7-day forecast for a location.
params = {
"latitude": lat,
"longitude": lon,
"daily": [
"temperature_2m_max",
"temperature_2m_min",
"precipitation_sum",
"windspeed_10m_max",
"weathercode",
],
"temperature_unit": "fahrenheit",
"windspeed_unit": "mph",
"precipitation_unit": "inch",
"timezone": "America/Los_Angeles",
"forecast_days": 7,
}
resp = requests.get(BASE_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()["daily"]
WMO_CODES = {0:"Clear",1:"Mostly Clear",2:"Partly Cloudy",
3:"Overcast",61:"Rain",71:"Snow",95:"Thunderstorm"}
rows = []
for i, date in enumerate(data["time"]):
rows.append({
"city": city,
"date": date,
"high_f": data["temperature_2m_max"][i],
"low_f": data["temperature_2m_min"][i],
"precip_in": data["precipitation_sum"][i],
"wind_mph": data["windspeed_10m_max"][i],
"condition": WMO_CODES.get(data["weathercode"][i], "Unknown"),
"fetched_at": datetime.now().isoformat(timespec="seconds"),
})
return rows
def main():
locations = [
(47.6062, -122.3321, "Seattle"),
(45.5231, -122.6765, "Portland"),
(47.2529, -122.4443, "Tacoma"),
]
all_rows = []
for lat, lon, city in locations:
print(f"Fetching forecast for {city}...")
all_rows.extend(fetch_forecast(lat, lon, city))
outfile = f"weather_{datetime.now().strftime('%Y%m%d')}.csv"
with open(outfile, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=all_rows[0].keys())
writer.writeheader()
writer.writerows(all_rows)
print(f"Done. {len(all_rows)} rows written to {outfile}")
if __name__ == "__main__":
main()