add new ui
This commit is contained in:
63
backend/routers/auth.py
Normal file
63
backend/routers/auth.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .. import schemas, database
|
||||
from ..security import authenticate_user, create_access_token, get_password_hash, require_roles
|
||||
from ..models import User
|
||||
|
||||
|
||||
auth = APIRouter(prefix="/auth", tags=["auth"])
|
||||
|
||||
|
||||
@auth.post("/token", response_model=schemas.Token)
|
||||
def login_for_access_token(
|
||||
form_data: OAuth2PasswordRequestForm = Depends(),
|
||||
db: Session = Depends(database.get_db),
|
||||
):
|
||||
user = authenticate_user(db, form_data.username, form_data.password)
|
||||
if not user:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
|
||||
access_token_expires = timedelta(minutes=60)
|
||||
access_token = create_access_token(data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires)
|
||||
return {"access_token": access_token, "token_type": "bearer"}
|
||||
|
||||
|
||||
@auth.post("/users", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
||||
def create_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
|
||||
if db.query(User).filter(User.username == item.username).first():
|
||||
raise HTTPException(status_code=400, detail="Username already exists")
|
||||
obj = User(username=item.username, password_hash=get_password_hash(item.password), role=item.role)
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
|
||||
|
||||
@auth.post("/users/admin", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
||||
def create_admin_user(item: schemas.UserCreate, db: Session = Depends(database.get_db)):
|
||||
if db.query(User).filter(User.username == item.username).first():
|
||||
raise HTTPException(status_code=400, detail="Username already exists")
|
||||
obj = User(username=item.username, password_hash=get_password_hash(item.password), role="admin")
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
|
||||
|
||||
@auth.get("/users", response_model=list[schemas.UserRead], dependencies=[Depends(require_roles(["admin"]))])
|
||||
def list_users(db: Session = Depends(database.get_db)):
|
||||
return db.query(User).all()
|
||||
|
||||
|
||||
@auth.patch("/users/{user_id}/role", response_model=schemas.UserRead, dependencies=[Depends(require_roles(["admin"]))])
|
||||
def update_user_role(user_id: int, payload: schemas.UserRoleUpdate, db: Session = Depends(database.get_db)):
|
||||
user = db.query(User).filter(User.id == user_id).first()
|
||||
if not user:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
user.role = payload.role
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
Reference in New Issue
Block a user