users.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.misc import get_gravatar_url
  7. from apps.web.internal.db import DB
  8. from apps.web.models.chats import Chats
  9. ####################
  10. # User DB Schema
  11. ####################
  12. class User(Model):
  13. id = CharField(unique=True)
  14. name = CharField()
  15. email = CharField()
  16. role = CharField()
  17. profile_image_url = TextField()
  18. last_active_at = BigIntegerField()
  19. updated_at = BigIntegerField()
  20. created_at = BigIntegerField()
  21. api_key = CharField(null=True, unique=True)
  22. class Meta:
  23. database = DB
  24. class UserModel(BaseModel):
  25. id: str
  26. name: str
  27. email: str
  28. role: str = "pending"
  29. profile_image_url: str
  30. last_active_at: int # timestamp in epoch
  31. updated_at: int # timestamp in epoch
  32. created_at: int # timestamp in epoch
  33. api_key: Optional[str] = None
  34. ####################
  35. # Forms
  36. ####################
  37. class UserRoleUpdateForm(BaseModel):
  38. id: str
  39. role: str
  40. class UserUpdateForm(BaseModel):
  41. name: str
  42. email: str
  43. profile_image_url: str
  44. password: Optional[str] = None
  45. class UsersTable:
  46. def __init__(self, db):
  47. self.db = db
  48. self.db.create_tables([User])
  49. def insert_new_user(
  50. self,
  51. id: str,
  52. name: str,
  53. email: str,
  54. profile_image_url: str = "/user.png",
  55. role: str = "pending",
  56. ) -> Optional[UserModel]:
  57. user = UserModel(
  58. **{
  59. "id": id,
  60. "name": name,
  61. "email": email,
  62. "role": role,
  63. "profile_image_url": profile_image_url,
  64. "last_active_at": int(time.time()),
  65. "created_at": int(time.time()),
  66. "updated_at": int(time.time()),
  67. }
  68. )
  69. result = User.create(**user.model_dump())
  70. if result:
  71. return user
  72. else:
  73. return None
  74. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  75. try:
  76. user = User.get(User.id == id)
  77. return UserModel(**model_to_dict(user))
  78. except:
  79. return None
  80. def get_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  81. try:
  82. user = User.get(User.api_key == api_key)
  83. return UserModel(**model_to_dict(user))
  84. except:
  85. return None
  86. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  87. try:
  88. user = User.get(User.email == email)
  89. return UserModel(**model_to_dict(user))
  90. except:
  91. return None
  92. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  93. return [
  94. UserModel(**model_to_dict(user))
  95. for user in User.select()
  96. # .limit(limit).offset(skip)
  97. ]
  98. def get_num_users(self) -> Optional[int]:
  99. return User.select().count()
  100. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  101. try:
  102. query = User.update(role=role).where(User.id == id)
  103. query.execute()
  104. user = User.get(User.id == id)
  105. return UserModel(**model_to_dict(user))
  106. except:
  107. return None
  108. def update_user_profile_image_url_by_id(
  109. self, id: str, profile_image_url: str
  110. ) -> Optional[UserModel]:
  111. try:
  112. query = User.update(profile_image_url=profile_image_url).where(
  113. User.id == id
  114. )
  115. query.execute()
  116. user = User.get(User.id == id)
  117. return UserModel(**model_to_dict(user))
  118. except:
  119. return None
  120. def update_user_last_active_by_id(self, id: str) -> Optional[UserModel]:
  121. try:
  122. query = User.update(last_active_at=int(time.time())).where(User.id == id)
  123. query.execute()
  124. user = User.get(User.id == id)
  125. return UserModel(**model_to_dict(user))
  126. except:
  127. return None
  128. def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
  129. try:
  130. query = User.update(**updated).where(User.id == id)
  131. query.execute()
  132. user = User.get(User.id == id)
  133. return UserModel(**model_to_dict(user))
  134. except:
  135. return None
  136. def delete_user_by_id(self, id: str) -> bool:
  137. try:
  138. # Delete User Chats
  139. result = Chats.delete_chats_by_user_id(id)
  140. if result:
  141. # Delete User
  142. query = User.delete().where(User.id == id)
  143. query.execute() # Remove the rows, return number of rows removed.
  144. return True
  145. else:
  146. return False
  147. except:
  148. return False
  149. def update_user_api_key_by_id(self, id: str, api_key: str) -> str:
  150. try:
  151. query = User.update(api_key=api_key).where(User.id == id)
  152. result = query.execute()
  153. return True if result == 1 else False
  154. except:
  155. return False
  156. def get_user_api_key_by_id(self, id: str) -> Optional[str]:
  157. try:
  158. user = User.get(User.id == id)
  159. return user.api_key
  160. except:
  161. return None
  162. Users = UsersTable(DB)