users.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. from pymongo import ReturnDocument
  4. import time
  5. from utils.utils import decode_token
  6. from utils.misc import get_gravatar_url
  7. from config import DB
  8. ####################
  9. # User DB Schema
  10. ####################
  11. class UserModel(BaseModel):
  12. id: str
  13. name: str
  14. email: str
  15. role: str = "pending"
  16. profile_image_url: str = "/user.png"
  17. created_at: int # timestamp in epoch
  18. ####################
  19. # Forms
  20. ####################
  21. class UserRoleUpdateForm(BaseModel):
  22. id: str
  23. role: str
  24. class UsersTable:
  25. def __init__(self, db):
  26. self.db = db
  27. self.table = db.users
  28. def insert_new_user(
  29. self, id: str, name: str, email: str, role: str = "pending"
  30. ) -> Optional[UserModel]:
  31. user = UserModel(
  32. **{
  33. "id": id,
  34. "name": name,
  35. "email": email,
  36. "role": role,
  37. "profile_image_url": get_gravatar_url(email),
  38. "created_at": int(time.time()),
  39. }
  40. )
  41. result = self.table.insert_one(user.model_dump())
  42. if result:
  43. return user
  44. else:
  45. return None
  46. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  47. user = self.table.find_one({"email": email}, {"_id": False})
  48. if user:
  49. return UserModel(**user)
  50. else:
  51. return None
  52. def get_user_by_token(self, token: str) -> Optional[UserModel]:
  53. data = decode_token(token)
  54. if data != None and "email" in data:
  55. return self.get_user_by_email(data["email"])
  56. else:
  57. return None
  58. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  59. return [
  60. UserModel(**user)
  61. for user in list(
  62. self.table.find({}, {"_id": False}).skip(skip).limit(limit)
  63. )
  64. ]
  65. def get_num_users(self) -> Optional[int]:
  66. return self.table.count_documents({})
  67. def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
  68. user = self.table.find_one_and_update(
  69. {"id": id}, {"$set": updated}, return_document=ReturnDocument.AFTER
  70. )
  71. return UserModel(**user)
  72. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  73. return self.update_user_by_id(id, {"role": role})
  74. Users = UsersTable(DB)