import bcrypt import jwt from datetime import datetime, timedelta, timezone from flask import Blueprint, request, jsonify, g, current_app from pydantic import ValidationError from app.extensions import db from app.models.user import ClientUser from app.schemas.auth import RegisterIn, LoginIn, UserOut, AuthTokenOut from app.utils.auth import require_auth auth_bp = Blueprint("auth", __name__) _DUMMY_HASH = bcrypt.hashpw(b"dummy-password-for-timing", bcrypt.gensalt()) def _generate_token(user_id: str, secret: str) -> str: payload = { "sub": user_id, "exp": datetime.now(timezone.utc) + timedelta(days=7), "iat": datetime.now(timezone.utc), } return jwt.encode(payload, secret, algorithm="HS256") @auth_bp.post("/register") def register(): try: data = RegisterIn.model_validate(request.get_json() or {}) except ValidationError as e: return jsonify({"error": e.errors(include_url=False)}), 422 existing = ClientUser.query.filter_by(email=data.email).first() if existing: return jsonify({"error": "E-mail já cadastrado"}), 409 pwd_hash = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode() user = ClientUser( name=data.name, email=data.email, password_hash=pwd_hash, phone=data.phone, whatsapp=data.whatsapp, cpf=data.cpf, birth_date=data.birth_date, address_street=data.address_street, address_number=data.address_number, address_complement=data.address_complement, address_neighborhood=data.address_neighborhood, address_city=data.address_city, address_state=data.address_state, address_zip=data.address_zip, ) db.session.add(user) db.session.commit() token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"]) user_out = UserOut.model_validate(user) return ( jsonify( AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json") ), 201, ) @auth_bp.post("/login") def login(): try: data = LoginIn.model_validate(request.get_json() or {}) except ValidationError as e: return jsonify({"error": e.errors(include_url=False)}), 422 user = ClientUser.query.filter_by(email=data.email).first() # Always run bcrypt to prevent timing attacks / email enumeration (SC-006) candidate_hash = user.password_hash.encode() if user else _DUMMY_HASH password_ok = bcrypt.checkpw(data.password.encode(), candidate_hash) if not user or not password_ok: return jsonify({"error": "E-mail ou senha inválidos"}), 401 token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"]) user_out = UserOut.model_validate(user) return ( jsonify( AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json") ), 200, ) @auth_bp.get("/me") @require_auth def me(): user = ClientUser.query.get(g.current_user_id) if not user: return jsonify({"error": "Não autorizado."}), 401 user_out = UserOut.model_validate(user) return jsonify(user_out.model_dump(mode="json")), 200