""" Web scraping module for New England Oil prices. This module handles scraping oil price data from the New England Oil website for Zone 10 (Central Massachusetts). """ import logging import time from datetime import date from typing import List, Dict, Optional from decimal import Decimal import requests from bs4 import BeautifulSoup from app.config import ( NEWENGLAND_OIL_ZONE10_URL, SCRAPER_USER_AGENT, SCRAPER_TIMEOUT, SCRAPER_DELAY_SECONDS, ) logger = logging.getLogger(__name__) class ScraperError(Exception): """Custom exception for scraper errors.""" pass def scrape_newengland_oil() -> List[Dict[str, any]]: """ Scrape oil prices from New England Oil Zone 10 page. Fetches the page, parses the HTML table, and extracts company names, towns, and prices. Returns: List of dictionaries with keys: company_name, town, price_decimal, scrape_date, zone Raises: ScraperError: If the request fails or parsing fails """ logger.info(f"Starting scrape of {NEWENGLAND_OIL_ZONE10_URL}") headers = { "User-Agent": SCRAPER_USER_AGENT, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", } try: # Make the request response = requests.get( NEWENGLAND_OIL_ZONE10_URL, headers=headers, timeout=SCRAPER_TIMEOUT ) response.raise_for_status() logger.info(f"Successfully fetched page (status: {response.status_code})") # Parse HTML soup = BeautifulSoup(response.content, 'lxml') # Find the price table # The table typically has company names, towns, and prices # We need to inspect the actual HTML structure prices = [] today = date.today() # Look for table rows with price data # The structure appears to be: company links followed by town and price info # We'll look for patterns in the HTML # Find all table rows tables = soup.find_all('table') if not tables: logger.warning("No tables found on page") # Debug: Save HTMl to file with open("debug_page.html", "wb") as f: f.write(response.content) raise ScraperError("No price table found on page") # The main price table is usually the largest one or contains specific markers # Let's find rows that contain price information for table in tables: rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) if len(cells) >= 3: # Expect at least company, town, price # Try to extract company name (usually in a link) company_link = row.find('a') if company_link: company_name = company_link.get_text(strip=True) # Extract text from all cells cell_texts = [cell.get_text(strip=True) for cell in cells] # Look for price pattern (e.g., "$2.599" or "2.599") price_value = None town_value = None for text in cell_texts: # Check if this looks like a price text_clean = text.replace('$', '').replace(',', '').strip() try: # Try to parse as decimal if text_clean and '.' in text_clean: potential_price = Decimal(text_clean) # Reasonable price range for heating oil (0.50 to 10.00) if Decimal('0.50') <= potential_price <= Decimal('10.00'): price_value = potential_price break except (ValueError, ArithmeticError): # Not a valid price, might be town name if text and not text.startswith('$') and len(text) > 2: if not town_value: # Take first non-price text as town town_value = text if price_value: prices.append({ "company_name": company_name, "town": town_value, "price_decimal": price_value, "scrape_date": today, "zone": "zone10" }) logger.debug(f"Found: {company_name} - {town_value} - ${price_value}") if not prices: logger.warning("No prices extracted from page") raise ScraperError("Failed to extract any price data from page") logger.info(f"Successfully scraped {len(prices)} price records") return prices except requests.RequestException as e: logger.error(f"Request failed: {e}") raise ScraperError(f"Failed to fetch page: {str(e)}") except Exception as e: logger.error(f"Scraping failed: {e}", exc_info=True) raise ScraperError(f"Failed to parse page: {str(e)}") def scrape_and_delay() -> List[Dict[str, any]]: """ Scrape prices and apply rate limiting delay. This is a convenience function that scrapes and then sleeps to respect rate limits. Returns: List of price dictionaries """ prices = scrape_newengland_oil() # Apply rate limiting delay if SCRAPER_DELAY_SECONDS > 0: logger.debug(f"Sleeping {SCRAPER_DELAY_SECONDS}s for rate limiting") time.sleep(SCRAPER_DELAY_SECONDS) return prices