Files
crawler/cheapestoil/scraper.py
Edwin Eames 1592e6d685 refactor: replace fuel_scraper with newenglandoil + cheapestoil scrapers
- Add newenglandoil/ package as the primary scraper (replaces fuel_scraper)
- Add cheapestoil/ package as a secondary market price scraper
- Add app.py entry point for direct execution
- Update run.py: new scrape_cheapest(), migrate command, --state filter,
  --refresh-metadata flag for overwriting existing phone/URL data
- Update models.py with latest schema fields
- Update requirements.txt dependencies
- Update Dockerfile and docker-compose.yml for new structure
- Remove deprecated fuel_scraper module, test.py, and log file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 11:34:21 -05:00

218 lines
8.2 KiB
Python

"""
Main orchestrator for the CheapestOil scraper.
"""
import logging
import time
from datetime import datetime
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy.orm import Session
import models
from .config import STATE_COUNTIES, STATE_API_NAMES, SCRAPE_DELAY
from .api_client import fetch_company_details, fetch_county_prices
from .parsers import parse_company_record
from .company_matcher import find_existing_record
from .town_lookup import resolve_county_from_service_area
def _resolve_county_id(
county_name: str | None,
service_area: str,
state_abbr: str,
county_lookup: dict,
) -> int | None:
"""
Resolve a county_id from either a direct county name or service area text.
For MA/CT/ME: county_name comes directly from the API query parameter.
For NH/RI/VT: parse service_area text to find a town -> county mapping.
"""
# Direct county match (MA/CT/ME)
if county_name:
county_id = county_lookup.get((state_abbr, county_name))
if county_id is None:
logging.warning(f"County not in DB: ({state_abbr}, {county_name})")
return county_id
# Service area parsing (NH/RI/VT)
if service_area:
resolved = resolve_county_from_service_area(service_area, state_abbr)
if resolved:
county_id = county_lookup.get((state_abbr, resolved))
if county_id is not None:
return county_id
logging.warning(f"Resolved county '{resolved}' not in DB for {state_abbr}")
return None
def scrape_state(state_abbr: str, db_session: Session, county_lookup: dict, refresh_metadata: bool = False) -> dict:
"""
Scrape all CheapestOil data for a single state.
Args:
state_abbr: Two-letter state code (MA, CT, ME, NH, RI, VT)
db_session: SQLAlchemy session
county_lookup: Dict of (state_abbr, county_name) -> county_id
refresh_metadata: If True, force re-fetch details (phone/url) and overwrite DB.
Returns:
Summary dict with {state, counties_scraped, records_added, records_updated, records_skipped}
"""
state_abbr = state_abbr.upper()
if state_abbr not in STATE_API_NAMES:
raise ValueError(f"Unknown state: {state_abbr}. Must be one of {list(STATE_API_NAMES.keys())}")
api_name = STATE_API_NAMES[state_abbr]
counties = STATE_COUNTIES[state_abbr]
summary = {
"state": state_abbr,
"counties_scraped": 0,
"records_added": 0,
"records_updated": 0,
"records_skipped": 0,
}
details_cache = {} # cache for detail pages: slug -> {url, phone}
for i, county_name in enumerate(counties):
if i > 0:
time.sleep(SCRAPE_DELAY)
label = county_name or "(state-level)"
logging.info(f"[CheapestOil] Fetching: {state_abbr} / {label}")
rows = fetch_county_prices(api_name, county_name)
if not rows:
logging.info(f"No results for {state_abbr} / {label}")
continue
logging.info(f"[CheapestOil] Processing {len(rows)} records from {state_abbr} / {label} (Size: {len(rows)})")
summary["counties_scraped"] += 1
for row in rows:
record = parse_company_record(row, county_name)
if not record or record["price"] is None:
summary["records_skipped"] += 1
continue
# Resolve county_id
county_id = _resolve_county_id(
record["county_name"],
record["service_area"],
state_abbr,
county_lookup,
)
# Check for existing record (cross-source dedup)
existing = find_existing_record(
db_session, record["name"], state_abbr, county_id
)
# Fetch details logic:
slug = record.get("slug")
real_url = record.get("url")
phone = None
# Determine if we need to fetch details
# If refresh_metadata is True, we want to fetch to ensure fresh data.
# If not, we fetch if we are missing info (which is handled if we don't have existing record or existing record missing info)
# Simplest approach: fetch if we have slug and (refresh_metadata OR missing basic info)
should_fetch_details = False
if slug:
if refresh_metadata:
should_fetch_details = True
elif existing:
if not existing.url or not existing.phone:
should_fetch_details = True
else:
# New record, always fetch
should_fetch_details = True
if should_fetch_details:
if slug in details_cache:
cached = details_cache[slug]
real_url = cached["url"]
phone = cached["phone"]
else:
details = fetch_company_details(slug)
details_cache[slug] = details
real_url = details["url"]
phone = details["phone"]
time.sleep(1.0) # Polite delay between detail pages
if existing:
# Skip vendor-managed records
if existing.company_id is not None:
logging.debug(f"Skipping vendor-managed: {record['name']}")
summary["records_skipped"] += 1
continue
updated = False
# Backfill or Force Update url
if real_url:
if not existing.url or (refresh_metadata and existing.url != real_url):
existing.url = real_url
updated = True
logging.info(f"Updated/Backfilled URL for {record['name']}")
# Backfill or Force Update phone
if phone:
if not existing.phone or (refresh_metadata and existing.phone != phone):
existing.phone = phone
updated = True
logging.info(f"Updated/Backfilled Phone for {record['name']}")
# Backfill county_id if we have it now
if county_id is not None and existing.county_id != county_id:
existing.county_id = county_id
updated = True
logging.info(f"Updated county_id for {record['name']}")
# Update if price changed, otherwise just touch timestamp
if existing.price != record["price"]:
existing.price = record["price"]
existing.date = record["date"]
existing.scrapetimestamp = datetime.utcnow()
summary["records_updated"] += 1
logging.info(f"Updated price: {record['name']} ${existing.price:.2f} → ${record['price']:.2f}")
elif updated:
existing.scrapetimestamp = datetime.utcnow()
summary["records_updated"] += 1
else:
existing.scrapetimestamp = datetime.utcnow()
summary["records_skipped"] += 1
logging.debug(f"No changes for {record['name']} (${record['price']:.2f})")
else:
# Insert new record (zone=0 for cheapestoil)
oil_price = models.OilPrice(
state=state_abbr,
zone=0,
name=record["name"],
price=record["price"],
date=record["date"],
county_id=county_id,
url=real_url,
phone=phone,
scrapetimestamp=datetime.utcnow(),
)
db_session.add(oil_price)
summary["records_added"] += 1
logging.info(f"Added: {record['name']} in {state_abbr} (county_id={county_id}, phone={phone})")
db_session.commit()
logging.info(
f"[CheapestOil] State {state_abbr} complete: "
f"{summary['records_added']} added, {summary['records_updated']} updated, "
f"{summary['records_skipped']} skipped (no changes)"
)
return summary