100 lines
3.1 KiB
Python
100 lines
3.1 KiB
Python
import bcrypt
|
|
import jwt
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask import Blueprint, request, jsonify, g, current_app
|
|
from pydantic import ValidationError
|
|
|
|
from app.extensions import db
|
|
from app.models.user import ClientUser
|
|
from app.schemas.auth import RegisterIn, LoginIn, UserOut, AuthTokenOut
|
|
from app.utils.auth import require_auth
|
|
|
|
auth_bp = Blueprint("auth", __name__)
|
|
|
|
_DUMMY_HASH = bcrypt.hashpw(b"dummy-password-for-timing", bcrypt.gensalt())
|
|
|
|
|
|
def _generate_token(user_id: str, secret: str) -> str:
|
|
payload = {
|
|
"sub": user_id,
|
|
"exp": datetime.now(timezone.utc) + timedelta(days=7),
|
|
"iat": datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, secret, algorithm="HS256")
|
|
|
|
|
|
@auth_bp.post("/register")
|
|
def register():
|
|
try:
|
|
data = RegisterIn.model_validate(request.get_json() or {})
|
|
except ValidationError as e:
|
|
return jsonify({"error": e.errors(include_url=False)}), 422
|
|
|
|
existing = ClientUser.query.filter_by(email=data.email).first()
|
|
if existing:
|
|
return jsonify({"error": "E-mail já cadastrado"}), 409
|
|
|
|
pwd_hash = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
|
|
user = ClientUser(
|
|
name=data.name,
|
|
email=data.email,
|
|
password_hash=pwd_hash,
|
|
phone=data.phone,
|
|
whatsapp=data.whatsapp,
|
|
cpf=data.cpf,
|
|
birth_date=data.birth_date,
|
|
address_street=data.address_street,
|
|
address_number=data.address_number,
|
|
address_complement=data.address_complement,
|
|
address_neighborhood=data.address_neighborhood,
|
|
address_city=data.address_city,
|
|
address_state=data.address_state,
|
|
address_zip=data.address_zip,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
|
|
user_out = UserOut.model_validate(user)
|
|
return (
|
|
jsonify(
|
|
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
|
|
),
|
|
201,
|
|
)
|
|
|
|
|
|
@auth_bp.post("/login")
|
|
def login():
|
|
try:
|
|
data = LoginIn.model_validate(request.get_json() or {})
|
|
except ValidationError as e:
|
|
return jsonify({"error": e.errors(include_url=False)}), 422
|
|
|
|
user = ClientUser.query.filter_by(email=data.email).first()
|
|
|
|
# Always run bcrypt to prevent timing attacks / email enumeration (SC-006)
|
|
candidate_hash = user.password_hash.encode() if user else _DUMMY_HASH
|
|
password_ok = bcrypt.checkpw(data.password.encode(), candidate_hash)
|
|
|
|
if not user or not password_ok:
|
|
return jsonify({"error": "E-mail ou senha inválidos"}), 401
|
|
|
|
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
|
|
user_out = UserOut.model_validate(user)
|
|
return (
|
|
jsonify(
|
|
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
|
|
),
|
|
200,
|
|
)
|
|
|
|
|
|
@auth_bp.get("/me")
|
|
@require_auth
|
|
def me():
|
|
user = ClientUser.query.get(g.current_user_id)
|
|
if not user:
|
|
return jsonify({"error": "Não autorizado."}), 401
|
|
user_out = UserOut.model_validate(user)
|
|
return jsonify(user_out.model_dump(mode="json")), 200
|