Files
eamco_address_checker/app/main.py
2026-01-18 17:53:26 -05:00

559 lines
16 KiB
Python

"""
eamco_address_checker - FastAPI Address Verification Microservice.
This microservice provides a batch job endpoint for verifying customer addresses
using geocoding. Designed to be triggered via cron from Unraid.
Endpoints:
GET /health - Health check with database connectivity status
POST /verify-addresses - Trigger batch address verification
POST /reset-verifications - Clear all verification data for re-checking
POST /streets/{town}/{state} - Fetch and store streets from OSM for a town
GET /streets/{town}/{state} - Get street count for a town
Usage:
# Development
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Production (Docker)
docker run -p 8000:8000 eamco_address_checker
# Trigger from cron
curl -X POST http://localhost:8000/verify-addresses
"""
import logging
import sys
from contextlib import contextmanager
from typing import Generator
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError
from app.config import (
DATABASE_URL,
CORS_ORIGINS,
LOG_LEVEL,
LOG_FORMAT,
BATCH_SIZE,
COMMIT_BATCH_SIZE,
)
from app.agent import AddressVerificationAgent
from app.models import CustomerCustomer, StreetReference, Base
from app.streets import (
populate_streets_for_town,
get_town_street_count,
)
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format=LOG_FORMAT,
handlers=[
logging.StreamHandler(sys.stdout),
]
)
logger = logging.getLogger(__name__)
# =============================================================================
# DATABASE SETUP
# =============================================================================
# Create SQLAlchemy engine with connection pooling
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # Verify connections before use
pool_size=5,
max_overflow=10,
echo=False, # Set to True for SQL debugging
)
# Session factory
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
)
def get_db() -> Generator[Session, None, None]:
"""
Dependency that provides a database session.
Yields a SQLAlchemy session and ensures proper cleanup.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
@contextmanager
def get_db_session() -> Generator[Session, None, None]:
"""
Context manager for database sessions (non-dependency use).
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def check_db_connection() -> bool:
"""
Test database connectivity.
Returns:
True if database is reachable, False otherwise
"""
try:
with get_db_session() as db:
db.execute(text("SELECT 1"))
return True
except SQLAlchemyError as e:
logger.error(f"Database connection failed: {e}")
return False
# =============================================================================
# FASTAPI APPLICATION
# =============================================================================
app = FastAPI(
title="eamco_address_checker",
description="Address verification microservice using Nominatim geocoding",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# =============================================================================
# CORS MIDDLEWARE
# =============================================================================
app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =============================================================================
# PYDANTIC MODELS (Response Schemas)
# =============================================================================
class HealthResponse(BaseModel):
"""Health check response schema."""
status: str
db_connected: bool
class VerificationResponse(BaseModel):
"""Address verification batch response schema."""
status: str
message: str
total_queried: int
processed: int
updated: int
corrected: int
failed: int
skipped: int
rate_limited: int
duration_seconds: float
success_rate: float
errors_count: int
sample_errors: list
sample_corrections: list
class ResetResponse(BaseModel):
"""Reset verifications response schema."""
status: str
message: str
records_reset: int
class StreetPopulateResponse(BaseModel):
"""Response for street population endpoint."""
status: str
message: str
town: str
state: str
streets_added: int
streets_updated: int
total_found: int
errors: list
class StreetInfoResponse(BaseModel):
"""Response for street info endpoint."""
town: str
state: str
street_count: int
message: str
# =============================================================================
# ENDPOINTS
# =============================================================================
@app.get("/", include_in_schema=False)
async def root():
"""Root endpoint - redirect to docs."""
return {
"service": "eamco_address_checker",
"version": "1.0.0",
"docs": "/docs",
}
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""
Health check endpoint.
Returns service status and database connectivity.
Use this endpoint for container health checks and monitoring.
Returns:
HealthResponse with status and db_connected flag
"""
db_connected = check_db_connection()
return HealthResponse(
status="healthy" if db_connected else "degraded",
db_connected=db_connected,
)
@app.post(
"/verify-addresses",
response_model=VerificationResponse,
tags=["Verification"],
)
async def verify_addresses(db: Session = Depends(get_db)):
"""
Trigger batch address verification.
This endpoint runs a synchronous batch job that:
1. Queries records needing verification (max BATCH_SIZE)
2. Geocodes each address using Nominatim
3. Updates records with lat/long and verification status
4. Returns statistics about the batch run
The batch respects Nominatim rate limits (1 req/sec) so execution
time is approximately BATCH_SIZE * 1.5 seconds.
Use this endpoint from Unraid cron:
curl -X POST http://localhost:8000/verify-addresses
Returns:
VerificationResponse with batch statistics
"""
logger.info("=" * 60)
logger.info("VERIFY-ADDRESSES ENDPOINT CALLED")
logger.info("=" * 60)
logger.info(f"Configuration: BATCH_SIZE={BATCH_SIZE}, COMMIT_SIZE={COMMIT_BATCH_SIZE}")
try:
# Initialize and run the agent
agent = AddressVerificationAgent(
session=db,
batch_size=BATCH_SIZE,
commit_size=COMMIT_BATCH_SIZE,
)
result = agent.run()
logger.info(f"Batch complete: {result.get('message', 'No message')}")
return VerificationResponse(
status=result.get("status", "unknown"),
message=result.get("message", ""),
total_queried=result.get("total_queried", 0),
processed=result.get("processed", 0),
updated=result.get("updated", 0),
corrected=result.get("corrected", 0),
failed=result.get("failed", 0),
skipped=result.get("skipped", 0),
rate_limited=result.get("rate_limited", 0),
duration_seconds=result.get("duration_seconds", 0.0),
success_rate=result.get("success_rate", 0.0),
errors_count=result.get("errors_count", 0),
sample_errors=result.get("sample_errors", []),
sample_corrections=result.get("sample_corrections", []),
)
except SQLAlchemyError as e:
logger.error(f"Database error during verification: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Database error: {str(e)}"
)
except Exception as e:
logger.error(f"Unexpected error during verification: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Verification failed: {str(e)}"
)
@app.post(
"/reset-verifications",
response_model=ResetResponse,
tags=["Verification"],
)
async def reset_verifications(db: Session = Depends(get_db)):
"""
Reset all address verifications for re-checking.
This endpoint clears verification data for ALL customer records:
- Sets correct_address = FALSE
- Sets verified_at = NULL
- Clears customer_latitude and customer_longitude
After calling this endpoint, all addresses will be eligible for
re-verification on the next /verify-addresses call.
WARNING: This is a mass update operation. Use with caution.
Returns:
ResetResponse with count of records reset
"""
logger.info("=" * 60)
logger.info("RESET-VERIFICATIONS ENDPOINT CALLED")
logger.info("=" * 60)
try:
# Count records before update
total_records = db.query(CustomerCustomer).count()
logger.info(f"Total customer records: {total_records}")
# Mass update to reset all verification data
updated_count = db.query(CustomerCustomer).update(
{
CustomerCustomer.correct_address: False,
CustomerCustomer.verified_at: None,
CustomerCustomer.customer_latitude: None,
CustomerCustomer.customer_longitude: None,
},
synchronize_session=False
)
db.commit()
logger.info(f"Reset {updated_count} records successfully")
return ResetResponse(
status="success",
message=f"Reset {updated_count} address verifications. All addresses are now eligible for re-verification.",
records_reset=updated_count,
)
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Database error during reset: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
logger.error(f"Unexpected error during reset: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Reset failed: {str(e)}"
)
# =============================================================================
# STREET REFERENCE ENDPOINTS
# =============================================================================
@app.post(
"/streets/{town}/{state}",
response_model=StreetPopulateResponse,
tags=["Streets"],
)
async def populate_streets(
town: str,
state: str,
clear_existing: bool = False,
db: Session = Depends(get_db)
):
"""
Fetch and store all streets for a town from OpenStreetMap.
This endpoint queries the OSM Overpass API to get all named streets
in the specified town and stores them in the street_reference table
for fuzzy matching during address verification.
Args:
town: Town/city name (e.g., "Boston")
state: 2-letter state abbreviation (e.g., "MA")
clear_existing: If true, delete existing streets for this town first
Example:
curl -X POST http://localhost:8000/streets/Boston/MA
Returns:
StreetPopulateResponse with count of streets added
"""
logger.info("=" * 60)
logger.info(f"POPULATE STREETS: {town}, {state}")
logger.info("=" * 60)
# Validate state abbreviation (2 letters)
if len(state) != 2 or not state.isalpha():
raise HTTPException(
status_code=400,
detail="State must be a 2-letter abbreviation (e.g., MA, NY, CA)"
)
try:
# Ensure the street_reference table exists
Base.metadata.create_all(bind=engine, tables=[StreetReference.__table__])
result = populate_streets_for_town(
session=db,
town=town,
state=state.upper(),
clear_existing=clear_existing,
)
return StreetPopulateResponse(
status="success" if result.success else "partial",
message=result.message,
town=town,
state=state.upper(),
streets_added=result.streets_added,
streets_updated=result.streets_updated,
total_found=result.total_found,
errors=result.errors,
)
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Database error populating streets: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Database error: {str(e)}"
)
except Exception as e:
logger.error(f"Error populating streets: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to populate streets: {str(e)}"
)
@app.get(
"/streets/{town}/{state}",
response_model=StreetInfoResponse,
tags=["Streets"],
)
async def get_street_info(
town: str,
state: str,
db: Session = Depends(get_db)
):
"""
Get information about streets stored for a town.
Returns the count of streets in the reference table for the
specified town/state combination.
Args:
town: Town/city name
state: 2-letter state abbreviation
Example:
curl http://localhost:8000/streets/Boston/MA
Returns:
StreetInfoResponse with street count
"""
# Validate state abbreviation
if len(state) != 2 or not state.isalpha():
raise HTTPException(
status_code=400,
detail="State must be a 2-letter abbreviation"
)
count = get_town_street_count(db, town, state.upper())
if count == 0:
message = f"No streets found for {town}, {state}. Use POST to populate."
else:
message = f"Found {count} streets for {town}, {state}"
return StreetInfoResponse(
town=town,
state=state.upper(),
street_count=count,
message=message,
)
# =============================================================================
# STARTUP/SHUTDOWN EVENTS
# =============================================================================
@app.on_event("startup")
async def startup_event():
"""Application startup - log configuration and test DB connection."""
logger.info("*" * 60)
logger.info("eamco_address_checker STARTING")
logger.info("*" * 60)
logger.info(f"Database URL: {DATABASE_URL[:50]}...")
logger.info(f"CORS Origins: {CORS_ORIGINS}")
logger.info(f"Batch Size: {BATCH_SIZE}")
logger.info(f"Commit Batch Size: {COMMIT_BATCH_SIZE}")
# Test database connection
if check_db_connection():
logger.info("Database connection: OK")
else:
logger.warning("Database connection: FAILED - service may be degraded")
@app.on_event("shutdown")
async def shutdown_event():
"""Application shutdown - cleanup."""
logger.info("eamco_address_checker SHUTTING DOWN")
engine.dispose()
logger.info("Database connections closed")
# =============================================================================
# MAIN ENTRY POINT (for direct execution)
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info",
)