Files
eamco_office_api/app/stats/views.py
Edwin Eames 3066754821 feat: add admin settings system and improve customer/pricing endpoints
Add centralized admin settings (company info, social links, quick calls,
sidebar visibility toggles, theme, logo upload) with singleton pattern
and full CRUD API. Add active/dedicated customer count endpoints for
dashboard stats. Fix automatic assignment route to use PUT instead of
GET. Refactor oil price endpoint to use schema serialization with null
safety.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 18:45:06 -05:00

616 lines
22 KiB
Python
Executable File

import logging
from datetime import date
from app.stats import stats
import datetime
from flask import request
from app import db
from app.common.responses import success_response
from app.classes.delivery import Delivery_Delivery
from app.classes.stats_company import Stats_Company, Stats_Company_schema
from sqlalchemy import func, and_, extract
from app.classes.stats_customer import Stats_Customer, Stats_Customer_schema
logger = logging.getLogger(__name__)
def get_monday_date(date_object):
"""Gets the date of the Monday for the given date."""
# Get the day of the week as an integer (0 = Monday, 6 = Sunday)
day_of_week = date_object.weekday()
# Calculate the number of days to subtract to get to Monday
days_to_monday = day_of_week - 0 # Monday is 0
# Subtract the days from the given date to get Monday's date
monday_date = date_object - datetime.timedelta(days=days_to_monday)
return monday_date
@stats.route("/calls/add", methods=["PUT"])
def total_calls_post():
logger.info("PUT /stats/calls/add - Incrementing call count")
total_calls_today = (db.session
.query(Stats_Company)
.filter(Stats_Company.expected_delivery_date == date.today())
.first())
current_call_count = total_calls_today.total_calls
new_call = current_call_count + 1
total_calls_today.total_calls = new_call
db.session.add(total_calls_today)
db.session.commit()
return success_response()
@stats.route("/calls/count/today", methods=["GET"])
def total_calls_today():
logger.info("GET /stats/calls/count/today - Getting today's call count")
total_calls_today = (db.session
.query(Stats_Company)
.filter(Stats_Company.expected_delivery_date == date.today())
.count())
return success_response({'data': total_calls_today})
@stats.route("/gallons/total/<int:driver_id>", methods=["GET"])
def total_gallons_delivered_driver(driver_id):
logger.info(f"GET /stats/gallons/total/{driver_id} - Calculating total gallons for driver")
gallons_list = []
total_gallons = db.session\
.query(Delivery_Delivery)\
.filter(Delivery_Delivery.driver_employee_id == driver_id)\
.all()
for f in total_gallons:
gallons_list.append(f.gallons_delivered)
sum_of_gallons = (sum(gallons_list))
return success_response({'data': sum_of_gallons})
@stats.route("/delivery/total/<int:driver_id>", methods=["GET"])
def total_deliveries_driver(driver_id):
logger.info(f"GET /stats/delivery/total/{driver_id} - Counting total deliveries for driver")
total_stops = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.driver_employee_id == driver_id)
.count())
return success_response({'data': total_stops})
@stats.route("/primes/total/<int:driver_id>", methods=["GET"])
def total_primes_driver(driver_id):
logger.info(f"GET /stats/primes/total/{driver_id} - Counting prime deliveries for driver")
total_stops = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.driver_employee_id == driver_id)
.filter(Delivery_Delivery.prime == 1)
.count())
return success_response({'data': total_stops})
@stats.route("/delivery/count/today", methods=["GET"])
def total_deliveries_today():
logger.info("GET /stats/delivery/count/today - Counting today's deliveries")
total_stops = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.expected_delivery_date == date.today())
.count())
return success_response({'data': total_stops})
@stats.route("/delivery/count/delivered/today", methods=["GET"])
def total_deliveries_today_finished():
logger.info("GET /stats/delivery/count/delivered/today - Counting completed deliveries today")
total_stops = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.expected_delivery_date == date.today())
.filter((Delivery_Delivery.delivery_status == 10))
.count())
return success_response({'data': total_stops})
@stats.route("/user/<int:user_id>", methods=["GET"])
def get_user_stats(user_id):
"""
gets stats of user
"""
logger.info(f"GET /stats/user/{user_id} - Fetching user statistics")
get_user = db.session \
.query(Stats_Customer) \
.filter(Stats_Customer.customer_id == user_id) \
.first()
if get_user is None:
new_stats = Stats_Customer(
customer_id = user_id,
total_calls = 1,
service_calls_total = 0,
service_calls_total_spent = 0,
service_calls_total_profit = 0,
oil_deliveries = 0,
oil_total_gallons = 0,
oil_total_spent = 0,
oil_total_profit = 0,
)
db.session.add(new_stats)
db.session.commit()
get_user = db.session \
.query(Stats_Customer) \
.filter(Stats_Customer.customer_id == user_id) \
.first()
user_schema = Stats_Customer_schema(many=False)
return success_response({"user_stats": user_schema.dump(get_user)})
@stats.route("/user/lastdelivery/<int:user_id>", methods=["GET"])
def get_user_last_delivery(user_id):
"""
gets users last delivery. used on profile page
"""
logger.info(f"GET /stats/user/lastdelivery/{user_id} - Fetching user's last delivery date")
get_delivery= db.session \
.query(Delivery_Delivery) \
.filter(Delivery_Delivery.customer_id == user_id) \
.filter(Delivery_Delivery.delivery_status == 10) \
.order_by(Delivery_Delivery.id.desc())\
.first()
if get_delivery:
date_delivered = get_delivery.when_delivered
else:
date_delivered = "no deliveries on record"
return success_response({'date': str(date_delivered)})
@stats.route("/gallons/week", methods=["GET"])
def total_gallons_delivered_this_week():
logger.info("GET /stats/gallons/week - Calculating weekly gallons delivered")
# Get today's date
total_gallons = 0
today = datetime.date.today()
# Get the date of the Monday for today
monday = get_monday_date(today)
get_total = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.when_delivered >= monday)
.filter(Delivery_Delivery.when_delivered <= today)
.all())
for f in get_total:
total_gallons = total_gallons + f.gallons_delivered
return success_response({'total': total_gallons})
@stats.route("/gallons/check/total/<int:user_id>", methods=["GET"])
def calculate_gallons_user(user_id):
logger.info(f"GET /stats/gallons/check/total/{user_id} - Recalculating user total gallons")
# Get today's date
total_gallons = 0
# Get the date of the Monday for today
get_total = (db.session
.query(Delivery_Delivery)
.filter(Delivery_Delivery.customer_id == user_id)
.all())
get_user = db.session \
.query(Stats_Customer) \
.filter(Stats_Customer.customer_id == user_id) \
.first()
for f in get_total:
total_gallons = total_gallons + f.gallons_delivered
get_user.oil_total_gallons = total_gallons
db.session.add(get_user)
db.session.commit()
return success_response()
@stats.route("/deliveries/daily", methods=["GET"])
def get_daily_delivery_stats():
"""
Get daily delivery stats for a date range.
Query Params: start_date (YYYY-MM-DD), end_date (YYYY-MM-DD)
"""
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
logger.info(f"GET /stats/deliveries/daily - Fetching stats from {start_date_str} to {end_date_str}")
try:
start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d').date()
except (ValueError, TypeError):
return success_response({'error': 'Invalid date format. Use YYYY-MM-DD'}, 400)
# Query daily aggregates
daily_stats = db.session.query(
Delivery_Delivery.when_delivered,
func.sum(Delivery_Delivery.gallons_delivered).label('total_gallons'),
func.count(Delivery_Delivery.id).label('total_count')
).filter(
Delivery_Delivery.delivery_status == 10, # Delivered status
Delivery_Delivery.when_delivered >= start_date,
Delivery_Delivery.when_delivered <= end_date
).group_by(
Delivery_Delivery.when_delivered
).order_by(
Delivery_Delivery.when_delivered
).all()
result = []
for day_stat in daily_stats:
result.append({
'date': day_stat.when_delivered.strftime('%Y-%m-%d'),
'gallons': float(day_stat.total_gallons or 0),
'count': int(day_stat.total_count or 0)
})
return success_response({'daily_stats': result})
@stats.route("/deliveries/totals", methods=["GET"])
def get_delivery_totals():
"""
Get aggregated totals for a specific period compared to previous years.
Query Params: period (day, month, year), date (YYYY-MM-DD)
"""
period = request.args.get('period', 'day')
date_str = request.args.get('date')
logger.info(f"GET /stats/deliveries/totals - Fetching {period} totals for {date_str}")
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
except (ValueError, TypeError):
# Default to today if invalid
target_date = date.today()
response_data = []
# specific years to check - current year and last 3 years
years_to_check = [target_date.year, target_date.year - 1, target_date.year - 2, target_date.year - 3]
for year in years_to_check:
# Calculate start/end based on period for this specific year
if period == 'day':
start = datetime.date(year, target_date.month, target_date.day)
end = start
elif period == 'month':
start = datetime.date(year, target_date.month, 1)
# Logic to get end of month
next_month = start.replace(day=28) + datetime.timedelta(days=4)
end = next_month - datetime.timedelta(days=next_month.day)
elif period == 'year':
start = datetime.date(year, 1, 1)
end = datetime.date(year, 12, 31)
else:
return success_response({'error': 'Invalid period'}, 400)
stats = db.session.query(
func.sum(Delivery_Delivery.gallons_delivered).label('total_gallons'),
func.count(Delivery_Delivery.id).label('total_count')
).filter(
Delivery_Delivery.delivery_status == 10,
Delivery_Delivery.when_delivered >= start,
Delivery_Delivery.when_delivered <= end
).first()
response_data.append({
'year': year,
'gallons': float(stats.total_gallons or 0) if stats else 0,
'count': int(stats.total_count or 0) if stats else 0,
'period_label': start.strftime('%Y-%m-%d') if period == 'day' else start.strftime('%b %Y') if period == 'month' else str(year)
})
return success_response({'totals': response_data})
# ============================================
# Stats Dashboard Endpoints (for Charts)
# ============================================
MONTH_NAMES = [
'', 'January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December'
]
@stats.route("/gallons/daily", methods=["GET"])
def get_gallons_daily():
"""
Get daily gallons delivered for time-series chart.
Query Params:
- start_date: YYYY-MM-DD
- end_date: YYYY-MM-DD
- years: comma-separated list of years (e.g., 2024,2025,2026)
Returns daily gallons grouped by date for each requested year.
"""
from flask import request
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
years_str = request.args.get('years', str(date.today().year))
logger.info(f"GET /stats/gallons/daily - start={start_date_str}, end={end_date_str}, years={years_str}")
try:
start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d').date()
except (ValueError, TypeError):
return success_response({'ok': False, 'error': 'Invalid date format. Use YYYY-MM-DD'})
try:
years = [int(y.strip()) for y in years_str.split(',')]
except ValueError:
return success_response({'ok': False, 'error': 'Invalid years format'})
result_years = []
for year in years:
# Adjust dates to the specific year while keeping month/day
year_start = start_date.replace(year=year)
year_end = end_date.replace(year=year)
daily_data = db.session.query(
func.date(Delivery_Delivery.when_delivered).label('delivery_date'),
func.sum(Delivery_Delivery.gallons_delivered).label('gallons'),
func.count(Delivery_Delivery.id).label('deliveries')
).filter(
Delivery_Delivery.delivery_status == 10,
Delivery_Delivery.when_delivered >= year_start,
Delivery_Delivery.when_delivered <= year_end
).group_by(
func.date(Delivery_Delivery.when_delivered)
).order_by(
func.date(Delivery_Delivery.when_delivered)
).all()
data_points = []
for row in daily_data:
data_points.append({
'date': row.delivery_date.strftime('%Y-%m-%d') if row.delivery_date else None,
'gallons': float(row.gallons or 0),
'deliveries': int(row.deliveries or 0)
})
result_years.append({
'year': year,
'data': data_points
})
return success_response({'years': result_years})
@stats.route("/gallons/weekly", methods=["GET"])
def get_gallons_weekly():
"""
Get weekly aggregated gallons delivered.
Query Params:
- start_date: YYYY-MM-DD
- end_date: YYYY-MM-DD
- years: comma-separated list of years
Returns weekly gallons grouped by week number for each requested year.
"""
from flask import request
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
years_str = request.args.get('years', str(date.today().year))
logger.info(f"GET /stats/gallons/weekly - start={start_date_str}, end={end_date_str}, years={years_str}")
try:
start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d').date()
except (ValueError, TypeError):
return success_response({'ok': False, 'error': 'Invalid date format. Use YYYY-MM-DD'})
try:
years = [int(y.strip()) for y in years_str.split(',')]
except ValueError:
return success_response({'ok': False, 'error': 'Invalid years format'})
result_years = []
for year in years:
year_start = start_date.replace(year=year)
year_end = end_date.replace(year=year)
# Group by ISO week number
weekly_data = db.session.query(
extract('week', Delivery_Delivery.when_delivered).label('week_num'),
func.min(Delivery_Delivery.when_delivered).label('week_start'),
func.max(Delivery_Delivery.when_delivered).label('week_end'),
func.sum(Delivery_Delivery.gallons_delivered).label('gallons'),
func.count(Delivery_Delivery.id).label('deliveries')
).filter(
Delivery_Delivery.delivery_status == 10,
Delivery_Delivery.when_delivered >= year_start,
Delivery_Delivery.when_delivered <= year_end
).group_by(
extract('week', Delivery_Delivery.when_delivered)
).order_by(
extract('week', Delivery_Delivery.when_delivered)
).all()
data_points = []
for row in weekly_data:
data_points.append({
'week_start': row.week_start.strftime('%Y-%m-%d') if row.week_start else None,
'week_end': row.week_end.strftime('%Y-%m-%d') if row.week_end else None,
'week_number': int(row.week_num) if row.week_num else 0,
'gallons': float(row.gallons or 0),
'deliveries': int(row.deliveries or 0)
})
result_years.append({
'year': year,
'data': data_points
})
return success_response({'years': result_years})
@stats.route("/gallons/monthly", methods=["GET"])
def get_gallons_monthly():
"""
Get monthly aggregated gallons delivered.
Query Params:
- year: primary year to display
- compare_years: comma-separated list of years to compare
Returns monthly gallons for each requested year.
"""
from flask import request
year = request.args.get('year', type=int, default=date.today().year)
compare_years_str = request.args.get('compare_years', '')
logger.info(f"GET /stats/gallons/monthly - year={year}, compare_years={compare_years_str}")
years = [year]
if compare_years_str:
try:
compare_years = [int(y.strip()) for y in compare_years_str.split(',')]
years.extend(compare_years)
except ValueError:
pass
result_years = []
for y in years:
monthly_data = db.session.query(
extract('month', Delivery_Delivery.when_delivered).label('month_num'),
func.sum(Delivery_Delivery.gallons_delivered).label('gallons'),
func.count(Delivery_Delivery.id).label('deliveries')
).filter(
Delivery_Delivery.delivery_status == 10,
extract('year', Delivery_Delivery.when_delivered) == y
).group_by(
extract('month', Delivery_Delivery.when_delivered)
).order_by(
extract('month', Delivery_Delivery.when_delivered)
).all()
data_points = []
for row in monthly_data:
month_num = int(row.month_num) if row.month_num else 0
data_points.append({
'month': month_num,
'month_name': MONTH_NAMES[month_num] if 1 <= month_num <= 12 else '',
'gallons': float(row.gallons or 0),
'deliveries': int(row.deliveries or 0)
})
result_years.append({
'year': y,
'data': data_points
})
return success_response({'years': result_years})
@stats.route("/totals/comparison", methods=["GET"])
def get_totals_comparison():
"""
Get today/week-to-date/month-to-date/year-to-date totals with previous year comparison.
Returns totals for current period compared to same period last year with % change.
"""
logger.info("GET /stats/totals/comparison - Fetching comparison totals")
today = date.today()
current_year = today.year
compare_year = current_year - 1
def get_period_totals(start_date, end_date):
"""Helper to get gallons, deliveries, revenue for a date range."""
result = db.session.query(
func.coalesce(func.sum(Delivery_Delivery.gallons_delivered), 0).label('gallons'),
func.count(Delivery_Delivery.id).label('deliveries'),
func.coalesce(func.sum(Delivery_Delivery.final_price), 0).label('revenue')
).filter(
Delivery_Delivery.delivery_status == 10,
Delivery_Delivery.when_delivered >= start_date,
Delivery_Delivery.when_delivered <= end_date
).first()
return {
'gallons': float(result.gallons or 0),
'deliveries': int(result.deliveries or 0),
'revenue': float(result.revenue or 0)
}
def calculate_comparison(current_val, previous_val):
"""Calculate % change and direction."""
if previous_val == 0:
change_percent = 100.0 if current_val > 0 else 0.0
else:
change_percent = ((current_val - previous_val) / previous_val) * 100
if change_percent > 0:
direction = 'up'
elif change_percent < 0:
direction = 'down'
else:
direction = 'neutral'
return {
'current': current_val,
'previous': previous_val,
'change_percent': round(change_percent, 1),
'change_direction': direction
}
def build_period_comparison(current_start, current_end, prev_start, prev_end):
"""Build comparison object for a period."""
current = get_period_totals(current_start, current_end)
previous = get_period_totals(prev_start, prev_end)
return {
'gallons': calculate_comparison(current['gallons'], previous['gallons']),
'deliveries': calculate_comparison(current['deliveries'], previous['deliveries']),
'revenue': calculate_comparison(current['revenue'], previous['revenue'])
}
# Today comparison
today_current = today
today_prev = today.replace(year=compare_year)
today_comparison = build_period_comparison(today_current, today_current, today_prev, today_prev)
# Week-to-date comparison (Monday to today)
monday_current = get_monday_date(today)
monday_prev = monday_current.replace(year=compare_year)
today_prev_week = today.replace(year=compare_year)
wtd_comparison = build_period_comparison(monday_current, today, monday_prev, today_prev_week)
# Month-to-date comparison
first_of_month_current = today.replace(day=1)
first_of_month_prev = first_of_month_current.replace(year=compare_year)
today_prev_month = today.replace(year=compare_year)
mtd_comparison = build_period_comparison(first_of_month_current, today, first_of_month_prev, today_prev_month)
# Year-to-date comparison
first_of_year_current = today.replace(month=1, day=1)
first_of_year_prev = first_of_year_current.replace(year=compare_year)
today_prev_year = today.replace(year=compare_year)
ytd_comparison = build_period_comparison(first_of_year_current, today, first_of_year_prev, today_prev_year)
comparison_data = {
'today': today_comparison,
'week_to_date': wtd_comparison,
'month_to_date': mtd_comparison,
'year_to_date': ytd_comparison
}
return success_response({
'comparison': comparison_data,
'current_year': current_year,
'compare_year': compare_year
})