auths.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. from pydantic import BaseModel
  2. from typing import Optional
  3. import uuid
  4. import logging
  5. from sqlalchemy import String, Column, Boolean
  6. from sqlalchemy.orm import Session
  7. from apps.webui.models.users import UserModel, Users
  8. from utils.utils import verify_password
  9. from apps.webui.internal.db import Base, get_session
  10. from config import SRC_LOG_LEVELS
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  13. ####################
  14. # DB MODEL
  15. ####################
  16. class Auth(Base):
  17. __tablename__ = "auth"
  18. id = Column(String, primary_key=True)
  19. email = Column(String)
  20. password = Column(String)
  21. active = Column(Boolean)
  22. class AuthModel(BaseModel):
  23. id: str
  24. email: str
  25. password: str
  26. active: bool = True
  27. ####################
  28. # Forms
  29. ####################
  30. class Token(BaseModel):
  31. token: str
  32. token_type: str
  33. class ApiKey(BaseModel):
  34. api_key: Optional[str] = None
  35. class UserResponse(BaseModel):
  36. id: str
  37. email: str
  38. name: str
  39. role: str
  40. profile_image_url: str
  41. class SigninResponse(Token, UserResponse):
  42. pass
  43. class SigninForm(BaseModel):
  44. email: str
  45. password: str
  46. class ProfileImageUrlForm(BaseModel):
  47. profile_image_url: str
  48. class UpdateProfileForm(BaseModel):
  49. profile_image_url: str
  50. name: str
  51. class UpdatePasswordForm(BaseModel):
  52. password: str
  53. new_password: str
  54. class SignupForm(BaseModel):
  55. name: str
  56. email: str
  57. password: str
  58. profile_image_url: Optional[str] = "/user.png"
  59. class AddUserForm(SignupForm):
  60. role: Optional[str] = "pending"
  61. class AuthsTable:
  62. def insert_new_auth(
  63. self,
  64. email: str,
  65. password: str,
  66. name: str,
  67. profile_image_url: str = "/user.png",
  68. role: str = "pending",
  69. oauth_sub: Optional[str] = None,
  70. ) -> Optional[UserModel]:
  71. with get_session() as db:
  72. log.info("insert_new_auth")
  73. id = str(uuid.uuid4())
  74. auth = AuthModel(
  75. **{"id": id, "email": email, "password": password, "active": True}
  76. )
  77. result = Auth(**auth.model_dump())
  78. db.add(result)
  79. user = Users.insert_new_user(
  80. id, name, email, profile_image_url, role, oauth_sub
  81. )
  82. db.commit()
  83. db.refresh(result)
  84. if result and user:
  85. return user
  86. else:
  87. return None
  88. def authenticate_user(
  89. self, email: str, password: str
  90. ) -> Optional[UserModel]:
  91. log.info(f"authenticate_user: {email}")
  92. with get_session() as db:
  93. try:
  94. auth = db.query(Auth).filter_by(email=email, active=True).first()
  95. if auth:
  96. if verify_password(password, auth.password):
  97. user = Users.get_user_by_id(auth.id)
  98. return user
  99. else:
  100. return None
  101. else:
  102. return None
  103. except:
  104. return None
  105. def authenticate_user_by_api_key(
  106. self, api_key: str
  107. ) -> Optional[UserModel]:
  108. log.info(f"authenticate_user_by_api_key: {api_key}")
  109. with get_session() as db:
  110. # if no api_key, return None
  111. if not api_key:
  112. return None
  113. try:
  114. user = Users.get_user_by_api_key(api_key)
  115. return user if user else None
  116. except:
  117. return False
  118. def authenticate_user_by_trusted_header(
  119. self, email: str
  120. ) -> Optional[UserModel]:
  121. log.info(f"authenticate_user_by_trusted_header: {email}")
  122. with get_session() as db:
  123. try:
  124. auth = db.query(Auth).filter(email=email, active=True).first()
  125. if auth:
  126. user = Users.get_user_by_id(auth.id)
  127. return user
  128. except:
  129. return None
  130. def update_user_password_by_id(
  131. self, id: str, new_password: str
  132. ) -> bool:
  133. with get_session() as db:
  134. try:
  135. result = db.query(Auth).filter_by(id=id).update({"password": new_password})
  136. return True if result == 1 else False
  137. except:
  138. return False
  139. def update_email_by_id(self, id: str, email: str) -> bool:
  140. with get_session() as db:
  141. try:
  142. result = db.query(Auth).filter_by(id=id).update({"email": email})
  143. return True if result == 1 else False
  144. except:
  145. return False
  146. def delete_auth_by_id(self, id: str) -> bool:
  147. with get_session() as db:
  148. try:
  149. # Delete User
  150. result = Users.delete_user_by_id(id)
  151. if result:
  152. db.query(Auth).filter_by(id=id).delete()
  153. return True
  154. else:
  155. return False
  156. except:
  157. return False
  158. Auths = AuthsTable()