users.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.misc import get_gravatar_url
  7. from apps.web.internal.db import DB
  8. from apps.web.models.chats import Chats
  9. ####################
  10. # User DB Schema
  11. ####################
  12. class User(Model):
  13. id = CharField(unique=True)
  14. name = CharField()
  15. email = CharField()
  16. role = CharField()
  17. profile_image_url = CharField()
  18. timestamp = DateField()
  19. class Meta:
  20. database = DB
  21. class UserModel(BaseModel):
  22. id: str
  23. name: str
  24. email: str
  25. role: str = "pending"
  26. profile_image_url: str
  27. timestamp: int # timestamp in epoch
  28. ####################
  29. # Forms
  30. ####################
  31. class UserRoleUpdateForm(BaseModel):
  32. id: str
  33. role: str
  34. class UserUpdateForm(BaseModel):
  35. name: str
  36. email: str
  37. profile_image_url: str
  38. password: Optional[str] = None
  39. class UsersTable:
  40. def __init__(self, db):
  41. self.db = db
  42. self.db.create_tables([User])
  43. def insert_new_user(
  44. self,
  45. id: str,
  46. name: str,
  47. email: str,
  48. profile_image_url: str = "/user.png",
  49. role: str = "pending",
  50. ) -> Optional[UserModel]:
  51. user = UserModel(
  52. **{
  53. "id": id,
  54. "name": name,
  55. "email": email,
  56. "role": role,
  57. "profile_image_url": profile_image_url,
  58. "timestamp": int(time.time()),
  59. }
  60. )
  61. result = User.create(**user.model_dump())
  62. if result:
  63. return user
  64. else:
  65. return None
  66. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  67. try:
  68. user = User.get(User.id == id)
  69. return UserModel(**model_to_dict(user))
  70. except:
  71. return None
  72. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  73. try:
  74. user = User.get(User.email == email)
  75. return UserModel(**model_to_dict(user))
  76. except:
  77. return None
  78. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  79. return [
  80. UserModel(**model_to_dict(user))
  81. for user in User.select()
  82. # .limit(limit).offset(skip)
  83. ]
  84. def get_num_users(self) -> Optional[int]:
  85. return User.select().count()
  86. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  87. try:
  88. query = User.update(role=role).where(User.id == id)
  89. query.execute()
  90. user = User.get(User.id == id)
  91. return UserModel(**model_to_dict(user))
  92. except:
  93. return None
  94. def update_user_profile_image_url_by_id(
  95. self, id: str, profile_image_url: str
  96. ) -> Optional[UserModel]:
  97. try:
  98. query = User.update(profile_image_url=profile_image_url).where(
  99. User.id == id
  100. )
  101. query.execute()
  102. user = User.get(User.id == id)
  103. return UserModel(**model_to_dict(user))
  104. except:
  105. return None
  106. def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
  107. try:
  108. query = User.update(**updated).where(User.id == id)
  109. query.execute()
  110. user = User.get(User.id == id)
  111. return UserModel(**model_to_dict(user))
  112. except:
  113. return None
  114. def delete_user_by_id(self, id: str) -> bool:
  115. try:
  116. # Delete User Chats
  117. result = Chats.delete_chats_by_user_id(id)
  118. if result:
  119. # Delete User
  120. query = User.delete().where(User.id == id)
  121. query.execute() # Remove the rows, return number of rows removed.
  122. return True
  123. else:
  124. return False
  125. except:
  126. return False
  127. Users = UsersTable(DB)