auths.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. import time
  4. import uuid
  5. import logging
  6. from peewee import *
  7. from apps.web.models.users import UserModel, Users
  8. from utils.utils import verify_password
  9. from apps.web.internal.db import DB
  10. from config import SRC_LOG_LEVELS
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  13. ####################
  14. # DB MODEL
  15. ####################
  16. class Auth(Model):
  17. id = CharField(unique=True)
  18. email = CharField()
  19. password = CharField()
  20. active = BooleanField()
  21. api_key = CharField(null=True, unique=True)
  22. class Meta:
  23. database = DB
  24. class AuthModel(BaseModel):
  25. id: str
  26. email: str
  27. password: str
  28. active: bool = True
  29. api_key: Optional[str] = None
  30. ####################
  31. # Forms
  32. ####################
  33. class Token(BaseModel):
  34. token: str
  35. token_type: str
  36. class ApiKey(BaseModel):
  37. api_key: Optional[str] = None
  38. class UserResponse(BaseModel):
  39. id: str
  40. email: str
  41. name: str
  42. role: str
  43. profile_image_url: str
  44. class SigninResponse(Token, UserResponse):
  45. pass
  46. class SigninForm(BaseModel):
  47. email: str
  48. password: str
  49. class ProfileImageUrlForm(BaseModel):
  50. profile_image_url: str
  51. class UpdateProfileForm(BaseModel):
  52. profile_image_url: str
  53. name: str
  54. class UpdatePasswordForm(BaseModel):
  55. password: str
  56. new_password: str
  57. class SignupForm(BaseModel):
  58. name: str
  59. email: str
  60. password: str
  61. class AuthsTable:
  62. def __init__(self, db):
  63. self.db = db
  64. self.db.create_tables([Auth])
  65. def insert_new_auth(
  66. self, email: str, password: str, name: str, role: str = "pending"
  67. ) -> Optional[UserModel]:
  68. log.info("insert_new_auth")
  69. id = str(uuid.uuid4())
  70. auth = AuthModel(
  71. **{"id": id, "email": email, "password": password, "active": True}
  72. )
  73. result = Auth.create(**auth.model_dump())
  74. user = Users.insert_new_user(id, name, email, role)
  75. if result and user:
  76. return user
  77. else:
  78. return None
  79. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  80. log.info(f"authenticate_user: {email}")
  81. try:
  82. auth = Auth.get(Auth.email == email, Auth.active == True)
  83. if auth:
  84. if verify_password(password, auth.password):
  85. user = Users.get_user_by_id(auth.id)
  86. return user
  87. else:
  88. return None
  89. else:
  90. return None
  91. except:
  92. return None
  93. def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  94. log.info(f"authenticate_user_by_api_key: {api_key}")
  95. # if no api_key, return None
  96. if not api_key:
  97. return None
  98. try:
  99. auth = Auth.get(Auth.api_key == api_key, Auth.active == True)
  100. if auth:
  101. user = Users.get_user_by_id(auth.id)
  102. return user
  103. else:
  104. return None
  105. except:
  106. return None
  107. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  108. try:
  109. query = Auth.update(password=new_password).where(Auth.id == id)
  110. result = query.execute()
  111. return True if result == 1 else False
  112. except:
  113. return False
  114. def update_email_by_id(self, id: str, email: str) -> bool:
  115. try:
  116. query = Auth.update(email=email).where(Auth.id == id)
  117. result = query.execute()
  118. return True if result == 1 else False
  119. except:
  120. return False
  121. def update_api_key_by_id(self, id: str, api_key: str) -> str:
  122. try:
  123. query = Auth.update(api_key=api_key).where(Auth.id == id)
  124. result = query.execute()
  125. return True if result == 1 else False
  126. except:
  127. return False
  128. def get_api_key_by_id(self, id: str) -> Optional[str]:
  129. try:
  130. auth = Auth.get(Auth.id == id)
  131. return auth.api_key
  132. except:
  133. return None
  134. def delete_auth_by_id(self, id: str) -> bool:
  135. try:
  136. # Delete User
  137. result = Users.delete_user_by_id(id)
  138. if result:
  139. # Delete Auth
  140. query = Auth.delete().where(Auth.id == id)
  141. query.execute() # Remove the rows, return number of rows removed.
  142. return True
  143. else:
  144. return False
  145. except:
  146. return False
  147. Auths = AuthsTable(DB)