Files
crawler/newenglandoil/db_operations.py
Edwin Eames 1592e6d685 refactor: replace fuel_scraper with newenglandoil + cheapestoil scrapers
- Add newenglandoil/ package as the primary scraper (replaces fuel_scraper)
- Add cheapestoil/ package as a secondary market price scraper
- Add app.py entry point for direct execution
- Update run.py: new scrape_cheapest(), migrate command, --state filter,
  --refresh-metadata flag for overwriting existing phone/URL data
- Update models.py with latest schema fields
- Update requirements.txt dependencies
- Update Dockerfile and docker-compose.yml for new structure
- Remove deprecated fuel_scraper module, test.py, and log file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 11:34:21 -05:00

132 lines
4.8 KiB
Python

"""
Database operations module for oil price CRUD operations.
"""
import logging
import sys
import os
from datetime import datetime
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy.orm import Session
from sqlalchemy import func
import models
def upsert_oil_price(db_session: Session, item_dict: dict, force_update_metadata: bool = False) -> bool:
"""
Insert or update an oil price record.
Logic:
- Match by (name, state, county_id) - case insensitive on name!
- If county_id is None, fall back to (name, state, zone).
- If match found:
- If company_id is set: SKIP (vendor managed).
- Update name to formatted version (e.g. "Leblanc Oil" vs "LEBLANC OIL").
- Update phone/url if missing OR force_update_metadata is True.
- Update price/date if changed.
- If no match: INSERT.
Args:
db_session: SQLAlchemy session
item_dict: Dictionary with state, zone, name, price, date, county_id
force_update_metadata: If True, overwrite existing phone/url
"""
county_id = item_dict.get("county_id")
site_name = item_dict.get("site_name", "NewEnglandOil")
name_clean = item_dict["name"].strip()
# Query for existing record - Case Insensitive
query = db_session.query(models.OilPrice).filter(
func.lower(models.OilPrice.name) == name_clean.lower(),
models.OilPrice.state == item_dict["state"]
)
if county_id is not None:
query = query.filter(models.OilPrice.county_id == county_id)
else:
query = query.filter(models.OilPrice.zone == item_dict["zone"])
existing_record = query.first()
new_phone = item_dict.get("phone")
new_url = item_dict.get("url")
if existing_record:
# Record exists
if existing_record.company_id is not None:
logging.debug(
f"[{site_name}] Skipping update for {name_clean} (ID={existing_record.id}) "
"due to non-null company_id"
)
return False
updated = False
# 1. Update name casing if different (and new name looks "better" e.g. not all caps)
# Simple heuristic: if existing is all caps and new is mixed, take new.
if existing_record.name != name_clean:
# We trust the scraper's _smart_title() output is generally good
existing_record.name = name_clean
updated = True
# 2. Update county_id if we have one (scraper resolved it) and DB didn't have it
if county_id is not None and existing_record.county_id != county_id:
existing_record.county_id = county_id
updated = True
# 3. Backfill or Force Update phone/url
if new_phone:
if not existing_record.phone or (force_update_metadata and existing_record.phone != new_phone):
existing_record.phone = new_phone
updated = True
if new_url:
if not existing_record.url or (force_update_metadata and existing_record.url != new_url):
existing_record.url = new_url
updated = True
# 4. Check Price Change
# We compare as float provided logic is sound, but float equality can be tricky.
# However, price is usually 2 decimals.
if abs(existing_record.price - item_dict["price"]) > 0.001:
existing_record.price = item_dict["price"]
existing_record.date = item_dict["date"]
existing_record.scrapetimestamp = datetime.utcnow()
logging.info(
f"[{site_name}] Updated price for {name_clean} (ID={existing_record.id}) "
f"to {item_dict['price']}"
)
return True
elif updated:
existing_record.scrapetimestamp = datetime.utcnow()
logging.info(
f"[{site_name}] Updated metadata for {name_clean} (ID={existing_record.id})"
)
return True
else:
# No meaningful change
logging.debug(
f"[{site_name}] Price unchanged for {name_clean} in {item_dict['state']} zone {item_dict['zone']}"
)
return False
else:
# Create new
oil_price_record = models.OilPrice(
state=item_dict["state"],
zone=item_dict["zone"],
name=name_clean,
price=item_dict["price"],
date=item_dict["date"],
county_id=county_id,
phone=new_phone,
url=new_url,
scrapetimestamp=datetime.utcnow()
)
db_session.add(oil_price_record)
logging.info(
f"[{site_name}] Added new record for {name_clean} in {item_dict['state']} zone {item_dict['zone']} "
f"(county_id={county_id})"
)
return True