auths.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. import time
  4. import uuid
  5. from peewee import *
  6. from apps.web.models.users import UserModel, Users
  7. from utils.utils import (
  8. verify_password,
  9. get_password_hash,
  10. bearer_scheme,
  11. create_token,
  12. )
  13. from apps.web.internal.db import DB
  14. ####################
  15. # DB MODEL
  16. ####################
  17. class Auth(Model):
  18. id = CharField(unique=True)
  19. email = CharField()
  20. password = CharField()
  21. active = BooleanField()
  22. class Meta:
  23. database = DB
  24. class AuthModel(BaseModel):
  25. id: str
  26. email: str
  27. password: str
  28. active: bool = True
  29. ####################
  30. # Forms
  31. ####################
  32. class Token(BaseModel):
  33. token: str
  34. token_type: str
  35. class UserResponse(BaseModel):
  36. id: str
  37. email: str
  38. name: str
  39. role: str
  40. profile_image_url: str
  41. class SigninResponse(Token, UserResponse):
  42. pass
  43. class SigninForm(BaseModel):
  44. email: str
  45. password: str
  46. class ProfileImageUrlForm(BaseModel):
  47. profile_image_url: str
  48. class UpdateProfileForm(BaseModel):
  49. profile_image_url: str
  50. name: str
  51. class UpdatePasswordForm(BaseModel):
  52. password: str
  53. new_password: str
  54. class SignupForm(BaseModel):
  55. name: str
  56. email: str
  57. password: str
  58. class AuthsTable:
  59. def __init__(self, db):
  60. self.db = db
  61. self.db.create_tables([Auth])
  62. def insert_new_auth(
  63. self, email: str, password: str, name: str, role: str = "pending"
  64. ) -> Optional[UserModel]:
  65. print("insert_new_auth")
  66. id = str(uuid.uuid4())
  67. auth = AuthModel(
  68. **{"id": id, "email": email, "password": password, "active": True}
  69. )
  70. result = Auth.create(**auth.model_dump())
  71. user = Users.insert_new_user(id, name, email, role)
  72. if result and user:
  73. return user
  74. else:
  75. return None
  76. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  77. print("authenticate_user", email)
  78. try:
  79. auth = Auth.get(Auth.email == email, Auth.active == True)
  80. if auth:
  81. if verify_password(password, auth.password):
  82. user = Users.get_user_by_id(auth.id)
  83. return user
  84. else:
  85. return None
  86. else:
  87. return None
  88. except:
  89. return None
  90. def update_user_password_by_id(self, id: str, new_password: str) -> bool:
  91. try:
  92. query = Auth.update(password=new_password).where(Auth.id == id)
  93. result = query.execute()
  94. return True if result == 1 else False
  95. except:
  96. return False
  97. def update_email_by_id(self, id: str, email: str) -> bool:
  98. try:
  99. query = Auth.update(email=email).where(Auth.id == id)
  100. result = query.execute()
  101. return True if result == 1 else False
  102. except:
  103. return False
  104. def delete_auth_by_id(self, id: str) -> bool:
  105. try:
  106. # Delete User
  107. result = Users.delete_user_by_id(id)
  108. if result:
  109. # Delete Auth
  110. query = Auth.delete().where(Auth.id == id)
  111. query.execute() # Remove the rows, return number of rows removed.
  112. return True
  113. else:
  114. return False
  115. except:
  116. return False
  117. Auths = AuthsTable(DB)