# coding=utf-8 import logging import sys from flask import Flask, jsonify from flask_bcrypt import Bcrypt from flask_cors import CORS from flask_marshmallow import Marshmallow from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_session import Session from flask_login import LoginManager from sqlalchemy.orm import sessionmaker from werkzeug.routing import BaseConverter from flask_mail import Mail from config import load_config import re from sqlalchemy import text ApplicationConfig = load_config() # Configure logging def setup_logging(): """Configure structured logging for the application.""" log_level = logging.DEBUG if ApplicationConfig.CURRENT_SETTINGS != 'PRODUCTION' else logging.INFO # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(log_level) # Remove existing handlers to avoid duplicates root_logger.handlers.clear() # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(log_level) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # Reduce noise from third-party libraries logging.getLogger('werkzeug').setLevel(logging.WARNING) logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) return logging.getLogger('eamco_office_api') logger = setup_logging() app = Flask(__name__, static_url_path='', static_folder='static', template_folder='templates') app.config.from_object(ApplicationConfig) session = sessionmaker() class RegexConverter(BaseConverter): def __init__(self, url_map, *items): super(RegexConverter, self).__init__(url_map) self.regex = items[0] app.url_map.converters['regex'] = RegexConverter app.jinja_env.autoescape = True # configuration app.config['CORS_ALLOWED_ORIGINS'] = ApplicationConfig.CORS_ALLOWED_ORIGINS app.config['UPLOADED_FILES_DEST_ITEM'] = ApplicationConfig.UPLOADED_FILES_DEST_ITEM app.config['UPLOADED_FILES_ALLOW'] = ApplicationConfig.UPLOADED_FILES_ALLOW app.config['MAX_CONTENT_LENGTH'] = ApplicationConfig.MAX_CONTENT_LENGTH app.config['SESSION_COOKIE_NAME'] = ApplicationConfig.SESSION_COOKIE_NAME app.config['SESSION_COOKIE_SECURE'] = ApplicationConfig.SESSION_COOKIE_SECURE app.config['SESSION_COOKIE_HTTPONLY'] = ApplicationConfig.SESSION_COOKIE_HTTPONLY app.config['SESSION_COOKIE_SAMESITE'] = ApplicationConfig.SESSION_COOKIE_SAMESITE app.config['SESSION_PERMANENT'] = ApplicationConfig.SESSION_PERMANENT app.config['SESSION_USE_SIGNER'] = ApplicationConfig.SESSION_USE_SIGNER app.config['CURRENT_SETTINGS'] = ApplicationConfig.CURRENT_SETTINGS app.config['SECRET_KEY'] = ApplicationConfig.SECRET_KEY session.configure(bind=ApplicationConfig.SQLALCHEMY_DATABASE_URI) db = SQLAlchemy(app) migrate = Migrate(app, db) bcrypt = Bcrypt(app) app.config['SESSION_SQLALCHEMY'] = db server_session = Session(app) ma = Marshmallow(app) mail = Mail(app) login_manager = LoginManager(app) login_manager.session_protection = 'strong' login_manager.anonymous_user = "Guest" @login_manager.request_loader def load_user_from_request(request): from app.classes.auth import Auth_User # Check for Authorization header first, as it's the standard auth_header = request.headers.get('Authorization') if auth_header: # --- THIS IS THE FIX --- # Use a case-insensitive regular expression to strip "bearer " api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') if api_key: user = db.session.query(Auth_User).filter_by(api_key=api_key).first() if user: return user # As a fallback, check for api_key in URL args (less secure, but keeps existing logic) api_key_arg = request.args.get('api_key') if api_key_arg: user = db.session.query(Auth_User).filter_by(api_key=api_key_arg).first() if user: return user # If no valid key is found in header or args, return None return None cors = CORS(app, supports_credentials=True, resources={r"/*": {"origins": ApplicationConfig.CORS_ALLOWED_ORIGINS} }) # bind a function after each request, even if an exception is encountered. @app.teardown_request def teardown_request(error): db.session.remove() @app.teardown_appcontext def teardown_appcontext(error): db.session.remove() @app.errorhandler(500) def internal_error500(error): return jsonify({"error": "Internal Error 500"}), 500 @app.errorhandler(502) def internal_error502(error): return jsonify({"error": "Internal Error 502"}), 502 @app.errorhandler(404) def internal_error404(error): return jsonify({"error": "Internal Error 400"}), 400 @app.errorhandler(401) def internal_error401(error): return jsonify({"error": "Internal Error 401"}), 401 @app.errorhandler(400) def internal_error400(error): return jsonify({"error": "Internal Error 400"}), 400 @app.errorhandler(413) def to_large_file(error): return jsonify({"error": "File is too large. Use a smaller image/file."}), 413 @app.errorhandler(403) def internal_error403(error): return jsonify({"error": "Internal Error 403"}), 403 @app.errorhandler(405) def internal_error(error): return jsonify({"error": "Internal Error 405"}), 405 # link locations from .admin import admin as admin_blueprint app.register_blueprint(admin_blueprint, url_prefix='/admin') from .main import main as main_blueprint app.register_blueprint(main_blueprint, url_prefix='/main') from .customer import customer as customer_blueprint app.register_blueprint(customer_blueprint, url_prefix='/customer') from .delivery import delivery as delivery_blueprint app.register_blueprint(delivery_blueprint, url_prefix='/delivery') from .delivery_data import delivery_data as delivery_data_blueprint app.register_blueprint(delivery_data_blueprint, url_prefix='/deliverydata') from .delivery_status import deliverystatus as delivery_status_blueprint app.register_blueprint(delivery_status_blueprint, url_prefix='/deliverystatus') from .search import search as search_blueprint app.register_blueprint(search_blueprint, url_prefix='/search') from .reports import reports as reports_blueprint app.register_blueprint(reports_blueprint, url_prefix='/report') from .query import query as query_blueprint app.register_blueprint(query_blueprint, url_prefix='/query') from .payment import payment as payment_blueprint app.register_blueprint(payment_blueprint, url_prefix='/payment') from .money import money as money_blueprint app.register_blueprint(money_blueprint, url_prefix='/money') from .auth import auth as auth_blueprint app.register_blueprint(auth_blueprint, url_prefix='/auth') from .employees import employees as employees_blueprint app.register_blueprint(employees_blueprint, url_prefix='/employee') from .info import info as info_blueprint app.register_blueprint(info_blueprint, url_prefix='/info') from .stats import stats as stats_blueprint app.register_blueprint(stats_blueprint, url_prefix='/stats') from .ticket import ticket as ticket_blueprint app.register_blueprint(ticket_blueprint, url_prefix='/ticket') from .promo import promo as promo_blueprint app.register_blueprint(promo_blueprint, url_prefix='/promo') from .social import social as social_blueprint app.register_blueprint(social_blueprint, url_prefix='/social') from .service import service as service_blueprint app.register_blueprint(service_blueprint, url_prefix='/service') def check_db_connection(): """ Test database connectivity. """ try: db.session.execute(text("SELECT 1")) return True except Exception: return False with app.app_context(): db.configure_mappers() db.session.commit() # Startup logging logger.info("🚀 eamco_office_api STARTING") mode = ApplicationConfig.CURRENT_SETTINGS.upper() if mode in ['DEVELOPMENT', 'DEV']: logger.info("🤖🤖🤖🤖🤖 Mode: Development 🤖🤖🤖🤖🤖") elif mode in ['PRODUCTION', 'PROD']: logger.info("💀💀💀💀💀💀💀💀💀💀 ⚠️ WARNING PRODUCTION 💀💀💀💀💀💀💀💀💀💀") logger.info(f"DB: {ApplicationConfig.SQLALCHEMY_DATABASE_URI[:30]}...") logger.info(f"CORS: {len(ApplicationConfig.CORS_ALLOWED_ORIGINS)} origins configured") # Test database connection if check_db_connection(): logger.info("DB Connection: ✅ OK") else: logger.info("DB Connection: ❌ FAILED")