users.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. from pymongo import ReturnDocument
  4. import time
  5. from utils.utils import decode_token
  6. from utils.misc import get_gravatar_url
  7. from config import DB
  8. ####################
  9. # User DB Schema
  10. ####################
  11. class UserModel(BaseModel):
  12. id: str
  13. name: str
  14. email: str
  15. role: str = "pending"
  16. profile_image_url: str = "/user.png"
  17. created_at: int # timestamp in epoch
  18. ####################
  19. # Forms
  20. ####################
  21. class UsersTable:
  22. def __init__(self, db):
  23. self.db = db
  24. self.table = db.users
  25. def insert_new_user(
  26. self, id: str, name: str, email: str, role: str = "pending"
  27. ) -> Optional[UserModel]:
  28. user = UserModel(
  29. **{
  30. "id": id,
  31. "name": name,
  32. "email": email,
  33. "role": role,
  34. "profile_image_url": get_gravatar_url(email),
  35. "created_at": int(time.time()),
  36. }
  37. )
  38. result = self.table.insert_one(user.model_dump())
  39. if result:
  40. return user
  41. else:
  42. return None
  43. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  44. user = self.table.find_one({"email": email}, {"_id": False})
  45. if user:
  46. return UserModel(**user)
  47. else:
  48. return None
  49. def get_user_by_token(self, token: str) -> Optional[UserModel]:
  50. data = decode_token(token)
  51. if data != None and "email" in data:
  52. return self.get_user_by_email(data["email"])
  53. else:
  54. return None
  55. def get_users(self, skip: int = 0, limit: int = 50) -> Optional[UserModel]:
  56. return [
  57. UserModel(**user)
  58. for user in list(self.table.find({}, {"_id": False}))
  59. .skip(skip)
  60. .limit(limit)
  61. ]
  62. Users = UsersTable(DB)