import logging from flask import request from flask_login import current_user, logout_user, login_required from app.auth import auth from app import db, bcrypt from app.common.responses import error_response, success_response from datetime import datetime from uuid import uuid4 import secrets from app.classes.auth import Auth_User from app.classes.employee import Employee_Employee from app.schemas import LoginSchema, RegisterSchema, ChangePasswordSchema, validate_request import re logger = logging.getLogger(__name__) @auth.route("/whoami", methods=["GET"]) def check_session(): """ Checks auth token and returns user and associated employee data. """ auth_header = request.headers.get('Authorization') if not auth_header: return error_response("Authorization header missing", 401) # --- THIS IS THE FIX --- # Use a case-insensitive regular expression to remove "bearer " # This handles "Bearer ", "bearer ", "BEARER ", etc. api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: logger.warning("Authentication failed: no user found with provided API key") return error_response("Invalid token", 401) # Now, build the complete response with both user and employee data. return success_response({ 'user': { 'user_name': user.username, 'user_id': user.id, 'user_email': user.email, 'user_admin': user.admin_role, 'token': user.api_key, 'confirmed': user.confirmed }, }) @auth.route("/amiconfirmed", methods=["GET"]) @login_required def check_confirmed(): """ Checks to see if user confirmed with email or key """ user = db.session\ .query(Auth_User)\ .filter(Auth_User.id == current_user.id)\ .first() if user.confirmed == 0: confirmed = False else: confirmed = True return success_response({"status": confirmed}) @auth.route("/logout", methods=["POST"]) def logout(): """ Logs a user out of session on backend """ try: logout_user() return success_response({'status': 'logged out'}) except Exception as e: return error_response("Logout failed", 400) @auth.route("/login", methods=["POST"]) @validate_request(LoginSchema) def login(): data = request.validated_data username = data["username"] password = data["password"] user = db.session.query(Auth_User).filter_by(username=username).first() # Important checks! if not user: return error_response("User not found", 401) if not bcrypt.check_password_hash(user.password_hash, password): return error_response("Invalid password", 401) # Check if user is active if user.active != 1: return error_response("Please contact a manager. Login rejected", 401) # If login is successful, return the correct structure return success_response({ 'user': { 'user_name': user.username, 'user_id': user.id, 'user_email': user.email, 'admin_role': user.admin_role, }, 'token': user.api_key }) @auth.route("/register", methods=["POST"]) @validate_request(RegisterSchema) def register_user(): """ Main post function to register a user """ data = request.validated_data now = datetime.utcnow() username = data["username"] email = data["email"] password = data["password"] # Generate a cryptographically secure 64-byte (128 hex chars) API key key = secrets.token_hex(64) # check if email exists user_exists_email = db.session\ .query(Auth_User)\ .filter_by(email=email)\ .first() is not None if user_exists_email: return error_response("Email already exists") # check if username exists user_exists_username = db.session\ .query(Auth_User)\ .filter_by(username=username)\ .first() is not None if user_exists_username: return error_response("User already exists") # hash password for database hashed_password = bcrypt.generate_password_hash(password).decode('utf-8') new_user = Auth_User( username=username, email=email, password_hash=hashed_password, member_since=now, api_key=key, last_seen=now, admin=0, admin_role=0, confirmed=0, ) db.session.add(new_user) # commit to database db.session.commit() # log user in as active not sure if needed with frontend #login_user(new_user) # current_user.is_authenticated() # current_user.is_active() return success_response({ 'user': { 'user_email': new_user.email, 'admin_role': new_user.admin_role, 'token': new_user.api_key, 'confirmed': new_user.confirmed, }, 'token': new_user.api_key }) @auth.route('/change-password', methods=['POST']) @validate_request(ChangePasswordSchema) def change_password(): auth_header = request.headers.get('Authorization') if not auth_header: return error_response("Authorization header missing", 401) api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: return error_response("Invalid token", 401) data = request.validated_data new_password = data["new_password"] new_password_confirm = data["password_confirm"] if str(new_password) != str(new_password_confirm): return error_response("Error: Incorrect Passwords") hashed_password = bcrypt.generate_password_hash( new_password).decode('utf8') user.password_hash = hashed_password user.passwordpinallowed = 0 db.session.add(user) db.session.commit() return success_response() @auth.route('/admin-change-password', methods=['POST']) def admin_change_password(): auth_header = request.headers.get('Authorization') if not auth_header: return error_response("Authorization header missing", 401) api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: return error_response("Invalid token", 401) if user.admin_role == 0: return error_response("Admin access required", 403) employee_id = request.json.get("employee_id") new_password = request.json.get("new_password") new_password_confirm = request.json.get("password_confirm") if not employee_id or not new_password or not new_password_confirm: return error_response("Missing required fields", 400) if str(new_password) != str(new_password_confirm): return error_response("Passwords do not match", 400) from app.classes.employee import Employee_Employee employee = db.session.query(Employee_Employee).filter(Employee_Employee.id == employee_id).first() if not employee: return error_response("Employee not found", 404) target_user = db.session.query(Auth_User).filter(Auth_User.id == employee.user_id).first() if not target_user: return error_response("User not found", 404) hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8') target_user.password_hash = hashed_password target_user.passwordpinallowed = 0 db.session.add(target_user) db.session.commit() return success_response()