auths.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. import time
  4. import uuid
  5. from apps.web.models.users import UserModel, Users
  6. from utils.utils import (
  7. verify_password,
  8. get_password_hash,
  9. bearer_scheme,
  10. create_token,
  11. )
  12. import config
  13. DB = config.DB
  14. ####################
  15. # DB MODEL
  16. ####################
  17. class AuthModel(BaseModel):
  18. id: str
  19. email: str
  20. password: str
  21. active: bool = True
  22. ####################
  23. # Forms
  24. ####################
  25. class Token(BaseModel):
  26. token: str
  27. token_type: str
  28. class UserResponse(BaseModel):
  29. id: str
  30. email: str
  31. name: str
  32. role: str
  33. profile_image_url: str
  34. class SigninResponse(Token, UserResponse):
  35. pass
  36. class SigninForm(BaseModel):
  37. email: str
  38. password: str
  39. class SignupForm(BaseModel):
  40. name: str
  41. email: str
  42. password: str
  43. class AuthsTable:
  44. def __init__(self, db):
  45. self.db = db
  46. self.table = db.auths
  47. def insert_new_auth(
  48. self, email: str, password: str, name: str, role: str = "pending"
  49. ) -> Optional[UserModel]:
  50. print("insert_new_auth")
  51. id = str(uuid.uuid4())
  52. auth = AuthModel(
  53. **{"id": id, "email": email, "password": password, "active": True}
  54. )
  55. result = self.table.insert_one(auth.model_dump())
  56. user = Users.insert_new_user(id, name, email, role)
  57. print(result, user)
  58. if result and user:
  59. return user
  60. else:
  61. return None
  62. def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
  63. print("authenticate_user")
  64. auth = self.table.find_one({"email": email, "active": True})
  65. if auth:
  66. if verify_password(password, auth["password"]):
  67. user = self.db.users.find_one({"id": auth["id"]})
  68. return UserModel(**user)
  69. else:
  70. return None
  71. else:
  72. return None
  73. Auths = AuthsTable(DB)