auths.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from pydantic import BaseModel
  2. from typing import Optional
  3. import uuid
  4. import logging
  5. from sqlalchemy import String, Column, Boolean
  6. from apps.webui.models.users import UserModel, Users
  7. from utils.utils import verify_password
  8. from apps.webui.internal.db import Base, Session
  9. from config import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # DB MODEL
  14. ####################
  15. class Auth(Base):
  16. __tablename__ = "auth"
  17. id = Column(String, primary_key=True)
  18. email = Column(String)
  19. password = Column(String)
  20. active = Column(Boolean)
  21. class AuthModel(BaseModel):
  22. id: str
  23. email: str
  24. password: str
  25. active: bool = True
  26. ####################
  27. # Forms
  28. ####################
  29. class Token(BaseModel):
  30. token: str
  31. token_type: str
  32. class ApiKey(BaseModel):
  33. api_key: Optional[str] = None
  34. class UserResponse(BaseModel):
  35. id: str
  36. email: str
  37. name: str
  38. role: str
  39. profile_image_url: str
  40. class SigninResponse(Token, UserResponse):
  41. pass
  42. class SigninForm(BaseModel):
  43. email: str
  44. password: str
  45. class ProfileImageUrlForm(BaseModel):
  46. profile_image_url: str
  47. class UpdateProfileForm(BaseModel):
  48. profile_image_url: str
  49. name: str
  50. class UpdatePasswordForm(BaseModel):
  51. password: str
  52. new_password: str
  53. class SignupForm(BaseModel):
  54. name: str
  55. email: str
  56. password: str
  57. profile_image_url: Optional[str] = "/user.png"
  58. class AddUserForm(SignupForm):
  59. role: Optional[str] = "pending"
  60. class AuthsTable:
  61. def insert_new_auth(
  62. self,
  63. email: str,
  64. password: str,
  65. name: str,
  66. profile_image_url: str = "/user.png",
  67. role: str = "pending",
  68. oauth_sub: Optional[str] = None,
  69. ) -> Optional[UserModel]:
  70. log.info("insert_new_auth")
  71. id = str(uuid.uuid4())
  72. auth = AuthModel(
  73. **{"id": id, "email": email, "password": password, "active": True}
  74. )
  75. result = Auth(**auth.model_dump())
  76. Session.add(result)
  77. user = Users.insert_new_user(
  78. id, name, email, profile_image_url, role, oauth_sub)
  79. Session.commit()
  80. Session.refresh(result)
  81. if result and user:
  82. return user
  83. else:
  84. return None
  85. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  86. log.info(f"authenticate_user: {email}")
  87. try:
  88. auth = Session.query(Auth).filter_by(email=email, active=True).first()
  89. if auth:
  90. if verify_password(password, auth.password):
  91. user = Users.get_user_by_id(auth.id)
  92. return user
  93. else:
  94. return None
  95. else:
  96. return None
  97. except:
  98. return None
  99. def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  100. log.info(f"authenticate_user_by_api_key: {api_key}")
  101. # if no api_key, return None
  102. if not api_key:
  103. return None
  104. try:
  105. user = Users.get_user_by_api_key(api_key)
  106. return user if user else None
  107. except:
  108. return False
  109. def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
  110. log.info(f"authenticate_user_by_trusted_header: {email}")
  111. try:
  112. auth = Session.query(Auth).filter(email=email, active=True).first()
  113. if auth:
  114. user = Users.get_user_by_id(auth.id)
  115. return user
  116. except:
  117. return None
  118. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  119. try:
  120. result = (
  121. Session.query(Auth).filter_by(id=id).update({"password": new_password})
  122. )
  123. return True if result == 1 else False
  124. except:
  125. return False
  126. def update_email_by_id(self, id: str, email: str) -> bool:
  127. try:
  128. result = Session.query(Auth).filter_by(id=id).update({"email": email})
  129. return True if result == 1 else False
  130. except:
  131. return False
  132. def delete_auth_by_id(self, id: str) -> bool:
  133. try:
  134. # Delete User
  135. result = Users.delete_user_by_id(id)
  136. if result:
  137. Session.query(Auth).filter_by(id=id).delete()
  138. return True
  139. else:
  140. return False
  141. except:
  142. return False
  143. Auths = AuthsTable()