""" HTML parsing module for extracting oil price data from web pages. """ import logging import re from bs4 import BeautifulSoup def parse_zone_slug_to_int(zone_slug_str: str) -> int | None: """ Extract the numeric part of a zone slug. Examples: "zone1" -> 1 "zonema5" -> 5 Args: zone_slug_str: Zone slug string like "zone1", "zonema5" Returns: Integer zone number or None if parsing fails """ if not zone_slug_str: return None match = re.search(r'\d+$', zone_slug_str) if match: return int(match.group(0)) logging.warning(f"Could not parse numeric zone from slug: '{zone_slug_str}'") return None def _find_price_table_columns(thead) -> dict | None: """ Find column indices for company, price, and date in a table header. Args: thead: BeautifulSoup thead element Returns: Dictionary with column indices or None if not a price table """ headers_lower = [th.get_text(strip=True).lower() for th in thead.find_all('th')] column_indices = {} try: column_indices['company'] = headers_lower.index('company name') price_col_name_part = 'price' column_indices['price'] = next( i for i, header in enumerate(headers_lower) if price_col_name_part in header ) column_indices['date'] = headers_lower.index('date') return column_indices except (ValueError, StopIteration): return None def _parse_row(cells: list, column_indices: dict, state_name: str, zone: int) -> dict | None: """ Parse a single table row into a price record. Args: cells: List of td elements column_indices: Dictionary mapping column names to indices state_name: State name string zone: Zone number Returns: Dictionary with parsed data or None if parsing fails """ max_required_index = max(column_indices.values()) if len(cells) <= max_required_index: return None # Extract company name (prefer link text if available) company_cell = cells[column_indices['company']] company_name = company_cell.get_text(strip=True) company_link = company_cell.find('a') if company_link: company_name = company_link.get_text(strip=True) # Extract and parse price price_str = cells[column_indices['price']].get_text(strip=True) price_float = None try: cleaned_price_str = ''.join(filter(lambda x: x.isdigit() or x == '.', price_str)) if cleaned_price_str: price_float = float(cleaned_price_str) except ValueError: logging.warning(f"Could not parse price: '{price_str}' for {company_name} in {state_name}/zone{zone}.") except Exception as e: logging.error(f"Unexpected error parsing price: '{price_str}' for {company_name}. Error: {e}") # Extract date date_posted_str = cells[column_indices['date']].get_text(strip=True) return { "state": state_name.capitalize(), "zone": zone, "name": company_name, "price": price_float, "date": date_posted_str, } def parse_price_table(soup: BeautifulSoup, state_name_key: str, zone_slug_str: str) -> list[dict]: """ Parse price tables from a BeautifulSoup page. Args: soup: BeautifulSoup object of the page state_name_key: State key like "connecticut", "maine" zone_slug_str: Zone slug like "zone1", "zonema5" Returns: List of dictionaries containing price data """ data_dicts = [] all_tables = soup.find_all('table') logging.info(f"Found {len(all_tables)} table(s) on page for {state_name_key} - {zone_slug_str}.") if not all_tables: logging.warning(f"No HTML tables found at all for {state_name_key} - {zone_slug_str}.") return data_dicts # Parse zone number from slug zone_int = parse_zone_slug_to_int(zone_slug_str) if zone_int is None: logging.error(f"Cannot parse zone number for {state_name_key} - {zone_slug_str}. Skipping.") return data_dicts candidate_tables_found = 0 for table_index, table in enumerate(all_tables): thead = table.find('thead') if not thead: logging.debug(f"Table {table_index} has no thead.") continue # Check if this is a price table column_indices = _find_price_table_columns(thead) if column_indices is None: logging.debug(f"Table {table_index} headers do not contain all key columns.") continue logging.debug(f"Table {table_index} identified as price table. Indices: {column_indices}") candidate_tables_found += 1 # Parse table body tbody = table.find('tbody') if not tbody: logging.warning(f"Price table identified by headers has no tbody. Skipping. State: {state_name_key}, Zone: {zone_slug_str}") continue rows = tbody.find_all('tr') if not rows: logging.debug(f"No rows found in tbody for price table in {state_name_key}/{zone_slug_str}") continue # Parse each row for row_index, row in enumerate(rows): cells = row.find_all('td') record = _parse_row(cells, column_indices, state_name_key, zone_int) if record: data_dicts.append(record) elif len(cells) > 0: max_required = max(column_indices.values()) + 1 logging.warning( f"Skipping row {row_index+1} with insufficient cells ({len(cells)}, need {max_required}) " f"in {state_name_key}/{zone_slug_str}" ) if candidate_tables_found == 0: logging.warning(f"No tables matching expected price table structure found for {state_name_key} - {zone_slug_str}.") return data_dicts