from datetime import datetime, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from sqlalchemy import func from .. import schemas, database, models from ..security import get_current_user inspections = APIRouter(prefix="/inspections", tags=["inspections"]) @inspections.post("/sessions", response_model=schemas.InspectionSessionRead) async def create_inspection_session( payload: schemas.InspectionSessionCreate, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Начать новую сессию проверки""" session = models.InspectionSession( user_id=current_user.id, started_at=datetime.now(timezone.utc), aud_id=payload.aud_id ) db.add(session) db.commit() db.refresh(session) return session class CheckBarcodeRequest(schemas.BaseModel): inv_number: str @inspections.post("/sessions/{session_id}/check", response_model=schemas.CheckBarcodeResponse) async def check_barcode( session_id: int, payload: CheckBarcodeRequest, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Отсканировать штрихкод""" inv_number = payload.inv_number # Проверка существования сессии session = db.query(models.InspectionSession).filter(models.InspectionSession.id == session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") # Проверка, что сессия не завершена if session.completed_at is not None: raise HTTPException(status_code=400, detail="Session already completed") # Поиск оборудования по invNumber (точное совпадение) # invNumber может быть строкой или числом, преобразуем для поиска try: inv_num_int = int(inv_number) oborud = db.query(models.Oboruds).filter(models.Oboruds.invNumber == inv_num_int).first() except ValueError: oborud = None if oborud: # Оборудование найдено - создаём/обновляем запись (UPSERT) record = db.query(models.InspectionRecord).filter_by( session_id=session_id, oborud_id=oborud.id ).first() if record: # Обновить время проверки record.checked_at = datetime.now(timezone.utc) else: # Создать новую запись record = models.InspectionRecord( session_id=session_id, oborud_id=oborud.id, checked_at=datetime.now(timezone.utc) ) db.add(record) db.commit() db.refresh(oborud) return schemas.CheckBarcodeResponse( status="found", equipment=schemas.OborudRead.model_validate(oborud), message=f"Оборудование найдено: {oborud.nazvanie}" ) else: # Оборудование не найдено - сохранить в неизвестные штрихкоды unknown = models.UnknownBarcode( session_id=session_id, barcode=inv_number, scanned_at=datetime.now(timezone.utc) ) db.add(unknown) db.commit() return schemas.CheckBarcodeResponse( status="not_found", equipment=None, message=f"Оборудование с номером {inv_number} не найдено" ) @inspections.get("/sessions/{session_id}", response_model=schemas.InspectionSessionStats) async def get_inspection_session_stats( session_id: int, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Получить статистику сессии проверки""" session = db.query(models.InspectionSession).filter(models.InspectionSession.id == session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") # Подсчёт статистики total_checked = db.query(models.InspectionRecord).filter( models.InspectionRecord.session_id == session_id ).count() total_unknown = db.query(models.UnknownBarcode).filter( models.UnknownBarcode.session_id == session_id ).count() # Подсчёт ожидаемого количества оборудования if session.aud_id: # Проверка по аудитории total_expected = db.query(models.Oboruds).filter( models.Oboruds.aud_id == session.aud_id ).count() else: # Проверка всего оборудования total_expected = db.query(models.Oboruds).count() # Расчёт прогресса progress_percent = round((total_checked / total_expected * 100), 2) if total_expected > 0 else 0.0 return schemas.InspectionSessionStats( session=schemas.InspectionSessionRead.model_validate(session), total_expected=total_expected, total_checked=total_checked, total_unknown=total_unknown, progress_percent=progress_percent ) @inspections.post("/sessions/{session_id}/complete", response_model=schemas.InspectionSessionRead) async def complete_inspection_session( session_id: int, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Завершить сессию проверки""" session = db.query(models.InspectionSession).filter(models.InspectionSession.id == session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") if session.completed_at is not None: raise HTTPException(status_code=400, detail="Session already completed") session.completed_at = datetime.now(timezone.utc) db.commit() db.refresh(session) return session @inspections.get("/sessions", response_model=list[schemas.InspectionSessionRead]) async def list_inspection_sessions( user_id: Optional[int] = None, aud_id: Optional[int] = None, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Получить список всех сессий проверок (история)""" query = db.query(models.InspectionSession) if user_id is not None: query = query.filter(models.InspectionSession.user_id == user_id) if aud_id is not None: query = query.filter(models.InspectionSession.aud_id == aud_id) # Сортировка по дате (новые сверху) sessions = query.order_by(models.InspectionSession.started_at.desc()).all() return sessions @inspections.get("/sessions/{session_id}/records", response_model=schemas.InspectionDetailReport) async def get_inspection_session_records( session_id: int, db: Session = Depends(database.get_db), current_user: models.User = Depends(get_current_user) ): """Получить детальный отчёт по сессии проверки""" session = db.query(models.InspectionSession).filter(models.InspectionSession.id == session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") # Получить все записи проверок с информацией об оборудовании records = db.query(models.InspectionRecord).filter( models.InspectionRecord.session_id == session_id ).order_by(models.InspectionRecord.checked_at.desc()).all() # Получить неизвестные штрихкоды unknown_barcodes = db.query(models.UnknownBarcode).filter( models.UnknownBarcode.session_id == session_id ).order_by(models.UnknownBarcode.scanned_at.desc()).all() return schemas.InspectionDetailReport( records=[schemas.InspectionRecordRead.model_validate(r) for r in records], unknown_barcodes=[schemas.UnknownBarcodeRead.model_validate(ub) for ub in unknown_barcodes] )