390 lines
11 KiB
Python
390 lines
11 KiB
Python
"""
|
|
Geocoding tools for eamco_address_checker.
|
|
|
|
This module provides modular tool functions for the agentic address verification
|
|
workflow. Each function represents a discrete action in the ReAct-style pipeline.
|
|
|
|
Tools:
|
|
- build_address(): Constructs full US address string from components
|
|
- validate_address_components(): Validates required address fields
|
|
- geocode_address(): Calls Nominatim API to get lat/long
|
|
- validate_geocode_result(): Checks quality of geocoding result
|
|
- update_record(): Updates database record with geocoding results
|
|
"""
|
|
|
|
import logging
|
|
import random
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Optional, Tuple
|
|
|
|
from geopy.geocoders import Nominatim
|
|
from geopy.exc import GeocoderTimedOut, GeocoderServiceError, GeocoderUnavailable
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import (
|
|
NOMINATIM_USER_AGENT,
|
|
MIN_SLEEP_SECONDS,
|
|
MAX_SLEEP_SECONDS,
|
|
GEOCODE_TIMEOUT,
|
|
STATE_MAPPING,
|
|
)
|
|
from app.models import CustomerCustomer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class GeocodeResult:
|
|
"""Result from geocoding operation."""
|
|
success: bool
|
|
latitude: Optional[str] = None
|
|
longitude: Optional[str] = None
|
|
raw_address: Optional[str] = None
|
|
country_code: Optional[str] = None
|
|
error_message: Optional[str] = None
|
|
skipped: bool = False
|
|
skip_reason: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class AddressComponents:
|
|
"""Structured address components for geocoding."""
|
|
street: Optional[str]
|
|
apt: Optional[str]
|
|
city: Optional[str]
|
|
state: Optional[str]
|
|
zip_code: Optional[str]
|
|
is_valid: bool = True
|
|
validation_error: Optional[str] = None
|
|
|
|
|
|
def get_state_abbreviation(state_id: Optional[int]) -> Optional[str]:
|
|
"""
|
|
Convert state integer ID to 2-letter US state abbreviation.
|
|
|
|
Args:
|
|
state_id: Integer ID from database
|
|
|
|
Returns:
|
|
2-letter state abbreviation or None if not found
|
|
|
|
Note:
|
|
Replace with proper states table lookup when available
|
|
"""
|
|
if state_id is None:
|
|
return None
|
|
return STATE_MAPPING.get(state_id)
|
|
|
|
|
|
def build_address(customer: CustomerCustomer) -> AddressComponents:
|
|
"""
|
|
TOOL: Build full US address string from customer record components.
|
|
|
|
Constructs a normalized address string suitable for geocoding.
|
|
Format: "street, apt, city, state zip"
|
|
|
|
Args:
|
|
customer: CustomerCustomer record with address fields
|
|
|
|
Returns:
|
|
AddressComponents dataclass with parsed components and validation status
|
|
"""
|
|
# Extract and clean components
|
|
street = (customer.customer_address or "").strip()
|
|
apt = (customer.customer_apt or "").strip()
|
|
city = (customer.customer_town or "").strip()
|
|
state = get_state_abbreviation(customer.customer_state)
|
|
zip_code = (customer.customer_zip or "").strip()
|
|
|
|
logger.debug(
|
|
"Building address",
|
|
extra={
|
|
"customer_id": customer.id,
|
|
"street": street,
|
|
"apt": apt,
|
|
"city": city,
|
|
"state": state,
|
|
"zip": zip_code,
|
|
}
|
|
)
|
|
|
|
return AddressComponents(
|
|
street=street if street else None,
|
|
apt=apt if apt else None,
|
|
city=city if city else None,
|
|
state=state,
|
|
zip_code=zip_code if zip_code else None,
|
|
)
|
|
|
|
|
|
def validate_address_components(components: AddressComponents) -> AddressComponents:
|
|
"""
|
|
TOOL: Validate that address has minimum required components.
|
|
|
|
An address is considered valid for geocoding if it has:
|
|
- Street address (required)
|
|
- City (required)
|
|
- ZIP code (required)
|
|
- State is recommended but not strictly required
|
|
|
|
Args:
|
|
components: AddressComponents to validate
|
|
|
|
Returns:
|
|
Updated AddressComponents with is_valid flag and validation_error
|
|
"""
|
|
missing = []
|
|
|
|
if not components.street:
|
|
missing.append("street")
|
|
if not components.city:
|
|
missing.append("city")
|
|
if not components.zip_code:
|
|
missing.append("zip")
|
|
|
|
if missing:
|
|
components.is_valid = False
|
|
components.validation_error = f"Missing required fields: {', '.join(missing)}"
|
|
logger.debug(f"Address validation failed: {components.validation_error}")
|
|
else:
|
|
components.is_valid = True
|
|
logger.debug("Address validation passed")
|
|
|
|
return components
|
|
|
|
|
|
def format_address_string(components: AddressComponents) -> str:
|
|
"""
|
|
Format address components into a single string for geocoding.
|
|
|
|
Args:
|
|
components: Validated AddressComponents
|
|
|
|
Returns:
|
|
Formatted address string
|
|
"""
|
|
parts = []
|
|
|
|
# Street + Apt
|
|
if components.street:
|
|
if components.apt:
|
|
parts.append(f"{components.street}, {components.apt}")
|
|
else:
|
|
parts.append(components.street)
|
|
|
|
# City
|
|
if components.city:
|
|
parts.append(components.city)
|
|
|
|
# State + ZIP
|
|
if components.state and components.zip_code:
|
|
parts.append(f"{components.state} {components.zip_code}")
|
|
elif components.state:
|
|
parts.append(components.state)
|
|
elif components.zip_code:
|
|
parts.append(components.zip_code)
|
|
|
|
# Add country for better accuracy
|
|
parts.append("USA")
|
|
|
|
return ", ".join(parts)
|
|
|
|
|
|
def geocode_address(
|
|
address_string: str,
|
|
geocoder: Optional[Nominatim] = None
|
|
) -> GeocodeResult:
|
|
"""
|
|
TOOL: Call Nominatim API to geocode an address.
|
|
|
|
Uses geopy's Nominatim geocoder with proper rate limiting.
|
|
Respects Nominatim's 1 request/second policy.
|
|
|
|
Args:
|
|
address_string: Full formatted address to geocode
|
|
geocoder: Optional pre-initialized Nominatim instance
|
|
|
|
Returns:
|
|
GeocodeResult with lat/long or error information
|
|
"""
|
|
if geocoder is None:
|
|
geocoder = Nominatim(user_agent=NOMINATIM_USER_AGENT)
|
|
|
|
logger.info(f"Geocoding address: {address_string}")
|
|
|
|
try:
|
|
# Call Nominatim API with timeout
|
|
location = geocoder.geocode(
|
|
address_string,
|
|
timeout=GEOCODE_TIMEOUT,
|
|
addressdetails=True,
|
|
country_codes="us", # Limit to USA
|
|
)
|
|
|
|
if location is None:
|
|
logger.warning(f"No geocoding result for: {address_string}")
|
|
return GeocodeResult(
|
|
success=False,
|
|
error_message="No location found for address"
|
|
)
|
|
|
|
# Extract country code from raw response if available
|
|
country_code = None
|
|
if hasattr(location, 'raw') and 'address' in location.raw:
|
|
country_code = location.raw['address'].get('country_code', '').upper()
|
|
|
|
logger.info(
|
|
f"Geocoding successful: lat={location.latitude}, lon={location.longitude}",
|
|
extra={
|
|
"latitude": location.latitude,
|
|
"longitude": location.longitude,
|
|
"raw_address": location.address,
|
|
"country_code": country_code,
|
|
}
|
|
)
|
|
|
|
return GeocodeResult(
|
|
success=True,
|
|
latitude=str(location.latitude),
|
|
longitude=str(location.longitude),
|
|
raw_address=location.address,
|
|
country_code=country_code,
|
|
)
|
|
|
|
except GeocoderTimedOut as e:
|
|
logger.error(f"Geocoding timeout: {e}")
|
|
return GeocodeResult(
|
|
success=False,
|
|
error_message=f"Geocoding timed out after {GEOCODE_TIMEOUT}s"
|
|
)
|
|
|
|
except GeocoderServiceError as e:
|
|
logger.error(f"Geocoder service error: {e}")
|
|
return GeocodeResult(
|
|
success=False,
|
|
error_message=f"Geocoder service error: {str(e)}"
|
|
)
|
|
|
|
except GeocoderUnavailable as e:
|
|
logger.error(f"Geocoder unavailable: {e}")
|
|
return GeocodeResult(
|
|
success=False,
|
|
error_message=f"Geocoder unavailable: {str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected geocoding error: {e}", exc_info=True)
|
|
return GeocodeResult(
|
|
success=False,
|
|
error_message=f"Unexpected error: {str(e)}"
|
|
)
|
|
|
|
|
|
def validate_geocode_result(result: GeocodeResult) -> Tuple[bool, str]:
|
|
"""
|
|
TOOL: Validate quality of geocoding result.
|
|
|
|
Checks:
|
|
- Result was successful
|
|
- Country is USA (if available)
|
|
- Coordinates are within reasonable US bounds
|
|
|
|
Args:
|
|
result: GeocodeResult to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, reason_string)
|
|
"""
|
|
if not result.success:
|
|
return False, f"Geocoding failed: {result.error_message}"
|
|
|
|
# Check country code if available
|
|
if result.country_code and result.country_code != "US":
|
|
logger.warning(f"Non-US country code: {result.country_code}")
|
|
return False, f"Result is outside USA (country: {result.country_code})"
|
|
|
|
# Basic bounds check for continental US + Alaska + Hawaii
|
|
try:
|
|
lat = float(result.latitude)
|
|
lon = float(result.longitude)
|
|
|
|
# Rough US bounds (including Alaska and Hawaii)
|
|
if not (18.0 <= lat <= 72.0):
|
|
return False, f"Latitude {lat} outside US bounds"
|
|
if not (-180.0 <= lon <= -65.0):
|
|
return False, f"Longitude {lon} outside US bounds"
|
|
|
|
except (ValueError, TypeError) as e:
|
|
return False, f"Invalid coordinates: {e}"
|
|
|
|
return True, "Valid US geocode result"
|
|
|
|
|
|
def update_record(
|
|
session: Session,
|
|
customer: CustomerCustomer,
|
|
geocode_result: GeocodeResult,
|
|
is_valid: bool
|
|
) -> bool:
|
|
"""
|
|
TOOL: Update customer record with geocoding results.
|
|
|
|
Sets latitude, longitude, correct_address flag, and verified_at timestamp.
|
|
|
|
Args:
|
|
session: SQLAlchemy session
|
|
customer: CustomerCustomer record to update
|
|
geocode_result: Result from geocoding operation
|
|
is_valid: Whether the geocode result passed validation
|
|
|
|
Returns:
|
|
True if update successful, False otherwise
|
|
"""
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
if is_valid and geocode_result.success:
|
|
# Successful geocoding - update all fields
|
|
customer.customer_latitude = geocode_result.latitude
|
|
customer.customer_longitude = geocode_result.longitude
|
|
customer.correct_address = True
|
|
customer.verified_at = now
|
|
|
|
logger.info(
|
|
f"Updated record {customer.id}: lat={geocode_result.latitude}, "
|
|
f"lon={geocode_result.longitude}, correct_address=True"
|
|
)
|
|
else:
|
|
# Failed geocoding - mark as verified but not correct
|
|
customer.correct_address = False
|
|
customer.verified_at = now
|
|
|
|
logger.info(
|
|
f"Updated record {customer.id}: correct_address=False "
|
|
f"(reason: {geocode_result.error_message or 'validation failed'})"
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update record {customer.id}: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def rate_limit_sleep() -> float:
|
|
"""
|
|
Sleep for a random duration to respect Nominatim rate limits.
|
|
|
|
Nominatim requires max 1 request per second. We sleep between
|
|
MIN_SLEEP_SECONDS and MAX_SLEEP_SECONDS (default 1.2-1.8s).
|
|
|
|
Returns:
|
|
Actual sleep duration in seconds
|
|
"""
|
|
sleep_time = random.uniform(MIN_SLEEP_SECONDS, MAX_SLEEP_SECONDS)
|
|
logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
|
|
time.sleep(sleep_time)
|
|
return sleep_time
|