210 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			210 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| from flask import request, jsonify
 | |
| from flask_login import current_user, logout_user, login_user, login_required
 | |
| from app.auth import auth
 | |
| from app import db, bcrypt
 | |
| from datetime import datetime
 | |
| from uuid import uuid4
 | |
| from app.classes.auth import Auth_User
 | |
| 
 | |
| 
 | |
| @auth.route("/whoami", methods=["GET"])
 | |
| def check_session():
 | |
|     """
 | |
|     Checks auth token to ensure user is authenticated
 | |
|     """
 | |
| 
 | |
|     api_key = request.headers.get('Authorization')
 | |
|     if not api_key:
 | |
|         return jsonify({"error": "True"}), 200
 | |
|     else:
 | |
|         api_key = api_key.replace('bearer ', '', 1)
 | |
|         api_key = api_key.replace('"', '')
 | |
|         
 | |
|         user_exists = (db.session
 | |
|                           .query(Auth_User)
 | |
|                           .filter(Auth_User.api_key == api_key)
 | |
|                           .first())
 | |
|         if not user_exists:
 | |
|             return jsonify({"error": True}), 200
 | |
|         else:
 | |
|             user = db.session\
 | |
|                 .query(Auth_User)\
 | |
|                 .filter(Auth_User.api_key == api_key)\
 | |
|                 .first()
 | |
|               
 | |
|             return jsonify({
 | |
|                 "ok": True,
 | |
|                 'user': {
 | |
|                     'user_name': user.username,
 | |
|                     'user_id': user.id,
 | |
|                     'user_email': user.email,
 | |
|                     'user_admin': user.admin_role,
 | |
|                     'token': user.api_key,
 | |
|                     'confirmed': user.confirmed
 | |
|                          },
 | |
|                 'token': user.api_key
 | |
|             }), 200
 | |
| 
 | |
| 
 | |
| @auth.route("/amiconfirmed", methods=["GET"])
 | |
| @login_required
 | |
| def check_confirmed():
 | |
|     """
 | |
|     Checks to see if user confirmed with email or key
 | |
|     """
 | |
|     user = db.session\
 | |
|         .query(Auth_User)\
 | |
|         .filter(Auth_User.id == current_user.id)\
 | |
|         .first()
 | |
|     if user.confirmed == 0:
 | |
|         confirmed = False
 | |
|     else:
 | |
|         confirmed = True
 | |
| 
 | |
|     return jsonify({"status": confirmed}), 200
 | |
| 
 | |
| 
 | |
| @auth.route("/logout", methods=["POST"])
 | |
| def logout():
 | |
|     """
 | |
|     Logs a user out of session on backend
 | |
|     """
 | |
|     try:
 | |
|         logout_user()
 | |
|         return jsonify({'status': 'logged out'}), 200
 | |
|     except Exception as e:
 | |
|         return jsonify({"error", 'error'}), 400
 | |
| 
 | |
| 
 | |
| @auth.route("/login", methods=["POST"])
 | |
| def login():
 | |
|     """
 | |
|     Main post function to  a user
 | |
|     """
 | |
|    
 | |
|     username = request.json["username"]
 | |
|     password = request.json["password"]
 | |
| 
 | |
|     user = db.session\
 | |
|                 .query(Auth_User)\
 | |
|                 .filter_by(username=username)\
 | |
|                 .first() is not None
 | |
|     if not user:
 | |
|         return jsonify({"error": True}), 200
 | |
|     user = db.session\
 | |
|         .query(Auth_User)\
 | |
|         .filter_by(username=username)\
 | |
|         .first()
 | |
|     if not bcrypt.check_password_hash(user.password_hash, password):
 | |
|         return jsonify({"error": True}), 200
 | |
| 
 | |
|     db.session.add(user)
 | |
|     db.session.commit()
 | |
| 
 | |
| 
 | |
|     return jsonify({
 | |
|         "ok": True,
 | |
|         'user': {'user_id': user.uuid,
 | |
|                     'user_id': user.id,
 | |
|                     'user_email': user.email,
 | |
|                     'admin_role': user.admin_role,
 | |
|                     'token': user.api_key
 | |
|                     },
 | |
|         'token': user.api_key
 | |
|     }), 200
 | |
| 
 | |
| @auth.route("/register", methods=["POST"])
 | |
| def register_user():
 | |
|     """
 | |
|     Main post function to register a user
 | |
|     """
 | |
|     now = datetime.utcnow()
 | |
| 
 | |
|     username = request.json["username"]
 | |
|     email = request.json["email"]
 | |
|     password = request.json["password"]
 | |
|     
 | |
|     part_one_code = uuid4().hex
 | |
|     part_two_code = uuid4().hex
 | |
|     part_three_code = uuid4().hex
 | |
|     key = part_one_code + part_two_code + part_three_code
 | |
| 
 | |
|     # check if email exists
 | |
|     user_exists_email = db.session\
 | |
|         .query(Auth_User)\
 | |
|         .filter_by(email=email)\
 | |
|         .first() is not None
 | |
|     if user_exists_email:
 | |
|         return jsonify({"error": "Email already exists"}), 200
 | |
|     
 | |
|     # check if username exists
 | |
|     user_exists_username = db.session\
 | |
|         .query(Auth_User)\
 | |
|         .filter_by(username=username)\
 | |
|         .first() is not None
 | |
|     if user_exists_username:
 | |
|         return jsonify({"error": "User already exists"}), 200
 | |
|     
 | |
|     # hash password for database
 | |
|     hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
 | |
| 
 | |
|     new_user = Auth_User(
 | |
|         username=username,
 | |
|         email=email,
 | |
|         password_hash=hashed_password,
 | |
|         member_since=now,
 | |
|         api_key=key,
 | |
|         last_seen=now,
 | |
|         admin=0,
 | |
|         admin_role=0,
 | |
|         confirmed=0,
 | |
|     )
 | |
| 
 | |
|     db.session.add(new_user)
 | |
| 
 | |
|     # commit to database
 | |
|     db.session.commit()
 | |
|     
 | |
|     # log user in as active not sure if needed with frontend
 | |
|     #login_user(new_user)
 | |
|     # current_user.is_authenticated()
 | |
|     # current_user.is_active()
 | |
| 
 | |
|     return jsonify({
 | |
|         "ok": True,
 | |
|         'user': {
 | |
|                  'user_email': new_user.email,
 | |
|                  'admin_role': new_user.admin_role,
 | |
|                  'token': new_user.api_key,
 | |
|                  'confirmed': new_user.confirmed,
 | |
|                  },
 | |
|         'token':  new_user.api_key
 | |
|     }), 200
 | |
| 
 | |
| 
 | |
| @auth.route('/change-password', methods=['POST'])
 | |
| @login_required
 | |
| def change_password():
 | |
| 
 | |
|     new_password = request.json["new_password"]
 | |
|     new_password_confirm = request.json["password_confirm"]
 | |
| 
 | |
|     user = db.session\
 | |
|         .query(Auth_User) \
 | |
|         .filter(Auth_User.id == current_user.id) \
 | |
|         .first()
 | |
| 
 | |
|     if str(new_password) != str(new_password_confirm):
 | |
|         return jsonify({"error": "Error: Incorrect Passwords"}), 200
 | |
| 
 | |
|     hashed_password = bcrypt.generate_password_hash(
 | |
|         new_password).decode('utf8')
 | |
|     
 | |
|     user.password_hash = hashed_password
 | |
|     user.passwordpinallowed = 0
 | |
|     
 | |
|     db.session.add(user)
 | |
|     db.session.commit()
 | |
|     return jsonify({"ok": "success"}), 200
 | |
| 
 |