#!/usr/bin/env python3 import requests from bs4 import BeautifulSoup from datetime import datetime import logging import os import re # For parsing zone number from slug from sqlalchemy.orm import Session from database import SessionLocal, init_db import models # --- SITES CONFIGURATION --- SITES_CONFIG = [ { "site_name": "NewEnglandOil", "base_url": "https://www.newenglandoil.com", "url_template": "{base_url}/{state_slug}/{zone_slug}.asp?type={oil_type}", "oil_type": 0, "locations": { "connecticut": [ "zone1", "zone2", "zone3", "zone4", "zone5", "zone6", "zone7", "zone8", "zone9", "zone10" ], "massachusetts": [ "zone1", "zone2", "zone3", "zone4", "zone5", "zone6", "zone7", "zone8", "zone9", "zone10", "zone11", "zone12", "zone13","zone14","zone15" ], "newhampshire": [ "zone1", "zone2", "zone3", "zone4", "zone5", "zone6" ], "rhodeisland": [ "zone1", "zone2", "zone3", "zone4", "zone5" ], } }, { "site_name": "MaineOil", "base_url": "https://www.maineoil.com", # URL template for MaineOil using numeric zones like zone1.asp, zone2.asp # {zone_slug} will be "zone1", "zone2", etc. # No {state_slug} is needed in this part of the path for maineoil.com "url_template": "{base_url}/{zone_slug}.asp?type={oil_type}", "oil_type": 0, "locations": { # "maine" is our internal key for the state. # The zone_slugs are "zone1", "zone2", etc. # YOU NEED TO VERIFY THE ACTUAL ZONE SLUGS AND COUNT FOR MAINEOIL.COM "maine": [ "zone1", "zone2", "zone3", "zone4", "zone5", "zone6", "zone7" # Example: Add/remove based on actual zones on maineoil.com ] } } ] LOG_FILE = "oil_scraper.log" logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' ) # --- Helper Functions --- def make_request(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } try: response = requests.get(url, headers=headers, timeout=20) response.raise_for_status() return BeautifulSoup(response.content, 'html.parser') except requests.exceptions.RequestException as e: logging.error(f"Error fetching {url}: {e}") return None def parse_zone_slug_to_int(zone_slug_str): """Extracts the numeric part of a zone slug (e.g., "zone1" -> 1, "zonema5" -> 5).""" if not zone_slug_str: return None match = re.search(r'\d+$', zone_slug_str) if match: return int(match.group(0)) logging.warning(f"Could not parse numeric zone from slug: '{zone_slug_str}'") return None def parse_price_table(soup, state_name_key, zone_slug_str): """Parses price tables. state_name_key is "connecticut", "maine", etc. zone_slug_str is "zone1", "zonema5", etc.""" data_dicts = [] all_tables_on_page = soup.find_all('table') logging.info(f"Found {len(all_tables_on_page)} table(s) on page for {state_name_key} - {zone_slug_str}.") if not all_tables_on_page: logging.warning(f"No HTML tables found at all for {state_name_key} - {zone_slug_str}.") return data_dicts # --- Convert zone_slug_str to integer --- zone_int = parse_zone_slug_to_int(zone_slug_str) if zone_int is None: logging.error(f"Cannot parse zone number for {state_name_key} - {zone_slug_str}. Skipping.") return data_dicts candidate_tables_found = 0 for table_index, table in enumerate(all_tables_on_page): thead = table.find('thead') is_price_table = False actual_column_indices = {} if thead: headers_lower = [th.get_text(strip=True).lower() for th in thead.find_all('th')] logging.debug(f"Table {table_index} on {state_name_key}/{zone_slug_str} - headers: {headers_lower}") try: actual_column_indices['company'] = headers_lower.index('company name') price_col_name_part = 'price' actual_column_indices['price'] = next(i for i, header in enumerate(headers_lower) if price_col_name_part in header) actual_column_indices['date'] = headers_lower.index('date') is_price_table = True logging.debug(f"Table {table_index} identified as price table. Indices: {actual_column_indices}") except (ValueError, StopIteration): logging.debug(f"Table {table_index} headers do not contain all key columns.") else: logging.debug(f"Table {table_index} has no thead.") if not is_price_table: continue candidate_tables_found += 1 tbody = table.find('tbody') if not tbody: logging.warning(f"Price table identified by headers has no tbody. Skipping. State: {state_name_key}, Zone: {zone_slug_str}") continue rows = tbody.find_all('tr') if not rows: logging.debug(f"No rows found in tbody for price table in {state_name_key}/{zone_slug_str}") continue for row_index, row in enumerate(rows): cells = row.find_all('td') max_required_index = max(actual_column_indices.values()) if actual_column_indices else -1 if max_required_index == -1: logging.error(f"Logic error: is_price_table true but no column indices for {state_name_key}/{zone_slug_str}") continue if len(cells) > max_required_index: company_name_scraped = cells[actual_column_indices['company']].get_text(strip=True) price_str = cells[actual_column_indices['price']].get_text(strip=True) date_posted_str = cells[actual_column_indices['date']].get_text(strip=True) company_link = cells[actual_column_indices['company']].find('a') if company_link: company_name_scraped = company_link.get_text(strip=True) price_float = None try: cleaned_price_str = ''.join(filter(lambda x: x.isdigit() or x == '.', price_str)) if cleaned_price_str: price_float = float(cleaned_price_str) except ValueError: logging.warning(f"Could not parse price: '{price_str}' for {company_name_scraped} in {state_name_key}/{zone_slug_str}.") except Exception as e: logging.error(f"Unexpected error parsing price: '{price_str}' for {company_name_scraped}. Error: {e}") data_dicts.append({ "state": state_name_key.capitalize(), # Use the passed state_name_key "zone": zone_int, # Use the parsed integer zone "name": company_name_scraped, "price": price_float, "date": date_posted_str, }) elif len(cells) > 0: logging.warning(f"Skipping row {row_index+1} with insufficient cells ({len(cells)}, need {max_required_index+1}) in {state_name_key}/{zone_slug_str}") if candidate_tables_found == 0: logging.warning(f"No tables matching expected price table structure found for {state_name_key} - {zone_slug_str}.") return data_dicts # --- Main Script --- def main(): logging.info("Starting oil price scraper job.") try: init_db() logging.info("Database initialized/checked successfully.") except Exception as e: logging.error(f"Failed to initialize database: {e}", exc_info=True) return db_session: Session = SessionLocal() total_records_added_this_run = 0 try: for site_config in SITES_CONFIG: site_name = site_config["site_name"] base_url = site_config["base_url"] url_template = site_config["url_template"] oil_type = site_config["oil_type"] logging.info(f"--- Processing site: {site_name} ---") for state_key_in_config, zone_slugs_list in site_config["locations"].items(): # state_key_in_config is "connecticut", "maine", etc. for zone_slug_from_list in zone_slugs_list: # e.g., "zone1", "zonema5" format_params = { "base_url": base_url, "state_slug": state_key_in_config, # Used if {state_slug} in template "zone_slug": zone_slug_from_list, # This is "zone1", "zonema5", etc. "oil_type": oil_type } target_url = url_template.format(**format_params) logging.info(f"Scraping: {target_url} (State: {state_key_in_config}, Zone Slug: {zone_slug_from_list})") soup = make_request(target_url) if soup: # Pass state_key_in_config as state_name_key # Pass zone_slug_from_list (e.g. "zone1") as zone_slug_str for parsing to int parsed_items = parse_price_table(soup, state_key_in_config, zone_slug_from_list) if parsed_items: for item_dict in parsed_items: # item_dict["zone"] will be an integer # Check if a record with the same name, state, and zone already exists existing_record = db_session.query(models.OilPrice).filter( models.OilPrice.name == item_dict["name"], models.OilPrice.state == item_dict["state"], models.OilPrice.zone == item_dict["zone"] ).first() if existing_record: # If record exists, check if company_id is not null if existing_record.company_id is not None: logging.debug(f"Skipping update for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']} due to non-null company_id") else: # If company_id is null, check if price is different if existing_record.price != item_dict["price"]: existing_record.price = item_dict["price"] existing_record.date = item_dict["date"] existing_record.scrapetimestamp = datetime.utcnow() logging.info(f"Updated price for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']} to {item_dict['price']}") else: logging.debug(f"Price unchanged for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']}") else: # If no record exists, create a new one oil_price_record = models.OilPrice( state=item_dict["state"], zone=item_dict["zone"], name=item_dict["name"], price=item_dict["price"], date=item_dict["date"], scrapetimestamp=datetime.utcnow() ) db_session.add(oil_price_record) logging.info(f"Added new record for {item_dict['name']} in {item_dict['state']} zone {item_dict['zone']}") total_records_added_this_run += len(parsed_items) logging.info(f"Queued {len(parsed_items)} records from {site_name} - {state_key_in_config}/{zone_slug_from_list} for DB insertion.") else: logging.info(f"No data extracted from {target_url}") else: logging.warning(f"Failed to retrieve or parse {target_url}. Skipping.") if total_records_added_this_run > 0: db_session.commit() logging.info(f"Successfully committed {total_records_added_this_run} records to the database.") else: logging.info("No new records were queued for database insertion in this run.") except Exception as e: logging.error(f"An error occurred during scraping or DB operation: {e}", exc_info=True) db_session.rollback() logging.info("Database transaction rolled back due to error.") finally: db_session.close() logging.info("Database session closed.") logging.info("Oil price scraper job finished.") if __name__ == "__main__": main()