xtablo-source/backend/app/main.py
2025-03-23 10:57:30 +01:00

276 lines
8.7 KiB
Python

import os
import json
import random
import inspect
from typing import Annotated, Dict, List, Optional
from contextlib import contextmanager
from .auth import get_supabase
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr, field_validator, ValidationInfo, Field, SecretStr
from pydantic_core.core_schema import FieldValidationInfo
from dotenv import load_dotenv
from supabase import Client
from .auth import get_current_user
from uuid import uuid4
from datetime import datetime
# Initialize FastAPI app
app = FastAPI(title="XTablo API")
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Error-Code", "X-Error-Message"]
)
# Security
security = HTTPBearer()
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = exc.errors()
custom_errors = []
for error in errors:
custom_message = error["msg"]
if custom_message.startswith("Value error, "):
custom_message = custom_message.split(", ")[1]
custom_errors.append({
**error,
"form_location": error["loc"][-1],
"human_error": custom_message
})
return JSONResponse(status_code=422, content=jsonable_encoder(custom_errors))
# ======================================
# 🚀 MODELS
# ======================================
class UserCreate(BaseModel):
email: EmailStr
first_name: str
last_name: str
password: str
confirm_password: str
business_name: str
@field_validator("email")
def email_must_contain_at_symbol(cls, v):
if '@' not in v:
raise ValueError("Entrer un email valide")
return v
@field_validator("password")
def password_must_contain_at_least_8_characters(cls, v):
if len(v) < 8:
raise ValueError("Le mot de passe doit contenir au moins 8 caractères")
return v
@field_validator("business_name")
def business_name_must_contain_at_least_3_characters(cls, v):
if len(v) < 3:
raise ValueError("Le nom de la société doit contenir au moins 3 caractères")
return v
@field_validator('confirm_password', mode='before')
def passwords_match(cls, v, info: FieldValidationInfo):
if 'password' in info.data and v != info.data['password']:
raise ValueError('Les mots de passe ne correspondent pas')
return v
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserOut(BaseModel):
email: EmailStr
business_name: str
class GameState(BaseModel):
id: str
word: str
guessed_letters: List[str] = []
attempts_left: int
status: str
wrong_guesses: List[str] = []
correct_guesses: List[str] = []
created_by: str
created_at: str
hints: List[str] = []
class PublicGameState(BaseModel):
id: str
masked_word: str
guessed_letters: List[str] = []
attempts_left: int
status: str
wrong_guesses: List[str] = []
correct_guesses: List[str] = []
created_by: str
created_at: str
hints: List[str] = []
class PublicGameResponse(BaseModel):
game_id: str
status: str
created_by: str
created_at: str
attempts_left: int
class LetterGuess(BaseModel):
letter: str
class CreateGame(BaseModel):
word: str
class HintRequest(BaseModel):
hint: str
class RefreshToken(BaseModel):
refresh_token: str
class RefreshResponse(BaseModel):
access_token: str
refresh_token: str
expires_at: int
user: dict
# ======================================
# 🔥 AUTH ROUTES
# ======================================
@app.post("/auth/register")
async def register(user: UserCreate, supabase: Client = Depends(get_supabase)):
try:
return supabase.auth.sign_up({
"email": user.email,
"password": user.password,
"options": {"data": {"first_name": user.first_name, "last_name": user.last_name, "business_name": user.business_name}}
})
except Exception as e:
headers = {}
if e.code == "user_already_exists":
headers = {"X-Error-Code": e.code, "X-Error-Message": "Cette adresse email est déjà utilisée"}
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
@app.post("/auth/login")
async def login(credentials: UserLogin, supabase: Client = Depends(get_supabase)):
try:
print("Login attempt for:", credentials.email) # Debug log
response = supabase.auth.sign_in_with_password({
"email": credentials.email.strip(),
"password": credentials.password.strip()
})
print("Login response:", response)
return {
"access_token": response.session.access_token,
"token_type": "bearer"
}
except Exception as e:
headers = {}
if e.code == "invalid_credentials":
headers = {"X-Error-Code": e.code, "X-Error-Message": "Email ou mot de passe incorrect"}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers=headers
)
@app.get("/auth/login/google")
async def login_with_google(supabase: Client = Depends(get_supabase)):
try:
response = supabase.auth.sign_in_with_oauth({
"provider": "google",
"options": {
"redirect_to": "https://mhcafqvzbrrwvahpvvzd.supabase.co/auth/v1/callback"
}
})
return {"auth_url": response.url}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/auth/logout")
async def logout(user=Depends(get_current_user), supabase: Client = Depends(get_supabase)):
try:
supabase.auth.sign_out()
return {"message": "Successfully logged out"}
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
@app.get("/me", response_model=UserOut)
async def get_me(
user = Depends(get_current_user), # Now properly imported
supabase: Client = Depends(get_supabase)
):
try:
# Get user details from public.users table
db_user = supabase.table("users").select("*").eq("id", user.user.id).execute().data[0]
return {
"username": db_user["username"],
"email": user.user.email,
"business_name": db_user["business_name"]
}
except IndexError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found in database"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/auth/refresh", response_model=RefreshResponse)
async def refresh_token(refresh_request: RefreshToken, supabase: Client = Depends(get_supabase)):
"""Refresh the access token using a valid refresh token."""
try:
# Validate the refresh token and get new tokens
response = supabase.auth.refresh_session(refresh_request.refresh_token)
# Extract user data
user_data = {
"id": response.user.id,
"email": response.user.email,
"first_name": response.user.user_metadata.get("first_name", "Unknown"),
"last_name": response.user.user_metadata.get("last_name", "Unknown"),
"business_name": response.user.user_metadata.get("business_name", "Unknown")
}
# Return the new tokens and user data
return {
"access_token": response.session.access_token,
"refresh_token": response.session.refresh_token,
"expires_at": int(response.session.expires_at),
"user": user_data
}
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Failed to refresh token: {str(e)}")
# ======================================
# 🔍 HEALTH CHECK ROUTES
# ======================================
@app.get("/ping")
async def ping():
"""Health check endpoint that returns a success status."""
return {"status": "success", "message": "API is running"}