Updates the user registration and new account creation endpoints to require email confirmation. - Sets the 'confirmed' flag to 'false' by default for all new user accounts. - Generates a unique confirmation token for each new user. - Logs the confirmation link to the console for development purposes. This change ensures that users cannot log in without first verifying their email address, enhancing account security.
86 lines
3.1 KiB
Python
86 lines
3.1 KiB
Python
import secrets
|
|
from fastapi.responses import JSONResponse
|
|
|
|
# Existing imports...
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from database import get_db
|
|
from models import Account_User, Customer_Customer
|
|
from schemas import UserCreate
|
|
from passlib.context import CryptContext
|
|
from datetime import datetime, timezone
|
|
|
|
# Existing pwd_context and helper functions...
|
|
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
|
|
|
|
def get_password_hash(password):
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def escape_like_pattern(value: str) -> str:
|
|
"""Escape special characters for SQL LIKE patterns.
|
|
|
|
Escapes %, _, and \ which have special meaning in LIKE clauses
|
|
to prevent SQL injection via wildcards.
|
|
"""
|
|
# Escape backslash first, then the wildcards
|
|
return value.replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_')
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/register")
|
|
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
|
|
# Verify passwords match
|
|
if user.password != user.confirm_password:
|
|
raise HTTPException(status_code=400, detail="Passwords do not match")
|
|
|
|
# Check if customer exists in Customer_Customer table
|
|
escaped_house_number = escape_like_pattern(user.house_number)
|
|
customer_result = await db.execute(
|
|
select(Customer_Customer).where(
|
|
(Customer_Customer.account_number == user.account_number) &
|
|
(Customer_Customer.customer_address.like(f'{escaped_house_number} %', escape='\\'))
|
|
)
|
|
)
|
|
customer = customer_result.scalar_one_or_none()
|
|
if not customer:
|
|
raise HTTPException(status_code=400, detail="Customer not found with provided account and house number")
|
|
|
|
# Check if email already registered
|
|
result = await db.execute(select(Account_User).where(Account_User.email == user.email))
|
|
if result.scalar_one_or_none():
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
username = f"{user.account_number}-{user.house_number}"
|
|
hashed_password = get_password_hash(user.password)
|
|
token = secrets.token_urlsafe(32)
|
|
|
|
db_user = Account_User(
|
|
username=username,
|
|
account_number=user.account_number,
|
|
house_number=user.house_number,
|
|
password_hash=hashed_password,
|
|
member_since=datetime.now(timezone.utc),
|
|
email=user.email,
|
|
last_seen=datetime.now(timezone.utc),
|
|
admin=0,
|
|
admin_role=0,
|
|
confirmed=0,
|
|
active=1,
|
|
user_id=customer.id,
|
|
confirmation_token=token,
|
|
confirmation_sent_at=datetime.now(timezone.utc)
|
|
)
|
|
db.add(db_user)
|
|
await db.commit()
|
|
await db.refresh(db_user)
|
|
|
|
# In a real application, you would send an email here
|
|
# For now, we'll just print the confirmation URL to the console
|
|
confirmation_url = f"http://localhost:3000/confirm-email?token={token}"
|
|
print(f"Confirmation URL: {confirmation_url}")
|
|
|
|
return JSONResponse(status_code=201, content={"message": "Registration successful. Please check your email to confirm your account."})
|