utils.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import logging
  2. import uuid
  3. import jwt
  4. from datetime import UTC, datetime, timedelta
  5. from typing import Optional, Union, List, Dict
  6. from open_webui.apps.webui.models.users import Users
  7. from open_webui.constants import ERROR_MESSAGES
  8. from open_webui.env import WEBUI_SECRET_KEY
  9. from fastapi import Depends, HTTPException, Request, Response, status
  10. from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
  11. from passlib.context import CryptContext
  12. logging.getLogger("passlib").setLevel(logging.ERROR)
  13. SESSION_SECRET = WEBUI_SECRET_KEY
  14. ALGORITHM = "HS256"
  15. ##############
  16. # Auth Utils
  17. ##############
  18. bearer_security = HTTPBearer(auto_error=False)
  19. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  20. def verify_password(plain_password, hashed_password):
  21. return (
  22. pwd_context.verify(plain_password, hashed_password) if hashed_password else None
  23. )
  24. def get_password_hash(password):
  25. return pwd_context.hash(password)
  26. def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
  27. payload = data.copy()
  28. if expires_delta:
  29. expire = datetime.now(UTC) + expires_delta
  30. payload.update({"exp": expire})
  31. encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
  32. return encoded_jwt
  33. def decode_token(token: str) -> Optional[dict]:
  34. try:
  35. decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
  36. return decoded
  37. except Exception:
  38. return None
  39. def extract_token_from_auth_header(auth_header: str):
  40. return auth_header[len("Bearer ") :]
  41. def create_api_key():
  42. key = str(uuid.uuid4()).replace("-", "")
  43. return f"sk-{key}"
  44. def get_http_authorization_cred(auth_header: str):
  45. try:
  46. scheme, credentials = auth_header.split(" ")
  47. return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
  48. except Exception:
  49. raise ValueError(ERROR_MESSAGES.INVALID_TOKEN)
  50. def get_api_key_auth_config():
  51. from open_webui.config import ENABLE_API_KEY_AUTH
  52. return ENABLE_API_KEY_AUTH
  53. def get_current_user(
  54. request: Request,
  55. auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
  56. api_key_auth_enabled: bool = Depends(get_api_key_auth_config)
  57. ):
  58. token = None
  59. if auth_token is not None:
  60. token = auth_token.credentials
  61. if token is None and "token" in request.cookies:
  62. token = request.cookies.get("token")
  63. if token is None:
  64. raise HTTPException(status_code=403, detail="Not authenticated")
  65. # auth by api key
  66. if token.startswith("sk-"):
  67. if not api_key_auth_enabled:
  68. raise HTTPException(
  69. status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.API_KEY_NOT_ALLOWED
  70. )
  71. return get_current_user_by_api_key(token)
  72. # auth by jwt token
  73. try:
  74. data = decode_token(token)
  75. except Exception as e:
  76. raise HTTPException(
  77. status_code=status.HTTP_401_UNAUTHORIZED,
  78. detail="Invalid token",
  79. )
  80. if data is not None and "id" in data:
  81. user = Users.get_user_by_id(data["id"])
  82. if user is None:
  83. raise HTTPException(
  84. status_code=status.HTTP_401_UNAUTHORIZED,
  85. detail=ERROR_MESSAGES.INVALID_TOKEN,
  86. )
  87. else:
  88. Users.update_user_last_active_by_id(user.id)
  89. return user
  90. else:
  91. raise HTTPException(
  92. status_code=status.HTTP_401_UNAUTHORIZED,
  93. detail=ERROR_MESSAGES.UNAUTHORIZED,
  94. )
  95. def get_current_user_by_api_key(api_key: str):
  96. user = Users.get_user_by_api_key(api_key)
  97. if user is None:
  98. raise HTTPException(
  99. status_code=status.HTTP_401_UNAUTHORIZED,
  100. detail=ERROR_MESSAGES.INVALID_TOKEN,
  101. )
  102. else:
  103. Users.update_user_last_active_by_id(user.id)
  104. return user
  105. def get_verified_user(user=Depends(get_current_user)):
  106. if user.role not in {"user", "admin"}:
  107. raise HTTPException(
  108. status_code=status.HTTP_401_UNAUTHORIZED,
  109. detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
  110. )
  111. return user
  112. def get_admin_user(user=Depends(get_current_user)):
  113. if user.role != "admin":
  114. raise HTTPException(
  115. status_code=status.HTTP_401_UNAUTHORIZED,
  116. detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
  117. )
  118. return user