auths.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from pydantic import BaseModel
  2. from typing import Optional
  3. import uuid
  4. import logging
  5. from sqlalchemy import String, Column, Boolean, Text
  6. from apps.webui.models.users import UserModel, Users
  7. from utils.utils import verify_password
  8. from apps.webui.internal.db import Base, Session
  9. from config import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # DB MODEL
  14. ####################
  15. class Auth(Base):
  16. __tablename__ = "auth"
  17. id = Column(String, primary_key=True)
  18. email = Column(String)
  19. password = Column(Text)
  20. active = Column(Boolean)
  21. class AuthModel(BaseModel):
  22. id: str
  23. email: str
  24. password: str
  25. active: bool = True
  26. ####################
  27. # Forms
  28. ####################
  29. class Token(BaseModel):
  30. token: str
  31. token_type: str
  32. class ApiKey(BaseModel):
  33. api_key: Optional[str] = None
  34. class UserResponse(BaseModel):
  35. id: str
  36. email: str
  37. name: str
  38. role: str
  39. profile_image_url: str
  40. class SigninResponse(Token, UserResponse):
  41. pass
  42. class SigninForm(BaseModel):
  43. email: str
  44. password: str
  45. class ProfileImageUrlForm(BaseModel):
  46. profile_image_url: str
  47. class UpdateProfileForm(BaseModel):
  48. profile_image_url: str
  49. name: str
  50. class UpdatePasswordForm(BaseModel):
  51. password: str
  52. new_password: str
  53. class SignupForm(BaseModel):
  54. name: str
  55. email: str
  56. password: str
  57. profile_image_url: Optional[str] = "/user.png"
  58. class AddUserForm(SignupForm):
  59. role: Optional[str] = "pending"
  60. class AuthsTable:
  61. def insert_new_auth(
  62. self,
  63. email: str,
  64. password: str,
  65. name: str,
  66. profile_image_url: str = "/user.png",
  67. role: str = "pending",
  68. oauth_sub: Optional[str] = None,
  69. ) -> Optional[UserModel]:
  70. log.info("insert_new_auth")
  71. id = str(uuid.uuid4())
  72. auth = AuthModel(
  73. **{"id": id, "email": email, "password": password, "active": True}
  74. )
  75. result = Auth(**auth.model_dump())
  76. Session.add(result)
  77. user = Users.insert_new_user(
  78. id, name, email, profile_image_url, role, oauth_sub
  79. )
  80. Session.commit()
  81. Session.refresh(result)
  82. if result and user:
  83. return user
  84. else:
  85. return None
  86. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  87. log.info(f"authenticate_user: {email}")
  88. try:
  89. auth = Session.query(Auth).filter_by(email=email, active=True).first()
  90. if auth:
  91. if verify_password(password, auth.password):
  92. user = Users.get_user_by_id(auth.id)
  93. return user
  94. else:
  95. return None
  96. else:
  97. return None
  98. except:
  99. return None
  100. def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  101. log.info(f"authenticate_user_by_api_key: {api_key}")
  102. # if no api_key, return None
  103. if not api_key:
  104. return None
  105. try:
  106. user = Users.get_user_by_api_key(api_key)
  107. return user if user else None
  108. except:
  109. return False
  110. def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
  111. log.info(f"authenticate_user_by_trusted_header: {email}")
  112. try:
  113. auth = Session.query(Auth).filter(email=email, active=True).first()
  114. if auth:
  115. user = Users.get_user_by_id(auth.id)
  116. return user
  117. except:
  118. return None
  119. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  120. try:
  121. result = (
  122. Session.query(Auth).filter_by(id=id).update({"password": new_password})
  123. )
  124. return True if result == 1 else False
  125. except:
  126. return False
  127. def update_email_by_id(self, id: str, email: str) -> bool:
  128. try:
  129. result = Session.query(Auth).filter_by(id=id).update({"email": email})
  130. return True if result == 1 else False
  131. except:
  132. return False
  133. def delete_auth_by_id(self, id: str) -> bool:
  134. try:
  135. # Delete User
  136. result = Users.delete_user_by_id(id)
  137. if result:
  138. Session.query(Auth).filter_by(id=id).delete()
  139. return True
  140. else:
  141. return False
  142. except:
  143. return False
  144. Auths = AuthsTable()