- Add newenglandoil/ package as the primary scraper (replaces fuel_scraper) - Add cheapestoil/ package as a secondary market price scraper - Add app.py entry point for direct execution - Update run.py: new scrape_cheapest(), migrate command, --state filter, --refresh-metadata flag for overwriting existing phone/URL data - Update models.py with latest schema fields - Update requirements.txt dependencies - Update Dockerfile and docker-compose.yml for new structure - Remove deprecated fuel_scraper module, test.py, and log file Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
218 lines
8.2 KiB
Python
218 lines
8.2 KiB
Python
"""
|
|
Main orchestrator for the CheapestOil scraper.
|
|
"""
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from sqlalchemy.orm import Session
|
|
import models
|
|
|
|
from .config import STATE_COUNTIES, STATE_API_NAMES, SCRAPE_DELAY
|
|
from .api_client import fetch_company_details, fetch_county_prices
|
|
from .parsers import parse_company_record
|
|
from .company_matcher import find_existing_record
|
|
from .town_lookup import resolve_county_from_service_area
|
|
|
|
|
|
def _resolve_county_id(
|
|
county_name: str | None,
|
|
service_area: str,
|
|
state_abbr: str,
|
|
county_lookup: dict,
|
|
) -> int | None:
|
|
"""
|
|
Resolve a county_id from either a direct county name or service area text.
|
|
|
|
For MA/CT/ME: county_name comes directly from the API query parameter.
|
|
For NH/RI/VT: parse service_area text to find a town -> county mapping.
|
|
"""
|
|
# Direct county match (MA/CT/ME)
|
|
if county_name:
|
|
county_id = county_lookup.get((state_abbr, county_name))
|
|
if county_id is None:
|
|
logging.warning(f"County not in DB: ({state_abbr}, {county_name})")
|
|
return county_id
|
|
|
|
# Service area parsing (NH/RI/VT)
|
|
if service_area:
|
|
resolved = resolve_county_from_service_area(service_area, state_abbr)
|
|
if resolved:
|
|
county_id = county_lookup.get((state_abbr, resolved))
|
|
if county_id is not None:
|
|
return county_id
|
|
logging.warning(f"Resolved county '{resolved}' not in DB for {state_abbr}")
|
|
|
|
return None
|
|
|
|
|
|
def scrape_state(state_abbr: str, db_session: Session, county_lookup: dict, refresh_metadata: bool = False) -> dict:
|
|
"""
|
|
Scrape all CheapestOil data for a single state.
|
|
|
|
Args:
|
|
state_abbr: Two-letter state code (MA, CT, ME, NH, RI, VT)
|
|
db_session: SQLAlchemy session
|
|
county_lookup: Dict of (state_abbr, county_name) -> county_id
|
|
refresh_metadata: If True, force re-fetch details (phone/url) and overwrite DB.
|
|
|
|
Returns:
|
|
Summary dict with {state, counties_scraped, records_added, records_updated, records_skipped}
|
|
"""
|
|
state_abbr = state_abbr.upper()
|
|
if state_abbr not in STATE_API_NAMES:
|
|
raise ValueError(f"Unknown state: {state_abbr}. Must be one of {list(STATE_API_NAMES.keys())}")
|
|
|
|
api_name = STATE_API_NAMES[state_abbr]
|
|
counties = STATE_COUNTIES[state_abbr]
|
|
|
|
summary = {
|
|
"state": state_abbr,
|
|
"counties_scraped": 0,
|
|
"records_added": 0,
|
|
"records_updated": 0,
|
|
"records_skipped": 0,
|
|
}
|
|
|
|
details_cache = {} # cache for detail pages: slug -> {url, phone}
|
|
|
|
for i, county_name in enumerate(counties):
|
|
if i > 0:
|
|
time.sleep(SCRAPE_DELAY)
|
|
|
|
label = county_name or "(state-level)"
|
|
logging.info(f"[CheapestOil] Fetching: {state_abbr} / {label}")
|
|
|
|
rows = fetch_county_prices(api_name, county_name)
|
|
if not rows:
|
|
logging.info(f"No results for {state_abbr} / {label}")
|
|
continue
|
|
|
|
logging.info(f"[CheapestOil] Processing {len(rows)} records from {state_abbr} / {label} (Size: {len(rows)})")
|
|
|
|
summary["counties_scraped"] += 1
|
|
|
|
for row in rows:
|
|
record = parse_company_record(row, county_name)
|
|
if not record or record["price"] is None:
|
|
summary["records_skipped"] += 1
|
|
continue
|
|
|
|
# Resolve county_id
|
|
county_id = _resolve_county_id(
|
|
record["county_name"],
|
|
record["service_area"],
|
|
state_abbr,
|
|
county_lookup,
|
|
)
|
|
|
|
# Check for existing record (cross-source dedup)
|
|
existing = find_existing_record(
|
|
db_session, record["name"], state_abbr, county_id
|
|
)
|
|
|
|
# Fetch details logic:
|
|
slug = record.get("slug")
|
|
real_url = record.get("url")
|
|
phone = None
|
|
|
|
# Determine if we need to fetch details
|
|
# If refresh_metadata is True, we want to fetch to ensure fresh data.
|
|
# If not, we fetch if we are missing info (which is handled if we don't have existing record or existing record missing info)
|
|
# Simplest approach: fetch if we have slug and (refresh_metadata OR missing basic info)
|
|
|
|
should_fetch_details = False
|
|
if slug:
|
|
if refresh_metadata:
|
|
should_fetch_details = True
|
|
elif existing:
|
|
if not existing.url or not existing.phone:
|
|
should_fetch_details = True
|
|
else:
|
|
# New record, always fetch
|
|
should_fetch_details = True
|
|
|
|
if should_fetch_details:
|
|
if slug in details_cache:
|
|
cached = details_cache[slug]
|
|
real_url = cached["url"]
|
|
phone = cached["phone"]
|
|
else:
|
|
details = fetch_company_details(slug)
|
|
details_cache[slug] = details
|
|
real_url = details["url"]
|
|
phone = details["phone"]
|
|
time.sleep(1.0) # Polite delay between detail pages
|
|
|
|
if existing:
|
|
# Skip vendor-managed records
|
|
if existing.company_id is not None:
|
|
logging.debug(f"Skipping vendor-managed: {record['name']}")
|
|
summary["records_skipped"] += 1
|
|
continue
|
|
|
|
updated = False
|
|
|
|
# Backfill or Force Update url
|
|
if real_url:
|
|
if not existing.url or (refresh_metadata and existing.url != real_url):
|
|
existing.url = real_url
|
|
updated = True
|
|
logging.info(f"Updated/Backfilled URL for {record['name']}")
|
|
|
|
# Backfill or Force Update phone
|
|
if phone:
|
|
if not existing.phone or (refresh_metadata and existing.phone != phone):
|
|
existing.phone = phone
|
|
updated = True
|
|
logging.info(f"Updated/Backfilled Phone for {record['name']}")
|
|
|
|
# Backfill county_id if we have it now
|
|
if county_id is not None and existing.county_id != county_id:
|
|
existing.county_id = county_id
|
|
updated = True
|
|
logging.info(f"Updated county_id for {record['name']}")
|
|
|
|
# Update if price changed, otherwise just touch timestamp
|
|
if existing.price != record["price"]:
|
|
existing.price = record["price"]
|
|
existing.date = record["date"]
|
|
existing.scrapetimestamp = datetime.utcnow()
|
|
summary["records_updated"] += 1
|
|
logging.info(f"Updated price: {record['name']} ${existing.price:.2f} → ${record['price']:.2f}")
|
|
elif updated:
|
|
existing.scrapetimestamp = datetime.utcnow()
|
|
summary["records_updated"] += 1
|
|
else:
|
|
existing.scrapetimestamp = datetime.utcnow()
|
|
summary["records_skipped"] += 1
|
|
logging.debug(f"No changes for {record['name']} (${record['price']:.2f})")
|
|
else:
|
|
# Insert new record (zone=0 for cheapestoil)
|
|
oil_price = models.OilPrice(
|
|
state=state_abbr,
|
|
zone=0,
|
|
name=record["name"],
|
|
price=record["price"],
|
|
date=record["date"],
|
|
county_id=county_id,
|
|
url=real_url,
|
|
phone=phone,
|
|
scrapetimestamp=datetime.utcnow(),
|
|
)
|
|
db_session.add(oil_price)
|
|
summary["records_added"] += 1
|
|
logging.info(f"Added: {record['name']} in {state_abbr} (county_id={county_id}, phone={phone})")
|
|
|
|
db_session.commit()
|
|
logging.info(
|
|
f"[CheapestOil] State {state_abbr} complete: "
|
|
f"{summary['records_added']} added, {summary['records_updated']} updated, "
|
|
f"{summary['records_skipped']} skipped (no changes)"
|
|
)
|
|
return summary
|