from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from database import get_db from models import Account_User, Customer_Customer from schemas import UserCreate, UserResponse from passlib.context import CryptContext from datetime import datetime pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") def get_password_hash(password): return pwd_context.hash(password) def escape_like_pattern(value: str) -> str: """Escape special characters for SQL LIKE patterns. Escapes %, _, and \ which have special meaning in LIKE clauses to prevent SQL injection via wildcards. """ # Escape backslash first, then the wildcards return value.replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_') router = APIRouter() @router.post("/register", response_model=UserResponse) async def register(user: UserCreate, db: AsyncSession = Depends(get_db)): # Verify passwords match if user.password != user.confirm_password: raise HTTPException(status_code=400, detail="Passwords do not match") # Check if customer exists in Customer_Customer table # Escape SQL LIKE wildcards to prevent injection attacks escaped_house_number = escape_like_pattern(user.house_number) customer_result = await db.execute( select(Customer_Customer).where( (Customer_Customer.account_number == user.account_number) & (Customer_Customer.customer_address.like(f'{escaped_house_number} %', escape='\\')) ) ) customer = customer_result.scalar_one_or_none() if not customer: raise HTTPException(status_code=400, detail="Customer not found with provided account and house number") # Check if email already registered result = await db.execute(select(Account_User).where(Account_User.email == user.email)) if result.scalar_one_or_none(): raise HTTPException(status_code=400, detail="Email already registered") username = f"{user.account_number}-{user.house_number}" hashed_password = get_password_hash(user.password) db_user = Account_User( username=username, account_number=user.account_number, house_number=user.house_number, password_hash=hashed_password, member_since=datetime.utcnow(), email=user.email, last_seen=datetime.utcnow(), admin=0, admin_role=0, confirmed=1, active=1, user_id=customer.id ) db.add(db_user) await db.commit() await db.refresh(db_user) return db_user