559 lines
16 KiB
Python
559 lines
16 KiB
Python
"""
|
|
eamco_address_checker - FastAPI Address Verification Microservice.
|
|
|
|
This microservice provides a batch job endpoint for verifying customer addresses
|
|
using geocoding. Designed to be triggered via cron from Unraid.
|
|
|
|
Endpoints:
|
|
GET /health - Health check with database connectivity status
|
|
POST /verify-addresses - Trigger batch address verification
|
|
POST /reset-verifications - Clear all verification data for re-checking
|
|
POST /streets/{town}/{state} - Fetch and store streets from OSM for a town
|
|
GET /streets/{town}/{state} - Get street count for a town
|
|
|
|
Usage:
|
|
# Development
|
|
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
|
|
|
|
# Production (Docker)
|
|
docker run -p 8000:8000 eamco_address_checker
|
|
|
|
# Trigger from cron
|
|
curl -X POST http://localhost:8000/verify-addresses
|
|
"""
|
|
|
|
import logging
|
|
import sys
|
|
from contextlib import contextmanager
|
|
from typing import Generator
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app.config import (
|
|
DATABASE_URL,
|
|
CORS_ORIGINS,
|
|
LOG_LEVEL,
|
|
LOG_FORMAT,
|
|
BATCH_SIZE,
|
|
COMMIT_BATCH_SIZE,
|
|
)
|
|
from app.agent import AddressVerificationAgent
|
|
from app.models import CustomerCustomer, StreetReference, Base
|
|
from app.streets import (
|
|
populate_streets_for_town,
|
|
get_town_street_count,
|
|
)
|
|
|
|
# =============================================================================
|
|
# LOGGING CONFIGURATION
|
|
# =============================================================================
|
|
|
|
logging.basicConfig(
|
|
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
|
format=LOG_FORMAT,
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# =============================================================================
|
|
# DATABASE SETUP
|
|
# =============================================================================
|
|
|
|
# Create SQLAlchemy engine with connection pooling
|
|
engine = create_engine(
|
|
DATABASE_URL,
|
|
pool_pre_ping=True, # Verify connections before use
|
|
pool_size=5,
|
|
max_overflow=10,
|
|
echo=False, # Set to True for SQL debugging
|
|
)
|
|
|
|
# Session factory
|
|
SessionLocal = sessionmaker(
|
|
autocommit=False,
|
|
autoflush=False,
|
|
bind=engine,
|
|
)
|
|
|
|
|
|
def get_db() -> Generator[Session, None, None]:
|
|
"""
|
|
Dependency that provides a database session.
|
|
|
|
Yields a SQLAlchemy session and ensures proper cleanup.
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@contextmanager
|
|
def get_db_session() -> Generator[Session, None, None]:
|
|
"""
|
|
Context manager for database sessions (non-dependency use).
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def check_db_connection() -> bool:
|
|
"""
|
|
Test database connectivity.
|
|
|
|
Returns:
|
|
True if database is reachable, False otherwise
|
|
"""
|
|
try:
|
|
with get_db_session() as db:
|
|
db.execute(text("SELECT 1"))
|
|
return True
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Database connection failed: {e}")
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# FASTAPI APPLICATION
|
|
# =============================================================================
|
|
|
|
app = FastAPI(
|
|
title="eamco_address_checker",
|
|
description="Address verification microservice using Nominatim geocoding",
|
|
version="1.0.0",
|
|
docs_url="/docs",
|
|
redoc_url="/redoc",
|
|
)
|
|
|
|
# =============================================================================
|
|
# CORS MIDDLEWARE
|
|
# =============================================================================
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=CORS_ORIGINS,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# =============================================================================
|
|
# PYDANTIC MODELS (Response Schemas)
|
|
# =============================================================================
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response schema."""
|
|
status: str
|
|
db_connected: bool
|
|
|
|
|
|
class VerificationResponse(BaseModel):
|
|
"""Address verification batch response schema."""
|
|
status: str
|
|
message: str
|
|
total_queried: int
|
|
processed: int
|
|
updated: int
|
|
corrected: int
|
|
failed: int
|
|
skipped: int
|
|
rate_limited: int
|
|
duration_seconds: float
|
|
success_rate: float
|
|
errors_count: int
|
|
sample_errors: list
|
|
sample_corrections: list
|
|
|
|
|
|
class ResetResponse(BaseModel):
|
|
"""Reset verifications response schema."""
|
|
status: str
|
|
message: str
|
|
records_reset: int
|
|
|
|
|
|
class StreetPopulateResponse(BaseModel):
|
|
"""Response for street population endpoint."""
|
|
status: str
|
|
message: str
|
|
town: str
|
|
state: str
|
|
streets_added: int
|
|
streets_updated: int
|
|
total_found: int
|
|
errors: list
|
|
|
|
|
|
class StreetInfoResponse(BaseModel):
|
|
"""Response for street info endpoint."""
|
|
town: str
|
|
state: str
|
|
street_count: int
|
|
message: str
|
|
|
|
|
|
# =============================================================================
|
|
# ENDPOINTS
|
|
# =============================================================================
|
|
|
|
|
|
@app.get("/", include_in_schema=False)
|
|
async def root():
|
|
"""Root endpoint - redirect to docs."""
|
|
return {
|
|
"service": "eamco_address_checker",
|
|
"version": "1.0.0",
|
|
"docs": "/docs",
|
|
}
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse, tags=["Health"])
|
|
async def health_check():
|
|
"""
|
|
Health check endpoint.
|
|
|
|
Returns service status and database connectivity.
|
|
Use this endpoint for container health checks and monitoring.
|
|
|
|
Returns:
|
|
HealthResponse with status and db_connected flag
|
|
"""
|
|
db_connected = check_db_connection()
|
|
|
|
return HealthResponse(
|
|
status="healthy" if db_connected else "degraded",
|
|
db_connected=db_connected,
|
|
)
|
|
|
|
|
|
@app.post(
|
|
"/verify-addresses",
|
|
response_model=VerificationResponse,
|
|
tags=["Verification"],
|
|
)
|
|
async def verify_addresses(db: Session = Depends(get_db)):
|
|
"""
|
|
Trigger batch address verification.
|
|
|
|
This endpoint runs a synchronous batch job that:
|
|
1. Queries records needing verification (max BATCH_SIZE)
|
|
2. Geocodes each address using Nominatim
|
|
3. Updates records with lat/long and verification status
|
|
4. Returns statistics about the batch run
|
|
|
|
The batch respects Nominatim rate limits (1 req/sec) so execution
|
|
time is approximately BATCH_SIZE * 1.5 seconds.
|
|
|
|
Use this endpoint from Unraid cron:
|
|
curl -X POST http://localhost:8000/verify-addresses
|
|
|
|
Returns:
|
|
VerificationResponse with batch statistics
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("VERIFY-ADDRESSES ENDPOINT CALLED")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Configuration: BATCH_SIZE={BATCH_SIZE}, COMMIT_SIZE={COMMIT_BATCH_SIZE}")
|
|
|
|
try:
|
|
# Initialize and run the agent
|
|
agent = AddressVerificationAgent(
|
|
session=db,
|
|
batch_size=BATCH_SIZE,
|
|
commit_size=COMMIT_BATCH_SIZE,
|
|
)
|
|
|
|
result = agent.run()
|
|
|
|
logger.info(f"Batch complete: {result.get('message', 'No message')}")
|
|
|
|
return VerificationResponse(
|
|
status=result.get("status", "unknown"),
|
|
message=result.get("message", ""),
|
|
total_queried=result.get("total_queried", 0),
|
|
processed=result.get("processed", 0),
|
|
updated=result.get("updated", 0),
|
|
corrected=result.get("corrected", 0),
|
|
failed=result.get("failed", 0),
|
|
skipped=result.get("skipped", 0),
|
|
rate_limited=result.get("rate_limited", 0),
|
|
duration_seconds=result.get("duration_seconds", 0.0),
|
|
success_rate=result.get("success_rate", 0.0),
|
|
errors_count=result.get("errors_count", 0),
|
|
sample_errors=result.get("sample_errors", []),
|
|
sample_corrections=result.get("sample_corrections", []),
|
|
)
|
|
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Database error during verification: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Database error: {str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during verification: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Verification failed: {str(e)}"
|
|
)
|
|
|
|
|
|
@app.post(
|
|
"/reset-verifications",
|
|
response_model=ResetResponse,
|
|
tags=["Verification"],
|
|
)
|
|
async def reset_verifications(db: Session = Depends(get_db)):
|
|
"""
|
|
Reset all address verifications for re-checking.
|
|
|
|
This endpoint clears verification data for ALL customer records:
|
|
- Sets correct_address = FALSE
|
|
- Sets verified_at = NULL
|
|
- Clears customer_latitude and customer_longitude
|
|
|
|
After calling this endpoint, all addresses will be eligible for
|
|
re-verification on the next /verify-addresses call.
|
|
|
|
WARNING: This is a mass update operation. Use with caution.
|
|
|
|
Returns:
|
|
ResetResponse with count of records reset
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("RESET-VERIFICATIONS ENDPOINT CALLED")
|
|
logger.info("=" * 60)
|
|
|
|
try:
|
|
# Count records before update
|
|
total_records = db.query(CustomerCustomer).count()
|
|
logger.info(f"Total customer records: {total_records}")
|
|
|
|
# Mass update to reset all verification data
|
|
updated_count = db.query(CustomerCustomer).update(
|
|
{
|
|
CustomerCustomer.correct_address: False,
|
|
CustomerCustomer.verified_at: None,
|
|
CustomerCustomer.customer_latitude: None,
|
|
CustomerCustomer.customer_longitude: None,
|
|
},
|
|
synchronize_session=False
|
|
)
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Reset {updated_count} records successfully")
|
|
|
|
return ResetResponse(
|
|
status="success",
|
|
message=f"Reset {updated_count} address verifications. All addresses are now eligible for re-verification.",
|
|
records_reset=updated_count,
|
|
)
|
|
|
|
except SQLAlchemyError as e:
|
|
db.rollback()
|
|
logger.error(f"Database error during reset: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Database error: {str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Unexpected error during reset: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Reset failed: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# STREET REFERENCE ENDPOINTS
|
|
# =============================================================================
|
|
|
|
|
|
@app.post(
|
|
"/streets/{town}/{state}",
|
|
response_model=StreetPopulateResponse,
|
|
tags=["Streets"],
|
|
)
|
|
async def populate_streets(
|
|
town: str,
|
|
state: str,
|
|
clear_existing: bool = False,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Fetch and store all streets for a town from OpenStreetMap.
|
|
|
|
This endpoint queries the OSM Overpass API to get all named streets
|
|
in the specified town and stores them in the street_reference table
|
|
for fuzzy matching during address verification.
|
|
|
|
Args:
|
|
town: Town/city name (e.g., "Boston")
|
|
state: 2-letter state abbreviation (e.g., "MA")
|
|
clear_existing: If true, delete existing streets for this town first
|
|
|
|
Example:
|
|
curl -X POST http://localhost:8000/streets/Boston/MA
|
|
|
|
Returns:
|
|
StreetPopulateResponse with count of streets added
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info(f"POPULATE STREETS: {town}, {state}")
|
|
logger.info("=" * 60)
|
|
|
|
# Validate state abbreviation (2 letters)
|
|
if len(state) != 2 or not state.isalpha():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="State must be a 2-letter abbreviation (e.g., MA, NY, CA)"
|
|
)
|
|
|
|
try:
|
|
# Ensure the street_reference table exists
|
|
Base.metadata.create_all(bind=engine, tables=[StreetReference.__table__])
|
|
|
|
result = populate_streets_for_town(
|
|
session=db,
|
|
town=town,
|
|
state=state.upper(),
|
|
clear_existing=clear_existing,
|
|
)
|
|
|
|
return StreetPopulateResponse(
|
|
status="success" if result.success else "partial",
|
|
message=result.message,
|
|
town=town,
|
|
state=state.upper(),
|
|
streets_added=result.streets_added,
|
|
streets_updated=result.streets_updated,
|
|
total_found=result.total_found,
|
|
errors=result.errors,
|
|
)
|
|
|
|
except SQLAlchemyError as e:
|
|
db.rollback()
|
|
logger.error(f"Database error populating streets: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Database error: {str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error populating streets: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to populate streets: {str(e)}"
|
|
)
|
|
|
|
|
|
@app.get(
|
|
"/streets/{town}/{state}",
|
|
response_model=StreetInfoResponse,
|
|
tags=["Streets"],
|
|
)
|
|
async def get_street_info(
|
|
town: str,
|
|
state: str,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get information about streets stored for a town.
|
|
|
|
Returns the count of streets in the reference table for the
|
|
specified town/state combination.
|
|
|
|
Args:
|
|
town: Town/city name
|
|
state: 2-letter state abbreviation
|
|
|
|
Example:
|
|
curl http://localhost:8000/streets/Boston/MA
|
|
|
|
Returns:
|
|
StreetInfoResponse with street count
|
|
"""
|
|
# Validate state abbreviation
|
|
if len(state) != 2 or not state.isalpha():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="State must be a 2-letter abbreviation"
|
|
)
|
|
|
|
count = get_town_street_count(db, town, state.upper())
|
|
|
|
if count == 0:
|
|
message = f"No streets found for {town}, {state}. Use POST to populate."
|
|
else:
|
|
message = f"Found {count} streets for {town}, {state}"
|
|
|
|
return StreetInfoResponse(
|
|
town=town,
|
|
state=state.upper(),
|
|
street_count=count,
|
|
message=message,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# STARTUP/SHUTDOWN EVENTS
|
|
# =============================================================================
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Application startup - log configuration and test DB connection."""
|
|
logger.info("*" * 60)
|
|
logger.info("eamco_address_checker STARTING")
|
|
logger.info("*" * 60)
|
|
logger.info(f"Database URL: {DATABASE_URL[:50]}...")
|
|
logger.info(f"CORS Origins: {CORS_ORIGINS}")
|
|
logger.info(f"Batch Size: {BATCH_SIZE}")
|
|
logger.info(f"Commit Batch Size: {COMMIT_BATCH_SIZE}")
|
|
|
|
# Test database connection
|
|
if check_db_connection():
|
|
logger.info("Database connection: OK")
|
|
else:
|
|
logger.warning("Database connection: FAILED - service may be degraded")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Application shutdown - cleanup."""
|
|
logger.info("eamco_address_checker SHUTTING DOWN")
|
|
engine.dispose()
|
|
logger.info("Database connections closed")
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN ENTRY POINT (for direct execution)
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"app.main:app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
reload=True,
|
|
log_level="info",
|
|
)
|