Files
asuinventory/backend/routers/auth.py
2025-11-10 11:28:49 +03:00

64 lines
2.6 KiB
Python

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from .. import schemas, database
from ..security import authenticate_user, create_access_token, get_password_hash, require_roles
from ..models import User
auth = APIRouter(prefix="/auth", tags=["auth"])
@auth.post("/token", response_model=schemas.Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(database.get_db),
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=60)
access_token = create_access_token(data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@auth.post("/users", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
def create_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
if db.query(User).filter(User.username == item.username).first():
raise HTTPException(status_code=400, detail="Username already exists")
obj = User(username=item.username, password_hash=get_password_hash(item.password), role=item.role)
db.add(obj)
db.commit()
db.refresh(obj)
return obj
@auth.post("/users/admin", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
def create_admin_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
if db.query(User).filter(User.username == item.username).first():
raise HTTPException(status_code=400, detail="Username already exists")
obj = User(username=item.username, password_hash=get_password_hash(item.password), role="admin")
db.add(obj)
db.commit()
db.refresh(obj)
return obj
@auth.get("/users", response_model=list[schemas.UserRead], dependencies=[Depends(require_roles(["admin"]))])
def list_users(db: Session = Depends(database.get_db)):
return db.query(User).all()
@auth.patch("/users/{user_id}/role", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
def update_user_role(user_id: int, payload: schemas.UserRoleUpdate, db: Session = Depends(database.get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.role = payload.role
db.commit()
db.refresh(user)
return user