64 lines
2.6 KiB
Python
64 lines
2.6 KiB
Python
from datetime import timedelta
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .. import schemas, database
|
|
from ..security import authenticate_user, create_access_token, get_password_hash, require_roles
|
|
from ..models import User
|
|
|
|
|
|
auth = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
|
|
@auth.post("/token", response_model=schemas.Token)
|
|
def login_for_access_token(
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
db: Session = Depends(database.get_db),
|
|
):
|
|
user = authenticate_user(db, form_data.username, form_data.password)
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
|
|
access_token_expires = timedelta(minutes=60)
|
|
access_token = create_access_token(data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@auth.post("/users", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
|
def create_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
|
|
if db.query(User).filter(User.username == item.username).first():
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
obj = User(username=item.username, password_hash=get_password_hash(item.password), role=item.role)
|
|
db.add(obj)
|
|
db.commit()
|
|
db.refresh(obj)
|
|
return obj
|
|
|
|
|
|
@auth.post("/users/admin", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
|
def create_admin_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
|
|
if db.query(User).filter(User.username == item.username).first():
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
obj = User(username=item.username, password_hash=get_password_hash(item.password), role="admin")
|
|
db.add(obj)
|
|
db.commit()
|
|
db.refresh(obj)
|
|
return obj
|
|
|
|
|
|
@auth.get("/users", response_model=list[schemas.UserRead], dependencies=[Depends(require_roles(["admin"]))])
|
|
def list_users(db: Session = Depends(database.get_db)):
|
|
return db.query(User).all()
|
|
|
|
|
|
@auth.patch("/users/{user_id}/role", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
|
def update_user_role(user_id: int, payload: schemas.UserRoleUpdate, db: Session = Depends(database.get_db)):
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
user.role = payload.role
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|