sass-imobiliaria/backend/app/routes/auth.py

100 lines
3.1 KiB
Python

import bcrypt
import jwt
from datetime import datetime, timedelta, timezone
from flask import Blueprint, request, jsonify, g, current_app
from pydantic import ValidationError
from app.extensions import db
from app.models.user import ClientUser
from app.schemas.auth import RegisterIn, LoginIn, UserOut, AuthTokenOut
from app.utils.auth import require_auth
auth_bp = Blueprint("auth", __name__)
_DUMMY_HASH = bcrypt.hashpw(b"dummy-password-for-timing", bcrypt.gensalt())
def _generate_token(user_id: str, secret: str) -> str:
payload = {
"sub": user_id,
"exp": datetime.now(timezone.utc) + timedelta(days=7),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, secret, algorithm="HS256")
@auth_bp.post("/register")
def register():
try:
data = RegisterIn.model_validate(request.get_json() or {})
except ValidationError as e:
return jsonify({"error": e.errors(include_url=False)}), 422
existing = ClientUser.query.filter_by(email=data.email).first()
if existing:
return jsonify({"error": "E-mail já cadastrado"}), 409
pwd_hash = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
user = ClientUser(
name=data.name,
email=data.email,
password_hash=pwd_hash,
phone=data.phone,
whatsapp=data.whatsapp,
cpf=data.cpf,
birth_date=data.birth_date,
address_street=data.address_street,
address_number=data.address_number,
address_complement=data.address_complement,
address_neighborhood=data.address_neighborhood,
address_city=data.address_city,
address_state=data.address_state,
address_zip=data.address_zip,
)
db.session.add(user)
db.session.commit()
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
user_out = UserOut.model_validate(user)
return (
jsonify(
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
),
201,
)
@auth_bp.post("/login")
def login():
try:
data = LoginIn.model_validate(request.get_json() or {})
except ValidationError as e:
return jsonify({"error": e.errors(include_url=False)}), 422
user = ClientUser.query.filter_by(email=data.email).first()
# Always run bcrypt to prevent timing attacks / email enumeration (SC-006)
candidate_hash = user.password_hash.encode() if user else _DUMMY_HASH
password_ok = bcrypt.checkpw(data.password.encode(), candidate_hash)
if not user or not password_ok:
return jsonify({"error": "E-mail ou senha inválidos"}), 401
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
user_out = UserOut.model_validate(user)
return (
jsonify(
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
),
200,
)
@auth_bp.get("/me")
@require_auth
def me():
user = ClientUser.query.get(g.current_user_id)
if not user:
return jsonify({"error": "Não autorizado."}), 401
user_out = UserOut.model_validate(user)
return jsonify(user_out.model_dump(mode="json")), 200