216 lines
5.8 KiB
Python
216 lines
5.8 KiB
Python
from flask import request, jsonify
|
|
from flask_login import current_user, logout_user, login_user, login_required
|
|
from app.auth import auth
|
|
from app import db, bcrypt
|
|
from datetime import datetime
|
|
from uuid import uuid4
|
|
from app.classes.auth import Auth_User
|
|
|
|
|
|
@auth.route("/whoami", methods=["GET"])
|
|
def check_session():
|
|
"""
|
|
Checks auth token to ensure user is authenticated
|
|
"""
|
|
|
|
api_key = request.headers.get('Authorization')
|
|
if not api_key:
|
|
return jsonify({"error": "True"}), 200
|
|
else:
|
|
api_key = api_key.replace('bearer ', '', 1)
|
|
api_key = api_key.replace('"', '')
|
|
user_exists = (db.session
|
|
.query(Auth_User)
|
|
.filter(Auth_User.api_key == api_key)
|
|
.first())
|
|
if not user_exists:
|
|
return jsonify({"error": True}), 200
|
|
else:
|
|
user = db.session\
|
|
.query(Auth_User)\
|
|
.filter(Auth_User.api_key == api_key)\
|
|
.first()
|
|
return jsonify({
|
|
"ok": True,
|
|
'user': {
|
|
'user_name': user.username,
|
|
'user_id': user.id,
|
|
'user_email': user.email,
|
|
'user_admin': user.admin_role,
|
|
'token': user.api_key,
|
|
'confirmed': user.confirmed
|
|
},
|
|
'token': user.api_key
|
|
}), 200
|
|
|
|
|
|
@auth.route("/amiconfirmed", methods=["GET"])
|
|
@login_required
|
|
def check_confirmed():
|
|
"""
|
|
Checks to see if user confirmed with email or key
|
|
"""
|
|
user = db.session\
|
|
.query(Auth_User)\
|
|
.filter(Auth_User.id == current_user.id)\
|
|
.first()
|
|
if user.confirmed == 0:
|
|
confirmed = False
|
|
else:
|
|
confirmed = True
|
|
|
|
return jsonify({"status": confirmed}), 200
|
|
|
|
|
|
@auth.route("/logout", methods=["POST"])
|
|
def logout():
|
|
"""
|
|
Logs a user out of session on backend
|
|
"""
|
|
try:
|
|
logout_user()
|
|
return jsonify({'status': 'logged out'}), 200
|
|
except Exception as e:
|
|
return jsonify({"error", 'error'}), 400
|
|
|
|
|
|
@auth.route("/login", methods=["POST"])
|
|
def login():
|
|
"""
|
|
Main post function to a user
|
|
"""
|
|
|
|
username = request.json["username"]
|
|
password = request.json["password"]
|
|
|
|
user = db.session\
|
|
.query(Auth_User)\
|
|
.filter_by(username=username)\
|
|
.first() is not None
|
|
if not user:
|
|
return jsonify({"error": True}), 200
|
|
user = db.session\
|
|
.query(Auth_User)\
|
|
.filter_by(username=username)\
|
|
.first()
|
|
if not bcrypt.check_password_hash(user.password_hash, password):
|
|
|
|
current_fails = int(user.fails)
|
|
new_fails = current_fails + 1
|
|
user.fails = new_fails
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
return jsonify({"error": True}), 200
|
|
user.locked = 0
|
|
user.fails = 0
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
'user': {'user_id': user.uuid,
|
|
'user_id': user.id,
|
|
'user_email': user.email,
|
|
'admin_role': user.admin_role,
|
|
'token': user.api_key
|
|
},
|
|
'token': user.api_key
|
|
}), 200
|
|
|
|
@auth.route("/register", methods=["POST"])
|
|
def register_user():
|
|
"""
|
|
Main post function to register a user
|
|
"""
|
|
now = datetime.utcnow()
|
|
|
|
username = request.json["username"]
|
|
email = request.json["email"]
|
|
password = request.json["password"]
|
|
|
|
part_one_code = uuid4().hex
|
|
part_two_code = uuid4().hex
|
|
part_three_code = uuid4().hex
|
|
key = part_one_code + part_two_code + part_three_code
|
|
|
|
# check if email exists
|
|
user_exists_email = db.session\
|
|
.query(Auth_User)\
|
|
.filter_by(email=email)\
|
|
.first() is not None
|
|
if user_exists_email:
|
|
return jsonify({"error": "Email already exists"}), 200
|
|
|
|
# check if username exists
|
|
user_exists_username = db.session\
|
|
.query(Auth_User)\
|
|
.filter_by(username=username)\
|
|
.first() is not None
|
|
if user_exists_username:
|
|
return jsonify({"error": "User already exists"}), 200
|
|
|
|
# hash password for database
|
|
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
|
|
|
|
new_user = Auth_User(
|
|
username=username,
|
|
email=email,
|
|
password_hash=hashed_password,
|
|
member_since=now,
|
|
api_key=key,
|
|
last_seen=now,
|
|
admin=0,
|
|
admin_role=0,
|
|
confirmed=0,
|
|
)
|
|
|
|
db.session.add(new_user)
|
|
|
|
# commit to database
|
|
db.session.commit()
|
|
|
|
# log user in as active not sure if needed with frontend
|
|
#login_user(new_user)
|
|
# current_user.is_authenticated()
|
|
# current_user.is_active()
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
'user': {
|
|
'user_email': new_user.email,
|
|
'admin_role': new_user.admin_role,
|
|
'token': new_user.api_key,
|
|
'confirmed': new_user.confirmed,
|
|
},
|
|
'token': new_user.api_key
|
|
}), 200
|
|
|
|
|
|
@auth.route('/change-password', methods=['POST'])
|
|
@login_required
|
|
def change_password():
|
|
|
|
new_password = request.json["new_password"]
|
|
new_password_confirm = request.json["password_confirm"]
|
|
|
|
user = db.session\
|
|
.query(Auth_User) \
|
|
.filter(Auth_User.id == current_user.id) \
|
|
.first()
|
|
|
|
if str(new_password) != str(new_password_confirm):
|
|
return jsonify({"error": "Error: Incorrect Passwords"}), 200
|
|
|
|
hashed_password = bcrypt.generate_password_hash(
|
|
new_password).decode('utf8')
|
|
|
|
user.password_hash = hashed_password
|
|
user.passwordpinallowed = 0
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return jsonify({"ok": "success"}), 200
|
|
|