utils.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  2. from fastapi import HTTPException, status, Depends
  3. from apps.web.models.users import Users
  4. from pydantic import BaseModel
  5. from typing import Union, Optional
  6. from constants import ERROR_MESSAGES
  7. from passlib.context import CryptContext
  8. from datetime import datetime, timedelta
  9. import requests
  10. import jwt
  11. import logging
  12. import config
  13. logging.getLogger("passlib").setLevel(logging.ERROR)
  14. JWT_SECRET_KEY = config.WEBUI_JWT_SECRET_KEY
  15. ALGORITHM = "HS256"
  16. ##############
  17. # Auth Utils
  18. ##############
  19. bearer_scheme = HTTPBearer()
  20. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  21. def verify_password(plain_password, hashed_password):
  22. return (
  23. pwd_context.verify(plain_password, hashed_password) if hashed_password else None
  24. )
  25. def get_password_hash(password):
  26. return pwd_context.hash(password)
  27. def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
  28. payload = data.copy()
  29. if expires_delta:
  30. expire = datetime.utcnow() + expires_delta
  31. payload.update({"exp": expire})
  32. encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm=ALGORITHM)
  33. return encoded_jwt
  34. def decode_token(token: str) -> Optional[dict]:
  35. try:
  36. decoded = jwt.decode(token, JWT_SECRET_KEY, options={"verify_signature": False})
  37. return decoded
  38. except Exception as e:
  39. return None
  40. def extract_token_from_auth_header(auth_header: str):
  41. return auth_header[len("Bearer ") :]
  42. def get_current_user(auth_token: HTTPAuthorizationCredentials = Depends(HTTPBearer())):
  43. data = decode_token(auth_token.credentials)
  44. if data != None and "email" in data:
  45. user = Users.get_user_by_email(data["email"])
  46. if user is None:
  47. raise HTTPException(
  48. status_code=status.HTTP_401_UNAUTHORIZED,
  49. detail=ERROR_MESSAGES.INVALID_TOKEN,
  50. )
  51. return user
  52. else:
  53. raise HTTPException(
  54. status_code=status.HTTP_401_UNAUTHORIZED,
  55. detail=ERROR_MESSAGES.UNAUTHORIZED,
  56. )