from flask import request, jsonify from flask_login import current_user, logout_user, login_required from app.auth import auth from app import db, bcrypt from datetime import datetime from uuid import uuid4 from app.classes.auth import Auth_User from app.classes.employee import Employee_Employee import re @auth.route("/whoami", methods=["GET"]) def check_session(): """ Checks auth token and returns user and associated employee data. """ auth_header = request.headers.get('Authorization') if not auth_header: return jsonify({"ok": False, "error": "Authorization header missing"}), 401 # --- THIS IS THE FIX --- # Use a case-insensitive regular expression to remove "bearer " # This handles "Bearer ", "bearer ", "BEARER ", etc. api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: print("no user found with that api key") return jsonify({"ok": False, "error": "Invalid token"}), 401 # Now, build the complete response with both user and employee data. return jsonify({ "ok": True, 'user': { 'user_name': user.username, 'user_id': user.id, 'user_email': user.email, 'user_admin': user.admin_role, 'token': user.api_key, 'confirmed': user.confirmed }, }), 200 @auth.route("/amiconfirmed", methods=["GET"]) @login_required def check_confirmed(): """ Checks to see if user confirmed with email or key """ user = db.session\ .query(Auth_User)\ .filter(Auth_User.id == current_user.id)\ .first() if user.confirmed == 0: confirmed = False else: confirmed = True return jsonify({"status": confirmed}), 200 @auth.route("/logout", methods=["POST"]) def logout(): """ Logs a user out of session on backend """ try: logout_user() return jsonify({'status': 'logged out'}), 200 except Exception as e: return jsonify({"error", 'error'}), 400 @auth.route("/login", methods=["POST"]) def login(): username = request.json["username"] password = request.json["password"] user = db.session.query(Auth_User).filter_by(username=username).first() # Important checks! if not user: return jsonify({"error": "User not found"}), 401 # Use a more descriptive error and status code if not bcrypt.check_password_hash(user.password_hash, password): return jsonify({"error": "Invalid password"}), 401 # Use a more descriptive error and status code # Check if user is active if user.active != 1: return jsonify({"error": "Please contact a manager. Login rejected"}), 401 # If login is successful, return the correct structure return jsonify({ "ok": True, 'user': { 'user_name': user.username, 'user_id': user.id, 'user_email': user.email, 'admin_role': user.admin_role, }, 'token': user.api_key }), 200 @auth.route("/register", methods=["POST"]) def register_user(): """ Main post function to register a user """ now = datetime.utcnow() username = request.json["username"] email = request.json["email"] password = request.json["password"] part_one_code = uuid4().hex part_two_code = uuid4().hex part_three_code = uuid4().hex key = part_one_code + part_two_code + part_three_code # check if email exists user_exists_email = db.session\ .query(Auth_User)\ .filter_by(email=email)\ .first() is not None if user_exists_email: return jsonify({"error": "Email already exists"}), 200 # check if username exists user_exists_username = db.session\ .query(Auth_User)\ .filter_by(username=username)\ .first() is not None if user_exists_username: return jsonify({"error": "User already exists"}), 200 # hash password for database hashed_password = bcrypt.generate_password_hash(password).decode('utf-8') new_user = Auth_User( username=username, email=email, password_hash=hashed_password, member_since=now, api_key=key, last_seen=now, admin=0, admin_role=0, confirmed=0, ) db.session.add(new_user) # commit to database db.session.commit() # log user in as active not sure if needed with frontend #login_user(new_user) # current_user.is_authenticated() # current_user.is_active() return jsonify({ "ok": True, 'user': { 'user_email': new_user.email, 'admin_role': new_user.admin_role, 'token': new_user.api_key, 'confirmed': new_user.confirmed, }, 'token': new_user.api_key }), 200 @auth.route('/change-password', methods=['POST']) def change_password(): auth_header = request.headers.get('Authorization') if not auth_header: return jsonify({"error": "Authorization header missing"}), 401 api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: return jsonify({"error": "Invalid token"}), 401 new_password = request.json["new_password"] new_password_confirm = request.json["password_confirm"] if str(new_password) != str(new_password_confirm): return jsonify({"error": "Error: Incorrect Passwords"}), 200 hashed_password = bcrypt.generate_password_hash( new_password).decode('utf8') user.password_hash = hashed_password user.passwordpinallowed = 0 db.session.add(user) db.session.commit() return jsonify({"ok": True}), 200 @auth.route('/admin-change-password', methods=['POST']) def admin_change_password(): auth_header = request.headers.get('Authorization') if not auth_header: return jsonify({"error": "Authorization header missing"}), 401 api_key = re.sub(r'^bearer\s+', '', auth_header, flags=re.IGNORECASE).strip('"') user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first() if not user: return jsonify({"error": "Invalid token"}), 401 if user.admin_role != 0: return jsonify({"error": "Admin access required"}), 403 employee_id = request.json.get("employee_id") new_password = request.json.get("new_password") new_password_confirm = request.json.get("password_confirm") if not employee_id or not new_password or not new_password_confirm: return jsonify({"error": "Missing required fields"}), 400 if str(new_password) != str(new_password_confirm): return jsonify({"error": "Passwords do not match"}), 400 from app.classes.employee import Employee_Employee employee = db.session.query(Employee_Employee).filter(Employee_Employee.id == employee_id).first() if not employee: return jsonify({"error": "Employee not found"}), 404 target_user = db.session.query(Auth_User).filter(Auth_User.id == employee.user_id).first() if not target_user: return jsonify({"error": "User not found"}), 404 hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8') target_user.password_hash = hashed_password target_user.passwordpinallowed = 0 db.session.add(target_user) db.session.commit() return jsonify({"ok": True}), 200