""" HTML parsing module for extracting oil price data from web pages. """ import logging import re from urllib.parse import urlparse, parse_qs from bs4 import BeautifulSoup from .config import STATE_ABBREV_MAP def parse_zone_slug_to_int(zone_slug_str: str) -> int | None: """ Extract the numeric part of a zone slug. Examples: "zone1" -> 1 "zonema5" -> 5 Args: zone_slug_str: Zone slug string like "zone1", "zonema5" Returns: Integer zone number or None if parsing fails """ if not zone_slug_str: return None match = re.search(r'\d+$', zone_slug_str) if match: return int(match.group(0)) logging.warning(f"Could not parse numeric zone from slug: '{zone_slug_str}'") return None def _find_price_table_columns(thead) -> dict | None: """ Find column indices for company, price, and date in a table header. Args: thead: BeautifulSoup thead element Returns: Dictionary with column indices or None if not a price table """ headers_lower = [th.get_text(strip=True).lower() for th in thead.find_all('th')] column_indices = {} try: column_indices['company'] = headers_lower.index('company name') price_col_name_part = 'price' column_indices['price'] = next( i for i, header in enumerate(headers_lower) if price_col_name_part in header ) column_indices['date'] = headers_lower.index('date') return column_indices except (ValueError, StopIteration): return None def _smart_title(name: str) -> str: """ Convert a company name to title case, preserving common abbreviations. Handles: LLC, INC, CO, LP, HVAC, A1, etc. """ # Common abbreviations that should stay uppercase keep_upper = {"LLC", "INC", "LP", "HVAC", "II", "III", "IV", "USA", "CT", "MA", "NH", "ME", "RI", "VT"} words = name.title().split() result = [] for word in words: if word.upper() in keep_upper: result.append(word.upper()) else: result.append(word) return " ".join(result) def _extract_company_url(company_link) -> str | None: """ Extract the actual company URL from a link. Handles: 1. Redirects: click.asp?x=http://example.com&... -> http://example.com 2. Direct links: http://example.com -> http://example.com """ if not company_link: return None href = company_link.get('href', '') if not href: return None url_candidate = None if 'click.asp' in href: # Parse the x parameter which contains the actual URL try: parsed = urlparse(href) params = parse_qs(parsed.query) extracted = params.get('x', [None])[0] if extracted: url_candidate = extracted except Exception: pass elif href.startswith(('http://', 'https://')): # Direct link url_candidate = href # Validate the candidate URL if url_candidate: try: # Basic validation if not url_candidate.startswith(('http://', 'https://')): return None lower_url = url_candidate.lower() # Filter out internal or competitor site loops if 'newenglandoil.com' in lower_url or 'cheapestoil.com' in lower_url: return None return url_candidate except Exception: pass return None def _extract_phone_link(cells: list) -> dict | None: """ Extract the phone page link info from a row's phone cell. Phone link format: phones.asp?zone=1&ID=10&a=MA1 Returns dict with {url, company_neo_id} or None. """ for cell in cells: link = cell.find('a', href=lambda h: h and 'phones.asp' in h) if link: href = link.get('href', '') try: parsed = urlparse(href) params = parse_qs(parsed.query) neo_id = params.get('ID', [None])[0] return { "phone_page_path": href, "neo_id": neo_id, } except Exception: pass return None def _parse_row(cells: list, column_indices: dict, state_name: str, zone: int) -> dict | None: """ Parse a single table row into a price record. Args: cells: List of td elements column_indices: Dictionary mapping column names to indices state_name: State name string (lowercase key like "connecticut") zone: Zone number Returns: Dictionary with parsed data or None if parsing fails """ max_required_index = max(column_indices.values()) if len(cells) <= max_required_index: return None # Extract company name (prefer link text if available) company_cell = cells[column_indices['company']] company_name = company_cell.get_text(strip=True) company_link = company_cell.find('a') if company_link: company_name = company_link.get_text(strip=True) # Apply title case normalization company_name = _smart_title(company_name) # Extract company URL from click.asp link company_url = _extract_company_url(company_link) # Extract phone page link info phone_info = _extract_phone_link(cells) # Extract and parse price price_str = cells[column_indices['price']].get_text(strip=True) price_float = None try: cleaned_price_str = ''.join(filter(lambda x: x.isdigit() or x == '.', price_str)) if cleaned_price_str: price_float = float(cleaned_price_str) except ValueError: logging.warning(f"Could not parse price: '{price_str}' for {company_name} in {state_name}/zone{zone}.") except Exception as e: logging.error(f"Unexpected error parsing price: '{price_str}' for {company_name}. Error: {e}") # Extract date date_posted_str = cells[column_indices['date']].get_text(strip=True) # Convert state name to 2-letter abbreviation state_abbr = STATE_ABBREV_MAP.get(state_name.lower()) if not state_abbr: logging.warning(f"Unknown state key: {state_name}, using capitalized form") state_abbr = state_name.capitalize() return { "state": state_abbr, "zone": zone, "name": company_name, "price": price_float, "date": date_posted_str, "url": company_url, "phone_info": phone_info, } def parse_price_table(soup: BeautifulSoup, state_name_key: str, zone_slug_str: str, site_name: str = "NewEnglandOil") -> list[dict]: """ Parse price tables from a BeautifulSoup page. Args: soup: BeautifulSoup object of the page state_name_key: State key like "connecticut", "maine" zone_slug_str: Zone slug like "zone1", "zonema5" Returns: List of dictionaries containing price data """ data_dicts = [] all_tables = soup.find_all('table') logging.info(f"[{site_name}] Found {len(all_tables)} table(s) on page for {state_name_key} - {zone_slug_str}.") if not all_tables: logging.warning(f"[{site_name}] No HTML tables found at all for {state_name_key} - {zone_slug_str}.") return data_dicts # Parse zone number from slug zone_int = parse_zone_slug_to_int(zone_slug_str) if zone_int is None: logging.error(f"[{site_name}] Cannot parse zone number for {state_name_key} - {zone_slug_str}. Skipping.") return data_dicts candidate_tables_found = 0 for table_index, table in enumerate(all_tables): thead = table.find('thead') if not thead: logging.debug(f"Table {table_index} has no thead.") continue # Check if this is a price table column_indices = _find_price_table_columns(thead) if column_indices is None: logging.debug(f"Table {table_index} headers do not contain all key columns.") continue logging.debug(f"Table {table_index} identified as price table. Indices: {column_indices}") candidate_tables_found += 1 # Parse table body tbody = table.find('tbody') if not tbody: logging.warning(f"[{site_name}] Price table identified by headers has no tbody. Skipping. State: {state_name_key}, Zone: {zone_slug_str}") continue rows = tbody.find_all('tr') if not rows: logging.debug(f"No rows found in tbody for price table in {state_name_key}/{zone_slug_str}") continue # Parse each row for row_index, row in enumerate(rows): cells = row.find_all('td') record = _parse_row(cells, column_indices, state_name_key, zone_int) if record: data_dicts.append(record) elif len(cells) > 0: max_required = max(column_indices.values()) + 1 logging.warning( f"[{site_name}] Skipping row {row_index+1} with insufficient cells ({len(cells)}, need {max_required}) " f"in {state_name_key}/{zone_slug_str}" ) if candidate_tables_found == 0: logging.warning(f"[{site_name}] No tables matching expected price table structure found for {state_name_key} - {zone_slug_str}.") return data_dicts