users.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. from apps.web.models.chats import Chats
  10. ####################
  11. # User DB Schema
  12. ####################
  13. class User(Model):
  14. id = CharField(unique=True)
  15. name = CharField()
  16. email = CharField()
  17. role = CharField()
  18. profile_image_url = CharField()
  19. timestamp = DateField()
  20. class Meta:
  21. database = DB
  22. class UserModel(BaseModel):
  23. id: str
  24. name: str
  25. email: str
  26. role: str = "pending"
  27. profile_image_url: str = "/user.png"
  28. timestamp: int # timestamp in epoch
  29. ####################
  30. # Forms
  31. ####################
  32. class UserRoleUpdateForm(BaseModel):
  33. id: str
  34. role: str
  35. class UsersTable:
  36. def __init__(self, db):
  37. self.db = db
  38. self.db.create_tables([User])
  39. def insert_new_user(
  40. self, id: str, name: str, email: str, role: str = "pending"
  41. ) -> Optional[UserModel]:
  42. user = UserModel(
  43. **{
  44. "id": id,
  45. "name": name,
  46. "email": email,
  47. "role": role,
  48. "profile_image_url": get_gravatar_url(email),
  49. "timestamp": int(time.time()),
  50. }
  51. )
  52. result = User.create(**user.model_dump())
  53. if result:
  54. return user
  55. else:
  56. return None
  57. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  58. try:
  59. user = User.get(User.id == id)
  60. return UserModel(**model_to_dict(user))
  61. except:
  62. return None
  63. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  64. try:
  65. user = User.get(User.email == email)
  66. return UserModel(**model_to_dict(user))
  67. except:
  68. return None
  69. def get_user_by_token(self, token: str) -> Optional[UserModel]:
  70. data = decode_token(token)
  71. if data != None and "email" in data:
  72. return self.get_user_by_email(data["email"])
  73. else:
  74. return None
  75. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  76. return [
  77. UserModel(**model_to_dict(user))
  78. for user in User.select().limit(limit).offset(skip)
  79. ]
  80. def get_num_users(self) -> Optional[int]:
  81. return User.select().count()
  82. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  83. try:
  84. query = User.update(role=role).where(User.id == id)
  85. query.execute()
  86. user = User.get(User.id == id)
  87. return UserModel(**model_to_dict(user))
  88. except:
  89. return None
  90. def delete_user_by_id(self, id: str) -> bool:
  91. try:
  92. # Delete User Chats
  93. result = Chats.delete_chats_by_user_id(id)
  94. if result:
  95. # Delete User
  96. query = User.delete().where(User.id == id)
  97. query.execute() # Remove the rows, return number of rows removed.
  98. return True
  99. else:
  100. return False
  101. except:
  102. return False
  103. Users = UsersTable(DB)