From 0b9c0915a1a6bc9cc08568132343342f55136791 Mon Sep 17 00:00:00 2001 From: Edwin Eames Date: Sun, 18 Jan 2026 17:53:26 -0500 Subject: [PATCH] first commit --- .env.example | 39 ++++ .gitignore | 132 +++++++++++ Dockerfile | 45 ++++ Dockerfile.dev | 14 ++ Dockerfile.local | 40 ++++ Dockerfile.prod | 49 ++++ README.md | 3 + app/__init__.py | 1 + app/agent.py | 516 ++++++++++++++++++++++++++++++++++++++++++ app/config.py | 184 +++++++++++++++ app/main.py | 558 +++++++++++++++++++++++++++++++++++++++++++++ app/models.py | 127 +++++++++++ app/streets.py | 572 +++++++++++++++++++++++++++++++++++++++++++++++ app/tools.py | 389 ++++++++++++++++++++++++++++++++ requirements.txt | 23 ++ 15 files changed, 2692 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Dockerfile.dev create mode 100644 Dockerfile.local create mode 100644 Dockerfile.prod create mode 100644 README.md create mode 100644 app/__init__.py create mode 100644 app/agent.py create mode 100644 app/config.py create mode 100644 app/main.py create mode 100644 app/models.py create mode 100644 app/streets.py create mode 100644 app/tools.py create mode 100644 requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ffa784a --- /dev/null +++ b/.env.example @@ -0,0 +1,39 @@ +# ============================================================================= +# eamco_address_checker Environment Configuration +# ============================================================================= +# Copy this file to .env and adjust values as needed. +# All values have sensible defaults; only override what you need. + +# ============================================================================= +# DATABASE +# ============================================================================= +# Override the default PostgreSQL connection string +# Default: postgresql+psycopg2://postgres:password@192.168.1.204/eamco +# DATABASE_URL=postgresql+psycopg2://user:pass@host:5432/database + +# ============================================================================= +# BATCH PROCESSING +# ============================================================================= +# Maximum records to process per batch run (default: 150) +# BATCH_SIZE=150 + +# Number of records to process before committing to database (default: 20) +# COMMIT_BATCH_SIZE=20 + +# ============================================================================= +# RATE LIMITING (Nominatim) +# ============================================================================= +# Minimum sleep between geocoding requests in seconds (default: 1.2) +# MIN_SLEEP=1.2 + +# Maximum sleep between geocoding requests in seconds (default: 1.8) +# MAX_SLEEP=1.8 + +# Geocoding request timeout in seconds (default: 10) +# GEOCODE_TIMEOUT=10 + +# ============================================================================= +# LOGGING +# ============================================================================= +# Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL (default: INFO) +# LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..70c12d9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,132 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having sub-dependencies with platform-specific binaries, it is better to ignore the Pipfile.lock. +# Pipfile.lock + +# PEP 582; __pypackages__ +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyderworkspace + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Environments +.env.local +.env.prod \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7454574 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,45 @@ +# eamco_address_checker Dockerfile +# Lightweight Python 3.11 image for Unraid Docker deployment + +FROM python:3.11-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set working directory +WORKDIR /app + +# Install system dependencies (psycopg2 requirements) +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq-dev \ + gcc \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Copy requirements first (for better layer caching) +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Create non-root user for security +RUN useradd --create-home --shell /bin/bash appuser && \ + chown -R appuser:appuser /app +USER appuser + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1 + +# Run the application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 0000000..8c165e5 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,14 @@ +FROM python:3.11 + +ENV PYTHONFAULTHANDLER=1 +ENV PYTHONUNBUFFERED=1 +ENV MODE="DEVELOPMENT" + +WORKDIR /app + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/Dockerfile.local b/Dockerfile.local new file mode 100644 index 0000000..e43fafe --- /dev/null +++ b/Dockerfile.local @@ -0,0 +1,40 @@ +# eamco_address_checker - DEVELOPMENT Dockerfile +# Used by docker-compose.local.yml +# Features: Hot reload via volume mount, debug logging + +FROM python:3.11-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set working directory +WORKDIR /app + +# Install system dependencies (psycopg2 requirements) +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq-dev \ + gcc \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Copy requirements first (for better layer caching) +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy environment file for local development +COPY .env.local .env + +# Copy application code (will be overridden by volume mount in compose) +COPY app/ ./app/ + +# Expose port +EXPOSE 8000 + +# Development: Run with reload enabled +CMD ["uvicorn", "app.main:app", "--reload", "--host", "0.0.0.0", "--port", "8000"] diff --git a/Dockerfile.prod b/Dockerfile.prod new file mode 100644 index 0000000..56dd728 --- /dev/null +++ b/Dockerfile.prod @@ -0,0 +1,49 @@ +# eamco_address_checker - PRODUCTION Dockerfile +# Used by docker-compose.prod.yml +# Features: Optimized for production, non-root user, health checks + +FROM python:3.11-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set working directory +WORKDIR /app + +# Install system dependencies (psycopg2 requirements) +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq-dev \ + gcc \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Copy requirements first (for better layer caching) +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy environment file for production +COPY .env.prod .env + +# Copy application code +COPY app/ ./app/ + +# Create non-root user for security +RUN useradd --create-home --shell /bin/bash appuser && \ + chown -R appuser:appuser /app +USER appuser + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1 + +# Production: Run without reload, with workers +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..a5f2f72 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# EAMCO Address Checker + +This service checks addresses. diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..90d8b00 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +# eamco_address_checker app package diff --git a/app/agent.py b/app/agent.py new file mode 100644 index 0000000..9e0b321 --- /dev/null +++ b/app/agent.py @@ -0,0 +1,516 @@ +""" +Agentic Address Verification Orchestrator. + +This module implements a lightweight ReAct-inspired autonomous agent for batch +address verification. The agent follows a structured workflow: + +1. PLANNING PHASE: Query records needing verification +2. EXECUTION PHASE: For each record, follow think-act-observe-reflect cycle + - If geocoding fails, attempt fuzzy matching to correct misspellings + - Retry geocoding with corrected address +3. REFLECTION PHASE: Summarize batch results and statistics + +The agent is designed for resilience - individual record failures don't stop +the batch, and progress is committed incrementally. +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, date +from typing import List, Optional + +from geopy.geocoders import Nominatim +from sqlalchemy import or_, func +from sqlalchemy.orm import Session + +from app.config import ( + BATCH_SIZE, + COMMIT_BATCH_SIZE, + NOMINATIM_USER_AGENT, +) +from app.models import CustomerCustomer +from app.tools import ( + build_address, + validate_address_components, + format_address_string, + geocode_address, + validate_geocode_result, + update_record, + rate_limit_sleep, + GeocodeResult, + get_state_abbreviation, +) +from app.streets import correct_address, get_town_street_count + +logger = logging.getLogger(__name__) + + +@dataclass +class BatchStats: + """Statistics for a batch verification run.""" + total_queried: int = 0 + processed: int = 0 + updated: int = 0 + corrected: int = 0 # Addresses fixed via fuzzy matching + failed: int = 0 + skipped: int = 0 + rate_limited: int = 0 + errors: List[str] = field(default_factory=list) + corrections: List[str] = field(default_factory=list) # Log of corrections made + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + + @property + def duration_seconds(self) -> float: + """Calculate batch duration in seconds.""" + if self.start_time and self.end_time: + return (self.end_time - self.start_time).total_seconds() + return 0.0 + + def to_dict(self) -> dict: + """Convert stats to dictionary for JSON response.""" + return { + "total_queried": self.total_queried, + "processed": self.processed, + "updated": self.updated, + "corrected": self.corrected, + "failed": self.failed, + "skipped": self.skipped, + "rate_limited": self.rate_limited, + "duration_seconds": round(self.duration_seconds, 2), + "errors_count": len(self.errors), + "sample_errors": self.errors[:5] if self.errors else [], + "sample_corrections": self.corrections[:5] if self.corrections else [], + } + + +class AddressVerificationAgent: + """ + Lightweight autonomous agent for address verification. + + Implements a ReAct-inspired workflow where each record goes through: + - OBSERVE: Examine the address data + - THINK: Decide if geocoding should be attempted + - ACT: Call geocoding API + - OBSERVE: Examine the result + - REFLECT: Log decision and update database + + Attributes: + session: SQLAlchemy database session + batch_size: Maximum records per batch + commit_size: Records between commits + stats: Running statistics for the batch + geocoder: Nominatim geocoder instance + """ + + def __init__( + self, + session: Session, + batch_size: int = BATCH_SIZE, + commit_size: int = COMMIT_BATCH_SIZE, + ): + """ + Initialize the address verification agent. + + Args: + session: SQLAlchemy session for database operations + batch_size: Max records to process (default from config) + commit_size: Records before intermediate commit + """ + self.session = session + self.batch_size = batch_size + self.commit_size = commit_size + self.stats = BatchStats() + self.geocoder = Nominatim(user_agent=NOMINATIM_USER_AGENT) + + logger.info( + f"Agent initialized: batch_size={batch_size}, commit_size={commit_size}" + ) + + # ========================================================================= + # PHASE 1: PLANNING + # ========================================================================= + + def plan_batch(self) -> List[CustomerCustomer]: + """ + PLANNING PHASE: Query records that need address verification. + + Criteria for selection: + - correct_address = FALSE, OR + - verified_at IS NULL, OR + - verified_at < today (not verified today) + + Returns: + List of CustomerCustomer records to process + """ + logger.info("=" * 60) + logger.info("PLANNING PHASE: Querying records needing verification") + logger.info("=" * 60) + + today = date.today() + + # Build query for records needing verification + query = self.session.query(CustomerCustomer).filter( + or_( + CustomerCustomer.correct_address == False, # noqa: E712 + CustomerCustomer.verified_at.is_(None), + func.date(CustomerCustomer.verified_at) < today, + ) + ).limit(self.batch_size) + + records = query.all() + self.stats.total_queried = len(records) + + logger.info( + f"PLAN RESULT: Found {len(records)} records needing verification", + extra={"record_count": len(records), "batch_limit": self.batch_size} + ) + + # Log sample of record IDs for debugging + if records: + sample_ids = [r.id for r in records[:10]] + logger.debug(f"Sample record IDs: {sample_ids}") + + return records + + # ========================================================================= + # PHASE 2: EXECUTION (ReAct-style per record) + # ========================================================================= + + def process_record(self, customer: CustomerCustomer) -> bool: + """ + EXECUTION PHASE: Process a single record with ReAct-style workflow. + + Steps: + 1. OBSERVE: Build address from record components + 2. THINK: Validate address - skip if obviously invalid + 3. ACT: Call Nominatim geocoder + 4. OBSERVE: Examine geocoding result + 5. REFLECT: Log decision and update database + + Args: + customer: CustomerCustomer record to process + + Returns: + True if record was successfully updated, False otherwise + """ + logger.info("-" * 40) + logger.info(f"Processing record ID: {customer.id}") + + # ----------------------------------------------------------------- + # STEP 1: OBSERVE - Build address from components + # ----------------------------------------------------------------- + logger.debug(f"[OBSERVE] Building address for customer {customer.id}") + address_components = build_address(customer) + + # ----------------------------------------------------------------- + # STEP 2: THINK - Validate address components + # ----------------------------------------------------------------- + logger.debug(f"[THINK] Validating address components") + address_components = validate_address_components(address_components) + + if not address_components.is_valid: + # REFLECT: Skip invalid addresses + logger.info( + f"[REFLECT] Skipping record {customer.id}: " + f"{address_components.validation_error}" + ) + self.stats.skipped += 1 + + # Still update the record to mark it as processed + geocode_result = GeocodeResult( + success=False, + skipped=True, + skip_reason=address_components.validation_error, + error_message=address_components.validation_error, + ) + update_record(self.session, customer, geocode_result, is_valid=False) + return False + + # Format address for geocoding + address_string = format_address_string(address_components) + logger.debug(f"[THINK] Formatted address: {address_string}") + + # ----------------------------------------------------------------- + # STEP 3: ACT - Call geocoding API + # ----------------------------------------------------------------- + logger.debug(f"[ACT] Calling Nominatim geocoder") + geocode_result = geocode_address(address_string, self.geocoder) + + # ----------------------------------------------------------------- + # STEP 4: OBSERVE - Examine geocoding result + # ----------------------------------------------------------------- + logger.debug(f"[OBSERVE] Geocoding result: success={geocode_result.success}") + + if not geocode_result.success: + # ----------------------------------------------------------------- + # STEP 4a: THINK - Try fuzzy matching to correct address + # ----------------------------------------------------------------- + logger.info( + f"[THINK] Geocoding failed, attempting fuzzy street matching..." + ) + + # Get state abbreviation for fuzzy matching + state_abbr = get_state_abbreviation(customer.customer_state) + town = address_components.city + + if state_abbr and town: + # Check if we have street data for this town + street_count = get_town_street_count(self.session, town, state_abbr) + + if street_count > 0: + # Try to correct the address + match = correct_address( + session=self.session, + full_address=address_components.street or "", + town=town, + state=state_abbr, + min_confidence=75.0, + ) + + if match and match.corrected_address: + logger.info( + f"[ACT] Found correction: '{address_components.street}' " + f"-> '{match.corrected_address}' " + f"(confidence: {match.confidence_score:.1f}%)" + ) + + # Build corrected address string + corrected_components = address_components + corrected_components.street = match.corrected_address + corrected_address_string = format_address_string(corrected_components) + + logger.info(f"[ACT] Retrying with corrected address: {corrected_address_string}") + + # Rate limit before retry + rate_limit_sleep() + + # Retry geocoding with corrected address + geocode_result = geocode_address(corrected_address_string, self.geocoder) + + if geocode_result.success: + logger.info( + f"[OBSERVE] Corrected address geocoded successfully!" + ) + self.stats.corrected += 1 + self.stats.corrections.append( + f"ID {customer.id}: '{address_components.street}' " + f"-> '{match.corrected_address}'" + ) + else: + logger.info( + f"[OBSERVE] Corrected address still failed to geocode" + ) + else: + logger.debug( + f"[THINK] No confident fuzzy match found" + ) + else: + logger.debug( + f"[THINK] No street reference data for {town}, {state_abbr}. " + f"Use POST /streets/{town}/{state_abbr} to populate." + ) + + # If still failed after correction attempt + if not geocode_result.success: + # ----------------------------------------------------------------- + # STEP 5a: REFLECT - Handle failed geocoding + # ----------------------------------------------------------------- + logger.info( + f"[REFLECT] Geocoding failed for record {customer.id}: " + f"{geocode_result.error_message}" + ) + self.stats.failed += 1 + self.stats.errors.append( + f"ID {customer.id}: {geocode_result.error_message}" + ) + + update_record(self.session, customer, geocode_result, is_valid=False) + return False + + # Validate geocode result quality + is_valid, validation_reason = validate_geocode_result(geocode_result) + logger.debug(f"[OBSERVE] Validation: valid={is_valid}, reason={validation_reason}") + + # ----------------------------------------------------------------- + # STEP 5b: REFLECT - Update database with result + # ----------------------------------------------------------------- + if is_valid: + logger.info( + f"[REFLECT] Success for record {customer.id}: " + f"lat={geocode_result.latitude}, lon={geocode_result.longitude}" + ) + self.stats.updated += 1 + else: + logger.info( + f"[REFLECT] Invalid result for record {customer.id}: {validation_reason}" + ) + self.stats.failed += 1 + self.stats.errors.append(f"ID {customer.id}: {validation_reason}") + + update_record(self.session, customer, geocode_result, is_valid=is_valid) + return is_valid + + def execute_batch(self, records: List[CustomerCustomer]) -> None: + """ + Execute the batch processing loop with rate limiting. + + Processes records sequentially with proper rate limiting between + geocoding calls. Commits to database periodically. + + Args: + records: List of CustomerCustomer records to process + """ + logger.info("=" * 60) + logger.info("EXECUTION PHASE: Processing records") + logger.info("=" * 60) + + uncommitted_count = 0 + + for i, customer in enumerate(records): + try: + # Process the record + self.process_record(customer) + self.stats.processed += 1 + uncommitted_count += 1 + + # Commit in batches + if uncommitted_count >= self.commit_size: + logger.info(f"Committing batch of {uncommitted_count} records") + self.session.commit() + uncommitted_count = 0 + + # Rate limiting (skip on last record) + if i < len(records) - 1: + rate_limit_sleep() + + except Exception as e: + # Handle unexpected errors - continue processing + logger.error( + f"Unexpected error processing record {customer.id}: {e}", + exc_info=True + ) + self.stats.failed += 1 + self.stats.errors.append(f"ID {customer.id}: Unexpected error: {str(e)}") + self.stats.processed += 1 + + # Rollback the current transaction and continue + self.session.rollback() + uncommitted_count = 0 + + # Final commit for any remaining records + if uncommitted_count > 0: + logger.info(f"Final commit of {uncommitted_count} records") + self.session.commit() + + # ========================================================================= + # PHASE 3: REFLECTION + # ========================================================================= + + def reflect(self) -> dict: + """ + REFLECTION PHASE: Summarize batch results and statistics. + + Logs comprehensive statistics about the batch run and returns + a summary dictionary suitable for API response. + + Returns: + Dictionary with batch statistics + """ + self.stats.end_time = datetime.utcnow() + + logger.info("=" * 60) + logger.info("REFLECTION PHASE: Batch Summary") + logger.info("=" * 60) + + stats_dict = self.stats.to_dict() + + logger.info(f"Total queried: {stats_dict['total_queried']}") + logger.info(f"Processed: {stats_dict['processed']}") + logger.info(f"Updated (valid): {stats_dict['updated']}") + logger.info(f"Corrected: {stats_dict['corrected']}") + logger.info(f"Failed: {stats_dict['failed']}") + logger.info(f"Skipped: {stats_dict['skipped']}") + logger.info(f"Duration: {stats_dict['duration_seconds']}s") + + if stats_dict['errors_count'] > 0: + logger.warning(f"Errors encountered: {stats_dict['errors_count']}") + for error in stats_dict['sample_errors']: + logger.warning(f" - {error}") + + if stats_dict['corrected'] > 0: + logger.info(f"Addresses corrected via fuzzy matching: {stats_dict['corrected']}") + for correction in stats_dict['sample_corrections']: + logger.info(f" - {correction}") + + # Calculate success rate + if stats_dict['processed'] > 0: + success_rate = (stats_dict['updated'] / stats_dict['processed']) * 100 + logger.info(f"Success rate: {success_rate:.1f}%") + stats_dict['success_rate'] = round(success_rate, 1) + else: + stats_dict['success_rate'] = 0.0 + + logger.info("=" * 60) + + return stats_dict + + # ========================================================================= + # MAIN ENTRY POINT + # ========================================================================= + + def run(self) -> dict: + """ + Execute the full agent workflow. + + Runs through all three phases: + 1. Planning - Query records + 2. Execution - Process each record + 3. Reflection - Summarize results + + Returns: + Dictionary with batch statistics and message + """ + logger.info("*" * 60) + logger.info("ADDRESS VERIFICATION AGENT STARTING") + logger.info("*" * 60) + + self.stats.start_time = datetime.utcnow() + + try: + # Phase 1: Planning + records = self.plan_batch() + + if not records: + logger.info("No records to process - batch complete") + self.stats.end_time = datetime.utcnow() + return { + "status": "success", + "message": "No records needed verification", + **self.stats.to_dict(), + } + + # Phase 2: Execution + self.execute_batch(records) + + # Phase 3: Reflection + stats = self.reflect() + + logger.info("*" * 60) + logger.info("ADDRESS VERIFICATION AGENT COMPLETE") + logger.info("*" * 60) + + return { + "status": "success", + "message": f"Batch complete: {stats['updated']} addresses updated", + **stats, + } + + except Exception as e: + logger.error(f"Agent failed with error: {e}", exc_info=True) + self.stats.end_time = datetime.utcnow() + return { + "status": "error", + "message": f"Agent failed: {str(e)}", + **self.stats.to_dict(), + } diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..d6c2060 --- /dev/null +++ b/app/config.py @@ -0,0 +1,184 @@ +""" +Configuration settings for eamco_address_checker. + +This module provides configuration with environment-based switching: +- DEVELOPMENT: Uses 'eamco' database, localhost CORS origins +- PRODUCTION: Uses 'auburnoil' database, production domain CORS origins + +Environment variables are loaded from .env.local or .env.prod depending +on the Docker compose file used. +""" + +import os +from typing import List + +from dotenv import load_dotenv + +# Load environment variables from .env file if present +load_dotenv() + +# ============================================================================= +# ENVIRONMENT MODE +# ============================================================================= + +MODE = os.getenv("MODE", "LOCAL") +CURRENT_SETTINGS = os.getenv("CURRENT_SETTINGS", "DEVELOPMENT") + +if CURRENT_SETTINGS == "PRODUCTION": + print("USING PRODUCTION APPLICATIONCONFIG!!!!!") +else: + print("USING DEVELOPMENT APPLICATIONCONFIG!!!!!") + +# ============================================================================= +# DATABASE CONFIGURATION +# ============================================================================= + +# Database connection components (can be overridden individually) +POSTGRES_USERNAME = os.getenv("POSTGRES_USERNAME", "postgres") +POSTGRES_PW = os.getenv("POSTGRES_PW", "password") +POSTGRES_SERVER = os.getenv("POSTGRES_SERVER", "192.168.1.204") +POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432") + +# Database name differs by environment +if CURRENT_SETTINGS == "PRODUCTION": + POSTGRES_DBNAME = os.getenv("POSTGRES_DBNAME", "auburnoil") +else: + POSTGRES_DBNAME = os.getenv("POSTGRES_DBNAME", "eamco") + +# Build connection URI from components (fallback) +_DEFAULT_DATABASE_URI = "postgresql+psycopg2://{}:{}@{}:{}/{}".format( + POSTGRES_USERNAME, + POSTGRES_PW, + POSTGRES_SERVER, + POSTGRES_PORT, + POSTGRES_DBNAME +) + +# Allow full DATABASE_URL override +DATABASE_URL: str = os.getenv("DATABASE_URL", _DEFAULT_DATABASE_URI) + +# SQLAlchemy binds (for compatibility) +SQLALCHEMY_DATABASE_URI = DATABASE_URL +SQLALCHEMY_BINDS = {POSTGRES_DBNAME: SQLALCHEMY_DATABASE_URI} + +# ============================================================================= +# CORS CONFIGURATION +# ============================================================================= + +# Parse CORS origins from environment (comma-separated) or use defaults +_cors_env = os.getenv("CORS_ORIGINS", "") + +if _cors_env: + CORS_ORIGINS: List[str] = [origin.strip() for origin in _cors_env.split(",")] +elif CURRENT_SETTINGS == "PRODUCTION": + # Production CORS origins + CORS_ORIGINS = [ + "https://oil.edwineames.com", + "https://edwineames.com", + ] +else: + # Development CORS origins + CORS_ORIGINS = [ + "http://localhost:9000", + "https://localhost:9513", + "http://localhost:9514", + "http://localhost:9512", + "http://localhost:9511", + "http://localhost:5173", # Frontend port + "http://localhost:9616", # Authorize service port + ] + +# ============================================================================= +# BATCH PROCESSING CONFIGURATION +# ============================================================================= + +# Maximum records to process in a single batch run +BATCH_SIZE: int = int(os.getenv("BATCH_SIZE", "150")) + +# Records to process before committing to database +COMMIT_BATCH_SIZE: int = int(os.getenv("COMMIT_BATCH_SIZE", "20")) + +# ============================================================================= +# GEOCODING CONFIGURATION (Nominatim) +# ============================================================================= + +# User agent for Nominatim API (required - identifies your application) +NOMINATIM_USER_AGENT: str = "Unraid-EamcoAddressChecker/1.0 (eeames214@gmail.com)" + +# Rate limiting: Sleep range between requests (Nominatim requires 1 req/sec max) +MIN_SLEEP_SECONDS: float = float(os.getenv("MIN_SLEEP", "1.2")) +MAX_SLEEP_SECONDS: float = float(os.getenv("MAX_SLEEP", "1.8")) + +# Geocoding timeout in seconds +GEOCODE_TIMEOUT: int = int(os.getenv("GEOCODE_TIMEOUT", "10")) + +# ============================================================================= +# STATE MAPPING +# ============================================================================= + +# Integer -> US State Abbreviation mapping +# Replace with proper states table lookup when available +STATE_MAPPING: dict[int, str] = { + 1: "AL", # Alabama + 2: "AK", # Alaska + 3: "AS", # American Samoa + 4: "AZ", # Arizona + 5: "AR", # Arkansas + 6: "CA", # California + 7: "CO", # Colorado + 8: "CT", # Connecticut + 9: "DE", # Delaware + 10: "DC", # District of Columbia + 11: "FL", # Florida + 12: "GA", # Georgia + 13: "GU", # Guam + 14: "HI", # Hawaii + 15: "ID", # Idaho + 16: "IL", # Illinois + 17: "IN", # Indiana + 18: "IA", # Iowa + 19: "KS", # Kansas + 20: "KY", # Kentucky + 21: "LA", # Louisiana + 22: "ME", # Maine + 23: "MD", # Maryland + 24: "MA", # Massachusetts + 25: "MI", # Michigan + 26: "MN", # Minnesota + 27: "MS", # Mississippi + 28: "MO", # Missouri + 29: "MT", # Montana + 30: "NE", # Nebraska + 31: "NV", # Nevada + 32: "NH", # New Hampshire + 33: "NJ", # New Jersey + 34: "NM", # New Mexico + 35: "NY", # New York + 36: "NC", # North Carolina + 37: "ND", # North Dakota + 38: "OH", # Ohio + 39: "OK", # Oklahoma + 40: "OR", # Oregon + 41: "PA", # Pennsylvania + 42: "PR", # Puerto Rico + 43: "RI", # Rhode Island + 44: "SC", # South Carolina + 45: "SD", # South Dakota + 46: "TN", # Tennessee + 47: "TX", # Texas + 48: "UT", # Utah + 49: "VT", # Vermont + 50: "VA", # Virginia + 51: "VI", # Virgin Islands + 52: "WA", # Washington + 53: "WV", # West Virginia + 54: "WI", # Wisconsin + 55: "WY", # Wyoming +} + +# ============================================================================= +# LOGGING CONFIGURATION +# ============================================================================= + +LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") +LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..f3fc810 --- /dev/null +++ b/app/main.py @@ -0,0 +1,558 @@ +""" +eamco_address_checker - FastAPI Address Verification Microservice. + +This microservice provides a batch job endpoint for verifying customer addresses +using geocoding. Designed to be triggered via cron from Unraid. + +Endpoints: + GET /health - Health check with database connectivity status + POST /verify-addresses - Trigger batch address verification + POST /reset-verifications - Clear all verification data for re-checking + POST /streets/{town}/{state} - Fetch and store streets from OSM for a town + GET /streets/{town}/{state} - Get street count for a town + +Usage: + # Development + uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 + + # Production (Docker) + docker run -p 8000:8000 eamco_address_checker + + # Trigger from cron + curl -X POST http://localhost:8000/verify-addresses +""" + +import logging +import sys +from contextlib import contextmanager +from typing import Generator + +from fastapi import FastAPI, Depends, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import SQLAlchemyError + +from app.config import ( + DATABASE_URL, + CORS_ORIGINS, + LOG_LEVEL, + LOG_FORMAT, + BATCH_SIZE, + COMMIT_BATCH_SIZE, +) +from app.agent import AddressVerificationAgent +from app.models import CustomerCustomer, StreetReference, Base +from app.streets import ( + populate_streets_for_town, + get_town_street_count, +) + +# ============================================================================= +# LOGGING CONFIGURATION +# ============================================================================= + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format=LOG_FORMAT, + handlers=[ + logging.StreamHandler(sys.stdout), + ] +) +logger = logging.getLogger(__name__) + +# ============================================================================= +# DATABASE SETUP +# ============================================================================= + +# Create SQLAlchemy engine with connection pooling +engine = create_engine( + DATABASE_URL, + pool_pre_ping=True, # Verify connections before use + pool_size=5, + max_overflow=10, + echo=False, # Set to True for SQL debugging +) + +# Session factory +SessionLocal = sessionmaker( + autocommit=False, + autoflush=False, + bind=engine, +) + + +def get_db() -> Generator[Session, None, None]: + """ + Dependency that provides a database session. + + Yields a SQLAlchemy session and ensures proper cleanup. + """ + db = SessionLocal() + try: + yield db + finally: + db.close() + + +@contextmanager +def get_db_session() -> Generator[Session, None, None]: + """ + Context manager for database sessions (non-dependency use). + """ + db = SessionLocal() + try: + yield db + finally: + db.close() + + +def check_db_connection() -> bool: + """ + Test database connectivity. + + Returns: + True if database is reachable, False otherwise + """ + try: + with get_db_session() as db: + db.execute(text("SELECT 1")) + return True + except SQLAlchemyError as e: + logger.error(f"Database connection failed: {e}") + return False + + +# ============================================================================= +# FASTAPI APPLICATION +# ============================================================================= + +app = FastAPI( + title="eamco_address_checker", + description="Address verification microservice using Nominatim geocoding", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", +) + +# ============================================================================= +# CORS MIDDLEWARE +# ============================================================================= + +app.add_middleware( + CORSMiddleware, + allow_origins=CORS_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# ============================================================================= +# PYDANTIC MODELS (Response Schemas) +# ============================================================================= + + +class HealthResponse(BaseModel): + """Health check response schema.""" + status: str + db_connected: bool + + +class VerificationResponse(BaseModel): + """Address verification batch response schema.""" + status: str + message: str + total_queried: int + processed: int + updated: int + corrected: int + failed: int + skipped: int + rate_limited: int + duration_seconds: float + success_rate: float + errors_count: int + sample_errors: list + sample_corrections: list + + +class ResetResponse(BaseModel): + """Reset verifications response schema.""" + status: str + message: str + records_reset: int + + +class StreetPopulateResponse(BaseModel): + """Response for street population endpoint.""" + status: str + message: str + town: str + state: str + streets_added: int + streets_updated: int + total_found: int + errors: list + + +class StreetInfoResponse(BaseModel): + """Response for street info endpoint.""" + town: str + state: str + street_count: int + message: str + + +# ============================================================================= +# ENDPOINTS +# ============================================================================= + + +@app.get("/", include_in_schema=False) +async def root(): + """Root endpoint - redirect to docs.""" + return { + "service": "eamco_address_checker", + "version": "1.0.0", + "docs": "/docs", + } + + +@app.get("/health", response_model=HealthResponse, tags=["Health"]) +async def health_check(): + """ + Health check endpoint. + + Returns service status and database connectivity. + Use this endpoint for container health checks and monitoring. + + Returns: + HealthResponse with status and db_connected flag + """ + db_connected = check_db_connection() + + return HealthResponse( + status="healthy" if db_connected else "degraded", + db_connected=db_connected, + ) + + +@app.post( + "/verify-addresses", + response_model=VerificationResponse, + tags=["Verification"], +) +async def verify_addresses(db: Session = Depends(get_db)): + """ + Trigger batch address verification. + + This endpoint runs a synchronous batch job that: + 1. Queries records needing verification (max BATCH_SIZE) + 2. Geocodes each address using Nominatim + 3. Updates records with lat/long and verification status + 4. Returns statistics about the batch run + + The batch respects Nominatim rate limits (1 req/sec) so execution + time is approximately BATCH_SIZE * 1.5 seconds. + + Use this endpoint from Unraid cron: + curl -X POST http://localhost:8000/verify-addresses + + Returns: + VerificationResponse with batch statistics + """ + logger.info("=" * 60) + logger.info("VERIFY-ADDRESSES ENDPOINT CALLED") + logger.info("=" * 60) + logger.info(f"Configuration: BATCH_SIZE={BATCH_SIZE}, COMMIT_SIZE={COMMIT_BATCH_SIZE}") + + try: + # Initialize and run the agent + agent = AddressVerificationAgent( + session=db, + batch_size=BATCH_SIZE, + commit_size=COMMIT_BATCH_SIZE, + ) + + result = agent.run() + + logger.info(f"Batch complete: {result.get('message', 'No message')}") + + return VerificationResponse( + status=result.get("status", "unknown"), + message=result.get("message", ""), + total_queried=result.get("total_queried", 0), + processed=result.get("processed", 0), + updated=result.get("updated", 0), + corrected=result.get("corrected", 0), + failed=result.get("failed", 0), + skipped=result.get("skipped", 0), + rate_limited=result.get("rate_limited", 0), + duration_seconds=result.get("duration_seconds", 0.0), + success_rate=result.get("success_rate", 0.0), + errors_count=result.get("errors_count", 0), + sample_errors=result.get("sample_errors", []), + sample_corrections=result.get("sample_corrections", []), + ) + + except SQLAlchemyError as e: + logger.error(f"Database error during verification: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Database error: {str(e)}" + ) + + except Exception as e: + logger.error(f"Unexpected error during verification: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Verification failed: {str(e)}" + ) + + +@app.post( + "/reset-verifications", + response_model=ResetResponse, + tags=["Verification"], +) +async def reset_verifications(db: Session = Depends(get_db)): + """ + Reset all address verifications for re-checking. + + This endpoint clears verification data for ALL customer records: + - Sets correct_address = FALSE + - Sets verified_at = NULL + - Clears customer_latitude and customer_longitude + + After calling this endpoint, all addresses will be eligible for + re-verification on the next /verify-addresses call. + + WARNING: This is a mass update operation. Use with caution. + + Returns: + ResetResponse with count of records reset + """ + logger.info("=" * 60) + logger.info("RESET-VERIFICATIONS ENDPOINT CALLED") + logger.info("=" * 60) + + try: + # Count records before update + total_records = db.query(CustomerCustomer).count() + logger.info(f"Total customer records: {total_records}") + + # Mass update to reset all verification data + updated_count = db.query(CustomerCustomer).update( + { + CustomerCustomer.correct_address: False, + CustomerCustomer.verified_at: None, + CustomerCustomer.customer_latitude: None, + CustomerCustomer.customer_longitude: None, + }, + synchronize_session=False + ) + + db.commit() + + logger.info(f"Reset {updated_count} records successfully") + + return ResetResponse( + status="success", + message=f"Reset {updated_count} address verifications. All addresses are now eligible for re-verification.", + records_reset=updated_count, + ) + + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error during reset: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Database error: {str(e)}" + ) + + except Exception as e: + db.rollback() + logger.error(f"Unexpected error during reset: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Reset failed: {str(e)}" + ) + + +# ============================================================================= +# STREET REFERENCE ENDPOINTS +# ============================================================================= + + +@app.post( + "/streets/{town}/{state}", + response_model=StreetPopulateResponse, + tags=["Streets"], +) +async def populate_streets( + town: str, + state: str, + clear_existing: bool = False, + db: Session = Depends(get_db) +): + """ + Fetch and store all streets for a town from OpenStreetMap. + + This endpoint queries the OSM Overpass API to get all named streets + in the specified town and stores them in the street_reference table + for fuzzy matching during address verification. + + Args: + town: Town/city name (e.g., "Boston") + state: 2-letter state abbreviation (e.g., "MA") + clear_existing: If true, delete existing streets for this town first + + Example: + curl -X POST http://localhost:8000/streets/Boston/MA + + Returns: + StreetPopulateResponse with count of streets added + """ + logger.info("=" * 60) + logger.info(f"POPULATE STREETS: {town}, {state}") + logger.info("=" * 60) + + # Validate state abbreviation (2 letters) + if len(state) != 2 or not state.isalpha(): + raise HTTPException( + status_code=400, + detail="State must be a 2-letter abbreviation (e.g., MA, NY, CA)" + ) + + try: + # Ensure the street_reference table exists + Base.metadata.create_all(bind=engine, tables=[StreetReference.__table__]) + + result = populate_streets_for_town( + session=db, + town=town, + state=state.upper(), + clear_existing=clear_existing, + ) + + return StreetPopulateResponse( + status="success" if result.success else "partial", + message=result.message, + town=town, + state=state.upper(), + streets_added=result.streets_added, + streets_updated=result.streets_updated, + total_found=result.total_found, + errors=result.errors, + ) + + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error populating streets: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Database error: {str(e)}" + ) + + except Exception as e: + logger.error(f"Error populating streets: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to populate streets: {str(e)}" + ) + + +@app.get( + "/streets/{town}/{state}", + response_model=StreetInfoResponse, + tags=["Streets"], +) +async def get_street_info( + town: str, + state: str, + db: Session = Depends(get_db) +): + """ + Get information about streets stored for a town. + + Returns the count of streets in the reference table for the + specified town/state combination. + + Args: + town: Town/city name + state: 2-letter state abbreviation + + Example: + curl http://localhost:8000/streets/Boston/MA + + Returns: + StreetInfoResponse with street count + """ + # Validate state abbreviation + if len(state) != 2 or not state.isalpha(): + raise HTTPException( + status_code=400, + detail="State must be a 2-letter abbreviation" + ) + + count = get_town_street_count(db, town, state.upper()) + + if count == 0: + message = f"No streets found for {town}, {state}. Use POST to populate." + else: + message = f"Found {count} streets for {town}, {state}" + + return StreetInfoResponse( + town=town, + state=state.upper(), + street_count=count, + message=message, + ) + + +# ============================================================================= +# STARTUP/SHUTDOWN EVENTS +# ============================================================================= + + +@app.on_event("startup") +async def startup_event(): + """Application startup - log configuration and test DB connection.""" + logger.info("*" * 60) + logger.info("eamco_address_checker STARTING") + logger.info("*" * 60) + logger.info(f"Database URL: {DATABASE_URL[:50]}...") + logger.info(f"CORS Origins: {CORS_ORIGINS}") + logger.info(f"Batch Size: {BATCH_SIZE}") + logger.info(f"Commit Batch Size: {COMMIT_BATCH_SIZE}") + + # Test database connection + if check_db_connection(): + logger.info("Database connection: OK") + else: + logger.warning("Database connection: FAILED - service may be degraded") + + +@app.on_event("shutdown") +async def shutdown_event(): + """Application shutdown - cleanup.""" + logger.info("eamco_address_checker SHUTTING DOWN") + engine.dispose() + logger.info("Database connections closed") + + +# ============================================================================= +# MAIN ENTRY POINT (for direct execution) +# ============================================================================= + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + "app.main:app", + host="0.0.0.0", + port=8000, + reload=True, + log_level="info", + ) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..ac6683c --- /dev/null +++ b/app/models.py @@ -0,0 +1,127 @@ +""" +SQLAlchemy 2.x ORM Models for eamco_address_checker. + +This module defines the database models using SQLAlchemy's DeclarativeBase. + +Models: + CustomerCustomer: Customer records with address fields for geocoding + StreetReference: Known streets by town/state for fuzzy matching corrections +""" + +from sqlalchemy import Column, Integer, String, VARCHAR, TIMESTAMP, BOOLEAN, Index +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + """Base class for all SQLAlchemy models.""" + pass + + +class CustomerCustomer(Base): + """ + Customer model representing address and contact information. + + The verified_at timestamp tracks when the address was last geocoded. + The correct_address boolean indicates if geocoding was successful. + + Attributes: + id: Primary key + auth_net_profile_id: Authorize.net customer profile ID + account_number: Customer account number (max 25 chars) + customer_last_name: Customer's last name (max 250 chars) + customer_first_name: Customer's first name (max 250 chars) + customer_town: City/town name (max 140 chars) + customer_state: Integer mapping to US state abbreviation + customer_zip: ZIP code (max 25 chars) + customer_first_call: Timestamp of first customer contact + customer_email: Customer email address (max 500 chars) + customer_automatic: Automatic billing flag + customer_phone_number: Phone number (max 25 chars) + customer_home_type: Type of residence + customer_apt: Apartment/unit number (max 140 chars) + customer_address: Street address (max 1000 chars) + company_id: Associated company ID + customer_latitude: Geocoded latitude as string (max 250 chars) + customer_longitude: Geocoded longitude as string (max 250 chars) + correct_address: Flag indicating successful geocoding + verified_at: Timestamp of last verification attempt + """ + __tablename__ = "customer_customer" + __table_args__ = {"schema": "public"} + + id = Column(Integer, primary_key=True, autoincrement=True) + auth_net_profile_id = Column(String, unique=True, index=True, nullable=True) + account_number = Column(VARCHAR(25)) + customer_last_name = Column(VARCHAR(250)) + customer_first_name = Column(VARCHAR(250)) + customer_town = Column(VARCHAR(140)) + customer_state = Column(Integer) # Integer -> 2-letter US state abbreviation + customer_zip = Column(VARCHAR(25)) + customer_first_call = Column(TIMESTAMP) + customer_email = Column(VARCHAR(500)) + customer_automatic = Column(Integer) + customer_phone_number = Column(VARCHAR(25)) + customer_home_type = Column(Integer) + customer_apt = Column(VARCHAR(140)) + customer_address = Column(VARCHAR(1000)) + company_id = Column(Integer) + customer_latitude = Column(VARCHAR(250)) + customer_longitude = Column(VARCHAR(250)) + correct_address = Column(BOOLEAN, default=False, nullable=False) + verified_at = Column(TIMESTAMP, nullable=True) # NEW: Tracks verification timestamp + + def __repr__(self) -> str: + return ( + f"" + ) + + +class StreetReference(Base): + """ + Reference table of known streets for fuzzy matching address corrections. + + Streets are populated per town/state from OpenStreetMap data. + Used to correct misspellings and wrong street suffixes (rd vs dr, etc.) + when geocoding fails. + + Attributes: + id: Primary key + street_name: Full street name (e.g., "Main Street") + street_name_normalized: Lowercase, cleaned for matching + street_number_low: Lowest known street number (if available) + street_number_high: Highest known street number (if available) + town: Town/city name + town_normalized: Lowercase town name for matching + state: 2-letter state abbreviation (e.g., "MA") + zip_codes: Comma-separated ZIP codes this street spans + osm_id: OpenStreetMap way ID for reference + created_at: When this record was added + """ + __tablename__ = "street_reference" + __table_args__ = ( + Index("ix_street_ref_town_state", "town_normalized", "state"), + Index("ix_street_ref_name_town", "street_name_normalized", "town_normalized"), + {"schema": "public"}, + ) + + id = Column(Integer, primary_key=True, autoincrement=True) + street_name = Column(VARCHAR(500), nullable=False) + street_name_normalized = Column(VARCHAR(500), nullable=False, index=True) + street_number_low = Column(Integer, nullable=True) + street_number_high = Column(Integer, nullable=True) + town = Column(VARCHAR(140), nullable=False) + town_normalized = Column(VARCHAR(140), nullable=False) + state = Column(VARCHAR(2), nullable=False) + zip_codes = Column(VARCHAR(100), nullable=True) + osm_id = Column(String, nullable=True, index=True) + created_at = Column(TIMESTAMP, nullable=False) + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/app/streets.py b/app/streets.py new file mode 100644 index 0000000..cce6d25 --- /dev/null +++ b/app/streets.py @@ -0,0 +1,572 @@ +""" +Street reference tools for address correction. + +This module provides functionality to: +1. Fetch streets from OpenStreetMap Overpass API for a given town/state +2. Store streets in the StreetReference table +3. Perform fuzzy matching to correct misspelled addresses + +The fuzzy matching handles common issues like: +- Misspelled street names ("Mian St" -> "Main St") +- Wrong suffixes ("Main Rd" -> "Main St") +- Missing/extra spaces +- Abbreviated vs full names ("St" vs "Street") +""" + +import logging +import re +import time +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional, Tuple + +import requests +from rapidfuzz import fuzz, process +from sqlalchemy.orm import Session + +from app.config import STATE_MAPPING +from app.models import StreetReference + +logger = logging.getLogger(__name__) + +# Overpass API endpoints (multiple for fallback) +OVERPASS_API_URLS = [ + "https://overpass-api.de/api/interpreter", + "https://overpass.kumi.systems/api/interpreter", + "https://maps.mail.ru/osm/tools/overpass/api/interpreter", +] + +# Common street suffix variations for normalization +STREET_SUFFIXES = { + # Standard -> variations + "street": ["st", "str", "strt"], + "avenue": ["ave", "av", "aven"], + "road": ["rd", "rod"], + "drive": ["dr", "drv", "driv"], + "lane": ["ln", "lne"], + "court": ["ct", "crt", "cour"], + "circle": ["cir", "circ", "crcl"], + "boulevard": ["blvd", "boul", "blv"], + "place": ["pl", "plc"], + "terrace": ["ter", "terr", "trc"], + "way": ["wy"], + "highway": ["hwy", "hiway", "hgwy"], + "parkway": ["pkwy", "pky", "pkway"], + "square": ["sq", "sqr"], + "trail": ["trl", "tr"], + "crossing": ["xing", "crssng"], + "heights": ["hts", "hgts"], + "point": ["pt", "pnt"], + "ridge": ["rdg", "rdge"], + "valley": ["vly", "vlly"], + "view": ["vw", "viw"], + "center": ["ctr", "cntr", "centre"], + "north": ["n"], + "south": ["s"], + "east": ["e"], + "west": ["w"], + "northeast": ["ne"], + "northwest": ["nw"], + "southeast": ["se"], + "southwest": ["sw"], +} + +# Build reverse lookup: abbreviation -> full form +SUFFIX_TO_FULL = {} +for full, abbrevs in STREET_SUFFIXES.items(): + for abbr in abbrevs: + SUFFIX_TO_FULL[abbr] = full + SUFFIX_TO_FULL[full] = full # Also map full to itself + + +@dataclass +class StreetMatch: + """Result of fuzzy street matching.""" + original_street: str + matched_street: str + confidence_score: float + town: str + state: str + street_ref_id: int + corrected_address: Optional[str] = None + + +@dataclass +class FetchResult: + """Result of fetching streets from OSM.""" + success: bool + streets_added: int + streets_updated: int + total_found: int + message: str + errors: List[str] + + +def normalize_street_name(street: str) -> str: + """ + Normalize a street name for fuzzy matching. + + - Lowercase + - Remove extra whitespace + - Expand common abbreviations to full form + - Remove punctuation + + Args: + street: Raw street name + + Returns: + Normalized street name + """ + if not street: + return "" + + # Lowercase and strip + normalized = street.lower().strip() + + # Remove punctuation except hyphens + normalized = re.sub(r"[.,']", "", normalized) + + # Normalize whitespace + normalized = re.sub(r"\s+", " ", normalized) + + # Split into words and expand abbreviations + words = normalized.split() + expanded_words = [] + for word in words: + if word in SUFFIX_TO_FULL: + expanded_words.append(SUFFIX_TO_FULL[word]) + else: + expanded_words.append(word) + + return " ".join(expanded_words) + + +def extract_street_number(address: str) -> Tuple[Optional[str], str]: + """ + Extract street number from an address string. + + Args: + address: Full address like "123 Main Street" + + Returns: + Tuple of (street_number, remaining_address) + """ + if not address: + return None, "" + + # Match leading number (possibly with letter suffix like "123A") + match = re.match(r"^(\d+[A-Za-z]?)\s+(.+)$", address.strip()) + if match: + return match.group(1), match.group(2) + + return None, address.strip() + + +def get_state_name(state_abbr: str) -> str: + """ + Get full state name from abbreviation for Overpass query. + + Args: + state_abbr: 2-letter state abbreviation + + Returns: + Full state name + """ + state_names = { + "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", + "CA": "California", "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", + "DC": "District of Columbia", "FL": "Florida", "GA": "Georgia", "HI": "Hawaii", + "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", + "KS": "Kansas", "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", + "MD": "Maryland", "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", + "MS": "Mississippi", "MO": "Missouri", "MT": "Montana", "NE": "Nebraska", + "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico", + "NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", + "OK": "Oklahoma", "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", + "SC": "South Carolina", "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", + "UT": "Utah", "VT": "Vermont", "VA": "Virginia", "WA": "Washington", + "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming", + "PR": "Puerto Rico", "VI": "Virgin Islands", "GU": "Guam", "AS": "American Samoa", + } + return state_names.get(state_abbr.upper(), state_abbr) + + +def fetch_streets_from_osm(town: str, state: str) -> Tuple[List[dict], str]: + """ + Fetch all streets in a town from OpenStreetMap using Overpass API. + + Args: + town: Town/city name + state: 2-letter state abbreviation + + Returns: + Tuple of (list of street dicts, error message or empty string) + """ + state_name = get_state_name(state) + state_upper = state.upper() + + # Simpler, more reliable Overpass query + # Uses geocodeArea which is optimized for place lookups + query = f""" + [out:json][timeout:120]; + + // Use geocodeArea for reliable city lookup with state context + {{geocodeArea:{town}, {state_name}, United States}}->.city; + + // Get all named streets in the city + way["highway"]["name"](area.city); + out tags; + """ + + # Alternative query if geocodeArea fails (more explicit) + fallback_query = f""" + [out:json][timeout:120]; + + // Find state by ISO code + area["ISO3166-2"="US-{state_upper}"]->.state; + + // Find city/town within state + ( + relation["name"="{town}"]["type"="boundary"](area.state); + way["name"="{town}"]["place"](area.state); + node["name"="{town}"]["place"](area.state); + ); + map_to_area->.city; + + // Get streets + way["highway"]["name"](area.city); + out tags; + """ + + # Most reliable: search by name within bounding box of state + # This uses Nominatim-style search which is very reliable + simple_query = f""" + [out:json][timeout:60]; + area["name"="{state_name}"]["boundary"="administrative"]["admin_level"="4"]->.state; + area["name"="{town}"](area.state)->.city; + way["highway"]["name"](area.city); + out tags; + """ + + queries = [simple_query, query, fallback_query] + query_names = ["simple", "geocodeArea", "fallback"] + + logger.info(f"Fetching streets from OSM for {town}, {state_name}") + + last_error = "" + + for api_url in OVERPASS_API_URLS: + for q, q_name in zip(queries, query_names): + try: + logger.info(f"Trying {q_name} query on {api_url.split('/')[2]}...") + logger.debug(f"Query: {q}") + + response = requests.post( + api_url, + data={"data": q}, + timeout=120, + headers={"User-Agent": "EamcoAddressChecker/1.0"} + ) + + if response.status_code == 429: + logger.warning("Rate limited, waiting 30s...") + time.sleep(30) + continue + + if response.status_code == 504: + logger.warning(f"Timeout on {q_name} query, trying next...") + continue + + response.raise_for_status() + + data = response.json() + elements = data.get("elements", []) + + if elements: + logger.info(f"Success with {q_name} query: {len(elements)} street segments") + # Process and return results + streets = [] + seen_names = set() + + for element in elements: + tags = element.get("tags", {}) + name = tags.get("name") + + if name and name.lower() not in seen_names: + seen_names.add(name.lower()) + streets.append({ + "name": name, + "osm_id": str(element.get("id", "")), + "highway_type": tags.get("highway", ""), + }) + + logger.info(f"Extracted {len(streets)} unique street names") + return streets, "" + else: + logger.debug(f"No results from {q_name} query") + + except requests.exceptions.Timeout: + last_error = f"Timeout on {api_url}" + logger.warning(last_error) + continue + + except requests.exceptions.RequestException as e: + last_error = f"Request error: {str(e)}" + logger.warning(last_error) + continue + + except Exception as e: + last_error = f"Error: {str(e)}" + logger.warning(last_error) + continue + + # All attempts failed + error = f"All Overpass queries failed for {town}, {state}. Last error: {last_error}" + logger.error(error) + return [], error + + +def populate_streets_for_town( + session: Session, + town: str, + state: str, + clear_existing: bool = False +) -> FetchResult: + """ + Fetch streets from OSM and populate the StreetReference table. + + Args: + session: SQLAlchemy session + town: Town/city name + state: 2-letter state abbreviation + clear_existing: If True, delete existing streets for this town first + + Returns: + FetchResult with statistics + """ + state = state.upper() + town_normalized = town.lower().strip() + errors = [] + + logger.info(f"Populating streets for {town}, {state}") + + # Optionally clear existing streets for this town + if clear_existing: + deleted = session.query(StreetReference).filter( + StreetReference.town_normalized == town_normalized, + StreetReference.state == state + ).delete(synchronize_session=False) + session.commit() + logger.info(f"Cleared {deleted} existing street records") + + # Fetch from OSM + streets, error = fetch_streets_from_osm(town, state) + + if error: + errors.append(error) + + if not streets: + return FetchResult( + success=len(errors) == 0, + streets_added=0, + streets_updated=0, + total_found=0, + message=f"No streets found for {town}, {state}", + errors=errors, + ) + + # Check for existing streets to avoid duplicates + existing_streets = session.query(StreetReference).filter( + StreetReference.town_normalized == town_normalized, + StreetReference.state == state + ).all() + + existing_names = {s.street_name_normalized for s in existing_streets} + + added = 0 + now = datetime.utcnow() + + for street_data in streets: + name = street_data["name"] + name_normalized = normalize_street_name(name) + + if name_normalized in existing_names: + continue + + street_ref = StreetReference( + street_name=name, + street_name_normalized=name_normalized, + town=town, + town_normalized=town_normalized, + state=state, + osm_id=street_data.get("osm_id"), + created_at=now, + ) + session.add(street_ref) + existing_names.add(name_normalized) + added += 1 + + session.commit() + + logger.info(f"Added {added} new streets for {town}, {state}") + + return FetchResult( + success=True, + streets_added=added, + streets_updated=0, + total_found=len(streets), + message=f"Successfully added {added} streets for {town}, {state}", + errors=errors, + ) + + +def find_matching_street( + session: Session, + street_input: str, + town: str, + state: str, + min_confidence: float = 70.0 +) -> Optional[StreetMatch]: + """ + Find the best matching street for a potentially misspelled input. + + Uses fuzzy string matching with rapidfuzz to find the closest + match in the StreetReference table. + + Args: + session: SQLAlchemy session + street_input: The street name to match (may be misspelled) + town: Town/city to search within + state: State abbreviation + min_confidence: Minimum match confidence (0-100) + + Returns: + StreetMatch if found above threshold, None otherwise + """ + state = state.upper() + town_normalized = town.lower().strip() + + # Normalize the input for matching + input_normalized = normalize_street_name(street_input) + + # Get all streets for this town + streets = session.query(StreetReference).filter( + StreetReference.town_normalized == town_normalized, + StreetReference.state == state + ).all() + + if not streets: + logger.debug(f"No reference streets found for {town}, {state}") + return None + + # Build list of (normalized_name, street_object) for matching + choices = [(s.street_name_normalized, s) for s in streets] + + # Use rapidfuzz to find best match + # We use token_set_ratio which handles word order differences well + best_match = None + best_score = 0 + + for normalized_name, street_obj in choices: + # Try multiple scoring methods and take the best + scores = [ + fuzz.ratio(input_normalized, normalized_name), + fuzz.partial_ratio(input_normalized, normalized_name), + fuzz.token_sort_ratio(input_normalized, normalized_name), + fuzz.token_set_ratio(input_normalized, normalized_name), + ] + score = max(scores) + + if score > best_score: + best_score = score + best_match = street_obj + + if best_match and best_score >= min_confidence: + logger.info( + f"Fuzzy match: '{street_input}' -> '{best_match.street_name}' " + f"(confidence: {best_score:.1f}%)" + ) + return StreetMatch( + original_street=street_input, + matched_street=best_match.street_name, + confidence_score=best_score, + town=best_match.town, + state=best_match.state, + street_ref_id=best_match.id, + ) + + logger.debug( + f"No confident match for '{street_input}' " + f"(best: {best_score:.1f}%, threshold: {min_confidence}%)" + ) + return None + + +def correct_address( + session: Session, + full_address: str, + town: str, + state: str, + min_confidence: float = 75.0 +) -> Optional[StreetMatch]: + """ + Attempt to correct a full address using fuzzy street matching. + + Extracts the street portion, finds a match, and returns + a corrected address with the matched street name. + + Args: + session: SQLAlchemy session + full_address: Full street address (e.g., "123 Mian St") + town: Town/city name + state: State abbreviation + min_confidence: Minimum match confidence + + Returns: + StreetMatch with corrected_address if match found, None otherwise + """ + # Extract street number and street name + street_number, street_name = extract_street_number(full_address) + + if not street_name: + return None + + # Find matching street + match = find_matching_street( + session=session, + street_input=street_name, + town=town, + state=state, + min_confidence=min_confidence, + ) + + if match: + # Build corrected address + if street_number: + match.corrected_address = f"{street_number} {match.matched_street}" + else: + match.corrected_address = match.matched_street + + logger.info( + f"Address correction: '{full_address}' -> '{match.corrected_address}'" + ) + + return match + + +def get_town_street_count(session: Session, town: str, state: str) -> int: + """ + Get the number of streets in the reference table for a town. + + Args: + session: SQLAlchemy session + town: Town/city name + state: State abbreviation + + Returns: + Number of streets in the reference table + """ + return session.query(StreetReference).filter( + StreetReference.town_normalized == town.lower().strip(), + StreetReference.state == state.upper() + ).count() diff --git a/app/tools.py b/app/tools.py new file mode 100644 index 0000000..7dac715 --- /dev/null +++ b/app/tools.py @@ -0,0 +1,389 @@ +""" +Geocoding tools for eamco_address_checker. + +This module provides modular tool functions for the agentic address verification +workflow. Each function represents a discrete action in the ReAct-style pipeline. + +Tools: + - build_address(): Constructs full US address string from components + - validate_address_components(): Validates required address fields + - geocode_address(): Calls Nominatim API to get lat/long + - validate_geocode_result(): Checks quality of geocoding result + - update_record(): Updates database record with geocoding results +""" + +import logging +import random +import time +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Tuple + +from geopy.geocoders import Nominatim +from geopy.exc import GeocoderTimedOut, GeocoderServiceError, GeocoderUnavailable +from sqlalchemy.orm import Session + +from app.config import ( + NOMINATIM_USER_AGENT, + MIN_SLEEP_SECONDS, + MAX_SLEEP_SECONDS, + GEOCODE_TIMEOUT, + STATE_MAPPING, +) +from app.models import CustomerCustomer + +logger = logging.getLogger(__name__) + + +@dataclass +class GeocodeResult: + """Result from geocoding operation.""" + success: bool + latitude: Optional[str] = None + longitude: Optional[str] = None + raw_address: Optional[str] = None + country_code: Optional[str] = None + error_message: Optional[str] = None + skipped: bool = False + skip_reason: Optional[str] = None + + +@dataclass +class AddressComponents: + """Structured address components for geocoding.""" + street: Optional[str] + apt: Optional[str] + city: Optional[str] + state: Optional[str] + zip_code: Optional[str] + is_valid: bool = True + validation_error: Optional[str] = None + + +def get_state_abbreviation(state_id: Optional[int]) -> Optional[str]: + """ + Convert state integer ID to 2-letter US state abbreviation. + + Args: + state_id: Integer ID from database + + Returns: + 2-letter state abbreviation or None if not found + + Note: + Replace with proper states table lookup when available + """ + if state_id is None: + return None + return STATE_MAPPING.get(state_id) + + +def build_address(customer: CustomerCustomer) -> AddressComponents: + """ + TOOL: Build full US address string from customer record components. + + Constructs a normalized address string suitable for geocoding. + Format: "street, apt, city, state zip" + + Args: + customer: CustomerCustomer record with address fields + + Returns: + AddressComponents dataclass with parsed components and validation status + """ + # Extract and clean components + street = (customer.customer_address or "").strip() + apt = (customer.customer_apt or "").strip() + city = (customer.customer_town or "").strip() + state = get_state_abbreviation(customer.customer_state) + zip_code = (customer.customer_zip or "").strip() + + logger.debug( + "Building address", + extra={ + "customer_id": customer.id, + "street": street, + "apt": apt, + "city": city, + "state": state, + "zip": zip_code, + } + ) + + return AddressComponents( + street=street if street else None, + apt=apt if apt else None, + city=city if city else None, + state=state, + zip_code=zip_code if zip_code else None, + ) + + +def validate_address_components(components: AddressComponents) -> AddressComponents: + """ + TOOL: Validate that address has minimum required components. + + An address is considered valid for geocoding if it has: + - Street address (required) + - City (required) + - ZIP code (required) + - State is recommended but not strictly required + + Args: + components: AddressComponents to validate + + Returns: + Updated AddressComponents with is_valid flag and validation_error + """ + missing = [] + + if not components.street: + missing.append("street") + if not components.city: + missing.append("city") + if not components.zip_code: + missing.append("zip") + + if missing: + components.is_valid = False + components.validation_error = f"Missing required fields: {', '.join(missing)}" + logger.debug(f"Address validation failed: {components.validation_error}") + else: + components.is_valid = True + logger.debug("Address validation passed") + + return components + + +def format_address_string(components: AddressComponents) -> str: + """ + Format address components into a single string for geocoding. + + Args: + components: Validated AddressComponents + + Returns: + Formatted address string + """ + parts = [] + + # Street + Apt + if components.street: + if components.apt: + parts.append(f"{components.street}, {components.apt}") + else: + parts.append(components.street) + + # City + if components.city: + parts.append(components.city) + + # State + ZIP + if components.state and components.zip_code: + parts.append(f"{components.state} {components.zip_code}") + elif components.state: + parts.append(components.state) + elif components.zip_code: + parts.append(components.zip_code) + + # Add country for better accuracy + parts.append("USA") + + return ", ".join(parts) + + +def geocode_address( + address_string: str, + geocoder: Optional[Nominatim] = None +) -> GeocodeResult: + """ + TOOL: Call Nominatim API to geocode an address. + + Uses geopy's Nominatim geocoder with proper rate limiting. + Respects Nominatim's 1 request/second policy. + + Args: + address_string: Full formatted address to geocode + geocoder: Optional pre-initialized Nominatim instance + + Returns: + GeocodeResult with lat/long or error information + """ + if geocoder is None: + geocoder = Nominatim(user_agent=NOMINATIM_USER_AGENT) + + logger.info(f"Geocoding address: {address_string}") + + try: + # Call Nominatim API with timeout + location = geocoder.geocode( + address_string, + timeout=GEOCODE_TIMEOUT, + addressdetails=True, + country_codes="us", # Limit to USA + ) + + if location is None: + logger.warning(f"No geocoding result for: {address_string}") + return GeocodeResult( + success=False, + error_message="No location found for address" + ) + + # Extract country code from raw response if available + country_code = None + if hasattr(location, 'raw') and 'address' in location.raw: + country_code = location.raw['address'].get('country_code', '').upper() + + logger.info( + f"Geocoding successful: lat={location.latitude}, lon={location.longitude}", + extra={ + "latitude": location.latitude, + "longitude": location.longitude, + "raw_address": location.address, + "country_code": country_code, + } + ) + + return GeocodeResult( + success=True, + latitude=str(location.latitude), + longitude=str(location.longitude), + raw_address=location.address, + country_code=country_code, + ) + + except GeocoderTimedOut as e: + logger.error(f"Geocoding timeout: {e}") + return GeocodeResult( + success=False, + error_message=f"Geocoding timed out after {GEOCODE_TIMEOUT}s" + ) + + except GeocoderServiceError as e: + logger.error(f"Geocoder service error: {e}") + return GeocodeResult( + success=False, + error_message=f"Geocoder service error: {str(e)}" + ) + + except GeocoderUnavailable as e: + logger.error(f"Geocoder unavailable: {e}") + return GeocodeResult( + success=False, + error_message=f"Geocoder unavailable: {str(e)}" + ) + + except Exception as e: + logger.error(f"Unexpected geocoding error: {e}", exc_info=True) + return GeocodeResult( + success=False, + error_message=f"Unexpected error: {str(e)}" + ) + + +def validate_geocode_result(result: GeocodeResult) -> Tuple[bool, str]: + """ + TOOL: Validate quality of geocoding result. + + Checks: + - Result was successful + - Country is USA (if available) + - Coordinates are within reasonable US bounds + + Args: + result: GeocodeResult to validate + + Returns: + Tuple of (is_valid, reason_string) + """ + if not result.success: + return False, f"Geocoding failed: {result.error_message}" + + # Check country code if available + if result.country_code and result.country_code != "US": + logger.warning(f"Non-US country code: {result.country_code}") + return False, f"Result is outside USA (country: {result.country_code})" + + # Basic bounds check for continental US + Alaska + Hawaii + try: + lat = float(result.latitude) + lon = float(result.longitude) + + # Rough US bounds (including Alaska and Hawaii) + if not (18.0 <= lat <= 72.0): + return False, f"Latitude {lat} outside US bounds" + if not (-180.0 <= lon <= -65.0): + return False, f"Longitude {lon} outside US bounds" + + except (ValueError, TypeError) as e: + return False, f"Invalid coordinates: {e}" + + return True, "Valid US geocode result" + + +def update_record( + session: Session, + customer: CustomerCustomer, + geocode_result: GeocodeResult, + is_valid: bool +) -> bool: + """ + TOOL: Update customer record with geocoding results. + + Sets latitude, longitude, correct_address flag, and verified_at timestamp. + + Args: + session: SQLAlchemy session + customer: CustomerCustomer record to update + geocode_result: Result from geocoding operation + is_valid: Whether the geocode result passed validation + + Returns: + True if update successful, False otherwise + """ + try: + now = datetime.utcnow() + + if is_valid and geocode_result.success: + # Successful geocoding - update all fields + customer.customer_latitude = geocode_result.latitude + customer.customer_longitude = geocode_result.longitude + customer.correct_address = True + customer.verified_at = now + + logger.info( + f"Updated record {customer.id}: lat={geocode_result.latitude}, " + f"lon={geocode_result.longitude}, correct_address=True" + ) + else: + # Failed geocoding - mark as verified but not correct + customer.correct_address = False + customer.verified_at = now + + logger.info( + f"Updated record {customer.id}: correct_address=False " + f"(reason: {geocode_result.error_message or 'validation failed'})" + ) + + return True + + except Exception as e: + logger.error(f"Failed to update record {customer.id}: {e}", exc_info=True) + return False + + +def rate_limit_sleep() -> float: + """ + Sleep for a random duration to respect Nominatim rate limits. + + Nominatim requires max 1 request per second. We sleep between + MIN_SLEEP_SECONDS and MAX_SLEEP_SECONDS (default 1.2-1.8s). + + Returns: + Actual sleep duration in seconds + """ + sleep_time = random.uniform(MIN_SLEEP_SECONDS, MAX_SLEEP_SECONDS) + logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") + time.sleep(sleep_time) + return sleep_time diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..220a70b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +# eamco_address_checker dependencies +# FastAPI web framework and server +fastapi>=0.109.0,<1.0.0 +uvicorn[standard]>=0.27.0,<1.0.0 +pydantic>=2.5.0,<3.0.0 + +# Database +sqlalchemy>=2.0.0,<3.0.0 +psycopg2-binary>=2.9.9,<3.0.0 + +# Geocoding +geopy>=2.4.1,<3.0.0 + +# Fuzzy string matching for address correction +rapidfuzz>=3.5.0,<4.0.0 + +# HTTP client (for OSM Overpass API and geopy) +requests>=2.31.0,<3.0.0 +urllib3>=2.0.0,<3.0.0 +certifi>=2023.0.0 + +# Configuration +python-dotenv>=1.0.0,<2.0.0