feat: add full project - backend, frontend, docker, specs and configs

This commit is contained in:
MatheusAlves96 2026-04-20 23:59:45 -03:00
parent b77c7d5a01
commit e6cb06255b
24489 changed files with 61341 additions and 36 deletions

View file

1007
backend/app/routes/admin.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,92 @@
from flask import Blueprint, jsonify, request
from pydantic import ValidationError
from app.extensions import db
from app.models.agent import Agent
from app.schemas.agent import AgentIn, AgentOut
from app.utils.auth import require_admin
agents_public_bp = Blueprint("agents_public", __name__, url_prefix="/api/v1")
agents_admin_bp = Blueprint("agents_admin", __name__, url_prefix="/api/v1/admin")
# ── Public ────────────────────────────────────────────────────────────────────
@agents_public_bp.get("/agents")
def list_agents():
agents = (
Agent.query.filter_by(is_active=True)
.order_by(Agent.display_order.asc(), Agent.id.asc())
.all()
)
return jsonify([AgentOut.model_validate(a).model_dump() for a in agents])
# ── Admin ─────────────────────────────────────────────────────────────────────
@agents_admin_bp.get("/agents")
@require_admin
def admin_list_agents():
agents = Agent.query.order_by(Agent.display_order.asc(), Agent.id.asc()).all()
return jsonify([AgentOut.model_validate(a).model_dump() for a in agents])
@agents_admin_bp.post("/agents")
@require_admin
def admin_create_agent():
data = request.get_json(silent=True) or {}
try:
agent_in = AgentIn.model_validate(data)
except ValidationError as exc:
return jsonify({"error": "Dados inválidos", "details": exc.errors()}), 422
agent = Agent(
name=agent_in.name,
photo_url=agent_in.photo_url,
creci=agent_in.creci,
email=agent_in.email,
phone=agent_in.phone,
bio=agent_in.bio,
is_active=agent_in.is_active,
display_order=agent_in.display_order,
)
db.session.add(agent)
db.session.commit()
return jsonify(AgentOut.model_validate(agent).model_dump()), 201
@agents_admin_bp.put("/agents/<int:agent_id>")
@require_admin
def admin_update_agent(agent_id: int):
agent = Agent.query.get(agent_id)
if agent is None:
return jsonify({"error": "Corretor não encontrado"}), 404
data = request.get_json(silent=True) or {}
try:
agent_in = AgentIn.model_validate(data)
except ValidationError as exc:
return jsonify({"error": "Dados inválidos", "details": exc.errors()}), 422
agent.name = agent_in.name
agent.photo_url = agent_in.photo_url
agent.creci = agent_in.creci
agent.email = agent_in.email
agent.phone = agent_in.phone
agent.bio = agent_in.bio
agent.is_active = agent_in.is_active
agent.display_order = agent_in.display_order
db.session.commit()
return jsonify(AgentOut.model_validate(agent).model_dump())
@agents_admin_bp.delete("/agents/<int:agent_id>")
@require_admin
def admin_delete_agent(agent_id: int):
agent = Agent.query.get(agent_id)
if agent is None:
return jsonify({"error": "Corretor não encontrado"}), 404
agent.is_active = False
db.session.commit()
return jsonify({"message": "Corretor desativado com sucesso"})

View file

@ -0,0 +1,173 @@
import hashlib
import os
import re
from datetime import datetime, timedelta, timezone
from flask import Blueprint, request, jsonify
from sqlalchemy import func, text
from app.extensions import db
from app.models.page_view import PageView
from app.models.property import Property
from app.utils.auth import require_admin
analytics_bp = Blueprint("analytics", __name__)
# ─── Helpers ─────────────────────────────────────────────────────────────────
_PROPERTY_PATH_RE = re.compile(
r"^/api/v1/properties/([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})",
re.IGNORECASE,
)
_SKIP_PREFIXES = ("/api/v1/admin", "/api/v1/auth", "/static")
def _should_track(path: str, method: str) -> bool:
if method != "GET":
return False
for prefix in _SKIP_PREFIXES:
if path.startswith(prefix):
return False
return True
def _ip_hash(ip: str) -> str:
salt = os.environ.get("IP_SALT", "default-salt")
return hashlib.sha256(f"{ip}{salt}".encode()).hexdigest()
def record_page_view(path: str, ip: str, user_agent: str) -> None:
"""Insert a page_view row. Silently swallows errors to never break requests."""
try:
match = _PROPERTY_PATH_RE.match(path)
property_id = match.group(1) if match else None
pv = PageView(
path=path,
property_id=property_id,
ip_hash=_ip_hash(ip),
user_agent=(user_agent or "")[:512],
)
db.session.add(pv)
db.session.commit()
except Exception:
db.session.rollback()
# ─── Admin endpoints ──────────────────────────────────────────────────────────
def _period_start(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
@analytics_bp.get("/summary")
@require_admin
def analytics_summary():
"""Cards: total today / this week / this month + daily series."""
days = int(request.args.get("days", 30))
if days not in (7, 30, 90):
days = 30
now = datetime.now(timezone.utc)
start_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_week = start_day - timedelta(days=now.weekday())
start_month = start_day.replace(day=1)
start_period = _period_start(days)
def count_since(dt: datetime) -> int:
return (
db.session.query(func.count(PageView.id))
.filter(PageView.accessed_at >= dt)
.scalar()
or 0
)
# daily series for sparkline/chart
daily = (
db.session.query(
func.date_trunc("day", PageView.accessed_at).label("day"),
func.count(PageView.id).label("views"),
)
.filter(PageView.accessed_at >= start_period)
.group_by(text("day"))
.order_by(text("day"))
.all()
)
series = [
{"date": row.day.strftime("%Y-%m-%d"), "views": row.views} for row in daily
]
return jsonify(
{
"today": count_since(start_day),
"this_week": count_since(start_week),
"this_month": count_since(start_month),
"period_days": days,
"period_total": count_since(start_period),
"series": series,
}
)
@analytics_bp.get("/top-pages")
@require_admin
def analytics_top_pages():
"""Top 10 most-viewed paths in the given period."""
days = int(request.args.get("days", 30))
if days not in (7, 30, 90):
days = 30
rows = (
db.session.query(PageView.path, func.count(PageView.id).label("views"))
.filter(PageView.accessed_at >= _period_start(days))
.group_by(PageView.path)
.order_by(func.count(PageView.id).desc())
.limit(10)
.all()
)
return jsonify([{"path": r.path, "views": r.views} for r in rows])
@analytics_bp.get("/top-properties")
@require_admin
def analytics_top_properties():
"""Top 10 most-viewed properties in the given period."""
days = int(request.args.get("days", 30))
if days not in (7, 30, 90):
days = 30
rows = (
db.session.query(PageView.property_id, func.count(PageView.id).label("views"))
.filter(
PageView.accessed_at >= _period_start(days),
PageView.property_id.isnot(None),
)
.group_by(PageView.property_id)
.order_by(func.count(PageView.id).desc())
.limit(10)
.all()
)
result = []
for row in rows:
prop = db.session.get(Property, row.property_id)
if prop:
photos = prop.photos or []
result.append(
{
"property_id": row.property_id,
"views": row.views,
"title": prop.title,
"cover": photos[0] if photos else None,
"city": prop.city.name if prop.city else None,
"neighborhood": (
prop.neighborhood.name if prop.neighborhood else None
),
}
)
return jsonify(result)

100
backend/app/routes/auth.py Normal file
View file

@ -0,0 +1,100 @@
import bcrypt
import jwt
from datetime import datetime, timedelta, timezone
from flask import Blueprint, request, jsonify, g, current_app
from pydantic import ValidationError
from app.extensions import db
from app.models.user import ClientUser
from app.schemas.auth import RegisterIn, LoginIn, UserOut, AuthTokenOut
from app.utils.auth import require_auth
auth_bp = Blueprint("auth", __name__)
_DUMMY_HASH = bcrypt.hashpw(b"dummy-password-for-timing", bcrypt.gensalt())
def _generate_token(user_id: str, secret: str) -> str:
payload = {
"sub": user_id,
"exp": datetime.now(timezone.utc) + timedelta(days=7),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, secret, algorithm="HS256")
@auth_bp.post("/register")
def register():
try:
data = RegisterIn.model_validate(request.get_json() or {})
except ValidationError as e:
return jsonify({"error": e.errors(include_url=False)}), 422
existing = ClientUser.query.filter_by(email=data.email).first()
if existing:
return jsonify({"error": "E-mail já cadastrado"}), 409
pwd_hash = bcrypt.hashpw(data.password.encode(), bcrypt.gensalt()).decode()
user = ClientUser(
name=data.name,
email=data.email,
password_hash=pwd_hash,
phone=data.phone,
whatsapp=data.whatsapp,
cpf=data.cpf,
birth_date=data.birth_date,
address_street=data.address_street,
address_number=data.address_number,
address_complement=data.address_complement,
address_neighborhood=data.address_neighborhood,
address_city=data.address_city,
address_state=data.address_state,
address_zip=data.address_zip,
)
db.session.add(user)
db.session.commit()
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
user_out = UserOut.model_validate(user)
return (
jsonify(
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
),
201,
)
@auth_bp.post("/login")
def login():
try:
data = LoginIn.model_validate(request.get_json() or {})
except ValidationError as e:
return jsonify({"error": e.errors(include_url=False)}), 422
user = ClientUser.query.filter_by(email=data.email).first()
# Always run bcrypt to prevent timing attacks / email enumeration (SC-006)
candidate_hash = user.password_hash.encode() if user else _DUMMY_HASH
password_ok = bcrypt.checkpw(data.password.encode(), candidate_hash)
if not user or not password_ok:
return jsonify({"error": "E-mail ou senha inválidos"}), 401
token = _generate_token(user.id, current_app.config["JWT_SECRET_KEY"])
user_out = UserOut.model_validate(user)
return (
jsonify(
AuthTokenOut(access_token=token, user=user_out).model_dump(mode="json")
),
200,
)
@auth_bp.get("/me")
@require_auth
def me():
user = ClientUser.query.get(g.current_user_id)
if not user:
return jsonify({"error": "Não autorizado."}), 401
user_out = UserOut.model_validate(user)
return jsonify(user_out.model_dump(mode="json")), 200

View file

@ -0,0 +1,63 @@
from flask import Blueprint, jsonify
from sqlalchemy import func
from app.extensions import db
from app.models.catalog import Amenity, PropertyType
from app.models.imobiliaria import Imobiliaria
from app.models.property import Property
from app.schemas.catalog import AmenityOut, ImobiliariaOut, PropertyTypeOut
catalog_bp = Blueprint("catalog", __name__, url_prefix="/api/v1")
@catalog_bp.get("/property-types")
def list_property_types():
"""Return top-level categories with their subtypes nested."""
# Build count_map for subtypes (leaf nodes with parent_id IS NOT NULL)
rows = (
db.session.query(PropertyType.id, func.count(Property.id).label("cnt"))
.filter(PropertyType.parent_id.isnot(None))
.outerjoin(
Property,
(Property.subtype_id == PropertyType.id) & (Property.is_active.is_(True)),
)
.group_by(PropertyType.id)
.all()
)
count_map: dict[int, int] = {row.id: row.cnt for row in rows}
categories = (
PropertyType.query.filter_by(parent_id=None).order_by(PropertyType.id).all()
)
def serialize_category(cat) -> dict:
data = PropertyTypeOut.model_validate(cat).model_dump(mode="json")
data["subtypes"] = [
{**sub, "property_count": count_map.get(sub["id"], 0)}
for sub in data["subtypes"]
]
return data
return jsonify([serialize_category(c) for c in categories])
@catalog_bp.get("/amenities")
def list_amenities():
"""Return all amenities grouped."""
amenities = Amenity.query.order_by(Amenity.group, Amenity.name).all()
return jsonify(
[AmenityOut.model_validate(a).model_dump(mode="json") for a in amenities]
)
@catalog_bp.get("/imobiliarias")
def list_imobiliarias():
"""Return active imobiliárias ordered by display_order."""
items = (
Imobiliaria.query.filter_by(is_active=True)
.order_by(Imobiliaria.display_order, Imobiliaria.name)
.all()
)
return jsonify(
[ImobiliariaOut.model_validate(i).model_dump(mode="json") for i in items]
)

View file

@ -0,0 +1,109 @@
import uuid as _uuid
from flask import Blueprint, request, jsonify, g
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from app import db
from app.models.saved_property import SavedProperty
from app.models.visit_request import VisitRequest
from app.models.boleto import Boleto
from app.models.property import Property
from app.schemas.client_area import (
SavedPropertyOut,
FavoriteIn,
VisitRequestOut,
BoletoOut,
)
from app.utils.auth import require_auth
client_bp = Blueprint("client", __name__)
@client_bp.get("/favorites")
@require_auth
def get_favorites():
saved = (
SavedProperty.query.filter_by(user_id=g.current_user_id)
.order_by(SavedProperty.created_at.desc())
.all()
)
return (
jsonify(
[SavedPropertyOut.model_validate(s).model_dump(mode="json") for s in saved]
),
200,
)
@client_bp.post("/favorites")
@require_auth
def add_favorite():
try:
data = FavoriteIn.model_validate(request.get_json() or {})
except ValidationError as e:
return jsonify({"error": e.errors(include_url=False)}), 422
try:
prop_uuid = _uuid.UUID(data.property_id)
except ValueError:
return jsonify({"error": "property_id inválido"}), 422
prop = db.session.get(Property, prop_uuid)
if not prop:
return jsonify({"error": "Imóvel não encontrado"}), 404
saved = SavedProperty(user_id=g.current_user_id, property_id=prop_uuid)
db.session.add(saved)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return jsonify({"error": "Imóvel já está nos favoritos"}), 409
return jsonify(SavedPropertyOut.model_validate(saved).model_dump(mode="json")), 201
@client_bp.delete("/favorites/<property_id>")
@require_auth
def remove_favorite(property_id: str):
try:
prop_uuid = _uuid.UUID(property_id)
except ValueError:
return jsonify({"error": "property_id inválido"}), 422
saved = SavedProperty.query.filter_by(
user_id=g.current_user_id, property_id=prop_uuid
).first()
if not saved:
return jsonify({"error": "Favorito não encontrado"}), 404
db.session.delete(saved)
db.session.commit()
return "", 204
@client_bp.get("/visits")
@require_auth
def get_visits():
visits = (
VisitRequest.query.filter_by(user_id=g.current_user_id)
.order_by(VisitRequest.created_at.desc())
.all()
)
return (
jsonify(
[VisitRequestOut.model_validate(v).model_dump(mode="json") for v in visits]
),
200,
)
@client_bp.get("/boletos")
@require_auth
def get_boletos():
boletos = (
Boleto.query.filter_by(user_id=g.current_user_id)
.order_by(Boleto.due_date.asc())
.all()
)
return (
jsonify([BoletoOut.model_validate(b).model_dump(mode="json") for b in boletos]),
200,
)

View file

@ -0,0 +1,11 @@
import os
from flask import Blueprint, jsonify
config_bp = Blueprint("config", __name__)
@config_bp.get("/api/v1/config/whatsapp")
def get_whatsapp_config():
"""Returns the configured WhatsApp number (no auth required)."""
number = os.environ.get("WHATSAPP_NUMBER", "")
return jsonify({"whatsapp_number": number})

View file

@ -0,0 +1,14 @@
from flask import Blueprint, jsonify
from app.models.homepage import HomepageConfig
from app.schemas.homepage import HomepageConfigOut
homepage_bp = Blueprint("homepage", __name__, url_prefix="/api/v1")
@homepage_bp.get("/homepage-config")
def get_homepage_config():
config = HomepageConfig.query.first()
if config is None:
return jsonify({"error": "Homepage config not found"}), 404
return jsonify(HomepageConfigOut.model_validate(config).model_dump())

View file

@ -0,0 +1,155 @@
import unicodedata
import re
from flask import Blueprint, jsonify, request
from sqlalchemy import func
from app.extensions import db
from app.models.location import City, Neighborhood
from app.models.property import Property
from app.schemas.catalog import CityOut, NeighborhoodOut
locations_bp = Blueprint("locations", __name__, url_prefix="/api/v1")
def _slugify(text: str) -> str:
text = unicodedata.normalize("NFD", text)
text = "".join(c for c in text if unicodedata.category(c) != "Mn")
text = text.lower()
text = re.sub(r"[^a-z0-9\s-]", "", text)
text = re.sub(r"[\s/]+", "-", text.strip())
return text
# ── Public endpoints ──────────────────────────────────────────────────────────
@locations_bp.get("/cities")
def list_cities():
"""List all cities ordered by state + name, with property_count."""
rows = (
db.session.query(City, func.count(Property.id).label("cnt"))
.outerjoin(
Property,
(Property.city_id == City.id) & (Property.is_active.is_(True)),
)
.group_by(City.id)
.order_by(City.state, City.name)
.all()
)
return jsonify(
[
{**CityOut.model_validate(city).model_dump(), "property_count": cnt}
for city, cnt in rows
]
)
@locations_bp.get("/neighborhoods")
def list_neighborhoods():
"""List neighborhoods, optionally filtered by city_id, with property_count."""
city_id = request.args.get("city_id")
q = (
db.session.query(Neighborhood, func.count(Property.id).label("cnt"))
.outerjoin(
Property,
(Property.neighborhood_id == Neighborhood.id)
& (Property.is_active.is_(True)),
)
.group_by(Neighborhood.id)
)
if city_id:
try:
q = q.filter(Neighborhood.city_id == int(city_id))
except ValueError:
pass
rows = q.order_by(Neighborhood.name).all()
return jsonify(
[
{
**NeighborhoodOut.model_validate(nbh).model_dump(),
"property_count": cnt,
}
for nbh, cnt in rows
]
)
# ── Admin endpoints ───────────────────────────────────────────────────────────
@locations_bp.post("/admin/cities")
def create_city():
data = request.get_json(force=True) or {}
name = (data.get("name") or "").strip()
state = (data.get("state") or "").strip().upper()[:2]
if not name or not state:
return jsonify({"error": "name e state são obrigatórios"}), 400
slug = data.get("slug") or _slugify(name)
if City.query.filter_by(slug=slug).first():
return jsonify({"error": "Já existe uma cidade com esse slug"}), 409
city = City(name=name, slug=slug, state=state)
db.session.add(city)
db.session.commit()
return jsonify(CityOut.model_validate(city).model_dump()), 201
@locations_bp.put("/admin/cities/<int:city_id>")
def update_city(city_id: int):
city = City.query.get_or_404(city_id)
data = request.get_json(force=True) or {}
if "name" in data:
city.name = data["name"].strip()
if "state" in data:
city.state = data["state"].strip().upper()[:2]
if "slug" in data:
city.slug = data["slug"].strip()
db.session.commit()
return jsonify(CityOut.model_validate(city).model_dump())
@locations_bp.delete("/admin/cities/<int:city_id>")
def delete_city(city_id: int):
city = City.query.get_or_404(city_id)
db.session.delete(city)
db.session.commit()
return "", 204
@locations_bp.post("/admin/neighborhoods")
def create_neighborhood():
data = request.get_json(force=True) or {}
name = (data.get("name") or "").strip()
city_id = data.get("city_id")
if not name or not city_id:
return jsonify({"error": "name e city_id são obrigatórios"}), 400
city = City.query.get_or_404(int(city_id))
slug = data.get("slug") or _slugify(name)
if Neighborhood.query.filter_by(slug=slug, city_id=city.id).first():
return jsonify({"error": "Já existe um bairro com esse slug nessa cidade"}), 409
neighborhood = Neighborhood(name=name, slug=slug, city_id=city.id)
db.session.add(neighborhood)
db.session.commit()
return jsonify(NeighborhoodOut.model_validate(neighborhood).model_dump()), 201
@locations_bp.put("/admin/neighborhoods/<int:neighborhood_id>")
def update_neighborhood(neighborhood_id: int):
n = Neighborhood.query.get_or_404(neighborhood_id)
data = request.get_json(force=True) or {}
if "name" in data:
n.name = data["name"].strip()
if "slug" in data:
n.slug = data["slug"].strip()
if "city_id" in data:
n.city_id = int(data["city_id"])
db.session.commit()
return jsonify(NeighborhoodOut.model_validate(n).model_dump())
@locations_bp.delete("/admin/neighborhoods/<int:neighborhood_id>")
def delete_neighborhood(neighborhood_id: int):
n = Neighborhood.query.get_or_404(neighborhood_id)
db.session.delete(n)
db.session.commit()
return "", 204

View file

@ -0,0 +1,262 @@
import math
from flask import Blueprint, jsonify, request
from pydantic import ValidationError
from sqlalchemy import and_, or_
from sqlalchemy.orm import aliased
from app.extensions import db
from app.models.catalog import Amenity, property_amenity
from app.models.location import Neighborhood
from app.models.property import Property
from app.schemas.property import PaginatedPropertiesOut, PropertyDetailOut, PropertyOut
properties_bp = Blueprint("properties", __name__, url_prefix="/api/v1")
def _parse_int(value: str | None, default: int | None = None) -> int | None:
if value is None:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def _parse_float(value: str | None, default: float | None = None) -> float | None:
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
@properties_bp.get("/properties")
def list_properties():
args = request.args
# ── Featured shortcut (homepage) ────────────────────────────────────────
if args.get("featured", "").lower() == "true":
from app.models.homepage import HomepageConfig
config = HomepageConfig.query.first()
limit = config.featured_properties_limit if config else 6
props = (
Property.query.filter_by(is_active=True, is_featured=True)
.order_by(Property.created_at.desc())
.limit(limit)
.all()
)
return jsonify(
[PropertyOut.model_validate(p).model_dump(mode="json") for p in props]
)
# ── Base query ───────────────────────────────────────────────────────────
query = Property.query.filter_by(is_active=True)
# Listing type (venda | aluguel)
listing_type = args.get("listing_type", "").lower()
if listing_type in ("venda", "aluguel"):
query = query.filter(Property.type == listing_type)
# Subtype — supports subtype_ids (comma-sep) or legacy subtype_id
subtype_ids_raw = args.get("subtype_ids", "")
if subtype_ids_raw:
try:
subtype_ids_list = [int(x) for x in subtype_ids_raw.split(",") if x.strip()]
except ValueError:
subtype_ids_list = []
if subtype_ids_list:
query = query.filter(Property.subtype_id.in_(subtype_ids_list))
else:
subtype_id = _parse_int(args.get("subtype_id"))
if subtype_id is not None:
query = query.filter(Property.subtype_id == subtype_id)
# City
city_id = _parse_int(args.get("city_id"))
if city_id is not None:
query = query.filter(Property.city_id == city_id)
# Neighborhood — supports neighborhood_ids (comma-sep) or legacy neighborhood_id
neighborhood_ids_raw = args.get("neighborhood_ids", "")
if neighborhood_ids_raw:
try:
neighborhood_ids_list = [int(x) for x in neighborhood_ids_raw.split(",") if x.strip()]
except ValueError:
neighborhood_ids_list = []
if neighborhood_ids_list:
query = query.filter(Property.neighborhood_id.in_(neighborhood_ids_list))
else:
neighborhood_id = _parse_int(args.get("neighborhood_id"))
if neighborhood_id is not None:
query = query.filter(Property.neighborhood_id == neighborhood_id)
# Imobiliária
imobiliaria_id = _parse_int(args.get("imobiliaria_id"))
if imobiliaria_id is not None:
query = query.filter(Property.imobiliaria_id == imobiliaria_id)
# Price range
price_min = _parse_float(args.get("price_min"))
price_max = _parse_float(args.get("price_max"))
include_condo = args.get("include_condo", "").lower() == "true"
if price_min is not None or price_max is not None:
if include_condo:
# Effective price = price + coalesce(condo_fee, 0)
from sqlalchemy import func
effective = Property.price + func.coalesce(Property.condo_fee, 0)
else:
effective = Property.price
if price_min is not None:
query = query.filter(effective >= price_min)
if price_max is not None:
query = query.filter(effective <= price_max)
# Bedrooms
bedrooms_min = _parse_int(args.get("bedrooms_min"))
bedrooms_max = _parse_int(args.get("bedrooms_max"))
if bedrooms_min is not None:
query = query.filter(Property.bedrooms >= bedrooms_min)
if bedrooms_max is not None:
query = query.filter(Property.bedrooms <= bedrooms_max)
# Bathrooms
bathrooms_min = _parse_int(args.get("bathrooms_min"))
bathrooms_max = _parse_int(args.get("bathrooms_max"))
if bathrooms_min is not None:
query = query.filter(Property.bathrooms >= bathrooms_min)
if bathrooms_max is not None:
query = query.filter(Property.bathrooms <= bathrooms_max)
# Parking spots
parking_min = _parse_int(args.get("parking_min"))
parking_max = _parse_int(args.get("parking_max"))
if parking_min is not None:
query = query.filter(Property.parking_spots >= parking_min)
if parking_max is not None:
query = query.filter(Property.parking_spots <= parking_max)
# Area m²
area_min = _parse_int(args.get("area_min"))
area_max = _parse_int(args.get("area_max"))
if area_min is not None:
query = query.filter(Property.area_m2 >= area_min)
if area_max is not None:
query = query.filter(Property.area_m2 <= area_max)
# Amenities (AND logic — property must have ALL selected amenities)
amenity_ids_raw = args.get("amenity_ids", "")
if amenity_ids_raw:
try:
amenity_ids = [int(x) for x in amenity_ids_raw.split(",") if x.strip()]
except ValueError:
amenity_ids = []
for aid in amenity_ids:
query = query.filter(
Property.id.in_(
property_amenity.select()
.where(property_amenity.c.amenity_id == aid)
.with_only_columns(property_amenity.c.property_id)
)
)
# ── Text search ──────────────────────────────────────────────────────────
q_raw = args.get("q", "").strip()[:200]
if q_raw:
pattern = f"%{q_raw}%"
NeighborhoodAlias = aliased(Neighborhood)
query = (
query
.outerjoin(NeighborhoodAlias, Property.neighborhood_id == NeighborhoodAlias.id)
.filter(
or_(
Property.title.ilike(pattern),
Property.address.ilike(pattern),
Property.code.ilike(pattern),
NeighborhoodAlias.name.ilike(pattern),
)
)
)
# ── Sort ─────────────────────────────────────────────────────────────────
_sort_map = {
"price_asc": Property.price.asc(),
"price_desc": Property.price.desc(),
"area_desc": Property.area_m2.desc(),
"newest": Property.created_at.desc(),
"relevance": Property.created_at.desc(),
}
sort_key = args.get("sort", "relevance")
sort_order = _sort_map.get(sort_key, Property.created_at.desc())
# ── Pagination ───────────────────────────────────────────────────────────
page = max(1, _parse_int(args.get("page"), 1))
per_page = min(48, max(1, _parse_int(args.get("per_page"), 24)))
total = query.count()
pages = math.ceil(total / per_page) if total > 0 else 1
props = (
query.order_by(sort_order)
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
result = PaginatedPropertiesOut(
items=[PropertyOut.model_validate(p) for p in props],
total=total,
page=page,
per_page=per_page,
pages=pages,
)
return jsonify(result.model_dump(mode="json"))
@properties_bp.get("/properties/<slug>")
def get_property(slug: str):
prop = Property.query.filter_by(slug=slug, is_active=True).first()
if prop is None:
return jsonify({"error": "Im\u00f3vel n\u00e3o encontrado"}), 404
return jsonify(PropertyDetailOut.model_validate(prop).model_dump(mode="json"))
@properties_bp.post("/properties/<slug>/contact")
def contact_property(slug: str):
from app.models.lead import ContactLead
from app.schemas.lead import ContactLeadIn, ContactLeadCreatedOut
prop = Property.query.filter_by(slug=slug, is_active=True).first()
if prop is None:
return jsonify({"error": "Im\u00f3vel n\u00e3o encontrado"}), 404
data = request.get_json(silent=True) or {}
try:
lead_in = ContactLeadIn.model_validate(data)
except ValidationError as exc:
return jsonify({"error": "Dados inv\u00e1lidos", "details": exc.errors()}), 422
lead = ContactLead(
property_id=prop.id,
name=lead_in.name,
email=lead_in.email,
phone=lead_in.phone,
message=lead_in.message,
)
db.session.add(lead)
db.session.commit()
return (
jsonify(
ContactLeadCreatedOut(
id=lead.id, message="Mensagem enviada com sucesso!"
).model_dump()
),
201,
)