""" eamco_address_checker - FastAPI Address Verification Microservice. This microservice provides a batch job endpoint for verifying customer addresses using geocoding. Designed to be triggered via cron from Unraid. Endpoints: GET /health - Health check with database connectivity status POST /verify-addresses - Trigger batch address verification POST /reset-verifications - Clear all verification data for re-checking POST /streets/{town}/{state} - Fetch and store streets from OSM for a town GET /streets/{town}/{state} - Get street count for a town Usage: # Development uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # Production (Docker) docker run -p 8000:8000 eamco_address_checker # Trigger from cron curl -X POST http://localhost:8000/verify-addresses """ import logging import os import sys from contextlib import contextmanager from typing import Generator, List, Optional from fastapi import FastAPI, Depends, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.exc import SQLAlchemyError from app.config import ( DATABASE_URL, CORS_ORIGINS, LOG_LEVEL, LOG_FORMAT, BATCH_SIZE, COMMIT_BATCH_SIZE, STATE_MAPPING, ) from app.agent import AddressVerificationAgent from app.models import CustomerCustomer, StreetReference, Base from app.streets import ( populate_streets_for_town, get_town_street_count, ) # ============================================================================= # LOGGING CONFIGURATION # ============================================================================= logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format=LOG_FORMAT, handlers=[ logging.StreamHandler(sys.stdout), ] ) logger = logging.getLogger(__name__) # ============================================================================= # DATABASE SETUP # ============================================================================= # Create SQLAlchemy engine with connection pooling engine = create_engine( DATABASE_URL, pool_pre_ping=True, # Verify connections before use pool_size=5, max_overflow=10, echo=False, # Set to True for SQL debugging ) # Session factory SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=engine, ) def get_db() -> Generator[Session, None, None]: """ Dependency that provides a database session. Yields a SQLAlchemy session and ensures proper cleanup. """ db = SessionLocal() try: yield db finally: db.close() @contextmanager def get_db_session() -> Generator[Session, None, None]: """ Context manager for database sessions (non-dependency use). """ db = SessionLocal() try: yield db finally: db.close() def check_db_connection() -> bool: """ Test database connectivity. Returns: True if database is reachable, False otherwise """ try: with get_db_session() as db: db.execute(text("SELECT 1")) return True except SQLAlchemyError as e: logger.error(f"Database connection failed: {e}") return False # ============================================================================= # FASTAPI APPLICATION # ============================================================================= app = FastAPI( title="eamco_address_checker", description="Address verification microservice using Nominatim geocoding", version="1.0.0", docs_url="/docs", redoc_url="/redoc", ) # ============================================================================= # CORS MIDDLEWARE # ============================================================================= app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================= # PYDANTIC MODELS (Response Schemas) # ============================================================================= class HealthResponse(BaseModel): """Health check response schema.""" status: str db_connected: bool class VerificationResponse(BaseModel): """Address verification batch response schema.""" status: str message: str total_queried: int processed: int updated: int corrected: int failed: int skipped: int rate_limited: int duration_seconds: float success_rate: float errors_count: int sample_errors: list sample_corrections: list class ResetResponse(BaseModel): """Reset verifications response schema.""" status: str message: str records_reset: int class StreetPopulateResponse(BaseModel): """Response for street population endpoint.""" status: str message: str town: str state: str streets_added: int streets_updated: int total_found: int errors: list class StreetInfoResponse(BaseModel): """Response for street info endpoint.""" town: str state: str street_count: int message: str class TownSuggestion(BaseModel): """Single town suggestion.""" town: str state: str state_id: int customer_count: int class TownSearchResponse(BaseModel): """Response for town search endpoint.""" ok: bool suggestions: List[TownSuggestion] query: str class StreetSuggestion(BaseModel): """Single street suggestion.""" street_name: str full_address: str zip: str class StreetSearchResponse(BaseModel): """Response for street search endpoint.""" ok: bool suggestions: List[StreetSuggestion] town: str state: str query: str # ============================================================================= # ENDPOINTS # ============================================================================= @app.get("/", include_in_schema=False) async def root(): """Root endpoint - redirect to docs.""" return { "service": "eamco_address_checker", "version": "1.0.0", "docs": "/docs", } @app.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """ Health check endpoint. Returns service status and database connectivity. Use this endpoint for container health checks and monitoring. Returns: HealthResponse with status and db_connected flag """ db_connected = check_db_connection() return HealthResponse( status="healthy" if db_connected else "degraded", db_connected=db_connected, ) @app.post( "/verify-addresses", response_model=VerificationResponse, tags=["Verification"], ) async def verify_addresses(db: Session = Depends(get_db)): """ Trigger batch address verification. This endpoint runs a synchronous batch job that: 1. Queries records needing verification (max BATCH_SIZE) 2. Geocodes each address using Nominatim 3. Updates records with lat/long and verification status 4. Returns statistics about the batch run The batch respects Nominatim rate limits (1 req/sec) so execution time is approximately BATCH_SIZE * 1.5 seconds. Use this endpoint from Unraid cron: curl -X POST http://localhost:8000/verify-addresses Returns: VerificationResponse with batch statistics """ logger.info("=" * 60) logger.info("VERIFY-ADDRESSES ENDPOINT CALLED") logger.info("=" * 60) logger.info(f"Configuration: BATCH_SIZE={BATCH_SIZE}, COMMIT_SIZE={COMMIT_BATCH_SIZE}") try: # Initialize and run the agent agent = AddressVerificationAgent( session=db, batch_size=BATCH_SIZE, commit_size=COMMIT_BATCH_SIZE, ) result = agent.run() logger.info(f"Batch complete: {result.get('message', 'No message')}") return VerificationResponse( status=result.get("status", "unknown"), message=result.get("message", ""), total_queried=result.get("total_queried", 0), processed=result.get("processed", 0), updated=result.get("updated", 0), corrected=result.get("corrected", 0), failed=result.get("failed", 0), skipped=result.get("skipped", 0), rate_limited=result.get("rate_limited", 0), duration_seconds=result.get("duration_seconds", 0.0), success_rate=result.get("success_rate", 0.0), errors_count=result.get("errors_count", 0), sample_errors=result.get("sample_errors", []), sample_corrections=result.get("sample_corrections", []), ) except SQLAlchemyError as e: logger.error(f"Database error during verification: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Database error: {str(e)}" ) except Exception as e: logger.error(f"Unexpected error during verification: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Verification failed: {str(e)}" ) @app.post( "/reset-verifications", response_model=ResetResponse, tags=["Verification"], ) async def reset_verifications(db: Session = Depends(get_db)): """ Reset all address verifications for re-checking. This endpoint clears verification data for ALL customer records: - Sets correct_address = FALSE - Sets verified_at = NULL - Clears customer_latitude and customer_longitude After calling this endpoint, all addresses will be eligible for re-verification on the next /verify-addresses call. WARNING: This is a mass update operation. Use with caution. Returns: ResetResponse with count of records reset """ logger.info("=" * 60) logger.info("RESET-VERIFICATIONS ENDPOINT CALLED") logger.info("=" * 60) try: # Count records before update total_records = db.query(CustomerCustomer).count() logger.info(f"Total customer records: {total_records}") # Mass update to reset all verification data updated_count = db.query(CustomerCustomer).update( { CustomerCustomer.correct_address: False, CustomerCustomer.verified_at: None, CustomerCustomer.customer_latitude: None, CustomerCustomer.customer_longitude: None, }, synchronize_session=False ) db.commit() logger.info(f"Reset {updated_count} records successfully") return ResetResponse( status="success", message=f"Reset {updated_count} address verifications. All addresses are now eligible for re-verification.", records_reset=updated_count, ) except SQLAlchemyError as e: db.rollback() logger.error(f"Database error during reset: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Database error: {str(e)}" ) except Exception as e: db.rollback() logger.error(f"Unexpected error during reset: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Reset failed: {str(e)}" ) # ============================================================================= # STREET REFERENCE ENDPOINTS # ============================================================================= @app.post( "/streets/{town}/{state}", response_model=StreetPopulateResponse, tags=["Streets"], ) async def populate_streets( town: str, state: str, clear_existing: bool = False, db: Session = Depends(get_db) ): """ Fetch and store all streets for a town from OpenStreetMap. This endpoint queries the OSM Overpass API to get all named streets in the specified town and stores them in the street_reference table for fuzzy matching during address verification. Args: town: Town/city name (e.g., "Boston") state: 2-letter state abbreviation (e.g., "MA") clear_existing: If true, delete existing streets for this town first Example: curl -X POST http://localhost:8000/streets/Boston/MA Returns: StreetPopulateResponse with count of streets added """ logger.info("=" * 60) logger.info(f"POPULATE STREETS: {town}, {state}") logger.info("=" * 60) # Validate state abbreviation (2 letters) if len(state) != 2 or not state.isalpha(): raise HTTPException( status_code=400, detail="State must be a 2-letter abbreviation (e.g., MA, NY, CA)" ) try: # Ensure the street_reference table exists Base.metadata.create_all(bind=engine, tables=[StreetReference.__table__]) result = populate_streets_for_town( session=db, town=town, state=state.upper(), clear_existing=clear_existing, ) return StreetPopulateResponse( status="success" if result.success else "partial", message=result.message, town=town, state=state.upper(), streets_added=result.streets_added, streets_updated=result.streets_updated, total_found=result.total_found, errors=result.errors, ) except SQLAlchemyError as e: db.rollback() logger.error(f"Database error populating streets: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Database error: {str(e)}" ) except Exception as e: logger.error(f"Error populating streets: {e}", exc_info=True) raise HTTPException( status_code=500, detail=f"Failed to populate streets: {str(e)}" ) @app.get( "/streets/{town}/{state}", response_model=StreetInfoResponse, tags=["Streets"], ) async def get_street_info( town: str, state: str, db: Session = Depends(get_db) ): """ Get information about streets stored for a town. Returns the count of streets in the reference table for the specified town/state combination. Args: town: Town/city name state: 2-letter state abbreviation Example: curl http://localhost:8000/streets/Boston/MA Returns: StreetInfoResponse with street count """ # Validate state abbreviation if len(state) != 2 or not state.isalpha(): raise HTTPException( status_code=400, detail="State must be a 2-letter abbreviation" ) count = get_town_street_count(db, town, state.upper()) if count == 0: message = f"No streets found for {town}, {state}. Use POST to populate." else: message = f"Found {count} streets for {town}, {state}" return StreetInfoResponse( town=town, state=state.upper(), street_count=count, message=message, ) # ============================================================================= # ADDRESS AUTOCOMPLETE ENDPOINTS # ============================================================================= @app.get( "/towns/search", response_model=TownSearchResponse, tags=["Autocomplete"], ) async def search_towns( q: str = Query(..., min_length=2, description="Search query for town name"), limit: int = Query(10, ge=1, le=50, description="Maximum results to return"), db: Session = Depends(get_db) ): """ Search for towns based on existing customer data. This endpoint searches the customer database for unique town/state combinations matching the query. Returns towns sorted by customer count. Args: q: Partial town name to search for (min 2 characters) limit: Maximum number of suggestions to return (default 10) Example: GET /towns/search?q=spring Returns: TownSearchResponse with list of matching towns """ from sqlalchemy import func, distinct query_lower = q.lower().strip() # Query distinct town/state combinations with customer counts results = ( db.query( CustomerCustomer.customer_town, CustomerCustomer.customer_state, func.count(CustomerCustomer.id).label('customer_count') ) .filter(func.lower(CustomerCustomer.customer_town).contains(query_lower)) .filter(CustomerCustomer.customer_town.isnot(None)) .filter(CustomerCustomer.customer_town != '') .group_by(CustomerCustomer.customer_town, CustomerCustomer.customer_state) .order_by(func.count(CustomerCustomer.id).desc()) .limit(limit) .all() ) suggestions = [] for town, state_id, count in results: if town: # Handle state_id being 0 or None - default to empty string state_abbr = STATE_MAPPING.get(state_id, "") if state_id else "" suggestions.append(TownSuggestion( town=town.strip(), # Remove trailing whitespace state=state_abbr or "MA", # Default to MA if no state mapped state_id=state_id or 0, customer_count=count )) return TownSearchResponse( ok=True, suggestions=suggestions, query=q ) @app.get( "/streets/search", response_model=StreetSearchResponse, tags=["Autocomplete"], ) async def search_streets( town: str = Query(..., min_length=1, description="Town name to search within"), state: str = Query(..., min_length=2, max_length=2, description="2-letter state abbreviation"), q: str = Query(..., min_length=1, description="Partial street name to search"), limit: int = Query(10, ge=1, le=50, description="Maximum results to return"), db: Session = Depends(get_db) ): """ Search for streets within a specific town using fuzzy matching. This endpoint searches the StreetReference table for streets matching the partial query within the specified town/state using fuzzy matching. It then looks up zip codes from existing customer data. Args: town: Town/city name to search within state: 2-letter state abbreviation (e.g., "MA", "NY") q: Partial street name to search for limit: Maximum number of suggestions to return (default 10) Example: GET /streets/search?town=Auburn&state=MA&q=main Returns: StreetSearchResponse with list of matching streets and zip codes """ from sqlalchemy import func, or_ from rapidfuzz import fuzz, process import re state = state.upper() town_normalized = town.lower().strip() query_raw = q.strip() query_lower = query_raw.lower() suggestions = [] # Build state_ids list for customer lookup (include 0 for unmapped) state_ids = [0] for sid, abbr in STATE_MAPPING.items(): if abbr == state: state_ids.append(sid) # Check if query starts with a house number (e.g., "32 Roch" or "32 Rochdale") house_number = "" street_query = query_lower house_match = re.match(r'^(\d+[A-Za-z]?)\s+(.+)$', query_raw) if house_match: house_number = house_match.group(1) street_query = house_match.group(2).lower() # First, get streets that contain the street query string (case-insensitive) matching_street_refs = ( db.query(StreetReference) .filter(StreetReference.town_normalized == town_normalized) .filter(StreetReference.state == state) .filter(func.lower(StreetReference.street_name).contains(street_query)) .limit(limit * 3) # Get more for ranking .all() ) if matching_street_refs: # Rank by fuzzy score - prefer exact matches and starts-with scored_streets = [] for street in matching_street_refs: street_lower = street.street_name.lower() # Scoring: exact match > starts with > contains if street_lower == street_query: score = 100 elif street_lower.startswith(street_query): score = 90 + fuzz.ratio(street_query, street_lower) else: score = fuzz.ratio(street_query, street_lower) scored_streets.append((street.street_name, score)) # Sort by score descending scored_streets.sort(key=lambda x: x[1], reverse=True) # For each matched street, look up zip code from customer data for street_name, score in scored_streets[:limit]: # Look up zip code from customers on this street in this town zip_code = "" # Extract the base street name for matching (e.g., "Main" from "Main Street") street_base = street_name.lower() # Remove common suffixes to get base name for suffix in [' street', ' st', ' avenue', ' ave', ' road', ' rd', ' drive', ' dr', ' lane', ' ln', ' court', ' ct', ' circle', ' cir', ' boulevard', ' blvd', ' way', ' place', ' pl', ' terrace', ' ter']: if street_base.endswith(suffix): street_base = street_base[:-len(suffix)] break # Try to find a customer with this street name in their address customer_zip = ( db.query(CustomerCustomer.customer_zip) .filter(func.lower(func.trim(CustomerCustomer.customer_town)) == town_normalized) .filter(CustomerCustomer.customer_state.in_(state_ids)) .filter(func.lower(CustomerCustomer.customer_address).contains(street_base)) .filter(CustomerCustomer.customer_zip.isnot(None)) .filter(CustomerCustomer.customer_zip != '') .first() ) if customer_zip: zip_code = customer_zip[0] # Include house number if provided display_street = f"{house_number} {street_name}" if house_number else street_name suggestions.append(StreetSuggestion( street_name=display_street, full_address=f"{display_street}, {town}, {state}", zip=zip_code )) if not suggestions: # Fallback: search existing customer addresses directly # Use the full query (with house number) for direct address matching customer_addresses = ( db.query( CustomerCustomer.customer_address, CustomerCustomer.customer_zip, func.count(CustomerCustomer.id).label('count') ) .filter(func.lower(func.trim(CustomerCustomer.customer_town)) == town_normalized) .filter(CustomerCustomer.customer_state.in_(state_ids)) .filter(func.lower(CustomerCustomer.customer_address).contains(query_lower)) .filter(CustomerCustomer.customer_address.isnot(None)) .filter(CustomerCustomer.customer_address != '') .group_by(CustomerCustomer.customer_address, CustomerCustomer.customer_zip) .order_by(func.count(CustomerCustomer.id).desc()) .limit(limit) .all() ) for address, zip_code, _ in customer_addresses: if address: suggestions.append(StreetSuggestion( street_name=address, full_address=f"{address}, {town}, {state}", zip=zip_code or "" )) return StreetSearchResponse( ok=True, suggestions=suggestions, town=town, state=state, query=q ) # ============================================================================= # STARTUP/SHUTDOWN EVENTS # ============================================================================= @app.on_event("startup") async def startup_event(): """Application startup - log configuration and test DB connection.""" logger.info("🚀 eamco_address_checker STARTING") mode = os.environ.get('MODE', 'DEVELOPMENT').upper() if mode in ['DEVELOPMENT', 'DEV']: logger.info("🤖🤖🤖🤖🤖 Mode: Development 🤖🤖🤖🤖🤖") elif mode in ['PRODUCTION', 'PROD']: logger.info("💀💀💀💀💀💀💀💀💀💀 ⚠️ WARNING PRODUCTION 💀💀💀💀💀💀💀💀💀💀") logger.info(f"DB: {DATABASE_URL[:30]}...") logger.info(f"CORS: {len(CORS_ORIGINS)} origins configured") logger.info(f"Batch: {BATCH_SIZE} | Commit: {COMMIT_BATCH_SIZE}") # Test database connection if check_db_connection(): logger.info("DB Connection: ✅ OK") else: logger.info("DB Connection: ❌ FAILED") @app.on_event("shutdown") async def shutdown_event(): """Application shutdown - cleanup.""" logger.info("eamco_address_checker SHUTTING DOWN") engine.dispose() logger.info("Database connections closed") # ============================================================================= # MAIN ENTRY POINT (for direct execution) # ============================================================================= if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host="0.0.0.0", port=8000, reload=True, log_level="info", )