auths.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. import time
  4. import uuid
  5. import logging
  6. from peewee import *
  7. from apps.web.models.users import UserModel, Users
  8. from utils.utils import verify_password
  9. from apps.web.internal.db import DB
  10. from config import SRC_LOG_LEVELS
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  13. ####################
  14. # DB MODEL
  15. ####################
  16. class Auth(Model):
  17. id = CharField(unique=True)
  18. email = CharField()
  19. password = TextField()
  20. active = BooleanField()
  21. class Meta:
  22. database = DB
  23. class AuthModel(BaseModel):
  24. id: str
  25. email: str
  26. password: str
  27. active: bool = True
  28. ####################
  29. # Forms
  30. ####################
  31. class Token(BaseModel):
  32. token: str
  33. token_type: str
  34. class ApiKey(BaseModel):
  35. api_key: Optional[str] = None
  36. class UserResponse(BaseModel):
  37. id: str
  38. email: str
  39. name: str
  40. role: str
  41. profile_image_url: str
  42. class SigninResponse(Token, UserResponse):
  43. pass
  44. class SigninForm(BaseModel):
  45. email: str
  46. password: str
  47. class ProfileImageUrlForm(BaseModel):
  48. profile_image_url: str
  49. class UpdateProfileForm(BaseModel):
  50. profile_image_url: str
  51. name: str
  52. class UpdatePasswordForm(BaseModel):
  53. password: str
  54. new_password: str
  55. class SignupForm(BaseModel):
  56. name: str
  57. email: str
  58. password: str
  59. profile_image_url: Optional[str] = "/user.png"
  60. class AddUserForm(SignupForm):
  61. role: Optional[str] = "pending"
  62. class AuthsTable:
  63. def __init__(self, db):
  64. self.db = db
  65. self.db.create_tables([Auth])
  66. def insert_new_auth(
  67. self,
  68. email: str,
  69. password: str,
  70. name: str,
  71. profile_image_url: str = "/user.png",
  72. role: str = "pending",
  73. ) -> Optional[UserModel]:
  74. log.info("insert_new_auth")
  75. id = str(uuid.uuid4())
  76. auth = AuthModel(
  77. **{"id": id, "email": email, "password": password, "active": True}
  78. )
  79. result = Auth.create(**auth.model_dump())
  80. user = Users.insert_new_user(id, name, email, profile_image_url, role)
  81. if result and user:
  82. return user
  83. else:
  84. return None
  85. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  86. log.info(f"authenticate_user: {email}")
  87. try:
  88. auth = Auth.get(Auth.email == email, Auth.active == True)
  89. if auth:
  90. if verify_password(password, auth.password):
  91. user = Users.get_user_by_id(auth.id)
  92. return user
  93. else:
  94. return None
  95. else:
  96. return None
  97. except:
  98. return None
  99. def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  100. log.info(f"authenticate_user_by_api_key: {api_key}")
  101. # if no api_key, return None
  102. if not api_key:
  103. return None
  104. try:
  105. user = Users.get_user_by_api_key(api_key)
  106. return user if user else None
  107. except:
  108. return False
  109. def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
  110. log.info(f"authenticate_user_by_trusted_header: {email}")
  111. try:
  112. auth = Auth.get(Auth.email == email, Auth.active == True)
  113. if auth:
  114. user = Users.get_user_by_id(auth.id)
  115. return user
  116. except:
  117. return None
  118. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  119. try:
  120. query = Auth.update(password=new_password).where(Auth.id == id)
  121. result = query.execute()
  122. return True if result == 1 else False
  123. except:
  124. return False
  125. def update_email_by_id(self, id: str, email: str) -> bool:
  126. try:
  127. query = Auth.update(email=email).where(Auth.id == id)
  128. result = query.execute()
  129. return True if result == 1 else False
  130. except:
  131. return False
  132. def delete_auth_by_id(self, id: str) -> bool:
  133. try:
  134. # Delete User
  135. result = Users.delete_user_by_id(id)
  136. if result:
  137. # Delete Auth
  138. query = Auth.delete().where(Auth.id == id)
  139. query.execute() # Remove the rows, return number of rows removed.
  140. return True
  141. else:
  142. return False
  143. except:
  144. return False
  145. Auths = AuthsTable(DB)