utils.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  2. from fastapi import HTTPException, status, Depends
  3. from apps.web.models.users import Users
  4. from pydantic import BaseModel
  5. from typing import Union, Optional
  6. from constants import ERROR_MESSAGES
  7. from passlib.context import CryptContext
  8. from datetime import datetime, timedelta
  9. import requests
  10. import jwt
  11. import config
  12. JWT_SECRET_KEY = config.WEBUI_JWT_SECRET_KEY
  13. ALGORITHM = "HS256"
  14. ##############
  15. # Auth Utils
  16. ##############
  17. bearer_scheme = HTTPBearer()
  18. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  19. def verify_password(plain_password, hashed_password):
  20. return (
  21. pwd_context.verify(plain_password, hashed_password) if hashed_password else None
  22. )
  23. def get_password_hash(password):
  24. return pwd_context.hash(password)
  25. def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
  26. payload = data.copy()
  27. if expires_delta:
  28. expire = datetime.utcnow() + expires_delta
  29. payload.update({"exp": expire})
  30. encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm=ALGORITHM)
  31. return encoded_jwt
  32. def decode_token(token: str) -> Optional[dict]:
  33. try:
  34. decoded = jwt.decode(token, JWT_SECRET_KEY, options={"verify_signature": False})
  35. return decoded
  36. except Exception as e:
  37. return None
  38. def extract_token_from_auth_header(auth_header: str):
  39. return auth_header[len("Bearer ") :]
  40. def verify_auth_token(auth_token: HTTPAuthorizationCredentials = Depends(HTTPBearer())):
  41. data = decode_token(auth_token.credentials)
  42. if data != None and "email" in data:
  43. user = Users.get_user_by_email(data["email"])
  44. if user is None:
  45. raise HTTPException(
  46. status_code=status.HTTP_401_UNAUTHORIZED,
  47. detail=ERROR_MESSAGES.INVALID_TOKEN,
  48. )
  49. return
  50. else:
  51. raise HTTPException(
  52. status_code=status.HTTP_401_UNAUTHORIZED,
  53. detail=ERROR_MESSAGES.UNAUTHORIZED,
  54. )
  55. def get_current_user(auth_token: HTTPAuthorizationCredentials = Depends(HTTPBearer())):
  56. data = decode_token(auth_token.credentials)
  57. return Users.get_user_by_email(data["email"])