Files
eamco_office_api/app/auth/views.py
2025-09-01 16:42:59 -04:00

218 lines
6.2 KiB
Python
Executable File

from flask import request, jsonify
from flask_login import current_user, logout_user, login_required
from app.auth import auth
from app import db, bcrypt
from datetime import datetime
from uuid import uuid4
from app.classes.auth import Auth_User
from app.classes.employee import Employee_Employee
@auth.route("/whoami", methods=["GET"])
def check_session():
"""
Checks auth token and returns user and associated employee data.
"""
api_key = request.headers.get('Authorization')
if not api_key:
return jsonify({"ok": False, "error": "Authorization header missing"}), 401
# Clean up the token
api_key = api_key.replace('bearer ', '', 1).strip('"')
user = db.session.query(Auth_User).filter(Auth_User.api_key == api_key).first()
if not user:
return jsonify({"ok": False, "error": "Invalid token"}), 401
# --- THIS IS THE CRITICAL FIX ---
# Now that we have the user, find the corresponding employee record.
# This assumes your Employee model has a 'user_id' field linking to the Auth_User 'id'.
employee = db.session.query(Employee_Employee).filter(Employee_Employee.user_id == user.id).first()
# It's possible a user exists without an employee record, so we handle that case.
if not employee:
return jsonify({"ok": False, "error": "User found, but no associated employee record"}), 404
# Now, build the complete response with both user and employee data.
return jsonify({
"ok": True,
'user': {
'user_name': user.username,
'user_id': user.id,
'user_email': user.email,
'user_admin': user.admin_role,
'token': user.api_key,
'confirmed': user.confirmed
},
# ADD THE EMPLOYEE OBJECT TO THE RESPONSE
'employee': {
'id': employee.id,
'employee_first_name': employee.employee_first_name,
'employee_last_name': employee.employee_last_name,
# Add any other employee fields you might need on the frontend
}
}), 200
@auth.route("/amiconfirmed", methods=["GET"])
@login_required
def check_confirmed():
"""
Checks to see if user confirmed with email or key
"""
user = db.session\
.query(Auth_User)\
.filter(Auth_User.id == current_user.id)\
.first()
if user.confirmed == 0:
confirmed = False
else:
confirmed = True
return jsonify({"status": confirmed}), 200
@auth.route("/logout", methods=["POST"])
def logout():
"""
Logs a user out of session on backend
"""
try:
logout_user()
return jsonify({'status': 'logged out'}), 200
except Exception as e:
return jsonify({"error", 'error'}), 400
@auth.route("/login", methods=["POST"])
def login():
"""
Main post function to a user
"""
username = request.json["username"]
password = request.json["password"]
user = db.session\
.query(Auth_User)\
.filter_by(username=username)\
.first() is not None
if not user:
return jsonify({"error": True}), 200
user = db.session\
.query(Auth_User)\
.filter_by(username=username)\
.first()
if not bcrypt.check_password_hash(user.password_hash, password):
return jsonify({"error": True}), 200
db.session.add(user)
db.session.commit()
return jsonify({
"ok": True,
'user': {'user_id': user.uuid,
'user_id': user.id,
'user_email': user.email,
'admin_role': user.admin_role,
'token': user.api_key
},
'token': user.api_key
}), 200
@auth.route("/register", methods=["POST"])
def register_user():
"""
Main post function to register a user
"""
now = datetime.utcnow()
username = request.json["username"]
email = request.json["email"]
password = request.json["password"]
part_one_code = uuid4().hex
part_two_code = uuid4().hex
part_three_code = uuid4().hex
key = part_one_code + part_two_code + part_three_code
# check if email exists
user_exists_email = db.session\
.query(Auth_User)\
.filter_by(email=email)\
.first() is not None
if user_exists_email:
return jsonify({"error": "Email already exists"}), 200
# check if username exists
user_exists_username = db.session\
.query(Auth_User)\
.filter_by(username=username)\
.first() is not None
if user_exists_username:
return jsonify({"error": "User already exists"}), 200
# hash password for database
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
new_user = Auth_User(
username=username,
email=email,
password_hash=hashed_password,
member_since=now,
api_key=key,
last_seen=now,
admin=0,
admin_role=0,
confirmed=0,
)
db.session.add(new_user)
# commit to database
db.session.commit()
# log user in as active not sure if needed with frontend
#login_user(new_user)
# current_user.is_authenticated()
# current_user.is_active()
return jsonify({
"ok": True,
'user': {
'user_email': new_user.email,
'admin_role': new_user.admin_role,
'token': new_user.api_key,
'confirmed': new_user.confirmed,
},
'token': new_user.api_key
}), 200
@auth.route('/change-password', methods=['POST'])
@login_required
def change_password():
new_password = request.json["new_password"]
new_password_confirm = request.json["password_confirm"]
user = db.session\
.query(Auth_User) \
.filter(Auth_User.id == current_user.id) \
.first()
if str(new_password) != str(new_password_confirm):
return jsonify({"error": "Error: Incorrect Passwords"}), 200
hashed_password = bcrypt.generate_password_hash(
new_password).decode('utf8')
user.password_hash = hashed_password
user.passwordpinallowed = 0
db.session.add(user)
db.session.commit()
return jsonify({"ok": "success"}), 200