auths.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from pydantic import BaseModel
  2. from typing import Optional
  3. import uuid
  4. import logging
  5. from sqlalchemy import String, Column, Boolean, Text
  6. from apps.webui.models.users import UserModel, Users
  7. from utils.utils import verify_password
  8. from apps.webui.internal.db import Base, get_db
  9. from config import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # DB MODEL
  14. ####################
  15. class Auth(Base):
  16. __tablename__ = "auth"
  17. id = Column(String, primary_key=True)
  18. email = Column(String)
  19. password = Column(Text)
  20. active = Column(Boolean)
  21. class AuthModel(BaseModel):
  22. id: str
  23. email: str
  24. password: str
  25. active: bool = True
  26. ####################
  27. # Forms
  28. ####################
  29. class Token(BaseModel):
  30. token: str
  31. token_type: str
  32. class ApiKey(BaseModel):
  33. api_key: Optional[str] = None
  34. class UserResponse(BaseModel):
  35. id: str
  36. email: str
  37. name: str
  38. role: str
  39. profile_image_url: str
  40. class SigninResponse(Token, UserResponse):
  41. pass
  42. class SigninForm(BaseModel):
  43. email: str
  44. password: str
  45. class ProfileImageUrlForm(BaseModel):
  46. profile_image_url: str
  47. class UpdateProfileForm(BaseModel):
  48. profile_image_url: str
  49. name: str
  50. class UpdatePasswordForm(BaseModel):
  51. password: str
  52. new_password: str
  53. class SignupForm(BaseModel):
  54. name: str
  55. email: str
  56. password: str
  57. profile_image_url: Optional[str] = "/user.png"
  58. class AddUserForm(SignupForm):
  59. role: Optional[str] = "pending"
  60. class AuthsTable:
  61. def insert_new_auth(
  62. self,
  63. email: str,
  64. password: str,
  65. name: str,
  66. profile_image_url: str = "/user.png",
  67. role: str = "pending",
  68. oauth_sub: Optional[str] = None,
  69. ) -> Optional[UserModel]:
  70. with get_db() as db:
  71. log.info("insert_new_auth")
  72. id = str(uuid.uuid4())
  73. auth = AuthModel(
  74. **{"id": id, "email": email, "password": password, "active": True}
  75. )
  76. result = Auth(**auth.model_dump())
  77. db.add(result)
  78. user = Users.insert_new_user(
  79. id, name, email, profile_image_url, role, oauth_sub
  80. )
  81. db.commit()
  82. db.refresh(result)
  83. if result and user:
  84. return user
  85. else:
  86. return None
  87. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  88. log.info(f"authenticate_user: {email}")
  89. try:
  90. with get_db() as db:
  91. auth = db.query(Auth).filter_by(email=email, active=True).first()
  92. if auth:
  93. if verify_password(password, auth.password):
  94. user = Users.get_user_by_id(auth.id)
  95. return user
  96. else:
  97. return None
  98. else:
  99. return None
  100. except:
  101. return None
  102. def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  103. log.info(f"authenticate_user_by_api_key: {api_key}")
  104. # if no api_key, return None
  105. if not api_key:
  106. return None
  107. try:
  108. user = Users.get_user_by_api_key(api_key)
  109. return user if user else None
  110. except:
  111. return False
  112. def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
  113. log.info(f"authenticate_user_by_trusted_header: {email}")
  114. try:
  115. with get_db() as db:
  116. auth = db.query(Auth).filter(email=email, active=True).first()
  117. if auth:
  118. user = Users.get_user_by_id(auth.id)
  119. return user
  120. except:
  121. return None
  122. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  123. try:
  124. with get_db() as db:
  125. result = (
  126. db.query(Auth).filter_by(id=id).update({"password": new_password})
  127. )
  128. db.commit()
  129. return True if result == 1 else False
  130. except:
  131. return False
  132. def update_email_by_id(self, id: str, email: str) -> bool:
  133. try:
  134. with get_db() as db:
  135. result = db.query(Auth).filter_by(id=id).update({"email": email})
  136. db.commit()
  137. return True if result == 1 else False
  138. except:
  139. return False
  140. def delete_auth_by_id(self, id: str) -> bool:
  141. try:
  142. with get_db() as db:
  143. # Delete User
  144. result = Users.delete_user_by_id(id)
  145. if result:
  146. db.query(Auth).filter_by(id=id).delete()
  147. db.commit()
  148. return True
  149. else:
  150. return False
  151. except:
  152. return False
  153. Auths = AuthsTable()