Files
eamco_office_api/app/__init__.py
Edwin Eames 43a14eba2c Remove service endpoints and migrate to dedicated service API
- Delete app/service module (moved to eamco_service)
- Update delivery views to remove service-related imports
- Clean up delivery_data views
- Part of microservices architecture refactoring
2026-02-01 19:03:37 -05:00

280 lines
8.7 KiB
Python
Executable File

# coding=utf-8
import logging
import sys
from flask import Flask, jsonify
from flask_bcrypt import Bcrypt
from flask_cors import CORS
from flask_marshmallow import Marshmallow
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_session import Session
from flask_login import LoginManager
from sqlalchemy.orm import sessionmaker
from werkzeug.routing import BaseConverter
from flask_mail import Mail
from config import load_config
import re
from sqlalchemy import text
ApplicationConfig = load_config()
# Configure logging
def setup_logging():
"""Configure structured logging for the application."""
log_level = logging.DEBUG if ApplicationConfig.CURRENT_SETTINGS != 'PRODUCTION' else logging.INFO
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# Remove existing handlers to avoid duplicates
root_logger.handlers.clear()
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Reduce noise from third-party libraries
logging.getLogger('werkzeug').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
return logging.getLogger('eamco_office_api')
logger = setup_logging()
app = Flask(__name__,
static_url_path='',
static_folder='static',
template_folder='templates')
app.config.from_object(ApplicationConfig)
session = sessionmaker()
class RegexConverter(BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
app.url_map.converters['regex'] = RegexConverter
app.jinja_env.autoescape = True
# configuration
app.config['CORS_ALLOWED_ORIGINS'] = ApplicationConfig.CORS_ALLOWED_ORIGINS
app.config['UPLOADED_FILES_DEST_ITEM'] = ApplicationConfig.UPLOADED_FILES_DEST_ITEM
app.config['UPLOADED_FILES_ALLOW'] = ApplicationConfig.UPLOADED_FILES_ALLOW
app.config['MAX_CONTENT_LENGTH'] = ApplicationConfig.MAX_CONTENT_LENGTH
app.config['SESSION_COOKIE_NAME'] = ApplicationConfig.SESSION_COOKIE_NAME
app.config['SESSION_COOKIE_SECURE'] = ApplicationConfig.SESSION_COOKIE_SECURE
app.config['SESSION_COOKIE_HTTPONLY'] = ApplicationConfig.SESSION_COOKIE_HTTPONLY
app.config['SESSION_COOKIE_SAMESITE'] = ApplicationConfig.SESSION_COOKIE_SAMESITE
app.config['SESSION_PERMANENT'] = ApplicationConfig.SESSION_PERMANENT
app.config['SESSION_USE_SIGNER'] = ApplicationConfig.SESSION_USE_SIGNER
app.config['CURRENT_SETTINGS'] = ApplicationConfig.CURRENT_SETTINGS
app.config['SECRET_KEY'] = ApplicationConfig.SECRET_KEY
session.configure(bind=ApplicationConfig.SQLALCHEMY_DATABASE_URI)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
bcrypt = Bcrypt(app)
app.config['SESSION_SQLALCHEMY'] = db
server_session = Session(app)
ma = Marshmallow(app)
mail = Mail(app)
login_manager = LoginManager(app)
login_manager.session_protection = 'strong'
login_manager.anonymous_user = "Guest"
@login_manager.request_loader
def load_user_from_request(request):
from app.classes.auth import Auth_User
# Check for Authorization header first, as it's the standard
auth_header = request.headers.get('Authorization')
if auth_header:
# --- THIS IS THE FIX ---
# Use a case-insensitive regular expression to strip "bearer "
api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"')
if api_key:
user = db.session.query(Auth_User).filter_by(api_key=api_key).first()
if user:
return user
# As a fallback, check for api_key in URL args (less secure, but keeps existing logic)
api_key_arg = request.args.get('api_key')
if api_key_arg:
user = db.session.query(Auth_User).filter_by(api_key=api_key_arg).first()
if user:
return user
# If no valid key is found in header or args, return None
return None
cors = CORS(app,
supports_credentials=True,
resources={r"/*": {"origins": ApplicationConfig.CORS_ALLOWED_ORIGINS}
})
# bind a function after each request, even if an exception is encountered.
@app.teardown_request
def teardown_request(error):
db.session.remove()
@app.teardown_appcontext
def teardown_appcontext(error):
db.session.remove()
@app.errorhandler(500)
def internal_error500(error):
return jsonify({"ok": False, "error": "Internal Error 500"}), 500
@app.errorhandler(502)
def internal_error502(error):
return jsonify({"ok": False, "error": "Internal Error 502"}), 502
@app.errorhandler(404)
def internal_error404(error):
return jsonify({"ok": False, "error": "Not Found"}), 404
@app.errorhandler(401)
def internal_error401(error):
return jsonify({"ok": False, "error": "Unauthorized"}), 401
@app.errorhandler(400)
def internal_error400(error):
return jsonify({"ok": False, "error": "Bad Request"}), 400
@app.errorhandler(413)
def to_large_file(error):
return jsonify({"ok": False, "error": "File is too large. Use a smaller image/file."}), 413
@app.errorhandler(403)
def internal_error403(error):
return jsonify({"ok": False, "error": "Forbidden"}), 403
@app.errorhandler(405)
def internal_error(error):
return jsonify({"ok": False, "error": "Method Not Allowed"}), 405
# link locations
from .admin import admin as admin_blueprint
app.register_blueprint(admin_blueprint, url_prefix='/admin')
from .main import main as main_blueprint
app.register_blueprint(main_blueprint, url_prefix='/main')
from .customer import customer as customer_blueprint
app.register_blueprint(customer_blueprint, url_prefix='/customer')
from .delivery import delivery as delivery_blueprint
app.register_blueprint(delivery_blueprint, url_prefix='/delivery')
from .delivery_data import delivery_data as delivery_data_blueprint
app.register_blueprint(delivery_data_blueprint, url_prefix='/deliverydata')
from .delivery_status import deliverystatus as delivery_status_blueprint
app.register_blueprint(delivery_status_blueprint, url_prefix='/deliverystatus')
from .search import search as search_blueprint
app.register_blueprint(search_blueprint, url_prefix='/search')
from .reports import reports as reports_blueprint
app.register_blueprint(reports_blueprint, url_prefix='/report')
from .query import query as query_blueprint
app.register_blueprint(query_blueprint, url_prefix='/query')
from .payment import payment as payment_blueprint
app.register_blueprint(payment_blueprint, url_prefix='/payment')
from .money import money as money_blueprint
app.register_blueprint(money_blueprint, url_prefix='/money')
from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint, url_prefix='/auth')
from .employees import employees as employees_blueprint
app.register_blueprint(employees_blueprint, url_prefix='/employee')
from .info import info as info_blueprint
app.register_blueprint(info_blueprint, url_prefix='/info')
from .stats import stats as stats_blueprint
app.register_blueprint(stats_blueprint, url_prefix='/stats')
from .ticket import ticket as ticket_blueprint
app.register_blueprint(ticket_blueprint, url_prefix='/ticket')
from .promo import promo as promo_blueprint
app.register_blueprint(promo_blueprint, url_prefix='/promo')
from .social import social as social_blueprint
app.register_blueprint(social_blueprint, url_prefix='/social')
def check_db_connection():
"""
Test database connectivity.
"""
try:
db.session.execute(text("SELECT 1"))
return True
except Exception:
return False
with app.app_context():
db.configure_mappers()
db.session.commit()
# Startup logging
logger.info("🚀 eamco_office_api STARTING")
mode = ApplicationConfig.CURRENT_SETTINGS.upper()
if mode in ['DEVELOPMENT', 'DEV']:
logger.info("🤖🤖🤖🤖🤖 Mode: Development 🤖🤖🤖🤖🤖")
elif mode in ['PRODUCTION', 'PROD']:
logger.info("💀💀💀💀💀💀💀💀💀💀 ⚠️ WARNING PRODUCTION 💀💀💀💀💀💀💀💀💀💀")
# Sanitize DB URI to avoid logging credentials
db_uri = ApplicationConfig.SQLALCHEMY_DATABASE_URI
if '@' in db_uri:
db_uri = db_uri.split('@')[-1]
logger.info(f"DB: ...@{db_uri[:50]}")
logger.info(f"CORS: {len(ApplicationConfig.CORS_ALLOWED_ORIGINS)} origins configured")
# Test database connection
if check_db_connection():
logger.info("DB Connection: ✅ OK")
else:
logger.info("DB Connection: ❌ FAILED")