Files
crawler/fuel_scraper.py
2025-06-08 13:17:33 -04:00

281 lines
13 KiB
Python

#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import logging
import os
import re # For parsing zone number from slug
from sqlalchemy.orm import Session
from database import SessionLocal, init_db
import models
# --- SITES CONFIGURATION ---
SITES_CONFIG = [
{
"site_name": "NewEnglandOil",
"base_url": "https://www.newenglandoil.com",
"url_template": "{base_url}/{state_slug}/{zone_slug}.asp?type={oil_type}",
"oil_type": 0,
"locations": {
"connecticut": [
"zone1", "zone2", "zone3", "zone4", "zone5", "zone6", "zone7",
"zone8", "zone9", "zone10"
],
"massachusetts": [
"zone1", "zone2", "zone3", "zone4", "zone5", "zone6",
"zone7", "zone8", "zone9", "zone10", "zone11", "zone12",
"zone13","zone14","zone15"
],
"newhampshire": [
"zone1", "zone2", "zone3", "zone4", "zone5", "zone6"
],
"rhodeisland": [
"zone1", "zone2", "zone3", "zone4", "zone5"
],
}
},
{
"site_name": "MaineOil",
"base_url": "https://www.maineoil.com",
# URL template for MaineOil using numeric zones like zone1.asp, zone2.asp
# {zone_slug} will be "zone1", "zone2", etc.
# No {state_slug} is needed in this part of the path for maineoil.com
"url_template": "{base_url}/{zone_slug}.asp?type={oil_type}",
"oil_type": 0,
"locations": {
# "maine" is our internal key for the state.
# The zone_slugs are "zone1", "zone2", etc.
# YOU NEED TO VERIFY THE ACTUAL ZONE SLUGS AND COUNT FOR MAINEOIL.COM
"maine": [
"zone1", "zone2", "zone3", "zone4", "zone5",
"zone6", "zone7" # Example: Add/remove based on actual zones on maineoil.com
]
}
}
]
LOG_FILE = "oil_scraper.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
# --- Helper Functions ---
def make_request(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=20)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return None
def parse_zone_slug_to_int(zone_slug_str):
"""Extracts the numeric part of a zone slug (e.g., "zone1" -> 1, "zonema5" -> 5)."""
if not zone_slug_str: return None
match = re.search(r'\d+$', zone_slug_str)
if match:
return int(match.group(0))
logging.warning(f"Could not parse numeric zone from slug: '{zone_slug_str}'")
return None
def parse_price_table(soup, state_name_key, zone_slug_str):
"""Parses price tables. state_name_key is "connecticut", "maine", etc. zone_slug_str is "zone1", "zonema5", etc."""
data_dicts = []
all_tables_on_page = soup.find_all('table')
logging.info(f"Found {len(all_tables_on_page)} table(s) on page for {state_name_key} - {zone_slug_str}.")
if not all_tables_on_page:
logging.warning(f"No HTML tables found at all for {state_name_key} - {zone_slug_str}.")
return data_dicts
# --- Convert zone_slug_str to integer ---
zone_int = parse_zone_slug_to_int(zone_slug_str)
if zone_int is None:
logging.error(f"Cannot parse zone number for {state_name_key} - {zone_slug_str}. Skipping.")
return data_dicts
candidate_tables_found = 0
for table_index, table in enumerate(all_tables_on_page):
thead = table.find('thead')
is_price_table = False
actual_column_indices = {}
if thead:
headers_lower = [th.get_text(strip=True).lower() for th in thead.find_all('th')]
logging.debug(f"Table {table_index} on {state_name_key}/{zone_slug_str} - headers: {headers_lower}")
try:
actual_column_indices['company'] = headers_lower.index('company name')
price_col_name_part = 'price'
actual_column_indices['price'] = next(i for i, header in enumerate(headers_lower) if price_col_name_part in header)
actual_column_indices['date'] = headers_lower.index('date')
is_price_table = True
logging.debug(f"Table {table_index} identified as price table. Indices: {actual_column_indices}")
except (ValueError, StopIteration):
logging.debug(f"Table {table_index} headers do not contain all key columns.")
else:
logging.debug(f"Table {table_index} has no thead.")
if not is_price_table:
continue
candidate_tables_found += 1
tbody = table.find('tbody')
if not tbody:
logging.warning(f"Price table identified by headers has no tbody. Skipping. State: {state_name_key}, Zone: {zone_slug_str}")
continue
rows = tbody.find_all('tr')
if not rows:
logging.debug(f"No rows found in tbody for price table in {state_name_key}/{zone_slug_str}")
continue
for row_index, row in enumerate(rows):
cells = row.find_all('td')
max_required_index = max(actual_column_indices.values()) if actual_column_indices else -1
if max_required_index == -1:
logging.error(f"Logic error: is_price_table true but no column indices for {state_name_key}/{zone_slug_str}")
continue
if len(cells) > max_required_index:
company_name_scraped = cells[actual_column_indices['company']].get_text(strip=True)
price_str = cells[actual_column_indices['price']].get_text(strip=True)
date_posted_str = cells[actual_column_indices['date']].get_text(strip=True)
company_link = cells[actual_column_indices['company']].find('a')
if company_link:
company_name_scraped = company_link.get_text(strip=True)
price_float = None
try:
cleaned_price_str = ''.join(filter(lambda x: x.isdigit() or x == '.', price_str))
if cleaned_price_str:
price_float = float(cleaned_price_str)
except ValueError:
logging.warning(f"Could not parse price: '{price_str}' for {company_name_scraped} in {state_name_key}/{zone_slug_str}.")
except Exception as e:
logging.error(f"Unexpected error parsing price: '{price_str}' for {company_name_scraped}. Error: {e}")
data_dicts.append({
"state": state_name_key.capitalize(), # Use the passed state_name_key
"zone": zone_int, # Use the parsed integer zone
"name": company_name_scraped,
"price": price_float,
"date": date_posted_str,
})
elif len(cells) > 0:
logging.warning(f"Skipping row {row_index+1} with insufficient cells ({len(cells)}, need {max_required_index+1}) in {state_name_key}/{zone_slug_str}")
if candidate_tables_found == 0:
logging.warning(f"No tables matching expected price table structure found for {state_name_key} - {zone_slug_str}.")
return data_dicts
# --- Main Script ---
def main():
logging.info("Starting oil price scraper job.")
try:
init_db()
logging.info("Database initialized/checked successfully.")
except Exception as e:
logging.error(f"Failed to initialize database: {e}", exc_info=True)
return
db_session: Session = SessionLocal()
total_records_added_this_run = 0
try:
for site_config in SITES_CONFIG:
site_name = site_config["site_name"]
base_url = site_config["base_url"]
url_template = site_config["url_template"]
oil_type = site_config["oil_type"]
logging.info(f"--- Processing site: {site_name} ---")
for state_key_in_config, zone_slugs_list in site_config["locations"].items():
# state_key_in_config is "connecticut", "maine", etc.
for zone_slug_from_list in zone_slugs_list: # e.g., "zone1", "zonema5"
format_params = {
"base_url": base_url,
"state_slug": state_key_in_config, # Used if {state_slug} in template
"zone_slug": zone_slug_from_list, # This is "zone1", "zonema5", etc.
"oil_type": oil_type
}
target_url = url_template.format(**format_params)
logging.info(f"Scraping: {target_url} (State: {state_key_in_config}, Zone Slug: {zone_slug_from_list})")
soup = make_request(target_url)
if soup:
# Pass state_key_in_config as state_name_key
# Pass zone_slug_from_list (e.g. "zone1") as zone_slug_str for parsing to int
parsed_items = parse_price_table(soup, state_key_in_config, zone_slug_from_list)
if parsed_items:
for item_dict in parsed_items: # item_dict["zone"] will be an integer
# Check if a record with the same name, state, and zone already exists
existing_record = db_session.query(models.OilPrice).filter(
models.OilPrice.name == item_dict["name"],
models.OilPrice.state == item_dict["state"],
models.OilPrice.zone == item_dict["zone"]
).first()
if existing_record:
# If record exists, check if company_id is not null
if existing_record.company_id is not None:
logging.debug(f"Skipping update for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']} due to non-null company_id")
else:
# If company_id is null, check if price is different
if existing_record.price != item_dict["price"]:
existing_record.price = item_dict["price"]
existing_record.date = item_dict["date"]
existing_record.scrapetimestamp = datetime.utcnow()
logging.info(f"Updated price for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']} to {item_dict['price']}")
else:
logging.debug(f"Price unchanged for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']}")
else:
# If no record exists, create a new one
oil_price_record = models.OilPrice(
state=item_dict["state"],
zone=item_dict["zone"],
name=item_dict["name"],
price=item_dict["price"],
date=item_dict["date"],
scrapetimestamp=datetime.utcnow()
)
db_session.add(oil_price_record)
logging.info(f"Added new record for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']}")
total_records_added_this_run += len(parsed_items)
logging.info(f"Queued {len(parsed_items)} records from {site_name} - {state_key_in_config}/{zone_slug_from_list} for DB insertion.")
else:
logging.info(f"No data extracted from {target_url}")
else:
logging.warning(f"Failed to retrieve or parse {target_url}. Skipping.")
if total_records_added_this_run > 0:
db_session.commit()
logging.info(f"Successfully committed {total_records_added_this_run} records to the database.")
else:
logging.info("No new records were queued for database insertion in this run.")
except Exception as e:
logging.error(f"An error occurred during scraping or DB operation: {e}", exc_info=True)
db_session.rollback()
logging.info("Database transaction rolled back due to error.")
finally:
db_session.close()
logging.info("Database session closed.")
logging.info("Oil price scraper job finished.")
if __name__ == "__main__":
main()