from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from .. import schemas, database from ..security import authenticate_user, create_access_token, get_password_hash, require_roles from ..models import User auth = APIRouter(prefix="/auth", tags=["auth"]) @auth.post("/token", response_model=schemas.Token) def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(database.get_db), ): user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password") access_token_expires = timedelta(minutes=60) access_token = create_access_token(data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires) return {"access_token": access_token, "token_type": "bearer"} @auth.post("/users", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))]) def create_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)): if db.query(User).filter(User.username == item.username).first(): raise HTTPException(status_code=400, detail="Username already exists") obj = User(username=item.username, password_hash=get_password_hash(item.password), role=item.role) db.add(obj) db.commit() db.refresh(obj) return obj @auth.post("/users/admin", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))]) def create_admin_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)): if db.query(User).filter(User.username == item.username).first(): raise HTTPException(status_code=400, detail="Username already exists") obj = User(username=item.username, password_hash=get_password_hash(item.password), role="admin") db.add(obj) db.commit() db.refresh(obj) return obj @auth.get("/users", response_model=list[schemas.UserRead], dependencies=[Depends(require_roles(["admin"]))]) def list_users(db: Session = Depends(database.get_db)): return db.query(User).all() @auth.patch("/users/{user_id}/role", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))]) def update_user_role(user_id: int, payload: schemas.UserRoleUpdate, db: Session = Depends(database.get_db)): user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") user.role = payload.role db.commit() db.refresh(user) return user