utils.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from fastapi.security import HTTPBasicCredentials, HTTPBearer
  2. from pydantic import BaseModel
  3. from typing import Union, Optional
  4. from passlib.context import CryptContext
  5. from datetime import datetime, timedelta
  6. import requests
  7. import jwt
  8. import config
  9. JWT_SECRET_KEY = config.OLLAMA_WEBUI_JWT_SECRET_KEY
  10. ALGORITHM = "HS256"
  11. ##############
  12. # Auth Utils
  13. ##############
  14. bearer_scheme = HTTPBearer()
  15. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  16. def verify_password(plain_password, hashed_password):
  17. return (
  18. pwd_context.verify(plain_password, hashed_password) if hashed_password else None
  19. )
  20. def get_password_hash(password):
  21. return pwd_context.hash(password)
  22. def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
  23. payload = data.copy()
  24. if expires_delta:
  25. expire = datetime.utcnow() + expires_delta
  26. payload.update({"exp": expire})
  27. encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm=ALGORITHM)
  28. return encoded_jwt
  29. def decode_token(token: str) -> Optional[dict]:
  30. try:
  31. decoded = jwt.decode(token, JWT_SECRET_KEY, options={"verify_signature": False})
  32. return decoded
  33. except Exception as e:
  34. return None
  35. def extract_token_from_auth_header(auth_header: str):
  36. return auth_header[len("Bearer ") :]
  37. def verify_token(request):
  38. try:
  39. bearer = request.headers["authorization"]
  40. if bearer:
  41. token = bearer[len("Bearer ") :]
  42. decoded = jwt.decode(
  43. token, JWT_SECRET_KEY, options={"verify_signature": False}
  44. )
  45. return decoded
  46. else:
  47. return None
  48. except Exception as e:
  49. return None