Files
eamco_auto_api/app/script/fuel_estimator.py
Edwin Eames c134c05947 feat: rewrite K-factor engine with history tracking and outlier detection
Replace simple exponential smoothing with a rolling-average K-factor
system backed by a new auto_kfactor_history table. Budget fills are
detected and excluded from calculations, outliers beyond 2-sigma are
flagged, and confidence scores track data quality per customer.
Adds backfill endpoint, auto-create for missing estimation records,
and manual house_factor PUT endpoints for both auto and regular customers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 17:54:27 -05:00

300 lines
12 KiB
Python

import logging
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import date, timedelta
from decimal import Decimal
import statistics
logger = logging.getLogger(__name__)
from app.models.auto import Auto_Delivery, Auto_Temp, Auto_Update, Tickets_Auto_Delivery, KFactorHistory
# --- Constants ---
HOT_WATER_DAILY_USAGE = Decimal('1.0')
TANK_MAX_FILLS = {
275: 240,
330: 280,
500: 475,
550: 500
}
# K-Factor rolling average settings
K_FACTOR_ROLLING_WINDOW = 5
K_FACTOR_HISTORY_SIZE = 10
OUTLIER_SIGMA_THRESHOLD = 2.0
# Budget fill detection
BUDGET_FILL_AMOUNTS = {100, 125, 150, 200}
BUDGET_FILL_TOLERANCE = 0.5
# Confidence scoring
CONFIDENCE_PER_DELIVERY = 8
CONFIDENCE_MAX = 100
CONFIDENCE_MIN = 20
CONFIDENCE_VARIANCE_PENALTY = 10
# Default K-factor when no data available
DEFAULT_K_FACTOR = Decimal('0.12')
class FuelEstimator:
def __init__(self, session: Session):
self.session = session
def _get_weather_for_date(self, target_date: date) -> Auto_Temp | None:
return self.session.query(Auto_Temp).filter(Auto_Temp.todays_date == target_date).first()
@staticmethod
def _is_budget_fill(gallons) -> bool:
"""Returns True if gallons is within +/-0.5 of a budget fill amount."""
gal = float(gallons)
for amount in BUDGET_FILL_AMOUNTS:
if abs(gal - amount) <= BUDGET_FILL_TOLERANCE:
return True
return False
def _get_division_average_k(self) -> Decimal:
"""Average K from all valid (non-budget, non-outlier) history entries.
Fallback for new customers instead of hardcoded 0.12."""
result = self.session.query(func.avg(KFactorHistory.k_factor)).filter(
KFactorHistory.is_budget_fill == False,
KFactorHistory.is_outlier == False,
KFactorHistory.k_factor.isnot(None),
KFactorHistory.k_factor > 0
).scalar()
if result and result > 0:
return Decimal(str(round(float(result), 4)))
return DEFAULT_K_FACTOR
def _calculate_rolling_k_factor(self, customer_id: int):
"""Returns (k_factor, confidence_score, source).
1. Query last 10 non-budget history entries
2. Take last 5 for rolling window
3. Calculate mean + std dev
4. Exclude entries >2 sigma from mean, mark as outliers
5. Recalculate mean from filtered set
6. Confidence = min(100, qualifying_deliveries * 8) - penalty for high variance
"""
# Get last HISTORY_SIZE non-budget entries ordered most recent first
history = self.session.query(KFactorHistory).filter(
KFactorHistory.customer_id == customer_id,
KFactorHistory.is_budget_fill == False,
KFactorHistory.k_factor.isnot(None),
KFactorHistory.k_factor > 0
).order_by(KFactorHistory.fill_date.desc()).limit(K_FACTOR_HISTORY_SIZE).all()
if not history:
div_avg = self._get_division_average_k()
return (div_avg, CONFIDENCE_MIN, 'division_avg')
# Take last ROLLING_WINDOW for calculation
window = history[:K_FACTOR_ROLLING_WINDOW]
k_values = [float(h.k_factor) for h in window]
if len(k_values) < 2:
k = Decimal(str(round(k_values[0], 4)))
confidence = min(CONFIDENCE_MAX, CONFIDENCE_PER_DELIVERY)
return (k, max(CONFIDENCE_MIN, confidence), 'calculated')
mean_k = statistics.mean(k_values)
stdev_k = statistics.stdev(k_values)
# Mark outliers (>2 sigma from mean)
filtered = []
for h in window:
kf = float(h.k_factor)
if stdev_k > 0 and abs(kf - mean_k) > OUTLIER_SIGMA_THRESHOLD * stdev_k:
if not h.is_outlier:
h.is_outlier = True
else:
filtered.append(kf)
if h.is_outlier:
h.is_outlier = False
if not filtered:
# All were outliers - use full set
filtered = k_values
final_k = Decimal(str(round(statistics.mean(filtered), 4)))
# Confidence scoring
qualifying = len([h for h in history if not h.is_budget_fill and not h.is_outlier])
confidence = min(CONFIDENCE_MAX, qualifying * CONFIDENCE_PER_DELIVERY)
# Penalty for high variance (coefficient of variation)
if len(filtered) >= 2:
cv = statistics.stdev(filtered) / statistics.mean(filtered) if statistics.mean(filtered) > 0 else 0
if cv > 0.3:
confidence -= CONFIDENCE_VARIANCE_PENALTY * 2
elif cv > 0.15:
confidence -= CONFIDENCE_VARIANCE_PENALTY
confidence = max(CONFIDENCE_MIN, confidence)
return (final_k, confidence, 'calculated')
def run_daily_update(self):
"""
Main function to run once per day. Updates estimated fuel level
for all active automatic delivery customers.
"""
today = date.today()
if self.session.query(Auto_Update).filter(Auto_Update.last_updated == today).first():
logger.info(f"Daily update for {today} has already been completed.")
return {"ok": True, "message": "Update already run today."}
todays_weather = self._get_weather_for_date(today)
if not todays_weather:
logger.info(f"Error: Weather data for {today} not found. Cannot run update.")
return {"ok": False, "message": f"Weather data for {today} not found."}
degree_day = Decimal(max(0, 65 - float(todays_weather.temp_avg)))
auto_customers = self.session.query(Auto_Delivery).filter(
Auto_Delivery.auto_status == 1
).all()
if not auto_customers:
logger.info("No active automatic delivery customers found.")
return {"ok": True, "message": "No active customers to update."}
logger.info(f"Staging daily fuel update for {len(auto_customers)} customers...")
for customer in auto_customers:
heating_usage = customer.house_factor * degree_day
hot_water_usage = Decimal('0.0')
if customer.hot_water_summer == 1:
hot_water_usage = HOT_WATER_DAILY_USAGE
gallons_used_today = heating_usage + hot_water_usage
customer.estimated_gallons_left_prev_day = customer.estimated_gallons_left
new_estimated_gallons = customer.estimated_gallons_left - gallons_used_today
customer.estimated_gallons_left = max(Decimal('0.0'), new_estimated_gallons)
customer.last_updated = today
if customer.days_since_last_fill is not None:
customer.days_since_last_fill += 1
new_update_log = Auto_Update(last_updated=today)
self.session.add(new_update_log)
logger.info("Daily update staged. Awaiting commit.")
return {"ok": True, "message": f"Successfully staged updates for {len(auto_customers)} customers."}
def refine_factor_after_delivery(self, ticket: Tickets_Auto_Delivery):
"""
Recalculates and refines the customer's K-Factor after a delivery.
Uses K-factor history with rolling averages and outlier detection.
"""
customer = self.session.query(Auto_Delivery).filter(
Auto_Delivery.customer_id == ticket.customer_id
).first()
if not customer:
logger.info(f"Customer {ticket.customer_id} not found.")
return
# 1. Detect and flag budget fill
is_budget = self._is_budget_fill(ticket.gallons_delivered)
ticket.is_budget_fill = is_budget
if is_budget:
logger.info(f"Budget fill detected for customer {ticket.customer_id}: {ticket.gallons_delivered} gal")
# 2. First delivery - no previous fill to compare against
if not customer.last_fill:
logger.info(f"First delivery for customer {ticket.customer_id}. Setting division average K-factor.")
div_avg = self._get_division_average_k()
customer.house_factor = div_avg
customer.confidence_score = CONFIDENCE_MIN
customer.k_factor_source = 'division_avg'
self._update_tank_after_fill(customer, ticket, is_budget)
return
start_date = customer.last_fill
end_date = ticket.fill_date
if start_date >= end_date:
logger.info(f"Cannot refine K-Factor for customer {ticket.customer_id}: fill date not after last fill. Resetting tank only.")
self._update_tank_after_fill(customer, ticket, is_budget)
return
# 3. Calculate HDD for interval
interval_temps = self.session.query(Auto_Temp).filter(
Auto_Temp.todays_date > start_date,
Auto_Temp.todays_date <= end_date
).all()
total_degree_days = sum(max(0, 65 - float(temp.temp_avg)) for temp in interval_temps)
total_hdd = Decimal(total_degree_days)
# Hot water adjustment
num_days = (end_date - start_date).days
total_hot_water_usage = Decimal('0.0')
if customer.hot_water_summer == 1:
total_hot_water_usage = Decimal(num_days) * HOT_WATER_DAILY_USAGE
gallons_for_heating = ticket.gallons_delivered - total_hot_water_usage
# Calculate K-factor for this observation
k_factor_obs = None
if gallons_for_heating > 0 and total_hdd > 0:
k_factor_obs = gallons_for_heating / total_hdd
# 4. Store K-factor observation in history (even budget fills, flagged)
history_entry = KFactorHistory(
customer_id=ticket.customer_id,
ticket_id=ticket.id,
fill_date=ticket.fill_date,
gallons_delivered=ticket.gallons_delivered,
total_hdd=total_hdd,
days_in_period=num_days,
k_factor=k_factor_obs,
is_budget_fill=is_budget,
is_outlier=False,
created_at=date.today()
)
self.session.add(history_entry)
# 5. Run rolling K-factor calculation
# Flush so the new entry is visible to the query
self.session.flush()
new_k, confidence, source = self._calculate_rolling_k_factor(ticket.customer_id)
logger.info(f"Refining K-Factor for Customer {ticket.customer_id}:")
logger.info(f" Old K: {customer.house_factor:.4f}, New K: {new_k:.4f}, Confidence: {confidence}, Source: {source}")
# 6. Update customer
customer.house_factor = new_k
customer.confidence_score = confidence
customer.k_factor_source = source
# 7. Update tank after fill
self._update_tank_after_fill(customer, ticket, is_budget)
logger.info(f"K-Factor and tank status for Customer {customer.customer_id} staged for update.")
def _update_tank_after_fill(self, customer: Auto_Delivery, ticket: Tickets_Auto_Delivery, is_budget: bool = False):
"""Update customer tank status after a fill-up."""
customer.last_fill = ticket.fill_date
customer.days_since_last_fill = 0
if customer.tank_size and Decimal(customer.tank_size) > 0:
tank_size = float(Decimal(customer.tank_size))
max_fill = TANK_MAX_FILLS.get(tank_size, tank_size)
else:
max_fill = 240.0
if is_budget:
# Budget fill: ADD gallons to current level, cap at max_fill
customer.estimated_gallons_left += ticket.gallons_delivered
customer.estimated_gallons_left = min(customer.estimated_gallons_left, Decimal(str(max_fill)))
else:
# Full delivery: RESET to max_fill
customer.estimated_gallons_left = Decimal(str(max_fill))
customer.estimated_gallons_left_prev_day = customer.estimated_gallons_left
customer.last_updated = date.today()
customer.auto_status = 1