126 lines
4.1 KiB
Python
126 lines
4.1 KiB
Python
import secrets
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from datetime import datetime, timedelta
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from database import get_db
|
|
from models import Account_User
|
|
from schemas import ForgotPasswordRequest, ResetPasswordRequest
|
|
from passlib.context import CryptContext
|
|
from config import load_config
|
|
|
|
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
|
|
|
|
# Load configuration from environment
|
|
ApplicationConfig = load_config()
|
|
|
|
# Email configuration from environment variables
|
|
SMTP_SERVER = ApplicationConfig.SMTP_SERVER
|
|
SMTP_PORT = ApplicationConfig.SMTP_PORT
|
|
SMTP_USERNAME = ApplicationConfig.SMTP_USERNAME
|
|
SMTP_PASSWORD = ApplicationConfig.SMTP_PASSWORD
|
|
FROM_EMAIL = ApplicationConfig.SMTP_FROM_EMAIL
|
|
|
|
router = APIRouter()
|
|
|
|
def get_password_hash(password):
|
|
return pwd_context.hash(password)
|
|
|
|
def generate_reset_token():
|
|
return secrets.token_urlsafe(32)
|
|
|
|
def send_reset_email(to_email: str, reset_token: str):
|
|
"""Send password reset email with link"""
|
|
reset_url = f"{ApplicationConfig.FRONTEND_URL}/reset-password?token={reset_token}"
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = FROM_EMAIL
|
|
msg['To'] = to_email
|
|
msg['Subject'] = "Password Reset - Oil Customer Gateway"
|
|
|
|
body = f"""
|
|
You have requested a password reset for your Oil Customer Gateway account.
|
|
|
|
Click the following link to reset your password:
|
|
{reset_url}
|
|
|
|
This link will expire in 24 hours.
|
|
|
|
If you didn't request this reset, please ignore this email.
|
|
|
|
Best regards,
|
|
Oil Customer Gateway Team
|
|
"""
|
|
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
try:
|
|
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
|
|
server.starttls()
|
|
server.login(SMTP_USERNAME, SMTP_PASSWORD)
|
|
text = msg.as_string()
|
|
server.sendmail(FROM_EMAIL, to_email, text)
|
|
server.quit()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Email sending failed: {e}")
|
|
return False
|
|
|
|
@router.post("/forgot-password")
|
|
async def forgot_password(request: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)):
|
|
"""Request password reset - sends email with reset link"""
|
|
# Find user by email
|
|
result = await db.execute(select(Account_User).where(Account_User.email == request.email))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
# Don't reveal if email exists or not for security
|
|
return {"message": "If an account with that email exists, a password reset link has been sent."}
|
|
|
|
# Generate reset token and expiry (24 hours)
|
|
reset_token = generate_reset_token()
|
|
reset_expires = datetime.utcnow() + timedelta(hours=24)
|
|
|
|
# Update user with reset token
|
|
user.password_reset_token = reset_token
|
|
user.password_reset_expires = reset_expires
|
|
await db.commit()
|
|
|
|
# Send email
|
|
if send_reset_email(user.email, reset_token):
|
|
return {"message": "Password reset link sent to your email."}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to send reset email")
|
|
|
|
@router.post("/reset-password")
|
|
async def reset_password(request: ResetPasswordRequest, db: AsyncSession = Depends(get_db)):
|
|
"""Reset password using token"""
|
|
# Verify passwords match
|
|
if request.password != request.confirm_password:
|
|
raise HTTPException(status_code=400, detail="Passwords do not match")
|
|
|
|
# Find user by reset token
|
|
result = await db.execute(
|
|
select(Account_User).where(
|
|
Account_User.password_reset_token == request.token,
|
|
Account_User.password_reset_expires > datetime.utcnow()
|
|
)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(status_code=400, detail="Invalid or expired reset token")
|
|
|
|
# Update password
|
|
hashed_password = get_password_hash(request.password)
|
|
user.password_hash = hashed_password
|
|
user.password_reset_token = None
|
|
user.password_reset_expires = None
|
|
user.last_seen = datetime.utcnow()
|
|
|
|
await db.commit()
|
|
|
|
return {"message": "Password reset successfully"} |