auths.py 4.6 KB


  1. from pydantic import BaseModel
  2. from typing import Optional
  3. import uuid
  4. import logging
  5. from sqlalchemy import String, Column, Boolean
  6. from sqlalchemy.orm import Session
  7. from apps.webui.models.users import UserModel, Users
  8. from utils.utils import verify_password
  9. from apps.webui.internal.db import Base
  10. from config import SRC_LOG_LEVELS
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  13. ####################
  14. # DB MODEL
  15. ####################
  16. class Auth(Base):
  17. __tablename__ = "auth"
  18. id = Column(String, primary_key=True)
  19. email = Column(String)
  20. password = Column(String)
  21. active = Column(Boolean)
  22. class AuthModel(BaseModel):
  23. id: str
  24. email: str
  25. password: str
  26. active: bool = True
  27. ####################
  28. # Forms
  29. ####################
  30. class Token(BaseModel):
  31. token: str
  32. token_type: str
  33. class ApiKey(BaseModel):
  34. api_key: Optional[str] = None
  35. class UserResponse(BaseModel):
  36. id: str
  37. email: str
  38. name: str
  39. role: str
  40. profile_image_url: str
  41. class SigninResponse(Token, UserResponse):
  42. pass
  43. class SigninForm(BaseModel):
  44. email: str
  45. password: str
  46. class ProfileImageUrlForm(BaseModel):
  47. profile_image_url: str
  48. class UpdateProfileForm(BaseModel):
  49. profile_image_url: str
  50. name: str
  51. class UpdatePasswordForm(BaseModel):
  52. password: str
  53. new_password: str
  54. class SignupForm(BaseModel):
  55. name: str
  56. email: str
  57. password: str
  58. profile_image_url: Optional[str] = "/user.png"
  59. class AddUserForm(SignupForm):
  60. role: Optional[str] = "pending"
  61. class AuthsTable:
  62. def insert_new_auth(
  63. self,
  64. db: Session,
  65. email: str,
  66. password: str,
  67. name: str,
  68. profile_image_url: str = "/user.png",
  69. role: str = "pending",
  70. oauth_sub: Optional[str] = None,
  71. ) -> Optional[UserModel]:
  72. log.info("insert_new_auth")
  73. id = str(uuid.uuid4())
  74. auth = AuthModel(
  75. **{"id": id, "email": email, "password": password, "active": True}
  76. )
  77. result = Auth(**auth.model_dump())
  78. db.add(result)
  79. user = Users.insert_new_user(
  80. db, id, name, email, profile_image_url, role, oauth_sub
  81. )
  82. db.commit()
  83. db.refresh(result)
  84. if result and user:
  85. return user
  86. else:
  87. return None
  88. def authenticate_user(
  89. self, db: Session, email: str, password: str
  90. ) -> Optional[UserModel]:
  91. log.info(f"authenticate_user: {email}")
  92. try:
  93. auth = db.query(Auth).filter_by(email=email, active=True).first()
  94. if auth:
  95. if verify_password(password, auth.password):
  96. user = Users.get_user_by_id(db, auth.id)
  97. return user
  98. else:
  99. return None
  100. else:
  101. return None
  102. except:
  103. return None
  104. def authenticate_user_by_api_key(
  105. self, db: Session, api_key: str
  106. ) -> Optional[UserModel]:
  107. log.info(f"authenticate_user_by_api_key: {api_key}")
  108. # if no api_key, return None
  109. if not api_key:
  110. return None
  111. try:
  112. user = Users.get_user_by_api_key(db, api_key)
  113. return user if user else None
  114. except:
  115. return False
  116. def authenticate_user_by_trusted_header(
  117. self, db: Session, email: str
  118. ) -> Optional[UserModel]:
  119. log.info(f"authenticate_user_by_trusted_header: {email}")
  120. try:
  121. auth = db.query(Auth).filter(email=email, active=True).first()
  122. if auth:
  123. user = Users.get_user_by_id(auth.id)
  124. return user
  125. except:
  126. return None
  127. def update_user_password_by_id(
  128. self, db: Session, id: str, new_password: str
  129. ) -> bool:
  130. try:
  131. result = db.query(Auth).filter_by(id=id).update({"password": new_password})
  132. return True if result == 1 else False
  133. except:
  134. return False
  135. def update_email_by_id(self, db: Session, id: str, email: str) -> bool:
  136. try:
  137. result = db.query(Auth).filter_by(id=id).update({"email": email})
  138. return True if result == 1 else False
  139. except:
  140. return False
  141. def delete_auth_by_id(self, db: Session, id: str) -> bool:
  142. try:
  143. # Delete User
  144. result = Users.delete_user_by_id(db, id)
  145. if result:
  146. db.query(Auth).filter_by(id=id).delete()
  147. return True
  148. else:
  149. return False
  150. except:
  151. return False
  152. Auths = AuthsTable()